# Automatically download data (ERA5 and NDVI) from GEE through the Python API

## 1. Authentications and imports

In [None]:
# import the gee API to the notebook session
import ee

# authenticate to the Earth Engine servers
ee.Authenticate()

# initialize the API
ee.Initialize()

In [None]:
# insert your desired path to work on
import os
from os.path import join
project_path = os.path.dirname(os.getcwd())
os.chdir(join('..','..','data'))
os.getcwd()

Import necessary python libraries.

In [None]:
!pip install unidecode --quiet
import unidecode
import numpy as np
import pandas as pd
from glob import glob

Set folder structure.

In [None]:
config = {
    'main_brazil': 'Brazil',
    'main_peru': 'Peru',
    'out_brazil': join('Brazil', 'collections_Brazil'),
    'out_peru': join('Peru', 'collections_Peru')
}

# List comprehension for the folder structure code
[os.makedirs(val, exist_ok=True) for key, val in config.items()]

## 2. Code to download data

### 2.1. Functions

In [None]:
def get_days(image):
    return image.set('day', ee.Date(ee.Image(image).get('system:time_start')).format('yyyy-MM-dd'))


def addHumidity(feature):
    # dew point (TD), temperature (T) 
    T = ee.Number(feature.get('temperature_2m')).subtract(273.15)
    TD = ee.Number(feature.get('dewpoint_temperature_2m')).subtract(273.15)
    c = ee.Number(243.04)
    b = ee.Number(17.625)
  
    # FORMULA: 100*Math.exp(c*b*(TD-T)/((c+T)*(c+TD)))
    return feature.set({'humidity': ee.Number(100).multiply((c.multiply(b).multiply(TD.subtract(T)).divide((c.add(T)).multiply(c.add(TD)))).exp())})    


def filter_join(feature):
    f = ee.Feature(feature.get('match'))
    flag = ee.Algorithms.IsEqual(f, None)
    ndvi = ee.Algorithms.If(flag, ee.Dictionary({'NDVI': -999}), f.toDictionary())
    return feature.set(ndvi)


def remove_match(feature):
    properties = feature.propertyNames()
    return feature.select(properties.filter(ee.Filter.neq('item', 'match')))

    
def removeKeys(pair):
    f1 = ee.Feature(pair.get('primary'))
    f2 = ee.Feature(pair.get('secondary'))
    return f1.set(f2.toDictionary())


def renameBand(image, band, new_band):
    return image.select([band]).rename([new_band])


def getMetric(m, collection, startDate, func, vars):
    start = ee.Date(startDate).advance(m, 'months')
    end   = start.advance(1, 'months')

    if func == 'mean':
        return collection.select(vars)\
                         .filter(ee.Filter.date(start, end))\
                         .mean()\
                         .set('Date', ee.Date(startDate).advance(m, 'months'))\
                         .set('system:time_start', ee.Date(startDate).advance(m, 'months'))
    elif func == 'min':
        return collection.select(vars).filter(ee.Filter.date(start, end)).min()\
                         .set('Date', ee.Date(startDate).advance(m, 'months'))\
                         .set('system:time_start', ee.Date(startDate).advance(m, 'months'))
    elif func == 'max':
        return collection.select(vars).filter(ee.Filter.date(start, end)).max()\
                         .set('Date', ee.Date(startDate).advance(m, 'months'))\
                         .set('system:time_start', ee.Date(startDate).advance(m, 'months'))
    else: #func == 'sum'
        return collection.select(vars).filter(ee.Filter.date(start, end)).sum()\
                         .set('Date', ee.Date(startDate).advance(m, 'months'))\
                         .set('system:time_start', ee.Date(startDate).advance(m, 'months'))


def concat_join(feature):
    return ee.Image.cat(feature.get('primary'), feature.get('secondary'))


