In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark import SparkConf
from hyperopt import fmin, tpe, hp, STATUS_OK, SparkTrials, STATUS_FAIL
from hyperopt.pyll.base import scope
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout, Bidirectional
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score
from attention import Attention
import tensorflow as tf
from sklearn.model_selection import TimeSeriesSplit

conf = SparkConf().setAppName("Hyperopt with Spark") \
                  .setMaster("local[*]") \
                  .set("spark.driver.memory", "8g") \
                  .set("spark.executor.memory", "8g") \
                  .set("spark.executor.cores", "4")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [None]:
# Function to read data from a file
def read_data(file_location):
    df = pd.read_csv(file_location)
    return df

In [None]:
def df_one_day(pandas_df_to_plot,year_to_plot,month_to_plot, day_to_plot):
    p_one_day = pandas_df_to_plot[(pandas_df_to_plot.index.day == day_to_plot) & (pandas_df_to_plot.index.year == year_to_plot) & (pandas_df_to_plot.index.month == month_to_plot)]
    return p_one_day

In [None]:
def df_one_month(pandas_df_to_plot,year_to_plot,month_to_plot, day_to_plot):
    p_one_day = pandas_df_to_plot[(pandas_df_to_plot.index.day == day_to_plot) & (pandas_df_to_plot.index.year == year_to_plot) & (pandas_df_to_plot.index.month == month_to_plot)]
    return p_one_day

In [None]:
def plot_one_day(pandas_df_to_plot,year_to_plot,month_to_plot, day_to_plot):
    p_to_plot = df_one_day(pandas_df_to_plot,year_to_plot,month_to_plot, day_to_plot)
    plt.rcParams["figure.figsize"] = (40,3)
    plt.plot(p_to_plot)
    plt.show()
    print("Plotting: year = ", year_to_plot, " month = ", month_to_plot, " day = ", day_to_plot)

In [None]:
def plot_one_month(pandas_df_to_plot,year_to_plot,month_to_plot):
    days_from_month = pandas_df_to_plot[(pandas_df_to_plot.index.year == year_to_plot) & (pandas_df_to_plot.index.month == month_to_plot)].index.day.unique().values
    for day in days_from_month:
        plot_one_day(pandas_df_to_plot, year_to_plot,month_to_plot, day)

In [None]:
def print_max_freq_month(df):
    door_open_monthly = df.resample('M').sum()
    max_month_year = door_open_monthly[door_open_monthly.value == door_open_monthly.value.max()].index.year[0]
    max_month_month = door_open_monthly[door_open_monthly.value == door_open_monthly.value.max()].index.month[0]
    print('Year with max door opening:', max_month_year)
    print('Month with max door opening:', max_month_month)
    plot_one_month(df,max_month_year,max_month_month)

In [None]:
def get_all_months(df, year):
    return df[(df.index.year == year)].index.month.unique().values

In [None]:
def get_all_years(df):
    return df.index.year.unique().values

In [None]:
def plot_one_year(df, year):
    all_months = get_all_months(df, year)
    for month in all_months:
        plot_one_month(df, year, month)

In [None]:
def show_all_years(df):
    all_years = get_all_years(df)
    for year in all_years:
        plot_one_year(df, year)

In [None]:
# Function to preprocess and clean data
def preprocess_data_with_splits(df, aggregation='H', ws=24, number_of_predicted_days=2):

    df.dropna(inplace=True)

    df.drop_duplicates(subset=['source_ts'], inplace=True)

    datetime_series = pd.to_datetime(df['source_ts'])
    datetime_index = pd.DatetimeIndex(datetime_series.values)
    df=df.set_index(datetime_index)
    df.drop('source_ts',axis=1,inplace=True)

    df=df.asfreq(freq='S', method='ffill')

    lastDay = df.index[-1].strftime('%Y-%m-%d')
    df = df.loc[:lastDay].iloc[:-1 , :]
    df

    prediction_in_future_time = ws * number_of_predicted_days
    
    df_resampled = df.resample(aggregation).sum()
    df_resampled
    
    df = df_resampled
    n_splits = 4
    test_size = 48
    total_len = len ( df )
    fold_size = (total_len - test_size) // n_splits
    tscv = TimeSeriesSplit ( n_splits = n_splits)
    splits = []
    for train_index, test_index in tscv.split(df):
        test_indices = np.arange(test_index[0], test_index[0] + test_size)
        train_indices = np.arange(0, test_indices[0])
        splits.append((train_indices[0], train_indices[-1], test_indices[0], test_indices[-1]))

    return df_resampled, splits, n_splits

