# Pulls data from the Copernicus Data Provider (European Commision) available at cds.climate.copernicus.eu
The output is a csv file containing the requested time series.

As currently downloading the complete (daily) data set is not supported due to size limitations - only sampled data for every 10th day is available

To use the data set legally it is required to create an account here: https://cds.climate.copernicus.eu/
Please then obtain your API key from your user provide and provide it as a parameter to this component

WARNING: This component currently only supports local execution (not Kubeflow/Airflow)

Future work  
[ ] Download complete data set by creating multile requests and then merge the results

In [None]:
!pip3 install xarray==0.17.0 netcdf4==1.5.6 cdsapi==0.5.1 wget==3.2 ray==1.2.0 modin==0.9.1

In [None]:
# @param api key in form UID:APIKey obtained from
# https://cds.climate.copernicus.eu/
# @param data_dir temporal data storage for local execution
# @param file_name csv file name
# @param master url of master (default: local mode)

In [None]:
import wget
wget.download(
    'https://raw.githubusercontent.com/elyra-ai/' +
    'component-library/master/claimed_utils.py'
)

In [1]:
!export MODIN_OUT_OF_CORE=true

In [2]:
import ray
ray.init()
import modin.pandas as pd
import cdsapi
import xarray as xr
from claimed_utils import unzip
import os
import glob
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os
import shutil
import glob
import numpy as np
import databricks.koalas as ks
from pyspark.sql import SparkSession
import time

2021-03-22 10:32:37,231	INFO services.py:1172 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [3]:
apikey = os.environ.get('api key')
file_name = os.environ.get('file_name', 'data.csv')
data_dir = os.environ.get('data_dir', '../../data/')
master = os.environ.get('master', "local[*]")

In [None]:
skip = False

if os.path.exists(data_dir+file_name):
    skip = True

In [None]:
if not skip:
    config = SparkConf().setMaster(master).setAll([
        ('spark.executor.memory', '8g'), 
        ('spark.driver.memory','8g'),
        ('spark.sql.execution.arrow.pyspark.enabled', 'true')
    ])
    
    sc = SparkContext.getOrCreate(config)   
    spark = SparkSession.builder.getOrCreate()

In [None]:
if not skip:
    with open(os.path.expanduser('~/.cdsapirc'), "w") as myfile:
        myfile.write("url: https://cds.climate.copernicus.eu/api/v2\n")
        myfile.write("key: "+apikey+"\n")
        myfile.write("verify: 0")

In [None]:
if not skip:
    
    c = cdsapi.Client()

    c.retrieve(
        'satellite-soil-moisture',
        {
            'variable': 'volumetric_surface_soil_moisture',
            'type_of_sensor': 'passive',
            'time_aggregation': 'month_average',
            'year': [
                '1978', '1979', '1980',
                '1981', '1982', '1983',
                '1984', '1985', '1986',
                '1987', '1988', '1989',
                '1990', '1991', '1992',
                '1993', '1994', '1995',
                '1996', '1997', '1998',
                '1999', '2000', '2001',
                '2002', '2003', '2004',
                '2005', '2006', '2007',
                '2008', '2009', '2010',
                '2011', '2012', '2013',
                '2014', '2015', '2016',
                '2017', '2018', '2019',
            ],
            'month': [
                '01', '02', '03',
                '04', '05', '06',
                '07', '08', '09',
                '10', '11', '12',
            ],
            'day': '01',
            'type_of_record': 'cdr',
            'version': 'v201912.0.0',
            'format': 'zip',
        },
        data_dir+'download.zip')

In [None]:
if not skip:
    for f in glob.glob(data_dir+'*.nc'):
        os.remove(f)

In [None]:
if not skip:
    unzip(data_dir, data_dir+'download.zip')

In [4]:
skip = False

