# PoC for fetching data from Zenodo based on a DOI

## An example catalog

We want to have a catalog pointing to data on disk and having a Zenodo DOI as metadata.
Then, we want to be able to download all files from Zenodo that match files really needed in the catalog.

In [1]:
%%file fesom2_catalog.yaml

## TODO: We'd like to only specify the zenodo_doi

metadata:
  version: 1

plugins:
  source:
      - module: intake_xarray

sources:

  FESOM2_sample:
    driver: netcdf
    description: 'FESOM2 Sample dataset'
    metadata:
      zenodo_doi: "10.5281/zenodo.3819896"
    args:
      urlpath: "{{env('ESM_VFC_DATA_DIR')}}/FESOM2_PI_MESH/*.fesom.1948.nc"
      xarray_kwargs:
        decode_cf: False
        combine: 'by_coords'

  # CAUTION: The following is broken with current intake caching!
            
  MESH_NOD2D:
    driver: csv
    description: 'Node locations of sample FESOM pi mesh'
    metadata:
      zenodo_doi: "10.5281/zenodo.3819896"
    args:
      urlpath: "{{env('ESM_VFC_DATA_DIR')}}/FESOM2_PI_MESH/pi.tar.gz"
      csv_kwargs:
        delim_whitespace: True
        skiprows: 1
        names:
          - "node_number"
          - "x"
          - "y"
          - "flag"
    cache:
      - type: compressed
        decomp: tgz
        argkey: urlpath
        regex_filter: 'nod2d.out'

  MESH_ELEM2D:
    driver: csv
    description: 'Element locations of sample FESOM pi mesh'
    metadata:
      zenodo_doi: "10.5281/zenodo.3819896"
    args:
      urlpath: "{{env('ESM_VFC_DATA_DIR')}}/FESOM2_PI_MESH/pi.tar.gz"
      csv_kwargs:
        delim_whitespace: True
        skiprows: 1
        names:
          - "first_elem"
          - "second_elem"
          - "third_elem"
    cache:
      - type: compressed
        decomp: tgz
        argkey: urlpath
        regex_filter: 'elem2d.out'
            
  MESH_AUX3D:
    driver: csv
    description: 'Topography of sample FESMOM pi mesh'
    metadata:
      zenodo_doi: "10.5281/zenodo.3819896"
    args:
      urlpath: "{{env('ESM_VFC_DATA_DIR')}}/FESOM2_PI_MESH/pi.tar.gz"
      csv_kwargs:
        delim_whitespace: True
        skiprows: 49
        names:
          - "topo"
    cache:
      - type: compressed
        decomp: tgz
        argkey: urlpath
        regex_filter: 'aux3d.out'

Overwriting fesom2_catalog.yaml


In [2]:
!rm -rf ~/.intake

## Imports and paths

In [3]:
import intake
import requests
import pycurl
from urllib.parse import urlparse
import os
from pathlib import Path
import logging

import fnmatch
import hashlib

In [4]:
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

In [5]:
# parameters
data_path = Path("../esm_vfc_data/").resolve()

In [6]:
os.environ["ESM_VFC_DATA_DIR"] = str(data_path)

## open catalog

In [7]:
cat = intake.open_catalog("fesom2_catalog.yaml")

In [8]:
list(cat)

['FESOM2_sample', 'MESH_NOD2D', 'MESH_ELEM2D', 'MESH_AUX3D']

## How to pre-fetch the data?

We need something to check hashes.  And we want to download just based on Zenodo DOI and filename pattern as used in the catalog etrie's `urlpath`.

In [9]:
def check_file(file_name, checksum, blocksize=65536):
    """Check if file satisfies checksum.
    
    Parameters
    ----------
    file_name : str | Path
        File name to check.
    checksum : str
        Format f"{algorithm}:{checksum}"
    blocksize : int
        Defaults to 65536 (Bytes).
        
    Returns
    -------
    bool : True if checksum matches.

    """  
    algorithm, target_hash = tuple(checksum.split(":"))
    file_hash = hashlib.new(algorithm)
    with open(file_name, "rb") as f:
        while True:
            chunk = f.read(blocksize)
            if not chunk:
                break
            file_hash.update(chunk)
    file_hash = file_hash.hexdigest()
    
    return file_hash == target_hash

