Skip to content

Commit

Permalink
Checkout changes in fetchers.py design
Browse files Browse the repository at this point in the history
- Data fetchers are responsible for:
  - loading all available data from a given source
  - making data compliant to Argo standards (data type, variable name, attributes, etc ...)
  - providing the end-user API (argopy.fetchers.ArgoDataFetcher) methods to filter data
    according to user level or requests. These must includes:
      1. filter_data_mode
      1. filter_qc
      1. filter_variables
  • Loading branch information
gmaze committed Mar 18, 2020
1 parent 2563c9f commit b332495
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 77 deletions.
12 changes: 12 additions & 0 deletions argopy/data_fetchers/CONTRIBUTION_GUIDELINE
@@ -0,0 +1,12 @@
# Contribution guideline for data fetchers

- Data fetchers are responsible for:
- loading all available data from a given source
- making data compliant to Argo standards (data type, variable name, attributes, etc ...)
- providing the end-user API (argopy.fetchers.ArgoDataFetcher) methods to filter data
according to user level or requests. These must includes:
1. filter_data_mode
1. filter_qc
1. filter_variables

It is the responsability of the higher level end-user API (argopy.fetchers.ArgoDataFetcher) to run filters according to user level or requests
14 changes: 11 additions & 3 deletions argopy/data_fetchers/erddap.py
Expand Up @@ -21,7 +21,6 @@
from abc import ABC, abstractmethod
from pathlib import Path
import getpass
import warnings

from argopy.utilities import urlopen

Expand All @@ -34,7 +33,6 @@ class ErddapArgoDataFetcher(ABC):
ERDDAP transaction are managed with the erddapy library
__author__: gmaze@ifremer.fr
"""

###
Expand Down Expand Up @@ -556,13 +554,23 @@ def filter_qc(self, this, QC_list=[1, 2], drop=True, mode='all', mask=False):
else:
return this_mask

def filter_variables(self, this, mode):
""" Removing variables according to a user mode """
drop_list = ['DATA_MODE', 'DIRECTION'] # This depends on self._minimal_vlist()
this = this.drop_vars(drop_list)
# Also drop all QC variables:
for v in this.data_vars:
if "QC" in v:
this = this.drop_vars(v)
return this

class ArgoDataFetcher_wmo(ErddapArgoDataFetcher):
""" Manage access to Argo data through Ifremer ERDDAP for: a list of WMOs
__author__: gmaze@ifremer.fr
"""

def init(self, WMO=[6902746, 6902757, 6902766], CYC=None):
def init(self, WMO=[], CYC=None):
""" Create Argo data loader for WMOs
Parameters
Expand Down
121 changes: 73 additions & 48 deletions argopy/data_fetchers/localftp.py
Expand Up @@ -5,7 +5,7 @@
#
# This is not intended to be used directly, only by the facade at fetchers.py
#
# Since the GDAC ftp ir organised by DAC/WMO folders, we start by implementing the 'float' and 'profile' entry points.
# Since the GDAC ftp is organised by DAC/WMO folders, we start by implementing the 'float' and 'profile' entry points.
#
# Created by gmaze on 18/03/2020
# Building on earlier work from S. Tokunaga (as part of the MOCCA and EARISE H2020 projects)
Expand All @@ -16,57 +16,38 @@

import os
import sys
import glob
from glob import glob
import numpy as np
import pandas as pd
import xarray as xr
from abc import ABC, abstractmethod
import warnings

from argopy.xarray import ArgoMultiProfLocalLoader

class LocalFTPArgoDataFetcher(ABC):
""" Manage access to Argo data from a local copy of GDAC ftp
"""
def __init__(self, argo_root, sdl=None):
self.argo_root_path = argo_root
self.sdl_axis = sdl

# List netcdf files available for processing:
self.argo_files = sorted(glob(self.argo_root_path + "/*/*/*_prof.nc"))
def __init__(self, ftp_root):
""" Init fetcher
Parameters
----------
ftp_root : str
Path to the directory with the 'dac' folder and index file
"""
self.local_ftp_path = ftp_root
self.definition = 'Local ftp Argo data fetcher'

