In [None]:
import sys

lib_dir = "/home/daniele/documents/github/ftt01/phd/share/lib"
sys.path.insert( 0, lib_dir )

from lib import *
import subprocess
import psycopg2
import random
import multiprocessing

In [None]:
class Input():
    def add_start_date(self, start_date):
        self.start_date = start_date
    def add_end_date(self, end_date):
        self.end_date = end_date
    def add_file(self, file):
        self.file = file
    def add_variable(self, variable):
        self.variable = variable

input_parser = argparse.ArgumentParser()
input_parser.add_argument('-s','--start_date', type=str, nargs='?', default="2010-01-01T00:00:00")
input_parser.add_argument('-e','--end_date', type=str, nargs='?', default="2022-12-31T23:00:00")
input_parser.add_argument('-f','--file', type=str, nargs='?', default=None)
input_parser.add_argument('-v','--variables', type=list, nargs='?', default=['tp','2t'])
args = input_parser.parse_args()

input_path = "/media/lacie2022/data/meteo/ecmwf/era5/land/"
tmp_path = "/media/lacie2022/data/meteo/ecmwf/era5/land/tmp/"
mkNestedDir( tmp_path )

if args.file == None:
    args.files = [
    "20102011.grib",
    "20162017.grib",
    "2020_11.grib",
    "2020_2.grib",
    "2020_5.grib",
    "2020_8.grib",
    "2021_11.grib",
    "2021_2.grib",
    "2021_5.grib",
    "2021_8.grib",
    "20122013.grib",
    "20182019.grib",
    "2020_12.grib",
    "2020_3.grib",
    "2020_6.grib",
    "2020_9.grib",
    "2021_12.grib",
    "2021_3.grib",
    "2021_6.grib",
    "2021_9.grib",
    "20142015.grib",
    "2020_10.grib",
    "2020_1.grib",
    "2020_4.grib",
    "2020_7.grib",
    "2021_10.grib",
    "2021_1.grib",
    "2021_4.grib",
    "2021_7.grib"]
else:
    args.files = [args.file]

In [None]:
start_date = dt.datetime.strptime( args.start_date, '%Y-%m-%dT%H:%M:%S' )
end_date = dt.datetime.strptime( args.end_date, '%Y-%m-%dT%H:%M:%S' )
dates = pd.date_range(start_date, end_date, freq='1H')

In [None]:
def get_postgres_connection():

    db_name = 'meteo'
    db_user = 'postgres'
    db_password = 'pgAifa2Bias?'
    db_host = '172.20.0.2'

    return psycopg2.connect(database=db_name, user=db_user, password=db_password, host=db_host)

In [None]:
def add_point( y, x, epsg=4326, tolerance=0.01 ):

    c_id = None

    sql_exist = '''
        SELECT COUNT(*)
        FROM ecmwf.era5_land_points
        WHERE ST_Contains(
            ST_Transform(
                ST_MakeEnvelope({min_lon}, {min_lat}, {max_lon}, {max_lat}, {epsg}), 4326 ),
            era5_land_points.geom)
        LIMIT 1;'''

    min_lat = y - tolerance
    max_lat = y + tolerance
    min_lon = x - tolerance
    max_lon = x + tolerance

    sql_exist = sql_exist.format(
        min_lat=min_lat,
        min_lon=min_lon,
        max_lat=max_lat,
        max_lon=max_lon,
        epsg=epsg
    )
    
    # print(sql_exist)

    sql_insert = '''
        INSERT INTO ecmwf.era5_land_points(geom)
        VALUES ( ST_SetSRID(ST_MakePoint({x},{y}),{epsg}) )
        ON CONFLICT DO NOTHING;'''.format(
            x=x,
            y=y,
            epsg=epsg
    )

    sql_select = '''
        SELECT ecmwf.era5_land_points.id
        FROM ecmwf.era5_land_points
        ORDER BY ecmwf.era5_land_points.geom <#> ST_SetSRID(ST_MakePoint({x},{y}),{epsg})
        LIMIT 1;'''.format(
            x=x,
            y=y,
            epsg=epsg
        )
    
    # print(sql_select)

    conn = get_postgres_connection()
    try:
        with conn.cursor() as cur:
            # print(sql_insert)
            # cur.execute(sql_insert)
            # conn.commit()

            # cur.execute(sql_select)
            # c_id = int(cur.fetchall()[0][0])
            cur.execute(sql_exist)
            rows = cur.fetchall()
            if rows[0][0] != 0:
                cur.execute(sql_select)
                c_id = int(cur.fetchall()[0][0])
            else:
                print(sql_insert)
                cur.execute(sql_insert)
                conn.commit()
                cur.execute(sql_select)
                c_id = int(cur.fetchall()[0][0])
    finally:
        conn.close()

    return c_id

