# Combine the 20000 netCDF files from the GDP hourly dataset into a data set

In [1]:
import netCDF4 as nc
import xarray as xr
import numpy as np
import sys
import matplotlib.pyplot as plt
from pathlib import Path
import dask
import dask.bag as db
from datetime import datetime
import os 

# figure out where this warning comes from
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [2]:
path_gdp = '/Users/pmiron/OneDrive - Florida State University/projects/clouddrift/data/raw'
files = sorted(list(Path(path_gdp).rglob('*.nc')))

# Calculate the number of observations

In [3]:
def number_of_observations(file):
    """
    Load a file and get the size of the observations.
    """
    df = xr.open_dataset(file, decode_times=False)
    return df.sizes['obs']

In [7]:
file_traj_sz = 'process/count.csv'
if not os.path.isfile(file_traj_sz) or (os.path.getmtime(path_gdp) > os.path.getmtime(file_traj_sz)): 
    # files were modified
    print('Updating the count variable.')
    count = []
    for file in files:
        count_i = dask.delayed(number_of_observations)(file)
        count.append(count_i)  
    count = dask.compute(*count)
    
    # save to file
    count = np.array(count)
    np.savetxt(file_traj_sz, count, fmt='%d')
else: 
    # load if exists already and up to date
    print('Already up-to-date.')
    count = np.loadtxt(file_traj_sz, dtype='int')

Already up-to-date.


# Create data structures

In [5]:
nb_traj = len(count)
nb_obs = np.sum(count)

# scalar values
id = np.zeros(nb_traj, dtype='uint32')
wmo = np.zeros(nb_traj, dtype='uint32')
expno = np.zeros(nb_traj, dtype='uint32')
deploy_date = np.zeros(nb_traj, dtype='datetime64[ns]')
deploy_lat = np.zeros(nb_traj, dtype='float32')
deploy_lon = np.zeros(nb_traj, dtype='float32')
end_date = np.zeros(nb_traj, dtype='datetime64[ns]')
end_lon = np.zeros(nb_traj, dtype='float32')
end_lat = np.zeros(nb_traj, dtype='float32')
drogue_lost_date = np.zeros(nb_traj, dtype='datetime64[ns]')
type_death = np.zeros(nb_traj, dtype='uint8')
type_buoy = np.chararray(nb_traj, itemsize=10)

# attributes converted to metadata/data
'''
location_type = np.zeros(nb_traj, dtype='bool') # 0 Argos, 1 GPS
deployment_ship = 
deployment_status = 
buoy_type_manufacturer = 
buoy_type_sensor_array = 
current_program = np.zeros(nb_traj, dtype='uint32')
purchaser_funding = 
sensor_upgrade
transmissions 
deploying_country
deployment_comments
manufacture_year = np.zeros(nb_traj, dtype='uint16')
manufacture_month = np.zeros(nb_traj, dtype='uint8')
manufacture_voltage = np.zeros(nb_traj, dtype='uint16')



subsfc_float_presence = 
drogue_detect_sensor = 
'''

float_diameter = np.zeros(nb_traj, dtype='float32')
drogue_type = np.zeros(nb_traj, dtype='uint16') 
drogue_length = np.zeros(nb_traj, dtype='float32')
drogue_ballast = np.zeros(nb_traj, dtype='float32')
drag_area_above_drogue = np.zeros(nb_traj, dtype='float32')
drag_area_drogue = np.zeros(nb_traj, dtype='float32')
drag_area_ratio = np.zeros(nb_traj, dtype='float32')
drag_center_depth = np.zeros(nb_traj, dtype='float32')

# vectors
longitude = np.zeros(nb_obs, dtype='float32')
latitude = np.zeros(nb_obs, dtype='float32')
time = np.zeros(nb_obs, dtype='datetime64[ns]')
ve = np.zeros(nb_obs, dtype='float32')
vn = np.zeros(nb_obs, dtype='float32')
err_lat = np.zeros(nb_obs, dtype='float32')
err_lon = np.zeros(nb_obs, dtype='float32')
err_ve = np.zeros(nb_obs, dtype='float32')
err_vn = np.zeros(nb_obs, dtype='float32')
gap = np.zeros(nb_obs, dtype='datetime64[ns]')

In [6]:
def str_to_float(value, default=np.nan):
    try:
        return float(value)
    except ValueError:
        return default

