# load altimetric data and store it as zarr

In [1]:
import os
from glob import glob
import pandas as pd
import xarray as xr


In [2]:
from dask.distributed import Client
client = Client()  # set up local cluster on your laptop
client

0,1
Client  Scheduler: tcp://127.0.0.1:57330  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 17.18 GB


In [3]:
root_dir = '/Users/aponte/data/alti/'
out_dir = '/Users/aponte/data/alti/zarr/'
m_dir = sorted(glob(root_dir+'dataset*'))
missions = [m.split('-')[-3] for m in m_dir]
print(missions)

['al', 'alg', 'c2', 'e1', 'e1g', 'e2', 'en', 'enn', 'g2', 'h2', 'h2g', 'j1', 'j1g', 'j1n', 'j2', 'j2g', 'j2n', 'j3', 's3a', 's3b', 'tp', 'tpn']


In [4]:
# preprocessing is required for jason3 which has lwe and mdt variables only in the first half of the dataset
def pprocess(ds):
    variables = ['lwe', 'mdt']
    for v in variables:
        if v not in ds:
            ds[v] = ds.ocean_tide.where(ds.ocean_tide>1e10, drop=False)
    return ds

def load_data(path):
    _files = glob(path+'/**/*.nc', recursive=True)
    ds = xr.open_mfdataset(_files, combine='nested', 
                           concat_dim='time', parallel=True,
                           preprocess = pprocess)
    return ds.chunk({'time': 1e6})

In [5]:
mdir = m_dir[0]
ds = load_data(mdir)

In [6]:
print(ds.nbytes/1e9)

1.923045992


In [7]:
ds

<xarray.Dataset>
Dimensions:         (time: 34340107)
Coordinates:
  * time            (time) datetime64[ns] 2013-10-22T23:42:07.635517696 ... 2015-03-05T23:52:06.335301632
    longitude       (time) float64 dask.array<chunksize=(1000000,), meta=np.ndarray>
    latitude        (time) float64 dask.array<chunksize=(1000000,), meta=np.ndarray>
Data variables:
    cycle           (time) int16 dask.array<chunksize=(1000000,), meta=np.ndarray>
    track           (time) int16 dask.array<chunksize=(1000000,), meta=np.ndarray>
    dac             (time) float32 dask.array<chunksize=(1000000,), meta=np.ndarray>
    lwe             (time) float32 dask.array<chunksize=(1000000,), meta=np.ndarray>
    mdt             (time) float32 dask.array<chunksize=(1000000,), meta=np.ndarray>
    ocean_tide      (time) float64 dask.array<chunksize=(1000000,), meta=np.ndarray>
    sla_filtered    (time) float32 dask.array<chunksize=(1000000,), meta=np.ndarray>
    sla_unfiltered  (time) float32 dask.array<chun

In [None]:
overwrite=False

for mdir, m in zip(m_dir,missions):
    file_out = out_dir+'/'+m
    if not os.path.isdir(file_out) or overwrite:
        ds = load_data(mdir)
        ds.to_zarr(file_out, mode='w')
    print(m+' done')

In [None]:
# upload command:


## look at issue with j3

In [12]:
mdir

'/Users/aponte/data/alti/dataset-duacs-rep-global-j3-phy-l3'

In [50]:
# look into issue with jason3 data

_files = glob(mdir+'/**/*.nc', recursive=True)
print(len(_files))
def pprocess(ds):
    variables = ['lwe', 'mdt']
    for v in variables:
        if v not in ds:
            ds[v] = ds.ocean_tide.where(ds.ocean_tide>1e10, drop=False)
    return ds
            
ds = xr.open_mfdataset(_files[:420], combine='nested', 
                       concat_dim='time', parallel=True, preprocess = pprocess)
ds

962




<xarray.Dataset>
Dimensions:         (time: 20989090)
Coordinates:
  * time            (time) datetime64[ns] 2017-08-20T23:50:44.270703872 ... 2018-12-23T23:49:36.260031744
    longitude       (time) float64 dask.array<chunksize=(47519,), meta=np.ndarray>
    latitude        (time) float64 dask.array<chunksize=(47519,), meta=np.ndarray>
Data variables:
    cycle           (time) int16 dask.array<chunksize=(47519,), meta=np.ndarray>
    track           (time) int16 dask.array<chunksize=(47519,), meta=np.ndarray>
    dac             (time) float32 dask.array<chunksize=(47519,), meta=np.ndarray>
    lwe             (time) float64 dask.array<chunksize=(47519,), meta=np.ndarray>
    mdt             (time) float64 dask.array<chunksize=(47519,), meta=np.ndarray>
    ocean_tide      (time) float64 dask.array<chunksize=(47519,), meta=np.ndarray>
    sla_filtered    (time) float32 dask.array<chunksize=(47519,), meta=np.ndarray>
    sla_unfiltered  (time) float32 dask.array<chunksize=(47519,), me

In [45]:
ds = xr.open_mfdataset(_files[410:], combine='nested', 
                       concat_dim='time', parallel=True)
ds



<xarray.Dataset>
Dimensions:         (time: 27242256)
Coordinates:
  * time            (time) datetime64[ns] 2018-01-26T23:46:43.693778688 ... 2016-08-02T23:34:32.574447872
    longitude       (time) float64 dask.array<chunksize=(70965,), meta=np.ndarray>
    latitude        (time) float64 dask.array<chunksize=(70965,), meta=np.ndarray>
Data variables:
    cycle           (time) int16 dask.array<chunksize=(70965,), meta=np.ndarray>
    track           (time) int16 dask.array<chunksize=(70965,), meta=np.ndarray>
    dac             (time) float32 dask.array<chunksize=(70965,), meta=np.ndarray>
    ocean_tide      (time) float64 dask.array<chunksize=(70965,), meta=np.ndarray>
    sla_filtered    (time) float32 dask.array<chunksize=(70965,), meta=np.ndarray>
    sla_unfiltered  (time) float32 dask.array<chunksize=(70965,), meta=np.ndarray>
Attributes:
    Conventions:                     CF-1.6
    Metadata_Conventions:            Unidata Dataset Discovery v1.0
    cdm_data_type:         