In [None]:
def add_data( y, x, epsg, datetime_UTC, value, variable, um ):

    # print(point)

    sql_insert = '''
        INSERT INTO ecmwf.era5_land_values(
	        datetime, value, point, variable, um)
	    VALUES ('{datetime}'::timestamp, {value},
            (
                SELECT ecmwf.era5_land_points.id
                FROM ecmwf.era5_land_points
                ORDER BY ST_Distance(ST_SetSRID(ST_MakePoint({x},{y}),{epsg}), ecmwf.era5_land_points.geom)
                LIMIT 1
            ), '{variable}', '{um}')
        ON CONFLICT DO NOTHING;'''.format(
            x=x,
            y=y,
            epsg=epsg,
            datetime=datetime_UTC, 
            value=value, 
            variable=variable,
            um=um
        )
    print(sql_insert)
    
    conn = get_postgres_connection()
    try:
        with conn.cursor() as cur:
            cur.execute(sql_insert)
            conn.commit()
    finally:
        conn.close()

In [None]:
def load_on_db( date, var_meta, n_points:int, c_file ):

    c_ens_df = pd.read_csv(c_file, nrows=n_points, sep='\s+')
    c_ens_df = c_ens_df.rename(columns={'Value':1})
    
    c_ens_df = c_ens_df.set_index(['Latitude','Longitude'])

    for idx in c_ens_df.index:

        lat = idx[0]
        lon = idx[1]
        c_id = add_point( lat, lon, epsg=4326 )
        add_data( lat, lon, 4326, date, c_ens_df.loc[idx].values[0], var_meta[0], var_meta[1] )

In [None]:
def execute(c_file, model_varname):

    tmp_txt = dt.datetime.strftime(
        dt.datetime.now(),
        format="%Y%m%dT%H%M%S") + '_' + str(random.randint(1,100000)) + "_" + model_varname

    if model_varname == 'tp':
        variable = 'tp'
        um = 'm'

    elif model_varname == '2t':
        variable = '2t'
        um = 'K'

    ## create a file for each variable
    pre_cmd = '''cdo -selname,{var} {input_path}{file_grib} {input_path}{var}_{file_grib}'''.format(
        var = model_varname,
        input_path = input_path,
        file_grib = c_file
    )
    print(pre_cmd)
    pre_process = subprocess.Popen(pre_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    stdout, stderr = pre_process.communicate()

    if ('Error' in str(stderr)):
        print(stderr)
        return None
    else:
        ## read the meta info in a table with cdo
        meta_cmd = '''cdo -info {input_path}{var}_{file_grib} > {tmp_path}meta{tmp_txt}.txt'''.format(
            var = model_varname,
            input_path = input_path,
            file_grib = c_file,
            tmp_path = tmp_path,
            tmp_txt = tmp_txt
        )
        print(meta_cmd)
        process = subprocess.Popen(meta_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()

    if ('Error' in str(stderr)):
        print(stderr)
        return None
    else:
        ## load the meta
        df_meta = pd.read_csv('''{tmp_path}meta{tmp_txt}.txt'''.format(
            tmp_path = tmp_path,
            tmp_txt = tmp_txt
            ), sep='\s+'
        )

    for idx in df_meta.index:
        try:
            ts = int(df_meta.loc[idx]['-1'])
        except:
            break

        c_date = dt.datetime.strptime(
            df_meta.loc[idx]['Date'] + 'T' + df_meta.loc[idx]['Time'],
            "%Y-%m-%dT%H:%M:%S")
        
        if not(c_date in dates):
            continue

        n_points = df_meta.loc[idx]['Gridsize']
    
        pre_cmd = '''cdo -seltimestep,{step} {input_path}{var}_{file_grib} {input_path}tmp_{idx}_{var}_{file_grib}'''.format(
            var = model_varname,
            step = ts,
            input_path = input_path,
            idx = idx,
            file_grib = c_file
        )
        print(pre_cmd)
        process = subprocess.Popen(pre_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()
            
        cmd = '''grib_get_data {input_path}tmp_{idx}_{var}_{file_grib} >> {tmp_path}{tmp_txt}.txt'''.format(
            idx = idx,
            file_grib = c_file,
            input_path = input_path,
            tmp_path = tmp_path,
            tmp_txt=tmp_txt,
            var = model_varname
        )
        print(cmd)
        process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = process.communicate()

        load_on_db( c_date, (variable, um), int(n_points), tmp_path + '{tmp_txt}.txt'.format(tmp_txt=tmp_txt) )

        try:
            os.remove( tmp_path + '{tmp_txt}.txt'.format(tmp_txt=tmp_txt) )
            os.remove( '{input_path}tmp_{idx}_{var}_{file_grib}'''.format(
                input_path = input_path,
                idx = idx,
                file_grib = c_file,
                var = model_varname
                )
            )
        except:
            continue

In [None]:
def extract( c_file ):
    print('Processing: ' + str(c_file))

    for v in args.variables:
        execute(c_file, v)

In [None]:
# create a process pool that uses all cpus
pool = multiprocessing.Pool()
with multiprocessing.Pool() as pool:
    pool.map(extract, args.files)
# close the process pool
pool.close()