### Converting Argo data to parquet with dask

This notebook downloads and converts Argo Core and BGC profiles, given:

* the local path `gdac_path` to the argo index files (if they don't exist, they'll be downloaded to the folder),
* the path `outdir_nc` where to download the most recent Argo profile files (this is required to end with `GDAC/dac/`,
* the path `outdir_pqt` where the parquet database will be stored,
* the path `schema_path` to the parquet schemas, this should not need to be changed.

In [1]:
import argo_tools as at
from pprint import pprint

gdac_path = '/vortexfs1/share/boom/data/nc2pqt_test/'
outdir_nc = '/vortexfs1/share/boom/data/nc2pqt_test/GDAC/dac/'
outdir_pqt = '/vortexfs1/share/boom/data/nc2pqt_test/pqt2/'
schema_path = '/vortexfs1/home/enrico.milanese/projects/ARGO/nc2parquet/schemas/ArgoBGC_DATA_MODE_schema.metadata'

In [2]:
import dask
from dask.distributed import Client
client = Client(
    n_workers=10,
    threads_per_worker=10,
    processes=True, 
    memory_limit='auto'
)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 10
Total threads: 100,Total memory: 271.27 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38526,Workers: 10
Dashboard: http://127.0.0.1:8787/status,Total threads: 100
Started: Just now,Total memory: 271.27 GiB

0,1
Comm: tcp://127.0.0.1:45618,Total threads: 10
Dashboard: http://127.0.0.1:40712/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:35674,
Local directory: /tmp/dask-scratch-space/worker-ei7gdrfo,Local directory: /tmp/dask-scratch-space/worker-ei7gdrfo

0,1
Comm: tcp://127.0.0.1:43405,Total threads: 10
Dashboard: http://127.0.0.1:33654/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:46568,
Local directory: /tmp/dask-scratch-space/worker-oiik4uj1,Local directory: /tmp/dask-scratch-space/worker-oiik4uj1

0,1
Comm: tcp://127.0.0.1:40813,Total threads: 10
Dashboard: http://127.0.0.1:46395/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:37218,
Local directory: /tmp/dask-scratch-space/worker-kz_kg9mw,Local directory: /tmp/dask-scratch-space/worker-kz_kg9mw

0,1
Comm: tcp://127.0.0.1:33205,Total threads: 10
Dashboard: http://127.0.0.1:33645/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:39403,
Local directory: /tmp/dask-scratch-space/worker-3ci1vf7r,Local directory: /tmp/dask-scratch-space/worker-3ci1vf7r

0,1
Comm: tcp://127.0.0.1:38766,Total threads: 10
Dashboard: http://127.0.0.1:33156/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:45555,
Local directory: /tmp/dask-scratch-space/worker-kx5eiv29,Local directory: /tmp/dask-scratch-space/worker-kx5eiv29

0,1
Comm: tcp://127.0.0.1:41058,Total threads: 10
Dashboard: http://127.0.0.1:44943/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:37212,
Local directory: /tmp/dask-scratch-space/worker-f188j0_t,Local directory: /tmp/dask-scratch-space/worker-f188j0_t

0,1
Comm: tcp://127.0.0.1:42094,Total threads: 10
Dashboard: http://127.0.0.1:35386/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:40695,
Local directory: /tmp/dask-scratch-space/worker-6h_dm6e7,Local directory: /tmp/dask-scratch-space/worker-6h_dm6e7

0,1
Comm: tcp://127.0.0.1:39455,Total threads: 10
Dashboard: http://127.0.0.1:44267/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:34137,
Local directory: /tmp/dask-scratch-space/worker-qj0jg5s7,Local directory: /tmp/dask-scratch-space/worker-qj0jg5s7

0,1
Comm: tcp://127.0.0.1:40163,Total threads: 10
Dashboard: http://127.0.0.1:33691/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:35958,
Local directory: /tmp/dask-scratch-space/worker-in6p2nci,Local directory: /tmp/dask-scratch-space/worker-in6p2nci

0,1
Comm: tcp://127.0.0.1:43890,Total threads: 10
Dashboard: http://127.0.0.1:35700/status,Memory: 27.13 GiB
Nanny: tcp://127.0.0.1:35509,
Local directory: /tmp/dask-scratch-space/worker-4mk53uck,Local directory: /tmp/dask-scratch-space/worker-4mk53uck


In [3]:
import pyarrow as pa
import pyarrow.parquet as pq

schema_BGC = pq.read_schema(schema_path)

In [4]:
from datetime import datetime, timedelta
reference_time = datetime.utcnow() - timedelta(weeks=5*52)

filterQC = []
cols = []
for param in schema_BGC.names:
    if "_ADJUSTED_QC" in param:
        second_last_index = param.rfind('_', 0, param.rfind('_') )
        param_base_name = param[:second_last_index]
        print(param_base_name)
        param_data_mode = param_base_name + '_DATA_MODE'
        print(param_data_mode)
        filterQC.append( [ ("JULD",">=",reference_time), (param, "in", [1,2]), (param_data_mode, "in", ["A","D"]) ] )
        cols.append(param)
    elif "_QC" in param:
        # param_name_parts = param.split('_', 1)
        last_index = param.rfind('_') 
        param_base_name = param[:last_index]
        # if <PARAM>_ADJUSTED already exists, no need to filter by real-time data
        if (param_base_name + '_ADJUSTED') not in schema_BGC.names:
            param_data_mode = param_base_name + '_DATA_MODE'
            filterQC.append( [ ("JULD",">=",reference_time), (param, "in", [1,2]), (param_data_mode, "==", "R") ] )
            cols.append(param)
    else:
        cols.append(param)
    
    # if "_QC" not in param:
    #     cols.append(param)

schema_BGC_QC12 = schema_BGC
for name in schema_BGC_QC12.names:
    if name not in cols:
        id_name = schema_BGC_QC12.get_field_index(name)
        schema_BGC_QC12 = schema_BGC_QC12.remove(id_name)

schema_BGC_QC12_ADDEDVARS = schema_BGC_QC12
toadd = ['ABS_SAL_COMPUTED','CONSERVATIVE_TEMP_COMPUTED','SIGMA1_COMPUTED'] 
for name in toadd:
    schema_BGC_QC12_ADDEDVARS = schema_BGC_QC12_ADDEDVARS.append( pa.field(name, pa.float32()) )

PRES
PRES_DATA_MODE
TEMP
TEMP_DATA_MODE
PSAL
PSAL_DATA_MODE
DOXY
DOXY_DATA_MODE
BBP
BBP_DATA_MODE
BBP470
BBP470_DATA_MODE
BBP532
BBP532_DATA_MODE
BBP700
BBP700_DATA_MODE
TURBIDITY
TURBIDITY_DATA_MODE
CP
CP_DATA_MODE
CP660
CP660_DATA_MODE
CHLA
CHLA_DATA_MODE
CDOM
CDOM_DATA_MODE
NITRATE
NITRATE_DATA_MODE
BISULFIDE
BISULFIDE_DATA_MODE
PH_IN_SITU_TOTAL
PH_IN_SITU_TOTAL_DATA_MODE
DOWN_IRRADIANCE
DOWN_IRRADIANCE_DATA_MODE
DOWN_IRRADIANCE380
DOWN_IRRADIANCE380_DATA_MODE
DOWN_IRRADIANCE412
DOWN_IRRADIANCE412_DATA_MODE
DOWN_IRRADIANCE443
DOWN_IRRADIANCE443_DATA_MODE
DOWN_IRRADIANCE490
DOWN_IRRADIANCE490_DATA_MODE
DOWN_IRRADIANCE555
DOWN_IRRADIANCE555_DATA_MODE
UP_IRRADIANCE
UP_IRRADIANCE_DATA_MODE
UP_IRRADIANCE380
UP_IRRADIANCE380_DATA_MODE
UP_IRRADIANCE412
UP_IRRADIANCE412_DATA_MODE
UP_IRRADIANCE443
UP_IRRADIANCE443_DATA_MODE
UP_IRRADIANCE490
UP_IRRADIANCE490_DATA_MODE
UP_IRRADIANCE555
UP_IRRADIANCE555_DATA_MODE
DOWNWELLING_PAR
DOWNWELLING_PAR_DATA_MODE


In [5]:
schema_BGC_QC12_ADDEDVARS

JULD: timestamp[ns]
LATITUDE: double
LONGITUDE: double
CYCLE_NUMBER: int64
PLATFORM_NUMBER: int64
N_PROF: int64
N_LEVELS: int64
PRES: float
PRES_ADJUSTED: float
PRES_ADJUSTED_QC: uint8
PRES_ADJUSTED_ERROR: float
TEMP: float
TEMP_dPRES: float
TEMP_ADJUSTED: float
TEMP_ADJUSTED_QC: uint8
TEMP_ADJUSTED_ERROR: float
PSAL: float
PSAL_dPRES: float
PSAL_ADJUSTED: float
PSAL_ADJUSTED_QC: uint8
PSAL_ADJUSTED_ERROR: float
DOXY: float
DOXY_dPRES: float
DOXY_ADJUSTED: float
DOXY_ADJUSTED_QC: uint8
DOXY_ADJUSTED_ERROR: float
BBP: float
BBP_dPRES: float
BBP_ADJUSTED: float
BBP_ADJUSTED_QC: uint8
BBP_ADJUSTED_ERROR: float
BBP470: float
BBP470_dPRES: float
BBP470_ADJUSTED: float
BBP470_ADJUSTED_QC: uint8
BBP470_ADJUSTED_ERROR: float
BBP532: float
BBP532_dPRES: float
BBP532_ADJUSTED: float
BBP532_ADJUSTED_QC: uint8
BBP532_ADJUSTED_ERROR: float
BBP700: float
BBP700_dPRES: float
BBP700_ADJUSTED: float
BBP700_ADJUSTED_QC: uint8
BBP700_ADJUSTED_ERROR: float
TURBIDITY: float
TURBIDITY_dPRES: float
TURBIDITY

In [6]:
import dask.dataframe as dd

In [7]:
import gsw

def compute_sa(row):
    return gsw.conversions.SA_from_SP(
        row['PSAL_ADJUSTED'],
        row['PRES_ADJUSTED'],
        row['LONGITUDE'],
        row['LATITUDE']
    )

def compute_ct(row):
    return gsw.conversions.CT_from_t(
        row['ABS_SAL_COMPUTED'],
        row['TEMP_ADJUSTED'],
        row['PRES_ADJUSTED']
    )

def compute_sigma1(row):
    return gsw.density.sigma1(
        row['ABS_SAL_COMPUTED'],
        row['CONSERVATIVE_TEMP_COMPUTED']
    )

In [13]:
%%time
ddf = dd.read_parquet(
                outdir_pqt+'debugBGC_DATA_MODE/',
                engine="pyarrow",
                storage_options={"anon": True, "use_ssl": True},
                columns = cols,
                filters = filterQC
            )

ddf = ddf.repartition(partition_size="300MB")

name_function = lambda x: f"ArgoBGC_QC12_dask_{x}.parquet"

ddf.to_parquet(
    outdir_pqt + 'debug_ArgoBGC_QC12AD_300MB',
    engine="pyarrow",
    name_function = name_function,
    write_metadata_file = True,
    write_index=False,
    schema = schema_BGC_QC12
)

CPU times: user 31.2 s, sys: 2.99 s, total: 34.2 s
Wall time: 1min 22s


In [8]:
%%time
ddf = dd.read_parquet(
                outdir_pqt+'debugBGC_DATA_MODE/',
                engine="pyarrow",
                storage_options={"anon": True, "use_ssl": True},
                columns = cols,
                filters = filterQC
            )

ddf['ABS_SAL_COMPUTED'] = ddf.apply( compute_sa, axis=1, meta=('ABS_SAL_COMPUTED', 'float32') )
ddf['CONSERVATIVE_TEMP_COMPUTED'] = ddf.apply( compute_ct, axis=1, meta=('CONSERVATIVE_TEMP_COMPUTED', 'float32') )
ddf['SIGMA1_COMPUTED'] = ddf.apply( compute_sigma1, axis=1, meta=('SIGMA1_COMPUTED', 'float32') )

ddf = ddf.repartition(partition_size="300MB")

name_function = lambda x: f"ArgoBGC_QC12_dask_{x}.parquet"

ddf.to_parquet(
    outdir_pqt + 'debug_ArgoBGC_QC12AD_ADDEDVARS_300MB',
    engine="pyarrow",
    name_function = name_function,
    write_metadata_file = True,
    write_index=False,
    schema = schema_BGC_QC12_ADDEDVARS
)

CPU times: user 10min 58s, sys: 1min 25s, total: 12min 24s
Wall time: 1h 6min 15s


In [9]:
ref_var = "TEMP_ADJUSTED"
cols_read = ["N_PROF", "N_LEVELS", ref_var,"LATITUDE","LONGITUDE","PRES_ADJUSTED","ABS_SAL_COMPUTED","CONSERVATIVE_TEMP_COMPUTED","SIGMA1_COMPUTED","TEMP_DATA_MODE","PRES_DATA_MODE","PSAL_DATA_MODE"]

In [10]:
ddfAD = dd.read_parquet(
                outdir_pqt + 'debug_ArgoBGC_QC12AD_ADDEDVARS_300MB',
                engine="pyarrow",
                storage_options={"anon": True, "use_ssl": True},
                columns = cols_read
            )

In [11]:
pprint(ddfAD.columns.values)

array(['N_PROF', 'N_LEVELS', 'TEMP_ADJUSTED', 'LATITUDE', 'LONGITUDE',
       'PRES_ADJUSTED', 'ABS_SAL_COMPUTED', 'CONSERVATIVE_TEMP_COMPUTED',
       'SIGMA1_COMPUTED', 'TEMP_DATA_MODE', 'PRES_DATA_MODE',
       'PSAL_DATA_MODE'], dtype=object)


In [12]:
ddfAD.head()

Unnamed: 0,N_PROF,N_LEVELS,TEMP_ADJUSTED,LATITUDE,LONGITUDE,PRES_ADJUSTED,ABS_SAL_COMPUTED,CONSERVATIVE_TEMP_COMPUTED,SIGMA1_COMPUTED,TEMP_DATA_MODE,PRES_DATA_MODE,PSAL_DATA_MODE
0,0,0,12.376,49.236,-14.742,2.27,35.706829,12.353127,31.340513,D,D,D
1,0,1,12.379,49.236,-14.742,4.07,35.707024,12.355882,31.340067,D,D,D
2,0,2,12.378,49.236,-14.742,6.07,35.706913,12.35462,31.340254,D,D,D
3,0,3,12.38,49.236,-14.742,7.97,35.707027,12.356363,31.339966,D,D,D
4,0,4,12.379,49.236,-14.742,10.07,35.707054,12.355083,31.340263,D,D,D


In [None]:
ddf.compute()

#### Done!

When we are done, we can shut down the dask cluster.

In [None]:
client.shutdown()