In [1]:
from calendar import monthrange

import numpy as np
import pandas as pd

from nc4 import *
from merra2 import *

In [2]:
def save_df_as_parquet(df, output, compression_level=11, output_dir="dataframes"):
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)

    df.to_parquet(f"{output_dir}/{output}",
                  engine="fastparquet",
                  compression={"_default": {"type": "BROTLI", "args": {"level": compression_level}}})


def save_nc4_as_parquet(filename, variables, raw_dir="raw", output_dir="dataframes", **kwargs):
    output = filename[:-3] + "parquet"

    variables = get_merra_variables(variables)
    with open_xarray_dataset(filename, folder=raw_dir) as dataset:
        for var in variables:
            print(f"\tLoading '{var}'")
            variable = dataset[var].to_dataframe()
            variable.reset_index(inplace=True, drop=True)

            variable16 = variable.astype("float16")
            print(f"\tStandard Deviation (SD): '{variable[var].std()}'")
            print(f"\tSD of float32 - float16: '{(variable - variable16)[var].std()}'")

            print(f"\tSaving '{var}'\n")

            output_dir = f"{output_dir}/{var}"
            if not os.path.isdir(output_dir):
                os.makedirs(output_dir)

            save_df_as_parquet(variable16, output, output_dir=output_dir, **kwargs)


def compress_nc4(filename,
                 variables,
                 raw_dir="raw",
                 output_dir="compressed",
                 compression="zlib",
                 compression_level=9,
                 pack_as_float16=True):
    variables = get_merra_variables(variables)

    if os.path.isfile(f"{output_dir}/{filename}"):
        return

    if os.path.isfile(f"{raw_dir}/{filename}"):
        raw_file = filename
    else:
        raw_file = filename.replace(".nc4", ".SUB.nc4")

    with open_nc4_dataset(filename, folder=raw_dir) as dataset:
        with open_nc4_dataset(filename, folder=output_dir, mode="w") as dst:

            dimensions = dataset[variables[0]].shape
            if pack_as_float16 and dimensions[3] % 2 != 0:
                raise RuntimeError("Cannot pack float16 data as float32 because dimensions is an odd number")

            if pack_as_float16:
                chunksizes = (1, 1, dimensions[2], dimensions[3] // 2)
                dst.createDimension("lon", dimensions[3] // 2)  # //2 because we pack float16s as float32s
            else:
                chunksizes = (1, 1, dimensions[2], dimensions[3])
                dst.createDimension("lon", dimensions[3])

            dst.createDimension("lat", dimensions[2])
            dst.createDimension("lev", dimensions[1])
            dst.createDimension("time", dimensions[0])

            for variable in variables:
                data = dataset.variables[variable]

                dst.createVariable(variable, "f", data.dimensions,
                                   chunksizes=chunksizes,
                                   compression=compression,
                                   complevel=compression_level)
                dst[variable].setncatts(data.__dict__)  # copy variable attributes via a dictionary

                if pack_as_float16:
                    data_float16 = np.array(data[:], dtype="float16")
                    packed_float32 = data_float16.view("float32")
                    dst[variable][:] = packed_float32
                else:
                    dst[variable][:] = data[:]


def compress_all_nc4(collection: str,
                     variables,
                     start_year, end_year,
                     start_month=1, end_month=12,
                     start_day=1, end_day=None,
                     **kwargs):
    for yyyy in range(start_year, end_year + 1):

        stream = get_merra_stream_from_year(yyyy)

        mi = start_month if yyyy == start_year else 1
        mf = end_month if yyyy == end_year else 12

        for mm in range(mi, mf + 1):

            di = start_day if yyyy == start_year and mm == mi else 1
            df = end_day if end_day and yyyy == end_year and mm == mf else monthrange(yyyy, mm)[1]

            for dd in range(di, df + 1):
                filename = f"MERRA2_{stream}.{collection}.{yyyy}{mm:0>2}{dd:0>2}.nc4"
                compress_nc4(filename, variables, **kwargs)

In [4]:
print_nc4_metadata("raw/MERRA2_100.tavg3_3d_asm_Nv.19800101.nc4")

[08:47:34] LOG: Loading ./raw/MERRA2_100.tavg3_3d_asm_Nv.19800101.nc4
<xarray.Dataset>
Dimensions:  (lon: 576, lat: 361, lev: 72, time: 8)
Coordinates:
  * lon      (lon) float64 -180.0 -179.4 -178.8 -178.1 ... 178.1 178.8 179.4
  * lat      (lat) float64 -90.0 -89.5 -89.0 -88.5 -88.0 ... 88.5 89.0 89.5 90.0
  * lev      (lev) float64 1.0 2.0 3.0 4.0 5.0 6.0 ... 68.0 69.0 70.0 71.0 72.0
  * time     (time) datetime64[ns] 1980-01-01T01:30:00 ... 1980-01-01T22:30:00
Data variables: (12/17)
    CLOUD    (time, lev, lat, lon) float32 ...
    DELP     (time, lev, lat, lon) float32 ...
    EPV      (time, lev, lat, lon) float32 ...
    H        (time, lev, lat, lon) float32 ...
    O3       (time, lev, lat, lon) float32 ...
    OMEGA    (time, lev, lat, lon) float32 ...
    ...       ...
    QV       (time, lev, lat, lon) float32 ...
    RH       (time, lev, lat, lon) float32 ...
    SLP      (time, lat, lon) float32 ...
    T        (time, lev, lat, lon) float32 ...
    U        (time, lev,

In [3]:
compress_nc4("MERRA2_100.tavg3_3d_asm_Nv.19800101.nc4",
             ["U", "V"],
             output_dir="compressed/wind")

[17:41:50] LOG: Loading raw/MERRA2_100.tavg3_3d_asm_Nv.19800101.nc4
[17:41:50] LOG: Loading compressed/wind/MERRA2_100.tavg3_3d_asm_Nv.19800101.nc4


In [None]:
compress_all_nc4("tavg3_3d_cld_Np",
                 ["CLOUD", "RH"],
                 start_year=int(input("Enter the 1st year: ")), end_year=2022,
                 start_month=int(input("Enter the 1st month: ")),
                 start_day=int(input("Enter the 1st day: ")))
