In [1]:
import os
import pandas as pd
import data_preprocessing.features_generations.ts_lag_features_generator as lag_gen
import data_preprocessing.features_generations.ts_date_features_generator as date_gen
import data_preprocessing.aqi_calculations.aqi_calculator as aqc
import warnings
import time
warnings.filterwarnings('ignore')

In [2]:
CSV_EDA_CLEANUP_RES_PATH = '../../datasources/eda_cleanup_res/'
CSV_AQI_ENRICH_PATH = '../../datasources/aqi_enrich/'
POL_CODES = [7, 6001, 5, 8]
POL_NAMES = {7: "O3", 6001: "PM25", 5: "PM10", 8: "NO2"}
POL_NAMES_REVERSE = {"O3": 7, "PM25": 6001, "PM10": 5, "NO2": 8}
POL_MEASURES = {7: "µg/m3", 6001: "µg/m3", 5: "µg/m3", 10: "mg/m3", 1: "µg/m3", 8: "µg/m3"}

In [13]:
FORECAST_DURATION_DAYS = 7
TS_DATE_FROM = '2015-01-01'
TS_DATE_END = '2023-02-12'

#from datetime import timedelta, datetime
#FC_DATE_END = (datetime.strptime(TS_DATE_END, '%Y-%m-%d').date() + timedelta(days=FORECAST_DURATION_DAYS)).strftime('%Y-%m-%d')

CONCENTRATION_AGGREGATES = ['mean', 'median', 'min', 'max']
CONCENTRATION_AGGREGATES_FOR_LAGS = ['mean', 'median']
CONCENTRATION_AGGREGATES = ['mean']
CONCENTRATION_AGGREGATES_FOR_LAGS = []

In [4]:
def save_calc(df: pd.DataFrame, file_name: str):
	file_path = os.path.join(CSV_AQI_ENRICH_PATH, file_name)
	df.to_csv(file_path)

In [7]:
def timeit(show_args):
    def timeit_func(func):
        def timeit_wrapper(*args, **kwargs):
            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            end_time = time.perf_counter()
            total_time = end_time - start_time
            if show_args:
                print(f'Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds')
            else:
                print(f'Function {func.__name__} Took {total_time:.4f} seconds')
            return result
        return timeit_wrapper
    return timeit_func

## Расчет по дням: индексов качества воздуха, статистик концентраций, объединение данных

In [8]:
def read_dataframe_for_pollutant(pollutant_id: int):
    df_p = pd.read_csv(os.path.join(CSV_EDA_CLEANUP_RES_PATH, f'{pollutant_id}.csv'), parse_dates=True, index_col='DatetimeEnd')
    #df_p.drop(columns=['Unnamed: 0'], inplace=True)
    return df_p

def merge_column_by_index(pollutant_id: int, df_gen: pd.DataFrame, df_to_merge: pd.DataFrame, source_column: str, new_column=None) -> pd.DataFrame:
    if new_column is None:
        new_column=source_column

    df_gen = df_gen.merge(df_to_merge[source_column], left_index=True, right_index=True)
    df_gen = df_gen.rename(columns={source_column: f'{new_column}_{POL_NAMES[pollutant_id]}'})
    return df_gen

def calc_aqi_per_pollutant_and_merge_pollutants(g: pd.DataFrame) -> pd.DataFrame:
        for pollutant_id in POL_CODES:
            df_p = read_dataframe_for_pollutant(pollutant_id)
            measure = POL_MEASURES[pollutant_id]
            g_p = aqc.calc_aqi_for_day_pd(pollutant_id, df_p, measure).tz_localize(None)
            g = merge_column_by_index(pollutant_id, g, g_p, 'AQI')
        return g

def calc_mean_concentration_and_merge_pollutants(g: pd.DataFrame) -> pd.DataFrame:
    for pollutant_id in POL_CODES:
        df_p = read_dataframe_for_pollutant(pollutant_id)
        for method in CONCENTRATION_AGGREGATES:
            g_p = df_p['Concentration'].groupby(pd.Grouper(freq="24H")).agg(method).tz_localize(None).to_frame()
            g = merge_column_by_index(pollutant_id, g, g_p, 'Concentration', f'C_{method if type(method) is str else method.__name__}')
    return g

