In [5]:
from os.path import expanduser, join
from pathlib import Path
import sys
import time

import boto3
import numpy as np
import requests
import s3fs
import xarray as xr
from dask.distributed import Client

user_home_dir = expanduser('~')
sys.path.insert(0,join(user_home_dir,'ECCOv4-py'))
import ecco_v4_py as ecco

client = Client("tcp://127.0.0.1:43445")

In [10]:
# Use this for the netcdf files stored on an s3 bucket
def get_credentials(use_earthdata: bool = False):
    """
    This routine automatically pulls your EDL crediential from .netrc file and use it to obtain an AWS S3 credential 
    through a PO.DAAC service accessible at https://archive.podaac.earthdata.nasa.gov/s3credentials.
    From the PO.DAAC Github (https://podaac.github.io/tutorials/external/July_2022_Earthdata_Webinar.html).
    
    Returns:
    =======
    
    credentials: a dictionary with AWS secret_key, access_key, and token
    """
    # NASA EarthData hosts ECCO V4r4 fileds
    if not use_earthdata:
        session = boto3.Session()
        credentials_b3 = session.get_credentials()        
        credentials = dict()
        credentials['secretAccessKey'] = credentials_b3.secret_key
        credentials['accessKeyId'] = credentials_b3.access_key
        credentials['sessionToken'] = credentials_b3.token

    # A 'public' AWS s3 bucket hosts V4r5 fields (they will eventually move to PO.DAAC)
    else:
        credentials = requests.get('https://archive.podaac.earthdata.nasa.gov/s3credentials').json()
    
    return credentials

def init_S3FileSystem(use_earthdata: bool = False, requester_pays: bool = True):
    """
    This routine automatically creates an 's3 file system' object and credentials dictionary.
    The s3 file system needs to be initialized with the special aws credentials.
    
    Returns:
    =======
    
    s3: an AWS S3 filesystem, 
    credentials: a dictionary with AWS secret_key, access_key, and token

    """
    credentials = get_credentials(use_earthdata=use_earthdata)

    if use_earthdata:
        requester_pays = False
        
    s3 = s3fs.S3FileSystem(
        requester_pays=requester_pays,
        anon=False,
        key=credentials['accessKeyId'],
        secret=credentials['secretAccessKey'], 
        token=credentials['sessionToken']
    )
    
    return s3, credentials


def llc_to_latlon(
    xc,
    yc,
    field,
    min_lat: float = -90,
    max_lat: float = 90,
    dlat: float = 1,
    min_lon: float = -180,
    max_lon: float = 180,
    dlon: float = 1,
    method: str | None = None,
    radius: float | None = None,
):
    if method is None:
        method = "nearest_neighbor"
    if radius is None:
        radius = 120000
    lon_c, lat_c, _, _, field_latlon = ecco.resample_to_latlon(
        xc,
        yc, 
        field,
        min_lat,
        max_lat,
        dlat,
        min_lon,
        max_lon,
        dlon,
        fill_value = np.nan,
        mapping_method = method,
        radius_of_influence = radius,
    )
    if "k" in field.dims:
        dims = ["time", "lat", "lon", "depth"]
        coords = dict(
            time=field.time,
            lat=lat_c[:, 0],
            lon=lon_c[0],
            depth=field["Z"],
        )
    else:
        dims = ["time", "lat", "lon"]
        coords = dict(
            time=field.time,
            lat=lat_c[:, 0],
            lon=lon_c[0],
        )
    ds_latlon = xr.DataArray(
        data=field_latlon,
        dims=dims,
        coords=coords,
    )
    ds_latlon.attrs = field.attrs
    return ds_latlon


def list_s3_fsspec_ecco(mzz_dir: Path) -> None:
    print(np.sort(list(mzz_dir.glob("*.json"))))


def get_podaac(short_name: str, start_date: str, end_date: str) -> xr.Dataset:
    # search granules for a collection
    response = requests.get(url="https://cmr.earthdata.nasa.gov/search/granules.json", 
                                params={'ShortName':short_name})
    
    print(f'a code=200 means CMR is working: code {response.status_code}')
    
    response_json = response.json()['feed']['entry']
    # CMR can only return up to 2000 files
    # so be careful about what you search for
    input_search_params = {'ShortName' : short_name,
                           'temporal'  : f"{start_date} , {end_date}",  
                           'page_size' :2000}
    # search granules for a collection
    response = requests.get(url="https://cmr.earthdata.nasa.gov/search/granules.json", 
                                params=input_search_params)
    response_json = response.json()['feed']['entry']
    if len(response_json) > 2000:
        raise RuntimeError("More than 2000 entries returned. Refine selection and try again.")
    print(f"Found {len(response_json)} entries.")
    
    # Extract the filenames for these 60 garnules
    
    s3_podaac_paths=[]
    
    # loop through the json
    for rj in response_json:
        links = rj['links']
        # loop through all the links
        for link in links:
            # see if there is a 'title' key and a 'direct download access' entry
            if 'title' in link.keys():
                if 'direct download access via S3' in link['title']:
                    s3_podaac_paths.append(link['href'])
        
    s3, credentials = init_S3FileSystem(use_earthdata=True)

    files_to_load = s3_podaac_paths

    start_time = time.time()
    
    # pre-open the files
    file_objects = [s3.open(p, mode='rb') for p in files_to_load]
    
    # lazy-load 
    ds = xr.open_mfdataset(
        file_objects,
        parallel=True,
        data_vars='minimal',
        coords='minimal',
        compat='override',
        combine='nested',
        concat_dim='time', 
        chunks={'time':12, 'tile':13,' k':50,'j':90,'i':90}
    )
    
    total_time = time.time() - start_time

    print(f'\nlazy-loaded {len(ds.time)} granule(s)')
    print(f'total time {total_time/60:.2f} min for {len(ds.time.values)} granules')
    print(f'time per granule {total_time/len(files_to_load):.2f} sec/gran')

    return ds

