pip install cdsapi  

In [None]:
import os, sys, cdsapi, typing, zipfile, calendar, multiprocessing
from QueryHandler import QueryHandler
from shapely.geometry import Point
from dotenv import load_dotenv
import sqlalchemy as sq 
import geopandas as gpd
import xarray as xr
import pandas as pd
import numpy as np

sys.path.append('../')
from DataService import DataService


load_dotenv()
PG_DB = os.getenv('POSTGRES_DB')
PG_ADDR = os.getenv('POSTGRES_ADDR')
PG_PORT = os.getenv('POSTGRES_PORT')
PG_USER = os.getenv('POSTGRES_USER')
PG_PW = os.getenv('POSTGRES_PW')

In [None]:
NUM_WORKERS = 8

MIN_MONTH = 3
MAX_MONTH = 12

MIN_YEAR = 1995
MAX_YEAR = 2023

years = [str(year) for year in range(MIN_YEAR, MAX_YEAR + 1)]       # the year range we want to pull data from
months = [str(month) for month in range(MIN_MONTH, MAX_MONTH + 1)]  # the month range we want to pull data from

ATTRS = [                                                           # the attributes we want to pull data for
    '2m_dewpoint_temperature', '2m_temperature', 'evaporation_from_bare_soil', 'skin_reservoir_content', 'skin_temperature',
    'snowmelt', 'soil_temperature_level_1', 'soil_temperature_level_2', 'soil_temperature_level_3', 'soil_temperature_level_4',
    'surface_net_solar_radiation', 'surface_pressure', 'volumetric_soil_water_layer_1', 'volumetric_soil_water_layer_2', 
    'volumetric_soil_water_layer_3', 'volumetric_soil_water_layer_4'
]

HOURS = [                                                           # the hours we want to pull data from
    '00:00', '01:00', '02:00', '03:00', '04:00', '05:00', '06:00', '07:00', '08:00', '09:00', '10:00', '11:00','12:00', '13:00', 
    '14:00', '15:00', '16:00', '17:00', '18:00', '19:00', '20:00', '21:00', '22:00', '23:00'
]

AREA = [61, -125, 48, -88]

In [None]:
def main():
    db = DataService(PG_DB, PG_ADDR, PG_PORT, PG_USER, PG_PW)
    queryHandler = QueryHandler()
    jobArgs = [] # create the list of unique jobs -> tuples inside of array
    count = 1

    conn = db.connect()
    createTable(db, queryHandler)   # check the tables, if necessary make a new table for the data
    agRegions = loadGeometry(conn)  # load the geometry from the database
    db.cleanup()

    for year in years:
        for month in months:
            numDays = calendar.monthrange(int(year), int(month))[1]
            
            days = [str(day) for day in range(1, numDays + 1)]
            outputFile = f'copernicus{count}'
            count += 1

            jobArgs.append(tuple((agRegions, year, month, days, outputFile)))

            pullSateliteData(agRegions, year, month, days, outputFile)
            break
    pool = multiprocessing.Pool(NUM_WORKERS)
    pool.starmap(pullSateliteData, jobArgs)
    pool.close()

In [None]:
# check if the copernicus table exists, if it doesnt create it
def createTable(db: DataService, queryHandler: QueryHandler):
    query = sq.text(queryHandler.tableExistsReq('copernicus_satelite_data'))
    tableExists = queryHandler.readTableExists(db.execute(query))
    
    if not tableExists:
        query = sq.text(queryHandler.createCopernicusTableReq())
        db.execute(query)

In [None]:
# loads the agriculture regions from the datbase (projection is EPSG:3347)
def loadGeometry(conn: sq.engine.Connection) -> gpd.GeoDataFrame:
    query = sq.text('select car_name, geometry FROM public.census_ag_regions')
    agRegions = gpd.GeoDataFrame.from_postgis(query, conn, crs='EPSG:3347', geom_col='geometry')

    return agRegions
    

In [None]:
# check if the current point (lon, lat) is in any of the regions pulled from the database
def calcAgRegion(agRegions: gpd.GeoDataFrame, point: Point) -> typing.Tuple[int, None]:
    area = None   # by default it is assumed we will not find this point

    for index, region in agRegions.iterrows():
        if region['geometry'].contains(point)[0]:   # for each region, check if the point is within that areas geometry
            area = region['cr_num']                 # found it, update the area with the new name
            break
    
    return area