def get_era5land(startDate, endDate):
    # Difference between start and end in months 
    numberOfInstances = endDate.difference(ee.Date(startDate), 'month')
    seqInstances = ee.List.sequence(0, numberOfInstances.subtract(1))

    collection = ee.ImageCollection('ECMWF/ERA5_LAND/HOURLY')
    vars = ee.List(['temperature_2m', 'dewpoint_temperature_2m', 'surface_pressure', 'u_component_of_wind_10m', 'v_component_of_wind_10m'])

    era5_mean = ee.ImageCollection.fromImages(seqInstances.map(lambda y: getMetric(y, collection, startDate, 'mean', vars)))
    
    era5_min = ee.ImageCollection.fromImages(seqInstances.map(lambda y: getMetric(y, collection, startDate, 'min', 'temperature_2m')))\
                                 .map(lambda image: renameBand(image, 'temperature_2m', 'min_temperature_2m'))

    era5_max = ee.ImageCollection.fromImages(seqInstances.map(lambda y: getMetric(y, collection, startDate, 'max', 'temperature_2m')))\
                                 .map(lambda image: renameBand(image, 'temperature_2m', 'max_temperature_2m'))

    era5_sum = ee.ImageCollection.fromImages(seqInstances.map(lambda y: getMetric(y, collection, startDate, 'sum', 'total_precipitation')))

    # Define inner join
    innerJoin = ee.Join.inner(primaryKey='primary', secondaryKey='secondary')
    filterTimeEq = ee.Filter.equals(leftField='Date', rightField='Date')

    join = innerJoin.apply(era5_mean, era5_min, filterTimeEq).map(concat_join)
    mid_join = innerJoin.apply(join, era5_max, filterTimeEq).map(concat_join)
    era5_land = innerJoin.apply(mid_join, era5_sum, filterTimeEq).map(concat_join)
    return era5_land


def get_ndvi(starDate, endDate):
    # Difference between start and end in months 
    numberOfInstances = endDate.difference(ee.Date(startDate), 'month')
    seqInstances = ee.List.sequence(0, numberOfInstances.subtract(1))

    collection = ee.ImageCollection('MODIS/MOD09GA_006_NDVI')
    return ee.ImageCollection.fromImages(seqInstances.map(lambda y: getMetric(y, collection, startDate, 'mean', 'NDVI')))


def extract_var(image, geometry):
    stats = ee.Image(image).reduceRegion(reducer=ee.Reducer.mean(), geometry=geometry, scale=9000)
    stats1 = stats.map(lambda key, val: ee.List([val, -999]).reduce(ee.Reducer.firstNonNull()))
    return ee.Feature(None, stats1.combine({'Date': ee.Date(ee.Image(image).get('system:time_start')).format('yyyy-MM-dd')}))

### 2.2. Main code

#### 2.2.1. Brazil

In [None]:
startDate = ee.Date('2000-01-01')
endDate = ee.Date('2020-01-01')

# retrieve table with shapefiles info
table = ee.FeatureCollection("users/raquelarscarmo/BR_Municipios_2020")

# extract list of regions by code
regions = table.aggregate_array("CD_MUN").distinct()

era5Land_monthly = get_era5land(startDate, endDate)
ndvi_monthly = get_ndvi(ee.Date('2000-02-01'), endDate)

# loop over the regions
for index_region in range(0, regions.size().getInfo()):
    print(index_region)

    # get region ID
    region_ID = ee.String(regions.get(index_region))
    region = ee.Feature(ee.List(table.toList(table.size())).get(index_region))
    geometry = region.geometry()

    # monthly stats from ERA5-Land
    era5LandStats = ee.FeatureCollection(era5Land_monthly\
                                            .map(lambda image: extract_var(image, geometry)))\
                                            .map(addHumidity)

    # monthly NDVI (only available from February 2000)
    firstNDVI = ee.FeatureCollection([ee.Feature(None, ee.Dictionary({'Date': ee.Date('2000-01-01').format('yyyy-MM-dd'), 'NDVI': -999}))])
    ndvi = ee.FeatureCollection(ndvi_monthly.map(lambda image: extract_var(image, geometry)))
    ndviStats = firstNDVI.merge(ndvi)

    # Join ERA5-Land Collection and NDVI Collection
    saveFirst = ee.Join.saveFirst(matchKey='match', ordering='system:time_start', ascending=True, measureKey=None, outer=True)

    # Specify an equals filter for image timestamps
    filterTimeEq = ee.Filter.equals(leftField='Date', rightField='Date')

    timeSeries = saveFirst.apply(era5LandStats, ndviStats, filterTimeEq).map(filter_join).map(remove_match)

    # Export timeseries to Google Drive
    task = ee.batch.Export.table.toDrive(**{
            'collection': timeSeries,
            'description': 'ERA5_NDVI_collection_' + region_ID.getInfo(),
            'fileFormat': 'CSV',
            'folder': config['out_brazil']
            })

    # run tasks automatically
    task.start()
    print(task.status())

#### 2.2.2. Peru

In [None]:
startDate = ee.Date('2010-01-01')
endDate = ee.Date('2020-02-01')

# retrieve table with shapefiles info
# List of Department codes (ADM1_PCODE)
# Loreto - PE16
# Madre de Dios - PE17
# Piura - PE20
# San Martin - PE22
# Tumbes - PE24
# Ucayali - PE25
table = ee.FeatureCollection("users/raquelarscarmo/PROVINCES")
tablefiltered = table.filter(ee.Filter.inList('ADM1_PCODE', ee.List(['PE16', 'PE17', 'PE20', 'PE22', 'PE24', 'PE25'])))