def fill_ragged_array(file, tid, oid):
    """
    Fill the ragged array from the xr.Dataset corresponding to one trajectory
    
    Input filename: path and filename of the netCDF file
          tid: trajectory index
          oid: observation index in the ragged array
    """
    ds = xr.open_dataset(file)
    size = ds.dims['obs']
    
    # scalar
    id[tid] = ds.ID.data[0]
    wmo[tid] = ds.WMO.data[0]
    expno[tid] = ds.expno.data[0]
    deploy_date[tid] = ds.deploy_date.data[0]
    deploy_lon[tid] = ds.deploy_lon.data[0]
    deploy_lat[tid] = ds.deploy_lat.data[0]
    end_date[tid] = ds.end_date.data[0]
    end_lon[tid] = ds.end_lon.data[0]
    end_lat[tid] = ds.end_lat.data[0]
    drogue_lost_date[tid] = ds.drogue_lost_date.data[0]
    type_death[tid] = ds.typedeath.data[0]
    type_buoy[tid] = ds.typebuoy.data[0]
    
    # those values were store in the attributes
    float_diameter[tid] = str_to_float(ds.attrs['FloatDiameter'][:-3])
    drogue_length[tid] = str_to_float(ds.attrs['DrogueLength'][:-2])
    drogue_ballast[tid] = str_to_float(ds.attrs['DrogueBallast'][:-3])
    drag_area_above_drogue[tid] = str_to_float(ds.attrs['DragAreaAboveDrogue'][:-4])
    drag_area_drogue[tid] = str_to_float(ds.attrs['DragAreaOfDrogue'][:-4])
    drag_area_ratio[tid] = str_to_float(ds.attrs['DragAreaRatio'])
    drag_center_depth[tid] = str_to_float(ds.attrs['DrogueCenterDepth'][:-2])
    
    # vectors
    longitude[oid:oid+size] = ds.longitude.data[0]
    latitude[oid:oid+size] = ds.latitude.data[0]
    time[oid:oid+size] = ds.time.data[0]
    ve[oid:oid+size] = ds.ve.data[0]
    vn[oid:oid+size] = ds.vn.data[0]
    err_lat[oid:oid+size] = ds.err_lat.data[0]
    err_lon[oid:oid+size] = ds.err_lon.data[0]
    err_ve[oid:oid+size] = ds.err_ve.data[0]
    err_vn[oid:oid+size] = ds.err_vn.data[0]
    gap[oid:oid+size] = ds.gap.data[0]

In [7]:
# create the index for each trajectory in the continuous ragged array reprensentation
index_traj = np.cumsum(count)
index_traj = np.insert(index_traj, 0, 0)

# create a repeated ids[obs] maybe this is remove later
ids = np.zeros(nb_obs, dtype='uint32')
for i in range(0, nb_traj):
    r = range(index_traj[i], index_traj[i + 1])
    ids[r] = i

In [8]:
%%time
for i, filename in enumerate(files):
    if i not in [12503, 12504, 12743]:
        fill_ragged_array(filename, i, index_traj[i])

CPU times: user 3min 23s, sys: 11.9 s, total: 3min 35s
Wall time: 4min 2s