# List available netcdf files to process:
self.argo_files = sorted(glob(self.local_ftp_path + "/*/*/*_prof.nc"))
if self.argo_files is None:
raise ValueError("Argo root path doesn't contain any netcdf profiles (under */*/*_prof.nc)")
raise ValueError("Argo root path doesn't contain any netcdf profile files (under */*/*_prof.nc)")
self.argo_wmos = [int(os.path.basename(x).split("_")[0]) for x in self.argo_files]
self.argo_dacs = [x.split("/")[-3] for x in self.argo_files]

def _to_sdl(self, sdl, var, var_prs, mask, lat=45., name='var'):
""" Interpolate a 2D variable onto standard depth levels, from pressure levels """

def VI(zi, z, c):
zi, z = np.abs(zi), np.abs(z) # abs ensure depths are sorted for the interpolation to work
if c.shape[0] > 0:
ci = np.interp(zi, z, c, left=c[0], right=9999.)
if np.any(ci >= 9999.):
return np.array(())
else:
return ci
else:
return np.array(())

var_sdl = []
ip = []
for i in range(0, var.shape[0]):
c = var[i, mask[i, :] == True]
p = var_prs[i, mask[i, :] == True]
z = gsw.z_from_p(p, lat[i])
ci = VI(sdl, z, c)
if ci.shape[0] > 0:
var_sdl.append(ci)
ip.append(i)
if len(var_sdl) > 0:
return xr.DataArray(var_sdl, dims=['samples', 'depth'], coords={'depth': sdl, 'samples': ip}, name=name)
else:
return None

def _add_dsattributes(self, ds, argo_xarr, title='Argo float data'):
def _add_attributes(self, ds, argo_xarr):
ds.attrs['title'] = title
ds.attrs['Conventions'] = 'CF-1.6'
ds.attrs['CreationDate'] = pd.to_datetime('now').strftime("%Y/%m/%d")
Expand Down Expand Up @@ -152,13 +133,13 @@ def _add_dsattributes(self, ds, argo_xarr, title='Argo float data'):

return ds

def _xload_multiprof(self, dac_wmo):
def _xload_multiprof_legacy(self, dac_wmo):
"""Load an Argo multi-profile file as a collection of points or sdl profiles"""
dac_name, wmo_id = dac_wmo
wmo_id = int(wmo_id)

# instantiate the data loader:
argo_loader = ArgoMultiProfLocalLoader(argo_root_path=self.argo_root_path)
argo_loader = ArgoMultiProfLocalLoader(local_ftp_path=self.local_ftp_path)

with argo_loader.load_from_inst(dac_name, wmo_id) as argo_xarr:
try:
Expand Down Expand Up @@ -251,7 +232,7 @@ def _xload_multiprof(self, dac_wmo):
return None

# Preserve Argo attributes:
ds = self._add_dsattributes(ds, argo_xarr,
ds = self._add_attributes(ds, argo_xarr,
title='Argo float profiles interpolated onto Standard Depth Levels')
ds.attrs['DAC'] = dac_name
ds.attrs['WMO'] = wmo_id
Expand Down Expand Up @@ -286,21 +267,65 @@ def _xload_multiprof(self, dac_wmo):
xr.DataArray(temp_pts, name='to', dims='samples'),
xr.DataArray(psal_pts, name='so', dims='samples')))
# Preserve Argo attributes:
ds = self._add_dsattributes(ds, argo_xarr, title='Argo float profiles ravelled data')
ds = self._add_attributes(ds, argo_xarr, title='Argo float profiles ravelled data')
ds.attrs['DAC'] = dac_name
ds.attrs['WMO'] = wmo_id
ds['samples'].attrs = {'long_name': "Measurement samples"}
return ds
else:
return None