In [10]:
def download_zenodo_files(
    zenodo_doi,
    target_directory=None,
    force_download=False,
    filter_pattern=None
):
    """Download zenodo files for a given DOI.
    
    Parameters
    ----------
    zenodo_doi : str
        Zenodo DOI.  Example: "10.5281/zenodo.3819896"
    target_directory : path or str
        Target directory where all files will end up.
    force_download : bool
        Re-download and overwrite files even if they already exist?
    filter_pattern : str
        Pattern used to filter files.  Note that we use fnmatch and not regex.
        
    Returns
    -------
    list of paths : all target files.
    
    """    
    #     # check if we filter files
    #     if filter_files is not None:
    #         raise NotImplementedError("Filtering is not implemented yet")

    # get zenodo record ID from doi
    zenodo_record = zenodo_doi.split('.')[-1]
    logging.debug(f"will download record {zenodo_record}")
    
    # get full record from zenodo
    # see https://developers.zenodo.org/#quickstart-upload for pointers
    r = requests.get(f"https://zenodo.org/api/records/{zenodo_record}")
    logging.debug(f"got status code {r.status_code}")
    # should we debug-log the full json dump?

    # TODO: Check that we got the correct DOI
    
    # get list of source urls
    filtered_files = list(filter(
        lambda fn: fnmatch.fnmatch(fn["key"], filter_pattern),
        r.json()["files"]
    ))
    all_urls = [file["links"]["self"] for file in filtered_files]
    all_target_files = [
        Path(target_directory) / Path(parsed_url.path).name
        for parsed_url in map(urlparse, all_urls)
    ]
    all_checksums = [file["checksum"] for file in filtered_files]
    
    # ensure target dir exists
    Path(target_directory).mkdir(exist_ok=True, parents=True)
    
    # download all wanted files with curl
    for url, file, checksum in zip(all_urls, all_target_files, all_checksums):
        if not file.exists() or force_download:
            with open(file, "wb") as f:
                logging.debug(f"will download {url} to {file}")
                c = pycurl.Curl()
                c.setopt(c.URL, url)
                c.setopt(c.WRITEDATA, f)
                c.perform()
                c.close()
                logging.debug(f"download of {url} to {file} done")
        # This checks all files even if they were not downloaded:
        if not check_file(file_name=file, checksum=checksum):
            raise ValueError(f"Checksum for {file} does not match {checksum}")
    
    return all_target_files

## Download data

In [11]:
%%time

download_zenodo_files(
    zenodo_doi=cat["MESH_AUX3D"].metadata["zenodo_doi"],
    target_directory=Path(cat["MESH_AUX3D"].urlpath).parent,
    force_download=False, 
    filter_pattern="*"
)

DEBUG:will download record 3819896
DEBUG:Starting new HTTPS connection (1): zenodo.org:443
DEBUG:https://zenodo.org:443 "GET /api/records/3819896 HTTP/1.1" 200 None
DEBUG:got status code 200


CPU times: user 113 ms, sys: 48.1 ms, total: 161 ms
Wall time: 964 ms


[PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/temp.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/salt.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/u.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/v.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/w.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/a_ice.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/m_ice.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/vice.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/uice.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/sst.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/ssh.fesom.1948.nc'),
 PosixPath('/work/esm-vfc-catalogs/esm_vfc_data/FESOM2_PI_MESH/MLD1.fesom.

In [12]:
cat["MESH_AUX3D"].cache[0].clear_all()
cat["MESH_AUX3D"].read()

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='pi.tar.gz', max=1.0, style=ProgressStyl…

Unnamed: 0,topo
0,-672
1,-534
2,-621
3,-744
4,-594
...,...
3135,-1200
3136,-850
3137,-560
3138,-121


In [13]:
cat["MESH_NOD2D"].cache[0].clear_all()
cat["MESH_NOD2D"].read()

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='pi.tar.gz', max=1.0, style=ProgressStyl…

Unnamed: 0,node_number,x,y,flag
0,1,267.4665,84.5252,0
1,2,270.9702,84.3700,0
2,3,268.3207,84.0503,0
3,4,263.8669,84.5900,0
4,5,264.3323,84.2140,0
...,...,...,...,...
3135,3136,139.7089,-79.0173,0
3136,3137,144.1223,-79.2533,0
3137,3138,148.0000,-80.3240,1
3138,3139,135.7037,-79.9329,1


In [14]:
cat["MESH_ELEM2D"].cache[0].clear_all()
cat["MESH_ELEM2D"].read()

HBox(children=(FloatProgress(value=1.0, bar_style='info', description='pi.tar.gz', max=1.0, style=ProgressStyl…

Unnamed: 0,first_elem,second_elem,third_elem
0,1,12,2
1,2,12,10
2,2,10,9
3,3,1,2
4,3,5,1
...,...,...,...
5834,3138,3137,3132
5835,3138,3131,3133
5836,3139,3136,3140
5837,3139,3135,3134


In [15]:
print(cat["FESOM2_sample"].read())

<xarray.Dataset>
Dimensions:  (elem: 5839, nod2: 3140, nz: 48, nz1: 47, time: 12)
Coordinates:
  * time     (time) float64 2.678e+06 5.097e+06 ... 2.886e+07 3.154e+07
Dimensions without coordinates: elem, nod2, nz, nz1
Data variables:
    Av       (time, elem, nz) float32 0.005085066 0.099402376 ... 0.0 0.0
    Kv       (time, nod2, nz) float32 nan nan nan nan nan ... 0.0 0.0 0.0 0.0
    MLD1     (time, nod2) float32 -75.19909 -77.151375 ... -33.665993 -32.839382
    a_ice    (time, nod2) float32 0.98289865 0.9805132 0.9803011 ... 0.0 0.0 0.0
    m_ice    (time, nod2) float32 1.1300349 1.1106514 1.1073328 ... 0.0 0.0 0.0
    salt     (time, nod2, nz1) float32 nan nan nan nan nan ... 0.0 0.0 0.0 0.0
    ssh      (time, nod2) float32 -0.5813815 -0.5792128 ... -1.5763197 -1.586672
    sst      (time, nod2) float32 nan nan nan ... -1.517087 -1.5979521 -1.535592
    temp     (time, nod2, nz1) float32 nan nan nan nan nan ... 0.0 0.0 0.0 0.0
    u        (time, elem, nz1) float32 nan nan nan 