s3, credentials = init_S3FileSystem(use_earthdata=False, requester_pays=True)
mzz_dir = Path("/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY")
list_s3_fsspec_ecco(mzz_dir)

[PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_ATM_STATE_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_BOLUS_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_DENS_STRAT_PRESS_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_FRESH_FLUX_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_HEAT_FLUX_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_MIXED_LAYER_DEPTH_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_OBP_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_OCEAN_3D_MOMENTUM_TEND_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/efs_ecco/mzz-jsons/MZZ_LLC0090GRID_MONTHLY/ECCO_L4_OCEAN_3D_SALINITY_FLUX_LLC0090GRID_MONTHLY_V4R4.json')
 PosixPath('/e

In [11]:
short_name = 'ECCO_L4_HEAT_FLUX_LLC0090GRID_MONTHLY_V4R4'
start_date = "1992-01-01"
end_date = "2018-01-01"
ds = get_podaac(short_name, start_date, end_date)
# ds_fw["oceFWflx"].to_netcdf(f"/efs_ecco/ascherer/datasets/v4r4/ocefwflx_monthly_{start_date}-{end_date}.nc")

a code=200 means CMR is working: code 200
Found 312 entries.

lazy-loaded 312 granule(s)
total time 0.69 min for 312 granules
time per granule 0.13 sec/gran


In [12]:
ds.load()

In [13]:
ds.to_netcdf(f"/efs_ecco/ascherer/datasets/v4r4/heat_flux_monthly_{start_date}-{end_date}.nc")

In [None]:
# # It helps if you know the "Short Name" of the dataset
# #https://search.earthdata.nasa.gov/search/granules?p=C1991543742-POCLOUD
# short_name = 'ECCO_L4_FRESH_FLUX_LLC0090GRID_DAILY_V4R4'
# start_years = ["1992", "1997", "2002", "2007", "2012", "2017"]
# end_years = ["1996", "2001", "2006", "2011", "2016", "2019"]
# for s, e in zip(start_years, end_years):
#     start_date = f"{s}-01-01"
#     end_date = f"{e}-12-31"
#     ds_fw = get_podaac(short_name, start_date, end_date)
#     ds_fw["oceFWflx"].to_netcdf(f"/efs_ecco/ascherer/datasets/v4r4/ocefwflx_{s}-{e}.nc")

In [10]:
short_name = 'ECCO_L4_TEMP_SALINITY_LLC0090GRID_MONTHLY_V4R4'
start_date = "1992-01-01"
end_date = "2018-01-01"
ds_ts = get_podaac(short_name, start_date, end_date).isel(k=0)
# ds_ts.to_netcdf(f"/efs_ecco/ascherer/datasets/v4r4/ocefwflx_monthly_{start_date}-{end_date}.nc")

a code=200 means CMR is working: code 200
Found 312 entries.

lazy-loaded 312 granule(s)
total time 0.82 min for 312 granules
time per granule 0.16 sec/gran


In [15]:
ds_ts.get(["THETA", "SALT"]).to_netcdf(f"/efs_ecco/ascherer/datasets/v4r4/ts_monthly_{start_date}-{end_date}.nc")

In [43]:
ds_fw = xr.open_dataset("/efs_ecco/ascherer/datasets/v4r4/ocefwflx_monthly_1992-01-01-2018-01-01.nc")
ds_ts = xr.open_dataset("/efs_ecco/ascherer/datasets/v4r4/ts_monthly_1992-01-01-2018-01-01.nc")

In [44]:
ds_ts

In [29]:
ds = xr.merge([ds_fw, ds_ts])
ds = ds.rename({
    "oceFWflx": "surface_freshwater_flux",
    "THETA": "sea_surface_temperature",
    "SALT": "sea_surface_salinity",
})
# ds.to_netcdf("/efs_ecco/ascherer/datasets/v4r4/surface_sal_temp_fw_monthly_latlon_0-20S_100-120E.nc .nc

In [40]:
da_fw = llc_to_latlon(
    ds["XC"],
    ds["YC"],
    ds["oceFWflx"],
    min_lat = -20,
    max_lat = 10,
    min_lon = 90,
    max_lon = 120,
)
da_fw.name = "surface_freshwater_flux"
da_t = llc_to_latlon(
    ds["XC"],
    ds["YC"],
    ds["THETA"],
    min_lat = -20,
    max_lat = 10,
    min_lon = 90,
    max_lon = 120,
)
da_t.name = "sea_surface_temperature"
da_s = llc_to_latlon(
    ds["XC"],
    ds["YC"],
    ds["SALT"],
    min_lat = -20,
    max_lat = 10,
    min_lon = 90,
    max_lon = 120,
)
da_s.name = "sea_surface_salinity"

In [45]:
ds = xr.merge([da_fw, da_t, da_s])
ds.attrs = ds_ts.attrs

In [47]:
ds.to_netcdf("/efs_ecco/ascherer/datasets/v4r4/surface_sal_temp_fw_monthly_latlon_10N-20S_90E-120E.nc")