era5Land_monthly = get_era5land(startDate, endDate)
ndvi_monthly = get_ndvi(startDate, endDate)

# extract list of regions by code
regions = tablefiltered.aggregate_array("ADM2_PCODE").distinct()

# loop over the regions
for index_region in range(0, regions.size().getInfo()):
    print(index_region)

    # get region ID
    region_ID = ee.String(regions.get(index_region))
    region = ee.Feature(ee.List(tablefiltered.toList(tablefiltered.size())).get(index_region))
    geometry = region.geometry()

    # monthly stats from ERA5-Land
    era5LandStats = ee.FeatureCollection(era5Land_monthly.map(lambda image: extract_var(image, geometry)))\
                                                         .map(addHumidity)

    # monthly NDVI
    ndviStats = ee.FeatureCollection(ndvi_monthly.map(lambda image: extract_var(image, geometry)))

    # Join ERA5 Collection and NO2 Collection
    saveFirst = ee.Join.saveFirst(matchKey='match', ordering='system:time_start', ascending=True, measureKey=None, outer=True)

    # Specify an equals filter for image timestamps
    filterTimeEq = ee.Filter.equals(leftField='Date', rightField='Date')

    timeSeries = saveFirst.apply(era5LandStats, ndviStats, filterTimeEq).map(filter_join).map(remove_match)

    # Export timeseries to Google Drive
    task = ee.batch.Export.table.toDrive(**{
            'collection': timeSeries,
            'description': 'ERA5land_NDVI_collection_' + region_ID.getInfo(),
            'fileFormat': 'CSV',
            'folder': config['out_peru']
            })

    # run tasks automatically
    task.start()
    print(task.status())

#### Control of Earth Engine tasks

In [None]:
!earthengine task list

In [None]:
!earthengine task cancel all

Run the remaining code cells once all files have been successfully downloaded.

## 3. Gather and merge time series from all cities

### 3.1. Brazil: Merge all dataframes

In [None]:
csv_files = sorted(glob(join(config['out_brazil'], "*.csv")))
print(f'Number of csv files in folder "{config['out_brazil']}":', len(csv_files))

#### 3.1.1. Brazilian cities without ERA5-land

In [None]:
processed_files = [os.path.basename(x).split('_', 3)[-1].split('.')[0] for x in csv_files]

# retrieve table with shapefiles info
table = ee.FeatureCollection("users/raquelarscarmo/BR_Municipios_2020")

# extract list of regions by code
all = table.aggregate_array("CD_MUN").distinct().getInfo()

non_processed = [file for file in all if file not in processed_files]
print('Municipios without ERA5-land:', non_processed)

# RegionID == '2605459' refers to Fernando de Noronha which is an island, so we can ignore this one
geo2buffer = [x for x in non_processed if x != '2605459']
print('Municipios to run with 10km buffer:', geo2buffer)

Create a 10km buffer around each city without ERA5-land and re-download the data.

In [None]:
startDate = ee.Date('2000-01-01')
endDate = ee.Date('2020-01-01')

era5Land_monthly = get_era5land(startDate, endDate)
ndvi_monthly = get_ndvi(ee.Date('2000-02-01'), endDate)

# extract list of regions by code
regions = table.aggregate_array("CD_MUN").distinct()

# loop over the regions
for index_region in range(0, regions.size().getInfo()):
    #print(index_region)

    # get region ID
    region_ID = ee.String(regions.get(index_region)).getInfo()
    if region_ID in geo2buffer:
        print(region_ID)
        region = ee.Feature(ee.List(table.toList(table.size())).get(index_region))
        geometry = region.geometry()
        buffered_geo = geometry.buffer(distance=10000)

        # monthly stats from ERA5-Land
        era5LandStats = ee.FeatureCollection(era5Land_monthly\
                                                .map(lambda image: extract_var(image, buffered_geo)))\
                                                .map(addHumidity)

        # monthly NDVI (only available from Febuary 2000)
        firstNDVI = ee.FeatureCollection([ee.Feature(None, ee.Dictionary({'Date': ee.Date('2000-01-01').format('yyyy-MM-dd'), 'NDVI': -999}))])
        ndvi = ee.FeatureCollection(ndvi_monthly.map(lambda image: extract_var(image, buffered_geo)))
        ndviStats = firstNDVI.merge(ndvi)

        # Join ERA5-Land Collection and NDVI Collection
        saveFirst = ee.Join.saveFirst(matchKey='match', ordering='system:time_start', ascending=True, measureKey=None, outer=True)

        # Specify an equals filter for image timestamps
        filterTimeEq = ee.Filter.equals(leftField='Date', rightField='Date')

        timeSeries = saveFirst.apply(era5LandStats, ndviStats, filterTimeEq).map(filter_join).map(remove_match)

        # Export timeseries to Google Drive
        task = ee.batch.Export.table.toDrive(**{
                'collection': timeSeries,
                'description': 'ERA5_NDVI_collection_' + region_ID,
                'fileFormat': 'CSV',
                'folder': config['out_brazil']
                })

        # run tasks automatically
        task.start()
        print(task.status())

