In [6]:
import pandas as pd
import numpy as np
import os, glob, sys
sys.path.append(os.getcwd()+'/../cds-backend/code/')
import cds_eua4 as eua
import trajectory
import h5py

import ray
import time

import multiprocessing
from functools import partial

def find_nearest(array, value):
    array = np.asarray(array)
    idx = (np.abs(array - value)).argmin()
    return array[idx]

def datetime_to_seconds(dates, ref='1900-01-01T00:00:00'):
    """ from datetime64 to seconds since 1900-01-01 00:00:00"""
    return ((dates - np.datetime64(ref)) / np.timedelta64(1, 's')).astype(np.int64)

def seconds_to_datetime(seconds, ref='1900-01-01'):
    """ from seconds to datetime64 """
    seconds = np.asarray(seconds)
    return pd.to_datetime(seconds, unit='s', origin=ref)


In [22]:
@ray.remote
def create_disp_files(s):
    try:
        statid = (s.split("-")[-1])
        print(statid)
        conv_file = glob.glob('/mnt/users/scratch/leo/scratch/converted_v9/newindex/*'+statid+'*')[0]
        dt_from = datetime_to_seconds(np.datetime64('2020-01-01'))
        dt_to = datetime_to_seconds(np.datetime64('2021-12-31'))

        df_dict = {}
        h_df_dict = {}

        with h5py.File(conv_file, 'r') as file:
            rts = file['recordindices']['recordtimestamp'][:]
            idx = np.where(np.logical_and((rts >= dt_from), (rts <= dt_to)))[0]
            plevs = [1000,2000,3000,5000,7000,10000,15000,20000,25000,30000,40000,50000,70000,85000,92500,100000]
            idx_d = {}
            var_d = {'air_temperature':'126', 'relative_humidity':'138', 'geopotential':'117', 'eastward_wind_speed':'139', 'northward_wind_speed':'140', 'dew_point': '137', 'specific_humidity':'39'}
            for i in var_d:
                idx_d[i] = file['recordindices'][var_d[i]][idx]

            masks = {}
            for i in idx_d:
                masks[i] = file['observations_table']['z_coordinate'][idx_d[i][0]:idx_d[i][-1]]
                masks[i] = np.isin(masks[i],plevs)

            mask = masks['air_temperature']
            t_idx = idx_d['air_temperature']
            df_dict['z_coordinate'] = list(file['observations_table']['z_coordinate'][t_idx[0]:t_idx[-1]][mask])
            df_dict['date_time'] = seconds_to_datetime(list(file['observations_table']['date_time'][t_idx[0]:t_idx[-1]][mask]))
            df_dict['latitude'] = list(file['observations_table']['latitude'][t_idx[0]:t_idx[-1]][mask])
            df_dict['longitude'] = list(file['observations_table']['longitude'][t_idx[0]:t_idx[-1]][mask])
            repid = np.asarray(file['observations_table']['report_id'][t_idx[0]:t_idx[-1]][mask])
            df_dict['report_id'] = list(repid.view('|S{}'.format(repid.shape[1])).flatten().astype(str))
            df_dict['RASE_bias_estimate'] = list(file['advanced_homogenisation']['RASE_bias_estimate'][t_idx[0]:t_idx[-1]][mask])
            # df_dict['latitude_displacement'] = list(file['advanced_homogenisation']['latitude_displacement'][t_idx[0]:t_idx[-1]][mask])
            # df_dict['longitude_displacement'] = list(file['advanced_homogenisation']['longitude_displacement'][t_idx[0]:t_idx[-1]][mask])
            # df_dict['time_since_launch'] = list(file['advanced_homogenisation']['time_since_launch'][t_idx[0]:t_idx[-1]][mask])
            df_dict['air_temperature'] = list(file['observations_table']['observation_value'][t_idx[0]:t_idx[-1]][mask])

            df_dict_d = {}
            for i in masks:
                if i != 'air_temperature':
                    df_dict_d[i] = {}
            for i in masks:
                if i != 'air_temperature':
                    df_dict_d[i]['z_coordinate'] = list(file['observations_table']['z_coordinate'][idx_d[i][0]:idx_d[i][-1]][masks[i]])
                    df_dict_d[i]['date_time'] = seconds_to_datetime(list(file['observations_table']['date_time'][idx_d[i][0]:idx_d[i][-1]][masks[i]]))
                    df_dict_d[i][i] = list(file['observations_table']['observation_value'][idx_d[i][0]:idx_d[i][-1]][masks[i]])            

            first = True
            for i in ['air_temperature', 'eastward_wind_speed', 'northward_wind_speed']:
                if first:
                    out_df = pd.DataFrame.from_dict(df_dict)
                    # print('out_df')
                    # display(out_df)
                    # out_df = out_df.rename(columns={"observation_value": i})
                    # out_df = out_df.drop(['variable'], axis=1)
                    first = False
                else:
                    new = pd.DataFrame.from_dict(df_dict_d[i])
                    for drp in ["data_policy_licence", "RAOBCORE_bias_estimate", "latitude", "longitude", "primary_id", "report_id", "variable"]:
                        try:
                            new = new.drop([drp], axis=1)
                        except: pass
                    # new = new.rename(columns={'observation_value':i})
                    out_df = pd.merge(out_df, new, on=['date_time', 'z_coordinate'], how='inner')
            for i in ['dew_point', 'geopotential', 'relative_humidity', 'specific_humidity']:
                new = pd.DataFrame.from_dict(df_dict_d[i])
                print(i, len(new))
                for drp in ["data_policy_licence", "RAOBCORE_bias_estimate", "latitude", "longitude", "primary_id", "report_id", "variable"]:
                    try:
                        new = new.drop([drp], axis=1)
                    except: pass
                new = new.rename(columns={'observation_value':i})
                out_df = pd.merge(out_df, new, on=['date_time', 'z_coordinate'], how='left')

            dfj = out_df    
            # print('dfj')
            # display(dfj)

            latds = []
            londs = []
            rtss = []
            for i in dfj.report_id.unique():
                dfi = dfj[dfj.report_id == i]
                dfi = dfi.sort_values(by=['z_coordinate'], ascending=False)
                latd, lond, us, vs, rts = trajectory.trajectory(lat = dfi.latitude.iloc[0], lon = dfi.longitude.iloc[0], u = np.array(dfi.eastward_wind_speed), v = np.array(dfi.northward_wind_speed), pressure= np.array(dfi.z_coordinate), temperature=np.array(dfi.air_temperature))
                latds += (latd[::-1])
                londs += (lond[::-1])
                rtss += (rts[::-1])
            dfj['latitude_displacement'] = latds
            dfj['longitude_displacement'] = londs
            dfj['seconds_since_launche'] = rtss
            if len(dfj) > 0:
                dfj.to_csv("./displacement_files_no_licience_2020/"+str(statid)+"_displacements_2020_2022.csv")

    except:
        pass

