In [1]:
# show every output in cell, not one
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [2]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

import utils
import drop_utils
import os

from datetime import datetime, timedelta

import warnings
warnings.simplefilter("ignore", pd.core.common.SettingWithCopyWarning)

In [3]:
# module load
def meteo_from_dir(dir_path):
    meteo = load_meteo_from_dir(dir_path)
    meteo = utils.reduce_memory_usage(meteo)
    meteo["datetime"] = _build_meteo_datetime(meteo)
    meteo = _keep_useful_data(meteo)
    
    return meteo
    
    
def load_meteo_from_dir(dir_path):
    files = _find_csvs(dir_path)
    meteo = _load_and_concat_dfs(files, axis="index")
    return meteo
    
def _find_csvs(dir_path):
    all_files = _get_dir_files(dir_path)
    csv_files = utils.filter_by_substring(all_files, "csv")
    return csv_files


def _get_dir_files(dir_path):
    dir_files = []
    
    for obj_path in os.listdir(dir_path):
        obj_full_path = os.path.join(dir_path, obj_path)
        if os.path.isfile(obj_full_path):
            dir_files.append(obj_full_path)
            
    return dir_files
            
    
def _load_and_concat_dfs(csv_paths, axis):
    dataframes = _load_dfs(csv_paths)
    concated_df = pd.concat(dataframes, axis=axis)
    
    return concated_df

def _load_dfs(paths):
    dataframes = []
    for file_path in paths:
        try:
            df = pd.read_csv(file_path, index_col=0)
            dataframes.append(df)
        except pd.errors.ParserError:
            print(f"error with {file_path} occured")
    
    return dataframes


def _build_meteo_datetime(df):
    datetime_col = _datetime_from_local(df)
    gmt_datetime_col = _meteo_datetime_to_gmt(datetime_col)
    
    return gmt_datetime_col


def _datetime_from_local(df):
    date_parts = df[["localYear", "localMonth", "localDay"]]
    date_parts.rename(columns={"localYear": "year", "localMonth": "month", "localDay": "day"}, inplace=True)
    datetime_col = pd.to_datetime(date_parts) + pd.to_timedelta(df['localTime'], unit="hour")
    
    return datetime_col


def _meteo_datetime_to_gmt(datetime_col):
    msk_time_mask = datetime_col.dt.date < datetime(1993,1,1).date()
    datetime_col[msk_time_mask] -= timedelta(hours=3)
    return datetime_col

def _keep_useful_data(meteo):
    drop_cols = get_meteo_drop_cols(meteo)
    meteo.drop(columns=drop_cols, inplace=True)
    
    return meteo
    
    
def get_meteo_drop_cols(meteo):
    cols = meteo.columns
    
    drop_cols = get_meteo_source_datetime_features()
    drop_cols += utils.filter_by_substring(cols, "Sign")
    drop_cols += utils.filter_by_substring(cols, "Quality")
    drop_cols += drop_utils.filter_cols_min_nan_freq(meteo, 0.2)
    
    return drop_cols


def get_meteo_source_datetime_features():
    datetime_cols = ["year", "month", "day", "time", "localYear", "localMonth", "localDay", 
                 "localTimePeriod", "timePeriodNum", "localTime", "tz", "startMeteoDay"]
    return datetime_cols


In [4]:
# meteo = meteo_from_dir(meteo_dir)

In [5]:
# meteo.head(2)

In [6]:
def preprocess_dataset(df):
    df = process_cloud_cover(df)
    df = wind_direction_to_x_y(df)
    df = drop_cols_unable_to_forecast(df)
    df = df.set_index(["stationNumber", "datetime"])
    return df


def process_cloud_cover(df):
    column = df["cloudCoverTotal"]
    column = column.astype(np.float32)
    column[column == 12] = 9.5 # согласно README, это "10" с просветами
    column[column == 11] = 0.05 # следы облаков
    column[column == 13] = np.nan # облака невозможно определить
    df["cloudCoverTotal"] = column
    
    return df


def wind_direction_to_x_y(df):
    wind_angle_x, wind_value_y = angle_to_x_y(df["windDirection"])
    df.drop(columns="windDirection", inplace=True)
    
    df["windAngleX"] = wind_angle_x
    df["windAngleY"] = wind_value_y
    
    return df


def angle_to_x_y(angles):
    out_x, out_y = np.zeros(len(angles)), np.zeros(len(angles))
    
    not_null_mask = (angles != 0) & (angles != 999)
     
    #working only with values in range (1, 360]
    angles = angles[not_null_mask]
    
    # from classical wind angles to geometry angles
    right_coords_angles = 90 - angles
    right_coords_angles[right_coords_angles < 0] += 360
    
    radians = np.radians(right_coords_angles)
    coses, sines = np.cos(radians), np.sin(radians)
    
    out_x[not_null_mask] = coses
    out_y[not_null_mask] = sines
    
    return out_x, out_y