In [None]:
def storeData(df, db: DataService):
    queryHandler = QueryHandler()

    for index, data in df.iterrows():
        date = pd.Timestamp(np.datetime64(data['time']))
        year = date.year
        month = date.month
        day = date.day
        hour = date.hour

        for col in df.columns:
            if np.ma.is_masked(data[col]):
                data[col] = 'NULL'

        query = sq.text(queryHandler.createInsertRowReq(data, year, month, day, hour))
        db.execute(query)

In [None]:
def addRegions(df: pd.DataFrame, agRegions: gpd.GeoDataFrame) -> pd.DataFrame:
    df['cr_num'] = None
    
    for index, data in df.iterrows():
        point = Point(data['longitude'], data['latitude'])  # creates geometry for the current point                      
        point = gpd.GeoSeries(point, crs='EPSG:4326')       # transforms the geometry into a geoseries and sets the projection to common (lon, lat)
        point = point.to_crs(crs='EPSG:3347')               # changes the points projection to match the agriculture regions
        data['cr_num'] = calcAgRegion(agRegions, point)     # adds coordinate region's name (or None if the region is not of interest)    
    
    df.dropna(subset=['region'], inplace=True)          # drops all rows where region is None

    return df

In [None]:
def unzipFile(file: str):
    with zipfile.ZipFile(f'./{file}', 'r') as zip_ref:
        zipinfos = zip_ref.infolist()

        for zipinfo in zipinfos:
            zipinfo.filename = file
            zip_ref.extract(zipinfo)
            break

In [None]:


def readNetCDF(file: str) -> pd.DataFrame:
    dataset = xr.open_dataset(file)
    df = dataset.to_dataframe().reset_index()

    return df

In [None]:
def renameCols(df: pd.DataFrame) -> pd.DataFrame:
    df.rename(columns={df.columns[3]: 'dewpoint_temperature'}, inplace=True)
    df.rename(columns={df.columns[4]: 'temperature'}, inplace=True)
    df.rename(columns={df.columns[5]: 'evaporation_from_bare_soil'}, inplace=True)
    df.rename(columns={df.columns[6]: 'skin_reservoir_content'}, inplace=True)
    df.rename(columns={df.columns[7]: 'skin_temperature'}, inplace=True)
    df.rename(columns={df.columns[8]: 'snowmelt'}, inplace=True)
    df.rename(columns={df.columns[9]: 'soil_temperature_level_1'}, inplace=True)
    df.rename(columns={df.columns[10]: 'soil_temperature_level_2'}, inplace=True)
    df.rename(columns={df.columns[11]: 'soil_temperature_level_3'}, inplace=True)
    df.rename(columns={df.columns[12]: 'soil_temperature_level_4'}, inplace=True)
    df.rename(columns={df.columns[13]: 'surface_net_solar_radiation'}, inplace=True)
    df.rename(columns={df.columns[14]: 'surface_pressure'}, inplace=True)
    df.rename(columns={df.columns[15]: 'volumetric_soil_water_layer_1'}, inplace=True)
    df.rename(columns={df.columns[16]: 'volumetric_soil_water_layer_2'}, inplace=True)
    df.rename(columns={df.columns[17]: 'volumetric_soil_water_layer_3'}, inplace=True)
    df.rename(columns={df.columns[18]: 'volumetric_soil_water_layer_4'}, inplace=True)

    return df

In [None]:
def pullSateliteData(agRegions: gpd.GeoDataFrame, year: str, month : str, days: list, outputFile: str):
    db = DataService(PG_DB, PG_ADDR, PG_PORT, PG_USER, PG_PW)
    c = cdsapi.Client()
    
    print(f'Starting to pull data for {year}/{month}')
    c.retrieve(
        'reanalysis-era5-land',
        {
            'format': 'netcdf.zip',
            'variable': ATTRS,
            'year': year,
            'month': month,
            'day': days,
            'time': HOURS,
            'area': AREA,
        },
        f'{outputFile}.netcdf.zip'
    )

    unzipFile(outputFile)   # unzips the file, renames it to outputFile and then deletes the source .zip file
    
    df = readNetCDF(f'{outputFile}.nc')     # converts the netcdf content into a dataframe
    df = renameCols(df)                     # renames the columns according to their attribute
    df = addRegions(df, agRegions)          # adds regions to all coordinates (data without an associated region is dropped)
    storeData(df, db)

    # Clean up the environment after the transaction
    os.remove(f'{outputFile}.netcdf.zip')
    os.remove(f'{outputFile}.nc')
    db.cleanup()
                    
    print(f'[SUCCESS] data was pulled for {year}/{month}')

In [None]:
if __name__ == '__main__':
    main()