In [3]:
import json
import os

In [28]:
import logging
import time

current_ts = int(time.time())

logging.basicConfig(
    format='%(asctime)s %(levelname)-8s %(message)s',
    level=logging.DEBUG,
    datefmt='%Y-%m-%d %H:%M:%S',
    filename=f'script_prepare_jobs_state_{current_ts}.log',
    encoding='utf-8',
)

In [20]:
with open(os.path.join("/home/jovyan/work/ship/calculate_points", "jobs_state_new.json")) as f:
    state = json.load(f)
    completed_jobs = state["completed_jobs"]

In [5]:
class JobSchedulerContext:
    def __init__(self):
        pass

    @classmethod
    def make_job_context(cls, job_id, wrf_params, local_work_dir, job_work_dir, only_geo_params=False):

        kuber_job_prefix = 'calculate-geo-middle' if only_geo_params else 'calculate-points'
        results_dir_name = 'geo_results_middle' if only_geo_params else 'results'

        return {
            "id": job_id,
            "kuber_job_id": f"{kuber_job_prefix}-{job_id}",
            "wrf_params": wrf_params,
            "local_dir_with_results": os.path.join(local_work_dir, results_dir_name, job_id),
            "job_dir_with_results": os.path.join(job_work_dir, results_dir_name, job_id),
            "score": None,
            "error_msg": None
        }

    @classmethod
    def get_scheduler_job_id(cls, job_context):
        return job_context["id"]

    @classmethod
    def get_kuber_job_id(cls, job_context):
        return job_context["kuber_job_id"]

    @classmethod
    def get_wrf_params(cls, job_context):
        return job_context["wrf_params"]

    @classmethod
    def get_local_dir_with_results(cls, job_context):
        return job_context["local_dir_with_results"]

    @classmethod
    def get_job_dir_with_results(cls, job_context):
        return job_context["job_dir_with_results"]

    @classmethod
    def get_score(cls, job_context):
        return job_context["score"]

    @classmethod
    def set_score_result(cls, job_context, score_result):
        job_context["score_result"] = score_result

    @classmethod
    def set_error_msg(cls, job_context, error_msg):
        job_context["error_msg"] = error_msg

    @classmethod
    def calculate_expected_count_of_results_files(cls, job_context):
        wrf_params = JobSchedulerContext.get_wrf_params(job_context)
        start_date = datetime.strptime(wrf_params["START_DATE"], '%Y-%m-%d_%H:%M:%S')
        end_date = datetime.strptime(wrf_params["END_DATE"], '%Y-%m-%d_%H:%M:%S')
        interval_in_seconds = 3600
        delta_in_seconds = (end_date - start_date).total_seconds()
        return delta_in_seconds // interval_in_seconds + 1

In [6]:
import xarray as xr
import pandas
from sklearn.metrics import mean_squared_error
from scipy.interpolate import LinearNDInterpolator
import numpy as np