Run the remaining code cells once all files have been successfully downloaded.

In [None]:
csv_files = sorted(glob(join(config['out_brazil'], "*.csv")))
print(f'Number of csv files in folder "{config['out_brazil']}":', len(csv_files))

In [None]:
# get dataframe with names and codes of the Cities
df_mun = pd.read_csv(join(config['main_brazil'], "BR_Municipios_2020.csv"), encoding='iso-8859-1', converters={'CD_MUN':str})
df_mun

In [None]:
# get dataframe with names and codes of the States
df_uf = pd.read_csv(join(config['main_brazil'], "BR_UF_2020.csv"), encoding='iso-8859-1')
df_uf

In [None]:
# merge Cities and States into one dataframe
df_regions = pd.merge(df_uf, df_mun, how='inner', on='SIGLA_UF').rename(columns={'AREA_KM2': 'area_km2'})[['CD_UF', 'CD_MUN', 'area_km2']]
df_regions

In [None]:
fullTimeSeries = None
municipiosLeftOut = []
for file in csv_files:
    df = pd.read_csv(file).drop(columns=['system:index', '.geo'], axis=1)
    df.replace({-999: np.nan}, inplace=True)

    municipioCode = os.path.basename(file).split('_', 3)[-1].split('.')[0]
    df.insert(0, 'CD_MUN', value = municipioCode)

    if df['temperature_2m'].isnull().values.all():
        print(f"{municipioCode}: This station doesn't have ERA5-Land data")
        municipiosLeftOut.append(municipioCode)
        continue

    df2 = pd.merge(df_regions, df, how='inner', on='CD_MUN')
    fullTimeSeries = pd.concat([fullTimeSeries, df2], ignore_index=True)

print('Municipios left out:', municipiosLeftOut)
fullTimeSeries

In [None]:
fullTimeSeries.to_csv(join(config['main_brazil'], 'ERA5land_NDVI_monthly_cities_Brazil.csv'), index=False)

### 3.3. Peru: Merge all dataframes

In [None]:
csv_files = sorted(glob(join(config['out_peru'], "*.csv")))
print(f'Number of csv files in folder "{config['out_peru']}":', len(csv_files))

In [None]:
df_regions = pd.read_csv(join(config['main_peru'], "Peru_DepID_ProvID.csv"), encoding='iso-8859-1')\
                .drop(columns=['ADM0_EN', 'ADM0_PCODE', 'ADM1_ES', 'ADM2_ES', 'Shape_Area'], axis=1)
df_regions

In [None]:
fullTimeSeries = None
provincesLeftOut = []

for file in csv_files:
    #print(file)
    df = pd.read_csv(file).drop(columns=['system:index', '.geo'], axis=1)
    df.replace({-999: np.nan}, inplace=True)
    df = df[df.Date != '2020-01-01']

    code = os.path.basename(file).split('_')[-1].split('.')[0]
    df.insert(0, 'ADM2_PCODE', value=code)

    if df['temperature_2m'].isnull().values.all():
        print(f"{code}: This station doesn't have ERA5-Land data")
        provincesLeftOut.append(code)
        continue

    df2 = pd.merge(df_regions, df, how='inner', on='ADM2_PCODE')
    fullTimeSeries = pd.concat([fullTimeSeries, df2], ignore_index=True)

print('Provinces left out:', provincesLeftOut)
fullTimeSeries

In [None]:
fullTimeSeries.to_csv(join(config['main_peru'], 'ERA5land_NDVI_monthly_provinces_Peru.csv'), index=False)

## 4. State centroids and distances

In [None]:
df = pd.read_csv(join(config['main_brazil'], "BR_UF_distCentroids.csv"), converters={'CD_UF':str, 'neighboringState':str})[['CD_UF', 'neighboringState', 'distance']]
df.drop(df[df.CD_UF	== df.neighboringState].index, inplace=True)
df

In [None]:
df1 = df.sort_values(['CD_UF', 'distance'], ascending=True).groupby('CD_UF').head(7)
df1.reset_index(drop=True, inplace=True)
df1

In [None]:
df1.to_csv(join(config['main_brazil'], "BR_UF_7ClosestNeighbors.csv"), index=False)