In [11]:
# create the xarray Dataset
ds = xr.Dataset(
    data_vars=dict(
        id=(['traj'], id, {"long_name": "Global Drifter Program Buoy ID", "units":""}), 
        count=(['traj'], count, {"long_name": "Number of observations for each trajectory", "units":""}),
        wmo=(['traj'], wmo, {"long_name": "    World Meteorological Organization buoy identification number", "units":""}),
        expno=(['traj'], expno, {"long_name": "Experiment number", "units":""}),
        deploy_date=(['traj'], deploy_date, {"long_name": "Deployment date and time"}),#, "units":"seconds since 1970-01-01 00:00:00 UTC"}),
        deploy_lon=(['traj'], deploy_lon, {"long_name": "Deployment longitude", "units":"degrees_east"}),
        deploy_lat=(['traj'], deploy_lat, {"long_name": "Deployment latitude", "units":"degrees_north"}),
        end_date=(['traj'], end_date, {"long_name": "End date and time"}),#, "units":"seconds since 1970-01-01 00:00:00 UTC"}),
        end_lat=(['traj'], end_lat, {"long_name": "End latitude", "units":"degrees_north"}),
        end_lon=(['traj'], end_lon, {"long_name": "End longitude", "units":"degrees_east"}),
        drogue_lost_date=(['traj'], drogue_lost_date, {"long_name": "Date and time of drogue loss"}),#, "units":"seconds since 1970-01-01 00:00:00 UTC"}),
        type_death=(['traj'], type_death, {"long_name": "Type of death (0=buoy still alive, 1=buoy ran aground, 2=picked up by vessel, 3=stop transmitting, 4=sporadic transmissions, 5=bad batteries, 6=inactive status)", "units":""}),
        type_buoy=(['traj'], type_buoy, {"long_name": "Buoy type (see https://www.aoml.noaa.gov/phod/dac/dirall.html)", "units":""}),
        float_diameter=(['traj'], float_diameter, {"long_name": "Diameter of surface floater.", "units":"cm"}),
        drogue_length=(['traj'], drogue_length, {"long_name": "Length of drogue.", "units":"m"}),
        drogue_ballast=(['traj'], drogue_ballast, {"long_name": "Weight of the drogue's ballast.", "units":"kg"}),
        drag_area_above_drogue=(['traj'], drag_area_above_drogue, {"long_name": "Drag area above drogue.", "units":"m^2"}),
        drag_area_drogue=(['traj'], drag_area_drogue, {"long_name": "Drag area drogue.", "units":"m^2"}),
        drag_area_ratio=(['traj'], drag_area_ratio, {"long_name": "Drag area ratio", "units":"m"}),
        drag_center_depth=(['traj'], drag_center_depth, {"long_name": "Average depth of the drogue.", "units":"m"}),
        
        # if it's efficient to binned cut the Dataset maybe this is not needed
        ve=(['obs'], ve, {"long_name": "Eastward velocity", "units":"m/s"}),
        vn=(['obs'], vn, {"long_name": "Northward velocity", "units":"m/s"}),
        err_lat=(['obs'], err_lat, {"long_name": "Standard error in latitude", "units":"degrees_north"}),
        err_lon=(['obs'], err_lon, {"long_name": "Standard error in longitude", "units":"degrees_east"}),
        err_ve=(['obs'], err_ve, {"long_name": "Standard error in eastward velocity", "units":"m/s"}),
        err_vn=(['obs'], err_vn, {"long_name": "Standard error in northward velocity", "units":"m/s"}),
        
        # convert gap to nanoseconds from nanoseconds from 1970-01-01
        gap=(['obs'], (gap-np.datetime64('1970-01-01')), {"long_name": "time interval between previous and next location"}),#, "units":"nanoseconds"}),
    ),

    coords=dict(
        longitude=(['obs'], longitude, {"long_name": "Longitude", "units":"degrees_east"}),
        latitude=(['obs'], latitude, {"long_name": "Latitude", "units":"degrees_north"}),
        time=(['obs'], time, {"long_name": "Time"}),#, "units":"seconds since 1970-01-01 00:00:00 UTC"}),
        ids=(['obs'], ids, {"long_name": "Trajectory index of vars['traj'] for all observations", "units":""}),
    ),

    attrs={
        'title': 'Global Drifter Program hourly drifting buoy collection',
        'id': 'Global Drifter Program ID 13555',
        'history': 'Version 1.04.  Metadata from dirall.dat and deplog.dat',
        'Conventions': 'CF-1.6',
        'date_created': datetime.now().isoformat(),
        'publisher_name': 'GDP Drifter DAC',
        'publisher_email': 'aoml.dftr@noaa.gov',
        'publisher_url': 'https://www.aoml.noaa.gov/phod/gdp',
        'licence': 'MIT License',
        'processing_level': 'Level 2 QC by GDP drifter DAC',
        'metadata_link': 'https://www.aoml.noaa.gov/phod/dac/dirall.html',
        'contributor_name': 'NOAA Global Drifter Program',
        'contributor_role': 'Data Acquisition Center',
        'institution': 'NOAA Atlantic Oceanographic and Meteorological Laboratory',
        'acknowledgement': 'Elipot et al. (2016). Global Drifter Program quality-controlled hourly interpolated data from ocean surface drifting buoys, version 1.04. NOAA National Centers for Environmental Information. https://agupubs.onlinelibrary.wiley.com/doi/full/10.1002/2016JC011716TBA. Accessed [date].',
        'doi': '10.1002/2016JC011716TBA',
        'summary': 'Global Drifter Program hourly data'
    }
)

ds.deploy_date.encoding['units'] = "seconds since 1970-01-01 00:00:00"
ds.end_date.encoding['units'] = "seconds since 1970-01-01 00:00:00"
ds.drogue_lost_date.encoding['units'] = "seconds since 1970-01-01 00:00:00"
ds.time.encoding['units'] = "seconds since 1970-01-01 00:00:00"
ds.gap.encoding['units'] = "nanoseconds"

In [10]:
# save to one giant netcdf
ds.to_netcdf()

# and to zarr
ds.to_zarr('../data/process/gdp_1.04c', consolidated=True)

<xarray.backends.zarr.ZarrStore at 0x112e22880>