def drop_cols_unable_to_forecast(df):
    unable_to_forecast_cols = ["pastWeather", "presentWeather", "maximumWindGustSpeed", 
                               "characteristicOfPressureTendency", "HourPressureChange3", 'vapourPressure']
    for col in unable_to_forecast_cols:
        if col in df.columns:
            df.drop(columns=col, inplace=True)
    return df

In [7]:
# meteo = preprocess_dataset(meteo)
# meteo = meteo.set_index(["stationNumber", "datetime"])
# meteo.head(2)

In [8]:
def meteo_add_differential(meteo):
    diff_cols = ["cloudCoverTotal", "windSpeed", "totalAccumulatedPrecipitation", "soilTemperature", "airTemperature", 
                        "relativeHumidity", "pressureReducedToMeanSeaLevel", "windAngleX", "windAngleY"]
    
    diff_values = calc_df_differentials(meteo[diff_cols])
    meteo = pd.concat([meteo, diff_values], axis=1)
    return meteo
    


def calc_df_differentials(df):
    diff_cols = {}
    
    for col in df.columns:
        col_name = str(col) + "_diff"
        diff_val = differential(df[col])
        diff_val[diff_val.isna()] = 0 # nans are at first values
        
        diff_cols[col_name] = diff_val
        
    new_df = pd.DataFrame.from_dict(diff_cols, orient="columns")
    return new_df


def differential(ts, step=1):
    shifted_ts = ts.shift(step)
    return ts - shifted_ts

In [9]:
# meteo = meteo_add_differential(meteo)
# meteo.head(2)

In [10]:
# non_feature_cols = ["stationId"]
# feature_cols = pd.Index([col for col in meteo.columns if not col in non_feature_cols])
# feature_cols

In [11]:
def agg_daily_mean(meteo, feature_cols):
    grouped = groupby_station_date(meteo[feature_cols])
    daily_mean = grouped.agg(np.nanmean) 
    return daily_mean
    
    
def groupby_station_date(df):
    df = df.reset_index()
    df["date"] = df["datetime"].dt.date
    grouped = df.groupby(by=["stationNumber", "date"])
    
    return grouped

# meteo_daily = agg_daily_mean(meteo)
# del meteo
# meteo_daily.head(2)

In [12]:
# meteo_daily.columns

In [13]:
def create_feature_extraction_config(df_columns):
    funcs_and_listcol = []
    funcs_and_listcol.append(get_standard_fe_settings(df_columns))
    funcs_and_listcol.append(get_minimal_fe_settings(df_columns))
        
    return create_config(funcs_and_listcol)

def get_standard_fe_settings(df_columns):
    feature_extraction = [{"func": np.nanmean, "lag": 1, "winsize": 7}, 
                        {"func": np.nanmean, "lag": 7, "winsize": 30}, 
                        {"func": np.nanstd, "lag": 7, "winsize": 30}]
    
    cols = ["cloudCoverTotal", "windSpeed", "totalAccumulatedPrecipitation", "soilTemperature",
                            "airTemperature", "dewpointTemperature", "pressure", "pressureReducedToMeanSeaLevel",
                            "windAngleX", "windAngleY"]
    
    return feature_extraction, cols


def get_minimal_fe_settings(df_columns):
    feature_extraction = [{"func": np.nanmean, "lag": 1, "winsize": 7}]
    
    cols = ["minimumTemperatureAtHeightAndOverPeriodSpecified", "maximumTemperatureOverPeriodSpecified"]
    diff_cols = utils.filter_by_substring(df_columns, "_diff")
    cols += diff_cols
    
    return feature_extraction, cols


def create_config(funcs_and_listcol):
    config = {}
    for funcs, cols in funcs_and_listcol:
        for col in cols:
            config[col] = funcs
            
    return config

In [14]:
# config = create_feature_extraction_config()

In [15]:
def aggregate_with_config(df, config):
    all_features = {}
    
    for col, list_agg_settings in config.items():
        col_features = agg_col_with_settings(df[col], list_agg_settings)
        all_features.update(col_features)
        
    aggregated_df = pd.DataFrame.from_dict(all_features, orient="columns")
    return aggregated_df
        
def agg_col_with_settings(col, list_aggregations):
    agg_features = {}
    for agg_settings in list_aggregations:
        new_name = create_agg_name(col, agg_settings)
        aggregated_col = agg_col(col, agg_settings)
        
        agg_features[new_name] = aggregated_col
        
    return agg_features
        
        
