'''
Convert raw csv data into parquet
Input(s): Spire_Cargos_AIS_01012019_31122021_hourlydownsampled_0*.csv
Output(s): aisparquet.parquet
Runtime: 9 hours
'''

In [1]:
#pip install dask

In [2]:
#pip install distributed

In [3]:
#pip install dask_jobqueue

In [4]:
#pip install fastparquet

In [1]:
#!pip install bokeh

In [18]:
import dask.dataframe as dd
import glob, os, time
from dask.distributed import Client
from dask_jobqueue import SLURMCluster

In [19]:
def convert_csv_parquet(files, outdir = os.getcwd() + "parquetdata", usecols = None, dtypes = None, date_cols = None, append = True):
    """Convert csv files to parquet"""
    (
    dd.read_csv(
        files,
        usecols = usecols,
        dtype = dtypes,
        parse_dates = date_cols,
        assume_missing = True,
        verbose = False
    )
    .to_parquet(
        outdir,
        write_index = False,
        append = append
    )
    )

# Parsing details

In [20]:
usecols = ['created_at', 'mmsi', 'msg_type', 'latitude', 'longitude', 'speed', 'heading', 'draught']
dtypes = {
    'mmsi' : 'int32',
    'msg_type' : 'int8',
    'latitude' : 'float32',
    'longitude' : 'float32',
    'speed' : 'float16', # can probably reduce size using float16
    'heading' : 'float16',
    'draught' : 'float16'
}
date_cols = ['created_at']

# Files to convert

In [21]:
filepath = '/scratch/petersal/ShippingEmissions/src/data/AIS/ais_csv'
filekeystring = "Spire_Cargos_AIS_01012019_31122021_hourlydownsampled_0"
files = glob.glob(os.path.join(filepath,'*' + filekeystring + '*'))

# Cluster setup

In [25]:
cluster = SLURMCluster(project='def-kasahara-ab',
                       cores=32,
                       memory="32GB",
                       walltime='00:15:00')
client = Client(cluster)
cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39401 instead


Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

In [26]:
client.dashboard_link

'http://172.16.139.5:39401/status'

In [27]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -A def-kasahara-ab
#SBATCH -n 1
#SBATCH --cpus-per-task=32
#SBATCH --mem=30G
#SBATCH -t 00:15:00

/cvmfs/soft.computecanada.ca/easybuild/software/2020/avx2/Core/python/3.8.10/bin/python -m distributed.cli.dask_worker tcp://172.16.139.5:35184 --nthreads 4 --nprocs 8 --memory-limit 3.73GiB --name dummy-name --nanny --death-timeout 60 --protocol tcp://



In [28]:
cluster.scale(32)

In [30]:
!squeue -u petersal

          JOBID     USER      ACCOUNT           NAME  ST  TIME_LEFT NODES CPUS TRES_PER_N MIN_MEM NODELIST (REASON) 
       36090545 petersal def-kasahara    dask-worker   R      14:48     1   32        N/A     30G cdr1601 (None) 
       36090550 petersal def-kasahara    dask-worker   R      14:48     1   32        N/A     30G cdr1520 (None) 
       36090043 petersal def-kasahara    interactive   R      42:34     1    1        N/A    256M cdr774 (None) 


# Convert

In [31]:
files = files[0:1]
print(f"Converting {len(files)} files from {filepath}:")
for file in list(map(lambda x : os.path.split(x)[1], files)):
    print(file)
start = time.time()
convert_csv_parquet(files, os.path.join(os.path.split(filepath)[0], 'ais_raw'), usecols, dtypes, date_cols = date_cols, append = False)
end = time.time()
print(f"Elapsed time: {(end - start)}")

Converting 1 files from /scratch/petersal/ShippingEmissions/src/data/AIS/ais_csv:
Spire_Cargos_AIS_01012019_31122021_hourlydownsampled_000000000300.csv


  result, tz_parsed = tslib.array_to_datetime(


Elapsed time: 419.3014016151428


In [32]:
cluster.close()
client.close()

2022-06-09 22:04:33,185 - bokeh.core.property.validation - ERROR - 'str' object has no attribute 'text'
Traceback (most recent call last):
  File "/home/petersal/.local/lib/python3.8/site-packages/distributed/utils.py", line 767, in wrapper
    return func(*args, **kwargs)
  File "/home/petersal/.local/lib/python3.8/site-packages/distributed/dashboard/components/scheduler.py", line 355, in update
    self.root.title.text = title
AttributeError: 'str' object has no attribute 'text'
2022-06-09 22:04:33,193 - bokeh.application.handlers.function - ERROR - 'str' object has no attribute 'text'
Traceback (most recent call last):
  File "/home/petersal/.local/lib/python3.8/site-packages/distributed/utils.py", line 767, in wrapper
    return func(*args, **kwargs)
  File "/home/petersal/.local/lib/python3.8/site-packages/distributed/dashboard/components/scheduler.py", line 3915, in status_doc
    cluster_memory.update()
  File "/home/petersal/.local/lib/python3.8/site-packages/bokeh/core/propert

2022-06-09 22:04:33,291 - tornado.application - ERROR - Uncaught exception GET /status (172.16.128.6)
HTTPServerRequest(protocol='http', host='172.16.139.5:39401', method='GET', uri='/status', version='HTTP/1.1', remote_ip='172.16.128.6')
Traceback (most recent call last):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2020/avx2/Core/ipykernel/2022a/lib/python3.8/site-packages/tornado/web.py", line 1704, in _execute
    result = await result
  File "/home/petersal/.local/lib/python3.8/site-packages/bokeh/server/views/doc_handler.py", line 54, in get
    session = await self.get_session()
  File "/home/petersal/.local/lib/python3.8/site-packages/bokeh/server/views/session_handler.py", line 144, in get_session
    session = await self.application_context.create_session_if_needed(session_id, self.request, token)
  File "/home/petersal/.local/lib/python3.8/site-packages/bokeh/server/contexts.py", line 243, in create_session_if_needed
    self._application.initialize_document(doc)