if not skip:
    df = None

    for filename in os.listdir(data_dir):
        if filename.endswith(".nc") :
            dset = xr.open_dataset(os.path.join(data_dir, filename))
            df1 = pd.DataFrame(dset['sm'].to_series())
            df1.reset_index(inplace=True)
            print('Adding data from {} to data frame'.format(filename))
            if df is None:
                df = df1
            else:
                start = time.perf_counter()
                df = df.append(df1, ignore_index=True)
                print('Appending data from {} to data frame took {}'.format(filename, time.perf_counter()-start))
        else:
            continue


    df.to_csv(data_dir+file_name, index=False)



Adding data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-19790201000000-TCDR-v201912.0.0.nc to data frame
Adding data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-20041201000000-TCDR-v201912.0.0.nc to data frame
Appending data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-20041201000000-TCDR-v201912.0.0.nc to data frame took 0.013064845988992602
Adding data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-19960501000000-TCDR-v201912.0.0.nc to data frame




Appending data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-19960501000000-TCDR-v201912.0.0.nc to data frame took 0.04189064400270581
Adding data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-20170401000000-TCDR-v201912.0.0.nc to data frame
Appending data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-20170401000000-TCDR-v201912.0.0.nc to data frame took 0.03237331204582006
Adding data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-19881001000000-TCDR-v201912.0.0.nc to data frame
Appending data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-19881001000000-TCDR-v201912.0.0.nc to data frame took 0.08100520801963285
Adding data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-20060301000000-TCDR-v201912.0.0.nc to data frame
Appending data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-20060301000000-TCDR-v201912.0.0.nc to data frame took 0.09102080302545801
Adding data from C3S-SOILMOISTURE-L3S-SSMV-PASSIVE-MONTHLY-19850701000000-TCDR-v201912.0.0.nc to data frame
Appending data from C3S-

2021-03-22 10:33:35,185	INFO worker.py:1491 -- Put failed since the value was either too large or the store was full of pinned objects.


ObjectStoreFullError: Failed to put object ffffffffffffffffffffffffffffffffffffffff0100000074000000 in object store because it is full. Object size is 4684559 bytes.
The local object store is full of objects that are still in scope and cannot be evicted. Tip: Use the `ray memory` command to list active objects in the cluster.

In [None]:
skip = False
if not skip:
    df_list = []

    for filename in os.listdir(data_dir):
        if filename.endswith(".nc") :
            dset = xr.open_dataset(os.path.join(data_dir, filename))
            df = pd.DataFrame(dset['sm'].to_series())
            df.reset_index(inplace=True)
            df_list.append(df)
            print('Adding data from {} to data frame'.format(filename))
        else:
            continue

    df = pd.concat(df_list)
    df.to_csv(data_dir+file_name, index=False) 

In [None]:
if not skip:
    df_list = []

    for filename in os.listdir(data_dir):
        if filename.endswith(".nc") :
            dset = xr.open_dataset(os.path.join(data_dir, filename))
            df = pd.DataFrame(dset['sm'].to_series())
            df.reset_index(inplace=True)
            df_list.append(df)
            print('Adding data from {} to data frame'.format(filename))
        else:
            continue

    df = pd.concat(df_list)
    df.to_csv(data_dir+file_name, index=False) 

In [None]:
if not skip:
    df = None

    
    for filename in os.listdir(data_dir):
        if filename.endswith(".nc") :
            dset = xr.open_dataset(os.path.join(data_dir, filename))
            df1 = pd.DataFrame(dset['sm'].to_series())
            df1.reset_index(inplace=True)
            df1 = spark.createDataFrame(df1)
            df1 = df1.to_koalas()

            print('Adding data from {} to data frame'.format(filename))
            if df is None:
                df = df1
            else:
                start = time.perf_counter()
                df = df.append(df1, ignore_index=True)
                print('Appending data from {} to data frame took {}'.format(filename, time.perf_counter()-start))
        else:
            continue


    df.to_csv(data_dir+file_name, index=False)