In [None]:
def preprocess_data_simple(df, aggregation='H', ws=24, number_of_predicted_days=2):

    df.dropna(inplace=True)

    df.drop_duplicates(subset=['source_ts'], inplace=True)

    datetime_series = pd.to_datetime(df['source_ts'])
    datetime_index = pd.DatetimeIndex(datetime_series.values)
    df=df.set_index(datetime_index)
    df.drop('source_ts',axis=1,inplace=True)

    df=df.asfreq(freq='S', method='ffill')

    lastDay = df.index[-1].strftime('%Y-%m-%d')
    df = df.loc[:lastDay].iloc[:-1 , :]
    df

    prediction_in_future_time = ws * number_of_predicted_days
    
    df_resampled = df.resample(aggregation).sum()
    df_resampled
    
    year_to_plot = df.index[-1].year
    month_to_plot = df.index[-1].month
    day_to_plot = df.index[-1].day

    plot_one_day(df, year_to_plot, month_to_plot, day_to_plot)

    plot_one_day(df_resampled, year_to_plot, month_to_plot, day_to_plot)

    df = df_resampled

    index_of_start_prediction = 0 - prediction_in_future_time
    day_of_start_prediction = df.index[index_of_start_prediction].strftime('%Y-%m-%d')
    index_of_end_train = index_of_start_prediction - ws
    day_of_end_train = df.index[index_of_end_train].strftime('%Y-%m-%d')

    TRAIN_END = day_of_end_train
    TEST_START = day_of_start_prediction

    training_set_df = df.loc[:TRAIN_END]

    test_set_df = df.loc[TEST_START:]
    
    return df_resampled, TRAIN_END, TEST_START, training_set_df, test_set_df

In [None]:
# Function to make predictions using the trained model
def make_predictions(model, batch_one, prediction_in_future_time):
    prediction_test = []
    batch_new = batch_one.reshape((1, ws, 1))
    
    for _ in range(prediction_in_future_time):
        first_pred = model.predict(batch_new)[0]
        prediction_test.append(first_pred)
        batch_new = np.append(batch_new[:, 1:, :], [[first_pred]], axis=1)

    return np.array(prediction_test)


In [None]:
# Function to evaluate the model
def evaluate_model(test_set, predictions):
    rmse = np.sqrt(mean_squared_error(test_set, predictions))
    rsquare = r2_score(test_set, predictions)
    return rmse, rsquare

In [None]:
# Function to plot the actual vs predicted values
def plot_results(test_set, predictions):
    plt.plot(test_set, color='green', label='Actual value')
    plt.plot(predictions, color='orange', label='Predicted value')
    plt.legend()
    plt.show()

In [None]:
# Function to create input sequences for training the LSTM model
def create_input_sequences(data, ws):
    x_train, y_train = [], []

    for i in range(ws, len(data)):
        x_train.append(data[i-ws:i, 0:1])
        y_train.append(data[i, 0])

    return np.array(x_train), np.array(y_train)