def create_agg_name(col, agg_settings):
    return col.name + f"_{agg_settings['func'].__name__}_{agg_settings['lag']}_{agg_settings['winsize']}"
        
    
def agg_col(col, agg_settings):
    func, winsize, lag = agg_settings["func"], agg_settings["winsize"], agg_settings["lag"]
    col = col.reset_index(level=1, drop=True)
    
    grouped_col = col.groupby(by="stationNumber",)
    aggregated_col = grouped_col.rolling(winsize, min_periods=1).agg(func)
    shifted_col = aggregated_col.shift(lag)
    
    shifted_col.index = old_index
    return shifted_col

In [16]:
# aggregate_with_config(meteo_daily, config)

In [17]:
def load_extract_features_meteo(meteo_dir):
    meteo = load_preprocess_meteo(meteo_dir)
    meteo_daily = get_daily_meteo(meteo)
    aggregated_meteo = agg_meteo(meteo_daily)
    
    meteo_all_features = pd.concat([aggregated_meteo, meteo_daily], axis=1)
    
    return meteo_all_features


def load_preprocess_meteo(meteo_dir):
    meteo = meteo_from_dir(meteo_dir)
    meteo = preprocess_dataset(meteo)
    meteo = meteo_add_differential(meteo)
    
    return meteo

def get_daily_meteo(meteo):
    feature_cols = get_feature_cols(meteo)
    meteo_daily = agg_daily_mean(meteo, feature_cols=feature_cols)
    
    return meteo_daily

def agg_meteo(meteo):
    config = create_feature_extraction_config(meteo.columns)
    aggregated_meteo = aggregate_with_config(meteo, config)
    
    return aggregated_meteo
    

def get_feature_cols(meteo):
    non_feature_cols = ["stationId"]
    feature_cols = pd.Index([col for col in meteo.columns if not col in non_feature_cols])
    
    return feature_cols

In [18]:
root_dir = "../"
data_dir = os.path.join(root_dir, "working_data/")
meteo_dir = root_dir + "datasets/meteo_new"

In [19]:
all_meteo_features = load_extract_features_meteo(meteo_dir)
all_meteo_features.head()

NameError: name 'old_index' is not defined

In [None]:
all_meteo_features["cloudCoverTotal_nanmean_1_7"]

### Forecast

In [None]:
name_forecast_to_dataset = {"Total_cloud_cover_entire_atmosphere_Mixed_intervals_Average": "cloudCoverTotal",
                            'u-component_of_wind_height_above_ground': "windAngleX", 
                            'v-component_of_wind_height_above_ground': "windAngleY",
                            'Wind_speed_gust_surface': "windSpeed", 
                            'Total_precipitation_surface_Mixed_intervals_Accumulation': "totalAccumulatedPrecipitation", 
                            "Temperature_height_above_ground": 'airTemperature', 
                            'Maximum_temperature_height_above_ground_Mixed_intervals_Maximum': 'maximumTemperatureOverPeriodSpecified', 
                            'Minimum_temperature_height_above_ground_Mixed_intervals_Minimum': 'minimumTemperatureAtHeightAndOverPeriodSpecified',
                            'Temperature_surface': 'soilTemperature', 
                            'Relative_humidity_height_above_ground': 'relativeHumidity', 
                            'Pressure_height_above_ground': 'pressure', 
                            'Pressure_reduced_to_MSL_msl': 'pressureReducedToMeanSeaLevel',
                            "Dewpoint_temperature_height_above_ground": "dewpointTemperature"
                           }

required_data = list(name_forecast_to_dataset.keys())

In [21]:
# TODO: распараллелить получение предсказаний, попробовать пробивать по региону

def get_weather_forecast(points: list, required_data):
    url = "http://thredds.ucar.edu/thredds/catalog/grib/NCEP/GFS/Global_0p5deg/catalog.xml"
    point_outputs = []
    
    client = get_latest_dataset_client(url)
    query = build_base_query(client, required_data)
    
    for point in points:
        query = update_query_point(query, point)
        print("get data")#, query)
        t1 = time()
        out = client.get_data(query)
        print("Got data, time:", time() - t1)
        point_outputs.append(out)
    
    return point_outputs


def get_latest_dataset_client(catalog_url):
    gfs_cat = TDSCatalog(catalog_url)
    ncss_client = gfs_cat.latest.subset()
    return ncss_client


def build_base_query(client, required_data):
    query = client.query()
    
    query = query.variables(*required_data)
    query = query.all_times()
    query = query.accept("netCDF4")

    return query