In [23]:
stats = list(pd.read_csv(glob.glob('RAOBS_EUR-11.csv')[0]).statid)
ray.shutdown()
ray.init(num_cpus=20)
result_ids = []
for i in stats:
    result_ids.append(create_disp_files.remote(i))
results = ray.get(result_ids)
ray.shutdown()

2023-02-02 12:14:29,896	INFO worker.py:1528 -- Started a local Ray instance.


[2m[36m(create_disp_files pid=1452262)[0m 08190
[2m[36m(create_disp_files pid=1452267)[0m 08023
[2m[36m(create_disp_files pid=1452254)[0m 08001
[2m[36m(create_disp_files pid=1452264)[0m 07761
[2m[36m(create_disp_files pid=1452256)[0m 07510
[2m[36m(create_disp_files pid=1452248)[0m 07481
[2m[36m(create_disp_files pid=1452261)[0m 07145
[2m[36m(create_disp_files pid=1452265)[0m 06458
[2m[36m(create_disp_files pid=1452255)[0m 06181
[2m[36m(create_disp_files pid=1452266)[0m 03769
[2m[36m(create_disp_files pid=1452259)[0m 03743
[2m[36m(create_disp_files pid=1452252)[0m 03316
[2m[36m(create_disp_files pid=1452257)[0m 03238
[2m[36m(create_disp_files pid=1452258)[0m 03171
[2m[36m(create_disp_files pid=1452263)[0m 02963
[2m[36m(create_disp_files pid=1452250)[0m 02935
[2m[36m(create_disp_files pid=1452260)[0m 02836
[2m[36m(create_disp_files pid=1452253)[0m 02591
[2m[36m(create_disp_files pid=1452251)[0m 02365
[2m[36m(create_disp_files pid