In [None]:
# KNN с рандомной вырезкой
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from tqdm import tqdm
from sklearn.impute import KNNImputer
import random

'''
Функции
'''

def try_parsing_date(text):
    for fmt in ('%Y-%m-%d %H:%M:%S', '%Y-%m-%d'):
        try:
            return pd.to_datetime(text, format=fmt)
        except ValueError:
            pass
    raise ValueError('no valid date format found')

def fill_with_knn(chunk, k=3):
    filled_chunk = chunk.copy()
    filled_chunk['Original'] = ~filled_chunk['P_l'].isnull()

    data = []
    for idx, row in tqdm(filled_chunk.iterrows(), total=filled_chunk.shape[0], desc="Filling data"):
        values_from_previous_years = [filled_chunk.loc[idx - pd.DateOffset(years=i), 'P_l'] if (idx - pd.DateOffset(years=i)) in filled_chunk.index else np.nan for i in range(1, 4)]
        data.append(values_from_previous_years + [row['P_l']])

    imputer = KNNImputer(n_neighbors=k)
    filled_data_array = imputer.fit_transform(data)

    for i in tqdm(range(filled_chunk.shape[0]), desc="Applying KNN imputation"):
        filled_chunk.iloc[i, filled_chunk.columns.get_loc('P_l')] = filled_data_array[i, -1]

    return filled_chunk

def calculate_metrics(df_orig, df_imputed, drop_index):
    df_orig = df_orig.reset_index()
    df_imputed = df_imputed.reset_index()

    y_true = df_orig[df_orig['time'].isin(drop_index)]['P_l']
    y_pred = df_imputed[df_imputed['time'].isin(drop_index)]['P_l']

    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    r2 = r2_score(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    wmape = np.sum(np.abs(y_true - y_pred)) / np.sum(np.abs(y_true)) * 100

    return rmse, r2, mae, mape, wmape

'''
Расчёты
'''

# Загрузка данных
df = pd.read_csv('/content/filled_P_l.csv')

df['time'] = df['time'].apply(try_parsing_date)
df.set_index('time', inplace=True)
df = df.asfreq('5T')
df_without_nan = df.dropna()

# Установка процента пропущенных значений
percentage_of_missing_values = 10  # например, 10%
total_values = len(df)
num_values_to_remove = int(total_values * percentage_of_missing_values / 100)
drop_index = random.sample(df_without_nan.index.tolist(), num_values_to_remove)

# Создание копии DataFrame для тестирования
df_test = df.copy()
df_test.loc[drop_index, 'P_l'] = np.nan
df_test = fill_with_knn(df_test)

# Использование функции calculate_metrics
rmse, r2, mae, mape, wmape = calculate_metrics(df, df_test, drop_index)
print(f'RMSE = {rmse}')
print(f'R-squared = {r2}')
print(f'MAE = {mae}')
print(f'MAPE = {mape}')
print(f'WMAPE = {wmape}')

# Визуализация
plt.figure(figsize=(14, 7))
df_test['P_l'].plot(title="Dataset with Filled Values using KNN")
plt.xlabel("Time")
plt.ylabel("P_l Value")
plt.show()

# Сохранение результатов
df_test.to_csv('/content/imputed_filled_P_l_KNN.csv')


In [None]:
# KNN С продолжительной вырезкой

# Continuous data pruning
import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression
from datetime import datetime
import matplotlib.pyplot as plt
from matplotlib.dates import date2num
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from tqdm import tqdm
from sklearn.impute import KNNImputer
import random

format_string = '%Y-%m-%d %H:%M:%S%z'

def create_error_rate_data(initial_df, percent_gaps):
    df_orig = initial_df.copy()
    df_comparative = df_orig.dropna()
    df_test = initial_df.copy()
    df_for_search_indexes = df_orig.dropna()
    len_deleted = int(len(df_for_search_indexes) * (percent_gaps/100))
    start_index_delete_random = random.randint(0, len(df_for_search_indexes) - len_deleted)
    long_random_drop_indexes_list = list(range(start_index_delete_random, start_index_delete_random + len_deleted))
    date = df_for_search_indexes.loc[df_for_search_indexes.index.isin(long_random_drop_indexes_list), 'time'].tolist()
    drop_indexes = df_test[df_test['time'].isin(date)].index.tolist()
    df_test.loc[df_test['time'].isin(date), 'P_l'] = np.nan
    return df_orig, df_test, drop_indexes

def fill_with_knn(chunk, k=3):
    filled_chunk = chunk.copy()
    filled_chunk['Original'] = ~filled_chunk['P_l'].isnull()

    data = []
    for idx, row in tqdm(filled_chunk.iterrows(), total=filled_chunk.shape[0], desc="Filling data"):
        values_from_previous_years = [filled_chunk.loc[idx - pd.DateOffset(years=i)]['P_l'] if pd.to_datetime(idx) - pd.DateOffset(years=i) in filled_chunk.index else np.nan for i in range(1, 4)]
        data.append(values_from_previous_years + [row['P_l']])

    imputer = KNNImputer(n_neighbors=k)
    filled_data_array = imputer.fit_transform(data)

    for i in tqdm(range(filled_chunk.shape[0]), desc="Applying KNN imputation"):
        filled_chunk.iloc[i, filled_chunk.columns.get_loc('P_l')] = filled_data_array[i, -1]

    return filled_chunk

def calculate_metrics(df_orig, df_test, drop_index):
    y_true = df_orig.loc[drop_index, 'P_l']
    y_pred = df_test.loc[drop_index, 'P_l']

    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    y_true_mean = y_true.mean()
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - y_true_mean) ** 2)
    r2 = 1 - (ss_res / ss_tot)
    wmape = np.sum(np.abs(y_true - y_pred)) / np.sum(np.abs(y_true)) * 100

    return rmse, r2, mae, mape, wmape

