# Converts ARGO/PR/PF to a Parquet dataset

In [None]:
import datetime
import os
import netCDF4
import numpy as np
import pandas as pd
import uuid

In [None]:
def array_to_list(array):
    """Converts a numpy array into a Python list"""
    if array.dtype != np.dtype("S1"):
        return [item for item in array[:, ]]
    else:
        return [item.tostring().decode() for item in array[:, ]]

In [None]:
def load(path):
    """Load a NetCDF file and transform it into a pandas DataFrame"""
    # Definitions of the dataframe schema
    cols = dict(datetime=None,
                config_mission_number=None,
                cycle_number=None,
                data_centre=None,
                data_mode=None,
                data_state_indicator=None,
                data_type=None,
                date_creation=None,
                date_qc=None,
                date_update=None,
                dc_reference=None,
                depth=None,
                depth_adjusted=None,
                depth_adjusted_error=None,
                depth_adjusted_qc=None,
                depth_qc=None,
                direction=None,
                firmware_version=None,
                float_serial_no=None,
                hdyn=None,
                latitude=None,
                longitude=None,
                original_file_name=None,
                pi_name=None,
                platform_number=None,
                platform_type=None,
                position_qc=None,
                positioning_system=None,
                pres=None,
                pres_adjusted=None,
                pres_adjusted_error=None,
                pres_adjusted_qc=None,
                pres_qc=None,
                project_name=None,
                psal=None,
                psal_adjusted=None,
                psal_adjusted_error=None,
                psal_adjusted_qc=None,
                psal_qc=None,
                station_parameters=None,
                sla=None,
                temp=None,
                temp_adjusted=None,
                temp_adjusted_error=None,
                temp_adjusted_qc=None,
                temp_qc=None,
                vertical_sampling_scheme=None,
                wmo_inst_type=None)

    dtypes = dict(datetime=np.datetime64,
                  config_mission_number=np.int32,
                  cycle_number=np.int32,
                  data_centre=str,
                  data_mode=str,
                  data_state_indicator=str,
                  data_type=str,
                  date_creation=np.datetime64,
                  date_qc=str,
                  date_update=np.datetime64,
                  dc_reference=np.int64,
                  depth=[np.float32],
                  depth_adjusted=[np.float32],
                  depth_adjusted_error=[np.float32],
                  depth_adjusted_qc=[str],
                  depth_qc=[str],
                  direction=str,
                  firmware_version=str,
                  float_serial_no=str,
                  hdyn=np.float32,
                  latitude=np.float64,
                  longitude=np.float64,
                  original_file_name=str,
                  pi_name=str,
                  platform_number=str,
                  platform_type=str,
                  position_qc=str,
                  positioning_system=str,
                  pres=[np.float32],
                  pres_adjusted=[np.float32],
                  pres_adjusted_error=[np.float32],
                  pres_adjusted_qc=[str],
                  pres_qc=[str],
                  project_name=str,
                  psal=[np.float32],
                  psal_adjusted=[np.float32],
                  psal_adjusted_error=[np.float32],
                  psal_adjusted_qc=[str],
                  psal_qc=[str],
                  sla=np.float32,
                  station_parameters=[str],
                  temp=[np.float32],
                  temp_adjusted=[np.float32],
                  temp_adjusted_error=[np.float32],
                  temp_adjusted_qc=[str],
                  temp_qc=[str],
                  vertical_sampling_scheme=str,
                  wmo_inst_type=str)

    with netCDF4.Dataset(path, "r") as ds:
        # Axis of this dataset
        time, levels = len(ds.dimensions["N_PROF"]), len(
            ds.dimensions["N_LEVELS"])

        for name, item in ds.variables.items():
            values = item[:]

            # Axis : the axes must not contain undefined values
            if name in ["JULD", "LATITUDE", "LONGITUDE"]:
                if isinstance(values, np.ma.MaskedArray):
                    if np.ma.is_masked(values) and name == "JULD":
                        return None
                    values = values.data
                if len(values) != time:
                    assert len(values) == 1
                    values = np.full((time, ), values[0], dtype=values.dtype)
                if name == 'JULD':
                    cols["datetime"] = pd.Series(
                        netCDF4.num2date(values, item.units))
                    cols["partition"] = cols["datetime"].apply(lambda x: x.date().replace(day=1))
                else:
                    cols[name.lower()] = values
                continue

            # Process numpy MaskedArray
            if isinstance(
                    values,
                    np.ma.MaskedArray) and values.dtype != np.dtype("S1"):
                values[values.mask] = netCDF4.default_fillvals[
                    values.dtype.
                    str[1:]] if values.dtype.kind != 'f' else np.nan
                values = values.data

            # Transform the data type into a vector
            # (new column of the DataFrame)
            if name == "DATA_TYPE":
                values = [values.tostring().decode()] * time
            # Transforms the matrix of char into an array of strings
            elif name == "STATION_PARAMETERS":
                values = values.data
                values = [[
                    values[ix, jx, :].tostring().decode().strip()
                    for jx in range(values.shape[1])
                ] for ix in range(values.shape[0])]
            # Transforms matrix into an array of Python lists
            elif (item.dimensions == ("N_PROF", "N_LEVELS")) or (len(
                    values.shape) == 2 and item.dimensions[0] == "N_PROF"):
                values = array_to_list(values)
            # Converts arrays of chars into string
            elif values.dtype == np.dtype("S1"):
                string = values.tostring().decode()
                values = list(string) if item.dimensions == (
                    "N_PROF", ) else string
            # Converts a scalar into a new column
            elif item.dimensions[0] == "N_LEVELS" and levels == 1:
                values = [values] * time
            # Not handled
            elif item.dimensions[0] == "N_LEVELS":
                raise RuntimeError((path, name))
            # Transforms column name into lower case
            if name.lower() in cols:
                cols[name.lower()] = values

        # For all loaded data, the values are casted into the specified
        # dataframe type.
        for k, v in cols.items():
            if v is None:
                dtype = dtypes[k]
                if isinstance(dtype, list):
                    dtype = dtype[0]
                    if dtype != str:
                        cols[k] = [
                            item for item in np.full(
                                (time, levels), np.nan, dtype=dtype)[:, ]
                        ]
                    else:
                        cols[k] = [' ' * levels for _ in range(time)]
                elif dtype == np.dtype("float32"):
                    cols[k] = np.full((time,), np.nan, dtype=dtype)
                elif dtype == np.datetime64:
                    cols[k] = np.datetime64()
                elif dtype == str:
                    cols[k] = ""

        df = pd.DataFrame(cols)
        # Strip strings
        for key in [
                "data_state_indicator", "data_type", "firmware_version",
                "float_serial_no", "pi_name", "platform_number",
                "platform_type", "positioning_system",
                "vertical_sampling_scheme", "wmo_inst_type"
        ]:
            df.loc[:, key] = df.loc[:, key].apply(lambda x: x.strip())
        df["original_file_name"] = os.path.basename(path)
        # Transformation of some types contained in the columns. It's faster
        # to do it here on the dataframe pandas.
        df.loc[:, "dc_reference"] = df.loc[:, "dc_reference"].apply(lambda x:
                                                                    int(x))
        df.loc[:, "date_creation"] = df.loc[:, "date_creation"].apply(
            lambda x: datetime.datetime.strptime(x, "%Y%m%d%H%M%S"))
        df.loc[:, "date_update"] = df.loc[:, "date_update"].apply(
            lambda x: datetime.datetime.strptime(x, "%Y%m%d%H%M%S"))
        return df