def update_query_point(query, point): #also deletes old point
    query = query.lonlat_point(*point)
    return query

# 

In [22]:
def parse_forecast(forecasted_dataset):
    data = xarray.backends.NetCDF4DataStore(forecasted_dataset)
    
    values = {}
    for var_name in required_data:
        var_value = get_var_values(data, var_name)
        values[var_name] = var_value
        
    values = pd.DataFrame.from_dict(values, orient="columns")
    timestamps = get_timestamps(data)
    values.index = timestamps
    
    values = fill_forecast_nan(values)
    return values
    
    
def get_timestamps(data):
    attrs = data.get_attrs()
    start_time = pd.to_datetime(attrs["time_coverage_start"])
    
    hours_from_start = data.get_variables()["time"]
#     print(hours_from_start)
    time_deltas = timedelta_from_hours(hours_from_start)[:-1]
    
    timestamps = start_time + time_deltas
    
    timestamps.name = "datetime"
    return timestamps


def timedelta_from_hours(hours):
    return pd.to_timedelta(hours.values.flatten(), "h") 


def get_var_values(data, name):
    values = data.get_variables()[name].values
    shape = values.shape
    if len(shape) == 3:
        values = values[:, :, 0]
        
    if len(shape) > 3:
        raise ValueError("В Forecast размерность values > 3")
        
    return values.flatten()[:-1]


def fill_forecast_nan(df):
    df = fill_with_first_notnan(df, df.columns)
    return df


def fill_with_first_notnan(df, cols):
    cols = list(cols)
    
    for col in cols:
        notnan = df.loc[df[col].notnull(), col]
        first_notnan = notnan.iloc[0]
        df.loc[df[col].isna(), col] = first_notnan
    return df

In [23]:
def make_dataset_from_forecast(point_forecast):
    point_forecast = rescale_forecast(point_forecast)
    point_forecast = rename_forecast(point_forecast)
    point_forecast = agg_forecast_features(point_forecast)
    
    return point_forecast
    
    
def rescale_forecast(forecast):
    forecast = rescale_cloud_cover(forecast)
    forecast = rescale_wind_vector(forecast)
    forecast = rescale_temperature(forecast)
    forecast = rescale_pressure(forecast)
    return forecast


def rename_forecast(df):
    new_names = name_forecast_to_dataset # add them to utils??
    df.rename(columns=new_names, inplace=True)
    return df
    


def agg_forecast_features(forecast):
    # сделать аггрегирование, как у обычного dataset'а
    return forecast 
    
def rescale_cloud_cover(df):
    df["Total_cloud_cover_entire_atmosphere_Mixed_intervals_Average"] /= 10 # rescaling from 0-100 to 0-10
    return df


def rescale_wind_vector(df):
    wind_x = df['u-component_of_wind_height_above_ground']
    wind_y = df['v-component_of_wind_height_above_ground']
    vector_module = (wind_x ** 2 + wind_y ** 2)**0.5
    
    wind_x = np.sign(wind_x) * wind_x / vector_module
    wind_y = np.sign(wind_y) * wind_y / vector_module
    
    # if vector_module == 0, wind_x and wind_y are np.nan
    wind_x[vector_module == 0] = 0
    wind_y[vector_module == 0] = 0
    
    
    df['u-component_of_wind_height_above_ground'] = wind_x
    df['v-component_of_wind_height_above_ground'] = wind_y
    
    return df


def rescale_temperature(df):
    temperature_cols = ["Temperature_height_above_ground", "Temperature_surface",
                        "Maximum_temperature_height_above_ground_Mixed_intervals_Maximum",
                        "Minimum_temperature_height_above_ground_Mixed_intervals_Minimum",
                        "Dewpoint_temperature_height_above_ground"
                       ]
    
    df[temperature_cols] -= 273 # temperature there is in absolute form
    return df


def rescale_pressure(df):
    pressure_cols = ["Pressure_height_above_ground", "Pressure_reduced_to_MSL_msl"]
    df[pressure_cols] /= 100
    return df

In [None]:
# TODO: add ability to specify days from forecast

def get_forecast_dataset(forecast_coords):
    forecast = get_weather_forecast(forecast_coords, required_data)
    point_datasets = []
    
    for point in forecast:
        forecast_data = parse_forecast(point)
        point_dataset = make_dataset_from_forecast(forecast_data)
        point_datasets.append(point_dataset)

    
forecast_coords = [(135, 50), (66, 45), (-105, 40)] # переделать в dataframe с парами id: coords
forecast_df = get_forecast_dataset(forecast_coords)