def _ravel(self, this):
""" Ravel variables from a single multiprofile xarray.Dataset """
pres_pts = pres[good]
temp_pts = temps[good]
psal_pts = psals[good]

repeat_n = [np.sum(x) for x in good]
lons = np.concatenate([np.repeat(x, repeat_n[i]) for i, x in enumerate(argo.lon[good_profiles])])
lats = np.concatenate([np.repeat(x, repeat_n[i]) for i, x in enumerate(argo.lat[good_profiles])])
dts = np.concatenate(
[np.repeat(x, repeat_n[i]) for i, x in enumerate(argo.datetime[good_profiles])])
profile_id_pp = np.concatenate([np.repeat(x, repeat_n[i]) for i, x in enumerate(profile_id)])
profile_numid = np.concatenate([np.repeat(x, repeat_n[i]) for i, x in enumerate(
1000 * argo.wmo[good_profiles] + argo.cycle[good_profiles])])

ds = xr.merge((
xr.DataArray(lons, name='longitude', dims='samples'),
xr.DataArray(lats, name='latitude', dims='samples'),
xr.DataArray(pres_pts, name='pressure', dims='samples'),
xr.DataArray(gsw.z_from_p(pres_pts, lats, geo_strf_dyn_height=0), name='depth', dims='samples'),
xr.DataArray(dts, name='time', dims='samples'),
xr.DataArray(profile_numid, name='id', dims='samples'),
xr.DataArray(temp_pts, name='to', dims='samples'),
xr.DataArray(psal_pts, name='so', dims='samples')))
# Preserve Argo attributes:
ds = self._add_attributes(ds, argo_xarr, title='Argo float profiles ravelled data')
ds.attrs['DAC'] = dac_name
ds.attrs['WMO'] = wmo_id
ds['samples'].attrs = {'long_name': "Measurement samples"}
return ds

def _xload_multiprof(self, dac_wmo):
"""Load an Argo multi-profile file as a collection of points"""
dac_name, wmo_id = dac_wmo
wmo_id = int(wmo_id)

# instantiate the data loader:
argo_loader = ArgoMultiProfLocalLoader(self.local_ftp_path)

return argo_loader.load_from_inst(dac_name, wmo_id)

def to_xarray(self, client=None, n=None):
"""Fetch data using a dask distributed client"""
""" Load Argo data and return a xarray.DataSet
Possibly use a dask distributed client for performance
"""

dac_wmo_files = list(zip(*[self.argo_dacs, self.argo_wmos]))
if n is not None: # Sub-sample for test purposes
dac_wmo_files = list(np.array(dac_wmo_files)[np.random.choice(range(0, len(dac_wmo_files) - 1), n)])
print("NB OF FLOATS TO FETCH:", len(dac_wmo_files))
warnings.warn("NB OF FLOATS TO FETCH: %i" % len(dac_wmo_files))

if client is not None:
futures = client.map(self._xload_multiprof, dac_wmo_files)
Expand All @@ -312,12 +337,12 @@ def to_xarray(self, client=None, n=None):

results = [r for r in results if r is not None] # Only keep none empty results
if len(results) > 0:
ds = xr.concat(results, dim='samples', data_vars='all', compat='equals')
ds = xr.concat(results, dim='index', data_vars='all', compat='equals')
ds.attrs.pop('DAC')
ds.attrs.pop('WMO')
ds = ds.sortby('time')
ds['samples'].values = np.arange(0, len(ds['samples']))
ds['index'].values = np.arange(0, len(ds['index']))
return ds
else:
print("CAN'T FETCH ANY DATA !")
warnings.warn("CAN'T FETCH ANY DATA !")
return None
8 changes: 7 additions & 1 deletion argopy/errors.py
@@ -1,4 +1,4 @@
__author__ = 'sean.tokunaga@ifremer.fr'
# Legacy author (2019) sean.tokunaga@ifremer.fr