def calc_aqi_and_mean_concentration_and_merge(ts_date_from, fs_date_end) -> pd.DataFrame:
    g = pd.DataFrame(index=pd.date_range(start=ts_date_from, end=fs_date_end, freq='D', inclusive="both", name='DatetimeEnd'))
    g = calc_aqi_per_pollutant_and_merge_pollutants(g)
    #g['Pollutant'] = g.idxmax(axis=1).apply(lambda x: x[x.index('_') + 1:])
    g['AQI'] = g.max(axis=1)
    g['Pollutant'] = g.idxmax(axis=1).apply(lambda x: POL_NAMES_REVERSE[x[x.index('_') + 1:]])
    g = calc_mean_concentration_and_merge_pollutants(g)
    return g

In [9]:
df_aqi_mean = calc_aqi_and_mean_concentration_and_merge(TS_DATE_FROM, TS_DATE_END)
df_aqi_mean.head()

Unnamed: 0_level_0,AQI_O3,AQI_PM25,AQI_PM10,AQI_NO2,AQI,Pollutant,C_mean_O3,C_mean_PM25,C_mean_PM10,C_mean_NO2
DatetimeEnd,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
2015-01-01,17,128,39,26,128,6001,25.610435,46.612591,42.384783,35.496087
2015-01-02,26,46,20,15,46,6001,47.734348,11.071181,21.348261,22.372083
2015-01-03,22,46,19,38,46,6001,23.824167,10.96964,20.216667,44.20875
2015-01-04,26,32,20,39,39,8,37.75375,7.645373,21.07,37.37125
2015-01-05,4,80,25,32,80,6001,4.281667,26.080304,27.15,53.706522


#### Добавление доп. информации по датам

In [10]:
df_aqi_mean = date_gen.add_date_info(df_aqi_mean)
df_aqi_mean.head()

Unnamed: 0_level_0,AQI_O3,AQI_PM25,AQI_PM10,AQI_NO2,AQI,Pollutant,C_mean_O3,C_mean_PM25,C_mean_PM10,C_mean_NO2,weekday,day,month,year,season,is_weekend,is_new_year
DatetimeEnd,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
2015-01-01,17,128,39,26,128,6001,25.610435,46.612591,42.384783,35.496087,3,1,1,2015,0,0,1
2015-01-02,26,46,20,15,46,6001,47.734348,11.071181,21.348261,22.372083,4,2,1,2015,0,0,0
2015-01-03,22,46,19,38,46,6001,23.824167,10.96964,20.216667,44.20875,5,3,1,2015,0,1,0
2015-01-04,26,32,20,39,39,8,37.75375,7.645373,21.07,37.37125,6,4,1,2015,0,1,0
2015-01-05,4,80,25,32,80,6001,4.281667,26.080304,27.15,53.706522,0,5,1,2015,0,0,0


#### Добавление лагов

In [11]:
def get_all_concentration_and_aqi_columns(df):
    return [x for x in df.columns.values if [p for p in POL_CODES if x.endswith(POL_NAMES[p])]] + ['AQI'] + ['Pollutant']

def get_aqi_columns(df):
    return [x for x in df.columns.values if x.startswith('AQI') and [p for p in POL_CODES if x.endswith(POL_NAMES[p])]] + ['AQI']

def get_concentration_columns_by_method(df, method):
    return [x for x in df.columns.values if x.startswith(f'C_{method}') and [p for p in POL_CODES if x.endswith(POL_NAMES[p])]]

In [14]:
@timeit(show_args=False)
def get_lag_data_shift(df: pd.DataFrame) -> pd.DataFrame:
    method_name = 'get_lag_data_shift'
    print(f'-------------------------------------------')
    print(f'{method_name} started')
    df_c = df.copy(deep=True)
    target_cols = get_all_concentration_and_aqi_columns(df_c)
    lags = [7, 8, 9, 10, 11, 12,  13, 14, 21, 28]

    for column in target_cols:
        for lag in lags:
            df_c[f'{column}_lag{lag}'] = df[column].shift(lag)

    print(f'{method_name} finished')
    return df_c