In [None]:
# Objective function for Hyperopt
def objective(params):
    try:
        units = int(params['units'])
        num_layers = int(params['num_layers'])
        dropout_rate = params['dropout_rate']
        epochs = int(params['epochs'])
        batch_size = int(params['batch_size'])
        attention_units = int(params['attention_units'])

        results = []
        
        for (dataset, nbDays) in datasets:
            df = read_data(dataset)
    
            if(nbDays < 182):
                
                df_resampled, TRAIN_END, TEST_START, training_set_df, test_set_df = preprocess_data_simple(df)
                training_set = training_set_df.values
                test_set = test_set_df.values

                sc = MinMaxScaler(feature_range=(0, 1))
                training_set_scaled = sc.fit_transform(training_set)
                test_set_scaled = sc.fit_transform(test_set)
                
                x_train, y_train = create_input_sequences(training_set_scaled, ws)
                shape = x_train.shape[1]
                
                model = create_model(units, num_layers, dropout_rate, shape, attention_units)
                model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size, verbose=1)

                batch_one = test_set_scaled[-ws:]
                prediction_in_future_time = 2*ws
                predictions = make_predictions(model, batch_one, prediction_in_future_time)

                predictions = sc.inverse_transform(predictions)

                rmse, rsquare = evaluate_model(test_set, predictions)
                log_message = (f"For units={units}, num_layers={num_layers}, dropout_rate={dropout_rate}, epochs={epochs}, batch_size={batch_size}, attention_units={attention_units}\n DATASET: {dataset}\n RMSE: {rmse}\n R-squared: {rsquare}\n")
        
                with open(log_file_path, 'a') as f:
                    f.write(log_message)
                results.append(rmse)
                
            else:
                df_resampled, splits, n_splits = preprocess_data_with_splits(df)

                rmse_values = []
                r_squared_values = []
                for split in range(n_splits):
                    train_start, train_end, test_start, test_end = splits[split]
                    training_set = df_resampled[train_start:train_end + 1].values
                    test_set = df_resampled[test_start:test_end + 1].values
                    sc = MinMaxScaler(feature_range=(0,1))
                    training_set_scaled = sc.fit_transform(training_set)
                    x_train, y_train = create_input_sequences(training_set_scaled, ws)
                    shape = x_train.shape[1]
                    model = create_model(units, num_layers, dropout_rate, shape, attention_units)
                    model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size, verbose=1)

                    batch_one = training_set_scaled[-ws:]
                    prediction_in_future_time = test_end - test_start + 1
                    prediction = make_predictions(model, batch_one, prediction_in_future_time)

                    prediction = sc.inverse_transform(prediction)
                    rmse, rsquare = evaluate_model(test_set, prediction)
                    rmse_values.append(rmse)
                    r_squared_values.append(rsquare)
        
                log_message = (f"For units={units}, num_layers={num_layers}, dropout_rate={dropout_rate}, epochs={epochs}, batch_size={batch_size}, attention_units={attention_units}\n DATASET: {dataset} \n average RMSE: {np.mean(rmse_values)}\n average R-squared: {np.mean(r_squared_values)}\n")

                with open(log_file_path, 'a') as f:
                    f.write(log_message)
                    
                results.append(np.mean(rmse_values))
        
        return {'loss': (sum(results)/len(results)), 'status': STATUS_OK}
    except Exception as e:
        print(f"Exception in objective function: {e}")
        return {'loss': float('inf'), 'status': STATUS_FAIL}

In [None]:
# Define the search space
search_space = {
    'units': scope.int(hp.choice('units', [32, 64, 128])),
    'num_layers': scope.int(hp.choice('num_layers', [3, 4, 5, 6, 10])),
    'dropout_rate': hp.choice('dropout_rate', [0.1,0.2,0.3,0.5]),
    'attention_units': scope.int(hp.choice('attention_units', [8,16,32,64])),
    'epochs': scope.int(hp.choice('epochs', [15, 30, 60])),
    'batch_size': scope.int(hp.choice('batch_size', [16, 32, 64]))
}

datasets = [
    ('./testData/Dataset1.csv',121),
    ('./testData/Dataset2.csv',121),
    ('./testData/Dataset3.csv',61),
    ('./testData/Dataset4.csv',366),
    ('./testData/Dataset5.csv',361),
    ('./testData/Dataset6.csv',361),
    ('./testData/new/Dataset7.csv',31),
    ('./testData/new/Dataset8.csv', 361),
    ('./testData/new/Dataset9.csv',61),
    ('./testData/new/Dataset10.csv',721),
    ('./testData/new/Dataset11.csv',91),
    ('./testData/new/Dataset12.csv',359)
]

In [None]:
# Function to create an LSTM model
def create_model(units, num_layers, dropout_rate, shape, attention_units):
    model = Sequential()
    model.add(Bidirectional(LSTM(units=units, return_sequences=True), input_shape=(shape, 1)))
    model.add(Dropout(dropout_rate))
    
    for _ in range(num_layers - 2):
        model.add(Bidirectional(LSTM(units, return_sequences=True)))
        model.add(Dropout(dropout_rate))
    
    model.add(Bidirectional(LSTM(units, return_sequences=True)))
    model.add(Attention(attention_units))
    model.add(Dense(units=1))
    model.compile(optimizer='adam', loss='mean_squared_error')
    
    return model

In [None]:
# Main function
def main():

    spark_trials = SparkTrials(parallelism=4)
    best_params = fmin(
        fn=objective, 
        space=search_space, 
        algo=tpe.suggest, 
        max_evals=15,
        trials = spark_trials)
    
    best_model = sorted(trials.results, key=lambda result: result["loss"])[0]["loss"]
    print("Best parameters: ", best_params)
    
    print("Best model: ", best_model)
    

In [None]:
ws = 24
log_file_path = "Results.txt"
main()