# Чтение данных
df_with_season = pd.read_csv('/Users/dmitrii/Desktop/PhD/Python/PhD_code_project/Machine-Learning-Techniques-for-Ensuring-the-Health-of-Citizens/notebooks/results/filled_P_l.csv')
df_with_season, df_test_with_season, drop_index_with_season = create_error_rate_data(df_with_season, 30)

# Заполнение пропусков с помощью линейной регрессии
filled_df_with_season = fill_with_knn(df_test_with_season)

# Расчет метрик
rmse, r2, mae, mape, wmape = calculate_metrics(df_with_season, filled_df_with_season, drop_index_with_season)

# Вывод результатов
print(f'RMSE = {rmse}')
print(f'R-squared = {r2}')
print(f'MAE = {mae}')
print(f'MAPE = {mape}')
print(f'WMAPE = {wmape}')

# Визуализация результатов
plt.figure(figsize=(14, 7))
plt.plot(date2num(filled_df_with_season['time']), filled_df_with_season['P_l'], linewidth=0.1)
plt.title("Dataset with Filled Values using Linear Regression. Count gaps = 30%")
plt.xlabel("Time")
plt.ylabel("P_l Value")
plt.show()
# Сохранение результатов
df_test.to_csv('/content/imputed_filled_P_l_KNN.csv')

In [None]:
# KNN with season and CONTINUOUS DATA PRUNING. Test data is defined
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import holidays
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

def create_error_rate_data(initial_df, percent_gaps):
    df_orig = initial_df.copy()
    df_test = initial_df.copy()
    len_deleted = int(len(df_orig.dropna()) * (percent_gaps / 100))
    start_index_delete_random = np.random.randint(0, len(df_orig.dropna()) - len_deleted)
    long_random_drop_indexes_list = list(range(start_index_delete_random, start_index_delete_random + len_deleted))

    date = df_orig.dropna().index[long_random_drop_indexes_list]
    drop_indexes = df_test.index[df_test.index.isin(date)].tolist()
    df_test.loc[df_test.index.isin(date), 'P_l'] = np.nan

    return df_orig, df_test, drop_indexes