@timeit(show_args=False)
def get_lag_data_aqi(df: pd.DataFrame) -> pd.DataFrame:
        method_name = 'get_lag_data_aqi'
        print(f'-------------------------------------------')
        print(f'{method_name} started')

        target_cols = get_aqi_columns(df)
        id_cols = []
        date_col = 'DatetimeEnd'
        filters = ['NoFilter', 'weekday', 'month']

        windows = {
            'NoFilter': ['3D', '5D', '7D', '14D', '28D'],
            'weekday':  ['28D', '56D'],
            'month':    ['90D']
        }
        lags = [7, 10, 14, 21, 28]
        agg_methods = ['mean', 'median', lag_gen.percentile(10),  lag_gen.percentile(90)] #, pd.Series.skew, pd.Series.kurtosis]
        ewm_params={
            'NoFilter': [7, 14, 21, 28],
            'weekday': [28, 56],
            'month': [90],
        }
        df['NoFilter'] = 1

        total = len(target_cols) * len(lags) * len(windows) * len(agg_methods) * len(filters)
        print(f'New columns count: {total}')

        df_lagged_features = lag_gen.generate_lagged_features(df
                    , target_cols = target_cols
                    , id_cols = id_cols
                    , date_col = date_col
                    , lags = lags
                    , windows = windows
                    , preagg_methods = ['mean']
                    , agg_methods = agg_methods
                    , dynamic_filters = filters
                    , ewm_params=ewm_params
                    )
        df_lagged_features.set_index(date_col, inplace = True)

        print(f'{method_name} finished')
        return df_lagged_features

@timeit(show_args=False)
def get_lag_data_concentration(df: pd.DataFrame, method) -> pd.DataFrame:
        method_name = 'get_lag_data_concentration'
        print(f'-------------------------------------------')
        print(f'{method_name} started for {method}')

        target_cols = get_concentration_columns_by_method(df, method)
        id_cols = []
        date_col = 'DatetimeEnd'
        filters = ['NoFilter', 'weekday', 'month']

        windows = {
            'NoFilter': ['3D', '5D', '7D', '14D', '28D'],
            'weekday':  ['28D', '42D'],
            'month':    ['7D', '14D', '28D']
        }

        lags = [7, 10, 14, 21, 28]
        agg_methods = [method]
        ewm_params={
            'NoFilter': [7, 14, 21, 28],
            'weekday': [7, 14, 21, 28],
            'month': [7, 14, 21, 28],
        }
        df['NoFilter'] = 1

        total = len(target_cols) * len(lags) * len(windows) * len(agg_methods) * len(filters)
        print(f'New columns count: {total}')

        df_lagged_features = lag_gen.generate_lagged_features(df
                    , target_cols = target_cols
                    , id_cols = id_cols
                    , date_col = date_col
                    , lags = lags
                    , windows = windows
                    , preagg_methods = ['mean']
                    , agg_methods = agg_methods
                    , dynamic_filters = filters
                    , ewm_params=ewm_params
                    )
        df_lagged_features.set_index(date_col, inplace = True)

        print(f'{method_name} finished')
        return df_lagged_features

def get_all_lag_data(df: pd.DataFrame) -> pd.DataFrame:
    g_shift = get_lag_data_shift(df)
    g_shift = get_lag_data_aqi(g_shift)
    for method in CONCENTRATION_AGGREGATES_FOR_LAGS:
        g_shift = get_lag_data_concentration(g_shift, method)
    return g_shift

In [15]:
df_aqi_mean_lags = get_all_lag_data(df_aqi_mean)
save_calc(df_aqi_mean_lags, 'aqi_all_enrich_test.csv')
df_aqi_mean_lags.head()

-------------------------------------------
get_lag_data_shift started
get_lag_data_shift finished
Function get_lag_data_shift Took 0.0057 seconds
-------------------------------------------
get_lag_data_aqi started
New columns count: 120


TypeError: datetime64 type does not support sum operations

In [99]:
import re
df_aqi_mean_lags = df_aqi_mean_lags.rename(columns = lambda x:re.sub('[^A-Za-z0-9_]+', '', x))
save_calc(df_aqi_mean_lags, 'aqi_all_enrich.csv')