"""
A bunch of custom errors used in argopy.
Expand Down Expand Up @@ -33,3 +33,9 @@ class UnrecognisedProfileDirection(ValueError):
def __init__(self, institute=None, wmo=None):
self.institute = institute
self.wmo = wmo

class InvalidDatasetStructure(ValueError):
"""
This is to be used when the in-memory xarray dataset is not structured as expected
"""
pass
31 changes: 9 additions & 22 deletions argopy/fetchers.py
Expand Up @@ -106,21 +106,18 @@ def __init__(self, mode='standard', backend='erddap', ds='phy', **fetcher_kwargs
self.mode = mode # User mode determining the level of post-processing required
self.dataset_id = ds # Database to use
self.backend = backend # data_fetchers to use
if self.backend != 'erddap':
raise ValueError("Invalid backend, only 'erddap' available at this point")

# Load backend access points:
if backend not in available_backends:
raise ValueError("The %s data fetcher is not available" % backend)

if backend == 'erddap' and backend in available_backends:
self.Fetcher_wmo = Erddap_Fetcher.ArgoDataFetcher_wmo
self.Fetcher_box = Erddap_Fetcher.ArgoDataFetcher_box
else:
raise ValueError("The %s data fetcher is not available" % backend)

if backend == 'localftp' and backend in available_backends:
self.Fetcher_wmo = LocalFTP_Fetcher.ArgoDataFetcher_wmo
# self.Fetcher_box = Erddap_Fetcher.ArgoDataFetcher_box
else:
raise ValueError("The %s data fetcher is not available" % backend)

def __repr__(self):
if self.fetcher:
Expand All @@ -136,16 +133,6 @@ def __empty_processor(self, xds):
""" Do nothing to a dataset """
return xds

def __drop_vars(self, xds):
""" Drop Jargon variables for standard users """
drop_list = ['DATA_MODE', 'DIRECTION']
xds = xds.drop_vars(drop_list)
# Also drop all QC variables:
for v in xds.data_vars:
if "QC" in v:
xds = xds.drop_vars(v)
return xds

def float(self, wmo):
""" Load data from a float, given one or more WMOs """
self.fetcher = self.Fetcher_wmo(WMO=wmo, **self.fetcher_options)
Expand All @@ -154,34 +141,34 @@ def float(self, wmo):
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.__drop_vars(xds)
xds = self.fetcher.filter_variables(xds, self.mode)
return xds
self.postproccessor = postprocessing
return self

def profile(self, wmo, cyc):
""" Load data from a profile, given one ormore WMOs and CYCLE_NUMBER """
""" Load data from a profile, given one or more WMOs and CYCLE_NUMBER """
self.fetcher = self.Fetcher_wmo(WMO=wmo, CYC=cyc, **self.fetcher_options)

if self.mode == 'standard' and (self.dataset_id == 'phy' or self.dataset_id == 'bgc'):
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.__drop_vars(xds)
xds = self.fetcher.filter_variables(xds, self.mode)
return xds
self.postproccessor = postprocessing

return self

def region(self, box):
""" Load data for a rectangular region, given latitude, longitude, pressure and possibly time bounds """
""" Load data from a rectangular region, given latitude, longitude, pressure and possibly time bounds """
self.fetcher = self.Fetcher_box(box=box, **self.fetcher_options)

if self.mode == 'standard' and (self.dataset_id == 'phy' or self.dataset_id == 'bgc'):
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.__drop_vars(xds)
xds = self.fetcher.filter_variables(xds, self.mode)
return xds
self.postproccessor = postprocessing

Expand All @@ -195,7 +182,7 @@ def deployments(self, box):
def postprocessing(xds):
xds = self.fetcher.filter_data_mode(xds)
xds = self.fetcher.filter_qc(xds)
xds = self.__drop_vars(xds)
xds = self.fetcher.filter_variables(xds, self.mode)
return xds
self.postproccessor = postprocessing

Expand Down

0 comments on commit b332495

Please sign in to comment.