In [None]:
dirname = "dataset"

In [None]:
# Process one file to test the conversion
load(os.path.join(dirname, "CO_DMQCGL01_20000510_PR_PF.nc")).iloc[0]

In [None]:
def write_db(dirname, df):
    """Function to write or dataset"""
    # During the conversion, we defined a column labeled "partition" defining
    # the date of our data. This column will be used to group our dataset by
    # month. It's possible to do it differently, for example by days.
    partition_keys = [df["partition"]]
    data_df = df.drop("partition", axis='columns')
    for key, subgroup in data_df.groupby(partition_keys):
        outfile = None
        subdir = os.path.join(dirname, f'year={key.year}',
                              f'month={key.month}')
        update = os.path.exists(subdir)
        # Handles of dataframe update.
        if update:
            files = list(os.listdir(subdir))
            if len(files) == 1:
                outfile = os.path.join(subdir, files.pop())
                subgroup = pd.concat([pd.read_parquet(outfile), subgroup])
            elif len(files) == 0:
                pass
            else:
                raise RuntimeError(files)
        else:
            os.makedirs(subdir)

        if outfile is None:
            outfile = f'{uuid.uuid4().hex}.parquet'
        # TODO: lock file before write
        subgroup.to_parquet(os.path.join(subdir, outfile),
                            index=False,
                            compression='snappy')

In [None]:
# A minimalist solution to avoid reprocessing files twice.
def write_buffer(dirname, buffer, files):
    write_db(dirname, pd.concat(buffer))
    for item in files:
        with open(f"{item}.done", "w") as stream:
            pass

In [None]:
root = "argo"

In [None]:
# We can now start the conversion. We'll process the data in blocks to avoid
# doing too many OIs. We make blocks of 64 files, it is the computer RAM that
# acts as a limit here.
blocs = []
files = []

def netcdf_2_parquet(dirname, blocs, files):
    write_buffer(dirname, blocs, files)
    blocs.clear()
    files.clear()    

for item in sorted(os.listdir(dirname)):
    # Skip the file already processed
    if 'PR_PF' not in item or item.endswith(".done"):
        continue
    path = os.path.join(dirname, item)
    blocs.append(load(path))
    files.append(path)
    if len(blocs) > 64:
        netcdf_2_parquet(root, blocs, files)
if len(blocs):
    netcdf_2_parquet(root, blocs, files)

In [None]:
# Reading our file
import pyarrow.parquet as pq
pq.read_table(root, filters=[('year', '==', '2000')]).to_pandas()