class JobScoreCalculator:
    def __init__(self, real_temperature_filename):
        self._temp_df = pandas.read_csv(real_temperature_filename)
        self._temp_df["time"] = pandas.to_datetime(self._temp_df["time"], format='%Y-%m-%d %H:%M:%S', errors='ignore')

    @staticmethod
    def _get_data_from_result_file(path):
        logging.debug(f"_get_data_from_result_file({path})")
        data = xr.open_dataset(path)
        return data

    def _get_meteostations_data_in_grid_area(self, wrf_data, timestamp):
        logging.debug(f"_get_meteostations_data_in_grid_area")
        min_long = wrf_data.XLONG.data.min()
        max_long = wrf_data.XLONG.data.max()

        min_lat = wrf_data.XLAT.data.min()
        max_lat = wrf_data.XLAT.data.max()

        logging.debug(f"min_long={min_long}, max_long={max_long}, min_lat={min_lat}, max_lat={max_lat}, timestamp={timestamp}")

        df = self._temp_df[
            (self._temp_df["longitude"] > min_long) & (self._temp_df["longitude"] < max_long) &
            (self._temp_df["latitude"] > min_lat) & (self._temp_df["latitude"] < max_lat) &
            (self._temp_df["time"] == timestamp)
        ]

        return df.reset_index()

    @staticmethod
    def _calculate_interpolated_wrf_temps(wrf_data, meteostations_coords):
        logging.debug(f"_calculate_interpolated_wrf_temps")
        flat_grid_lon = wrf_data.XLONG.data.reshape(-1)
        flat_grid_lat = wrf_data.XLAT.data.reshape(-1)
        temp = wrf_data["T2"].data.reshape(-1)
        logging.debug(f"flat_grid_lon {flat_grid_lon}")
        logging.debug(f"flat_grid_lat {flat_grid_lat}")
        temp = temp - 273.15
        logging.debug(f"temp_ {temp}")
        temp_interp = LinearNDInterpolator(list(zip(flat_grid_lon, flat_grid_lat)), temp)
        return temp_interp([coord[0] for coord in meteostations_coords], [coord[1] for coord in meteostations_coords])

    def calculate_job_score(self, timestamped_paths, return_raw=False):
        logging.debug("job score calc started")
        real_temperatures = np.array([])
        wrf_calculated_temperatures = np.array([])

        dropped_points_cnt_by_path = []
        points_cnt_by_path = []
        used_meteostations = set()

        for path in timestamped_paths:
            filename = os.path.basename(path)
            timestamp_str = filename[-19:]
            timestamp = pandas.to_datetime(timestamp_str, format='%Y-%m-%d_%H:%M:%S', errors='ignore')
            wrf_data = self._get_data_from_result_file(path)

            meteostations_data = self._get_meteostations_data_in_grid_area(wrf_data, timestamp)

            if meteostations_data.empty:
                logging.info(f"No meteo data found for {timestamp}")
                points_cnt_by_path.append(0)
                dropped_points_cnt_by_path.append(0)
                continue

            meteostations_temperatures = []
            meteostations_coordinates = []

            for _, row in meteostations_data.iterrows():
                meteostations_temperatures.append(row["temperature"])
                meteostations_coordinates.append((row["longitude"], row["latitude"]))

            meteostations_temperatures = np.array(meteostations_temperatures)
            meteostations_coordinates = np.array(meteostations_coordinates)

            wrf_interpolated_temps = self._calculate_interpolated_wrf_temps(wrf_data, meteostations_coordinates)

            bool_nan_indexes = np.isnan(wrf_interpolated_temps)
            logging.debug(f"will drop {bool_nan_indexes.sum()} points")
            bool_not_nan_indexes = ~bool_nan_indexes
            wrf_interpolated_temps = wrf_interpolated_temps[bool_not_nan_indexes]
            meteostations_temperatures = meteostations_temperatures[bool_not_nan_indexes]
            meteostations_coordinates = meteostations_coordinates[bool_not_nan_indexes]

            for coords in meteostations_coordinates:
                used_meteostations.add((coords[0], coords[1]))
            points_cnt_by_path.append(len(real_temperatures))
            dropped_points_cnt_by_path.append(int(bool_nan_indexes.sum()))

            real_temperatures = np.concatenate((real_temperatures, meteostations_temperatures))
            wrf_calculated_temperatures = np.concatenate((wrf_calculated_temperatures, wrf_interpolated_temps))

        logging.debug(f"real_temperatures {real_temperatures}")
        logging.debug(f"wrf_calculated_temperatures {wrf_calculated_temperatures}")
        assert len(wrf_calculated_temperatures) == len(real_temperatures)
        assert wrf_calculated_temperatures.size
        score = {}
        if return_raw:
            return real_temperatures, wrf_calculated_temperatures
        value = mean_squared_error(real_temperatures, wrf_calculated_temperatures, squared=False)
        score["value"] = value
        logging.debug(f"score {value}")
        score["points_count_by_path"] = points_cnt_by_path
        score["dropped_points_count_by_path"] = dropped_points_cnt_by_path
        score["total_used_meteostations"] = [station for station in used_meteostations]
        return score

In [8]:
calculator = JobScoreCalculator(real_temperature_filename="/home/jovyan/work/ship/calculate_points/last_prepared_noaa_data_morozyto.csv")

In [24]:
import glob

def get_paths(pattern): # f"{JobSchedulerContext.get_local_dir_with_results(job_context)}/auxhist7_d01_2020-07-{day}_*"
    paths = glob.glob(pattern)
    paths.sort()
    return paths

In [26]:
def process_job(job):
    if "score_result" in job:
        del job["score_result"]

    three_hours_paths = f"{JobSchedulerContext.get_local_dir_with_results(job)}/auxhist7_d01_2020-07-15_0[123]:00:00"
    job["score_result_3h"] = calculator.calculate_job_score(timestamped_paths=get_paths(three_hours_paths))

    one_day_paths = f"{JobSchedulerContext.get_local_dir_with_results(job)}/auxhist7_d01_2020-07-15_2[123]:00:00"
    job["score_result_1d"] = calculator.calculate_job_score(timestamped_paths=get_paths(one_day_paths))

    three_days_paths = f"{JobSchedulerContext.get_local_dir_with_results(job)}/auxhist7_d01_2020-07-17_2[123]:00:00"
    job["score_result_3d"] = calculator.calculate_job_score(timestamped_paths=get_paths(three_days_paths))

    seven_days_paths = f"{JobSchedulerContext.get_local_dir_with_results(job)}/auxhist7_d01_2020-07-21_2[123]:00:00"
    job["score_result_7d"] = calculator.calculate_job_score(timestamped_paths=get_paths(seven_days_paths))

    return job

In [None]:
new_state = {}
new_state["completed_jobs"] = []
new_state["failed_jobs"] = []
new_state["running_jobs"] = []

for job in completed_jobs:
    new_state["completed_jobs"].append(process_job(job))