def add_additional_features(df):
    df.loc[:, 'year'] = df.index.year
    df.loc[:, 'week'] = df.index.isocalendar().week
    df.loc[:, 'day_of_week'] = df.index.dayofweek
    df.loc[:, 'hour'] = df.index.hour
    df.loc[:, 'minute'] = df.index.minute
    df.loc[:, 'hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df.loc[:, 'hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    df.loc[:, 'day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
    df.loc[:, 'day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
    df.loc[:, 'week_sin'] = np.sin(2 * np.pi * df['week'] / 52)
    df.loc[:, 'week_cos'] = np.cos(2 * np.pi * df['week'] / 52)
    it_holidays = holidays.Italy(years=df['year'].unique())
    df.loc[:, 'is_holiday'] = pd.Series(df.index.date).isin(it_holidays).astype(int).values
    return df

def fill_gaps_with_knn(df, n_neighbors=5):
    features = df[['year', 'week', 'day_of_week', 'hour', 'minute',
                   'hour_sin', 'hour_cos', 'day_of_week_sin', 'day_of_week_cos',
                   'week_sin', 'week_cos', 'is_holiday', 'P_l']]
    scaler = MinMaxScaler()
    features_scaled = scaler.fit_transform(features)

    imputer = KNNImputer(n_neighbors=n_neighbors)
    features_filled = imputer.fit_transform(features_scaled)
    df_filled = pd.DataFrame(scaler.inverse_transform(features_filled), columns=features.columns, index=df.index)

    return df_filled

def calculate_mape_improved(original_series, predicted_series):
    valid_mask = (original_series != 0) & ~original_series.isna() & ~predicted_series.isna()
    original_values = original_series[valid_mask]
    predicted_values = predicted_series[valid_mask]

    if len(original_values) > 0:
        mape = np.mean(np.abs((original_values - predicted_values) / original_values)) * 100
        return mape
    else:
        return None

# Чтение данных
df_with_season = pd.read_csv('/content/filled_P_l.csv', parse_dates=['time'], index_col='time')

# Фильтрация датасета для создания тестовой выборки, начиная с 2022-09-07
test_start_date = '2022-09-07'
df_test_period = df_with_season.loc[test_start_date:]

# Применение функций к фильтрованной тестовой выборке
df_test_period = add_additional_features(df_test_period)
_, df_test_with_gaps, drop_indexes = create_error_rate_data(df_test_period, 10)
filled_df_test_period = fill_gaps_with_knn(df_test_with_gaps, n_neighbors=5)

# Расчет MAPE
mape_improved = calculate_mape_improved(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
)

print(f'Улучшенный MAPE = {mape_improved}%')

# Расчет RMSE
rmse = np.sqrt(mean_squared_error(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
))

# Расчет R^2
r2 = r2_score(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
)

# Расчет MAE
mae = mean_absolute_error(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
)

print(f'RMSE = {rmse}')
print(f'R^2 = {r2}')
print(f'MAE = {mae}')

# Визуализация результатов
plt.figure(figsize=(14, 7))

# Отображение исходных данных (только для тестового периода)
df_test_period['P_l'].plot(label='Original Data', alpha=0.7)

# Отображение данных с пропусками (только для тестового периода)
df_test_with_gaps['P_l'].plot(label='Data with Gaps', linestyle='--', alpha=0.7)

# Отображение данных после заполнения пропусков (только для тестового периода)
filled_df_test_period['P_l'].plot(label='Filled Data with KNN', alpha=0.7)

plt.title('Comparison of Original, Data with Gaps, and Filled Data (Test Period Only)')
plt.xlabel('Time')
plt.ylabel('P_l Value')
plt.legend()
plt.show()

In [None]:
# KNN with season and Random DATA PRUNING. Test data is defined
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import holidays
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import random

def create_error_rate_data(initial_df, percent_gaps):
    df_orig = initial_df.copy()
    df_test = initial_df.copy()
    # Вычисляем общее количество элементов для удаления
    total_elements = len(df_orig.dropna())
    num_elements_to_delete = int(total_elements * (percent_gaps / 100))

    # Выбираем случайные индексы для удаления
    random_drop_indexes = random.sample(range(total_elements), num_elements_to_delete)

    # Преобразуем индексы в соответствующие метки индекса DataFrame
    drop_indexes = df_orig.dropna().iloc[random_drop_indexes].index

    # Устанавливаем NaN в выбранных случайных местах
    df_test.loc[drop_indexes, 'P_l'] = np.nan

    return df_orig, df_test, drop_indexes


def add_additional_features(df):
    df.loc[:, 'year'] = df.index.year
    df.loc[:, 'week'] = df.index.isocalendar().week
    df.loc[:, 'day_of_week'] = df.index.dayofweek
    df.loc[:, 'hour'] = df.index.hour
    df.loc[:, 'minute'] = df.index.minute
    df.loc[:, 'hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
    df.loc[:, 'hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
    df.loc[:, 'day_of_week_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)
    df.loc[:, 'day_of_week_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)
    df.loc[:, 'week_sin'] = np.sin(2 * np.pi * df['week'] / 52)
    df.loc[:, 'week_cos'] = np.cos(2 * np.pi * df['week'] / 52)
    it_holidays = holidays.Italy(years=df['year'].unique())
    df.loc[:, 'is_holiday'] = pd.Series(df.index.date).isin(it_holidays).astype(int).values
    return df

def fill_gaps_with_knn(df, n_neighbors=5):
    features = df[['year', 'week', 'day_of_week', 'hour', 'minute',
                   'hour_sin', 'hour_cos', 'day_of_week_sin', 'day_of_week_cos',
                   'week_sin', 'week_cos', 'is_holiday', 'P_l']]
    scaler = MinMaxScaler()
    features_scaled = scaler.fit_transform(features)

    imputer = KNNImputer(n_neighbors=n_neighbors)
    features_filled = imputer.fit_transform(features_scaled)
    df_filled = pd.DataFrame(scaler.inverse_transform(features_filled), columns=features.columns, index=df.index)

    return df_filled

def calculate_mape_improved(original_series, predicted_series):
    valid_mask = (original_series != 0) & ~original_series.isna() & ~predicted_series.isna()
    original_values = original_series[valid_mask]
    predicted_values = predicted_series[valid_mask]

    if len(original_values) > 0:
        mape = np.mean(np.abs((original_values - predicted_values) / original_values)) * 100
        return mape
    else:
        return None

# Чтение данных
df_with_season = pd.read_csv('/content/filled_P_l.csv', parse_dates=['time'], index_col='time')

# Фильтрация датасета для создания тестовой выборки, начиная с 2022-09-07
test_start_date = '2022-09-07'
df_test_period = df_with_season.loc[test_start_date:]

# Применение функций к фильтрованной тестовой выборке
df_test_period = add_additional_features(df_test_period)
_, df_test_with_gaps, drop_indexes = create_error_rate_data(df_test_period, 10)

# Add this print statement to check the shape of df_test_with_gaps
print("Shape of df_test_with_gaps:", df_test_with_gaps.shape)

filled_df_test_period = fill_gaps_with_knn(df_test_with_gaps, n_neighbors=5)


# Расчет MAPE
mape_improved = calculate_mape_improved(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
)

print(f'Улучшенный MAPE = {mape_improved}%')

# Расчет RMSE
rmse = np.sqrt(mean_squared_error(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
))

# Расчет R^2
r2 = r2_score(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
)

# Расчет MAE
mae = mean_absolute_error(
    df_test_period.loc[drop_indexes, 'P_l'],  # Используйте df_test_period для исходных значений
    filled_df_test_period.loc[drop_indexes, 'P_l']  # Используйте filled_df_test_period для заполненных значений
)

print(f'RMSE = {rmse}')
print(f'R^2 = {r2}')
print(f'MAE = {mae}')

# Визуализация результатов
plt.figure(figsize=(14, 7))

# Отображение исходных данных (только для тестового периода) - синим цветом
df_test_period['P_l'].plot(label='Original Data', alpha=0.7, color='blue')

# Отображение данных с пропусками (только для тестового периода)
df_test_with_gaps['P_l'].plot(label='Data with Gaps', linestyle='--', alpha=0.7)

# Отображение данных после заполнения пропусков (только для тестового периода) - оранжевым цветом
filled_df_test_period['P_l'].plot(label='Filled Data with KNN', alpha=0.7, color='red')

plt.title('Comparison of Original, Data with Gaps, and Filled Data (Test Period Only)')
plt.xlabel('Time')
plt.ylabel('P_l Value')
plt.legend()
plt.show()