In [33]:
tmp_path = "/home/jovyan/work/ship/calculate_points/jobs_state_new.json"
with open(tmp_path, 'w') as f:
    json.dump(new_state, f)

In [31]:
new_state["completed_jobs"]

[{'id': '0',
  'kuber_job_id': 'calculate-points-0',
  'wrf_params': {'MP_PHYSICS': 18,
   'RA_LW_PHYSICS': 4,
   'RA_SW_PHYSICS': 5,
   'RADT': 6,
   'SF_SFCLAY_PHYSICS': 1,
   'SF_SURFACE_PHYSICS': 2,
   'SF_URBAN_PHYSICS': 1,
   'BL_PBL_PHYSICS': 7,
   'CU_PHYSICS': 16,
   'DIFF_6TH_OPT': 2,
   'DIFF_OPT': 2,
   'KM_OPT': 4,
   'LAT': '66.00',
   'LON': '59.00',
   'E_WE': 100,
   'E_SN': 100,
   'DX': 6000,
   'TIME_STEP': 30,
   'START_DATE': '2020-07-15_00:00:00',
   'END_DATE': '2020-07-22_00:00:00',
   'INTERVAL_SECONDS': '10800'},
  'local_dir_with_results': '/home/jovyan/work/ship/calculate_points/results/0',
  'job_dir_with_results': '/share/calculate_points/results/0',
  'score': None,
  'error_msg': None,
  'score_result_3h': {'value': 1.6515771790171356,
   'points_count_by_path': [0, 0, 0],
   'dropped_points_count_by_path': [0, 0, 2],
   'total_used_meteostations': [(60.883333, 64.283333),
    (52.5625, 66.258333),
    (57.135, 65.126111),
    (65.05, 63.933333),
    (5

In [34]:
with open(os.path.join("/home/jovyan/work/ship/calculate_points", "jobs_state_new.json")) as f:
    state = json.load(f)
    completed_jobs = state["completed_jobs"]

In [39]:
used_meteostations_coords = set()

job = completed_jobs[1702]

def process_coords(field_name):
    for meteo_coords in job[field_name]['total_used_meteostations']:
        used_meteostations_coords.add((meteo_coords[0], meteo_coords[1]))

process_coords('score_result_3h')

process_coords('score_result_1d')

process_coords('score_result_3d')

process_coords('score_result_7d')

In [42]:
sorted(used_meteostations_coords) # long, lat

[(33.75, 53.533333),
 (33.766667, 52.583333),
 (34.016667, 54.416667),
 (34.176399, 53.214199),
 (34.316667, 53.25),
 (34.4, 55.166667),
 (34.55, 57.533333),
 (34.916667, 56.483333),
 (35.033333, 55.516667),
 (35.35, 54.1),
 (35.866667, 56.883333),
 (36.0, 52.933333),
 (36.3, 52.316667),
 (36.366669, 54.549999),
 (36.483333, 55.016667),
 (36.533333, 53.383333),
 (36.7, 55.383333),
 (36.75, 56.35),
 (36.816667, 55.9),
 (37.261501, 55.591499),
 (37.4146, 55.972599),
 (37.507198, 55.5117),
 (37.566667, 57.333333),
 (37.6, 52.433333),
 (37.616667, 54.233333),
 (37.616667, 55.833333),
 (37.9063, 55.408798),
 (38.116667, 53.15),
 (38.133333, 54.0),
 (38.15, 54.833333),
 (38.150002, 55.553299),
 (38.3, 57.5),
 (38.516667, 52.633333),
 (38.683333, 55.766667),
 (39.25, 53.783333),
 (39.416667, 57.2),
 (39.516667, 52.7),
 (39.5378, 52.702801),
 (39.7, 54.633333),
 (40.116667, 53.716667),
 (40.157398, 57.560699),
 (40.35, 56.116667),
 (40.483333, 52.883333),
 (40.65, 55.6),
 (40.966667, 56.95),
 

In [43]:
dd = pandas.read_csv("/home/jovyan/work/ship/calculate_points/last_prepared_noaa_data_morozyto.csv")

In [52]:
def g(coords):
    filtered_dd = dd[(dd["longitude"] == coords[0]) & (dd["latitude"] == coords[1])]
    station_ids = filtered_dd["station_id"].unique()
    assert(len(station_ids) == 1)
    return station_ids[0]

station_ids = [g(coords) for coords in used_meteostations_coords]

In [54]:
print(station_ids)

[34189, 34192, 49432, 49501, 49527, 34191, 49513, 49484, 49498, 49486, 49502, 34182, 34204, 49489, 49503, 49528, 49422, 49427, 49471, 49419, 49488, 49415, 49526, 49534, 49549, 49536, 49487, 34193, 49511, 49537, 49535, 49463, 49496, 34214, 49510, 34198, 49515, 49525, 49466, 49430, 34190, 49429, 49524, 49533, 49462, 49539, 49473, 49461, 49547, 49497]
