In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import datetime, pytz

from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.metrics import RootMeanSquaredError

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error

## Hyperparameter

In [None]:
SEQ_LEN = 21 # 6, 11, 21
SPLIT = 0.80 # 0.80, 0.90
LSTM_Layer = 3 # 1, 2, 3
WINDOW_SIZE = SEQ_LEN - 1

In [None]:
# Google Spreadsheet ID
SHEET_ID = '1JDNv_mArl-GPIpxuWS5GxgVEwvjXocS1MrXGc6TYs8M'
SHEET_NAME = ['USD/IDR', 'EUR/IDR', 'SGD/IDR']

url = f'https://docs.google.com/spreadsheets/d/{SHEET_ID}/gviz/tq?tqx=out:csv&sheet={SHEET_NAME[2]}'
data = pd.read_csv(url)

# Convert Date columns to datetime format
data['Date'] = pd.to_datetime(data['Date'], format='%d/%m/%Y %H:%M:%S')
data.info()

## Preprocessing

In [None]:
scaler = MinMaxScaler()
close_price = data.Close.values.reshape(-1, 1)
scaled_close = scaler.fit_transform(close_price)

In [None]:
print("----------- Normalize Data Shape -----------")
print(scaled_close.shape)
print("\n----------- Normalize Data -----------")
print(scaled_close)

In [None]:
def to_sequences(data, seq_len):
    """
    Converts a list of data into a sequence of equal length.

    Args:
        data: A list of numerical values.
        seq_len: An integer indicating the length of each sequence.

    Returns:
        A numpy array of shape (len(data) - seq_len, seq_len) containing the sequences.
    """
    d = []
    for index in range(len(data) - seq_len):
        d.append(data[index: index + seq_len])
    return np.array(d)

def preprocess(data_raw, seq_len, train_split):
    """
    Preprocesses the raw data for training and testing.

    Args:
        data_raw: A list of numerical values.
        seq_len: An integer indicating the length of each sequence.
        train_split: A float between 0 and 1 indicating the fraction of data to use for training.

    Returns:
        A tuple of four numpy arrays: (X_train, y_train, X_test, y_test).
        X_train and X_test are the input sequences for training and testing, respectively.
        y_train and y_test are the output values for training and testing, respectively.
    """
    data = to_sequences(data_raw, seq_len)
    num_train = int(train_split * data.shape[0])
    X_train = data[:num_train, :-1, :]
    y_train = data[:num_train, -1, :]
    X_test = data[num_train:, :-1, :]
    y_test = data[num_train:, -1, :]
    return X_train, y_train, X_test, y_test

X_train, y_train, X_test, y_test = preprocess(scaled_close, SEQ_LEN, train_split = SPLIT)

## Genetic Algorithm

In [None]:
def decimal_to_binary(chromosome: list):
    """
    Converts a list of decimal numbers to a binary string.

    Parameters:
        chromosome (list): A list of decimal numbers.

    Returns:
        binary_string (str): A binary string representation of the decimal numbers.
    """
    binary_string = ''.join([bin(x)[2:].zfill(8) for x in chromosome])
    return binary_string

def binary_to_decimal(binary_string: str):
    """
    Converts a binary string to a list of decimal numbers.

    Args:
        binary_string (str): The binary string to be converted.

    Returns:
        list: A list of decimal numbers representing the binary string.
    """
    decimal_array = [int(binary, 2) for binary in [binary_string[i:i+8] for i in range(0, len(binary_string), 8)]]
    return decimal_array

In [None]:
# Genetic Algorithm
def genetic_algorithm(generations, mutation_rate):
    """
    Performs a genetic algorithm optimization.

    This function creates an initial population and then performs selection, crossover, and mutation operations for a specified number of generations. The fitness of each chromosome in the population is evaluated using the fitness_function.

    Args:
        population_size (int): The size of the population.
        generations (int): The number of generations to run the genetic algorithm.

    Returns:
        tuple: The best chromosome and its fitness score.
    """
    print(f"Inizialized Population...")
    population = np.random.randint(1, high=251, size=(10, 3))
    fitness_scores = np.array([fitness_function(chromosome) for chromosome in population])
    max_fitness_each_gen = []
    
    for generation in range(generations):
        print(f"Generation - {generation + 1}")
        max_fitness_each_gen.append(np.max(fitness_scores))
        
        chromosome_1 = selection(population, fitness_scores)
        chromosome_2 = selection(population, fitness_scores)
        
        if generation % mutation_rate == 0:
            chromosome_1 = mutate(chromosome_1)
            chromosome_2 = mutate(chromosome_2)
        else:
            chromosome_1, chromosome_2 = crossover(chromosome_1, chromosome_2)
        
        fitness_score_1 = fitness_function(chromosome_1)
        fitness_score_2 = fitness_function(chromosome_2)
            
        worst_index = np.argmin(fitness_scores)
        worst_fitness = fitness_scores[worst_index]

        if fitness_score_1 > worst_fitness:
            population[worst_index] = chromosome_1
            fitness_scores[worst_index] = fitness_score_1
            worst_index = np.argmin(fitness_scores)
            worst_fitness = fitness_scores[worst_index]

        if fitness_score_2 > worst_fitness:
            population[worst_index] = chromosome_2
            fitness_scores[worst_index] = fitness_score_2
        
        # Release some memory
        del chromosome_1, chromosome_2
        del fitness_score_1, fitness_score_2
        del worst_index, worst_fitness
            
    best_chromosome = population[np.argmax(fitness_scores)]
    best_fitness = np.max(fitness_scores)
    
    return best_chromosome, best_fitness, max_fitness_each_gen

# Selection (Tournament selection)
def selection(population, fitness_scores, tournament_size=3):
    """
    Performs tournament selection on a population.

    Args:
        population: A list of individuals.
        fitness_scores: A list of fitness scores corresponding to each individual in the population.
        tournament_size: An integer indicating the number of individuals to select for the tournament.

    Returns:
        The individual from the tournament with the highest fitness score.
    """
    indices = np.random.randint(len(population), size=tournament_size)
    tournament = population[indices]
    tournament_fitness = fitness_scores[indices]
    return tournament[np.argmax(tournament_fitness)]

# Crossover (Single-point crossover)
def crossover(parent_1: list, parent_2: list):
    """
    Perform crossover operation between two parents.

    Args:
        parent_1 (list): The first parent list.
        parent_2 (list): The second parent list.

    Returns:
        tuple: A tuple containing two offspring list.
    """
    parent_1 = decimal_to_binary(parent_1)
    parent_2 = decimal_to_binary(parent_2)
    
    crossover_point = np.random.randint(1, len(parent_1))
    offspring_1 = parent_1[:crossover_point] + parent_2[crossover_point:]
    offspring_2 = parent_2[:crossover_point] + parent_1[crossover_point:]
    
    offspring_1 = binary_to_decimal(offspring_1)
    offspring_2 = binary_to_decimal(offspring_2)
    return offspring_1, offspring_2

# Mutation (Swap Mutation)
def mutate(chromosome: list):
    """
    Mutates a chromosome by swapping two randomly selected characters.

    Args:
        chromosome (list): The chromosome to be mutated.

    Returns:
        list: The mutated chromosome.
    """
    chromosome = decimal_to_binary(chromosome)
    
    chromosome_list = list(chromosome)
    i = np.random.randint(len(chromosome_list), size=2)
    chromosome_list[i[0]], chromosome_list[i[1]] = chromosome_list[i[1]], chromosome_list[i[0]]
    
    chromosome_list = ''.join(chromosome_list)
    chromosome = binary_to_decimal(chromosome_list)
    return chromosome

# Fitness Function
def fitness_function(chromosome: list):
    """
    Optimizes the fitness function.

    This function builds, trains, and evaluates an LSTM model using the provided chromosome. The fitness score is calculated as the negative value of the loss.

    Args:
        chromosome (list): A list of values used to determine the number of units in each LSTM layer.

    Returns:
        float: The fitness score of the model.
    """
    lstm_units = [int(chromosome[i]) or default for i, default in enumerate([128, 64, 32])]

    # Build the LSTM model
    tf.keras.backend.clear_session()
    model = Sequential()
    for i, units in enumerate(lstm_units[:LSTM_Layer]):
        model.add(LSTM(units, return_sequences=(i < LSTM_Layer - 1), input_shape=(WINDOW_SIZE, 1)))
    model.add(Dense(1))
    
    print('Training Model...')
    # Compile and train the model
    model.compile(loss='mean_squared_error',
                  optimizer='adam')
    model.fit(X_train, y_train,
              epochs=10,
              batch_size=32,
              verbose=0,
              validation_split=0.1)

    # Evaluate the model
    loss = model.evaluate(X_test, y_test)

    # Return the negative value of the loss as the fitness score
    return -loss

In [None]:
import typing
from datetime import datetime

def timer(start_time: datetime = None) -> "typing.Union[datetime.datetime, str]":
    """
    Calculates the time elapsed since the provided start time.

    This function returns the current time if no start time is provided. If a start time is provided, it calculates the time elapsed since the start time and returns it in the format of "hours minutes and seconds".

    Args:
        start_time (datetime, optional): The start time. Defaults to None.

    Returns:
        typing.Union[datetime.datetime, str]: The current time if no start time is provided, otherwise the time elapsed since the start time.
    """
    if not start_time:
        start_time = datetime.now()
        return start_time
    elif start_time:
        thour, temp_sec = divmod((datetime.now() - start_time).total_seconds(), 3600)
        tmin, tsec = divmod(temp_sec, 60)
        return "%i hours %i minutes and %s seconds." % (
            thour,
            tmin,
            round(tsec, 2),
        )

In [None]:
# Do not increase this value
generations = 10
mutation_rate = 5

time = timer(None)
best_chromosome, best_fitness, max_fitness_each_gen = genetic_algorithm(generations, mutation_rate)
time = timer(time)

In [None]:
print("Best Chromosome:", best_chromosome)
print("Best Fitness:", best_fitness)
print("Time Taken:", time)

In [None]:
max_fitness_each_gen

## Re-Training Model with Best Parameter

In [None]:
model = Sequential()
for i, units in enumerate(best_chromosome[:LSTM_Layer]):
    model.add(LSTM(units, return_sequences=(i < LSTM_Layer - 1), input_shape=(WINDOW_SIZE, 1)))
model.add(Dense(1))

model.summary()

In [None]:
model.compile(loss='mean_squared_error',
              metrics=['mae', RootMeanSquaredError()],
              optimizer='adam')

history = model.fit(X_train,
                    y_train,
                    epochs=50,
                    batch_size=32,
                    validation_split=0.1)

In [None]:
model.evaluate(X_test, y_test)

## Model Evaluation

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title(f'{SHEET_NAME[2]} Model Loss', fontsize=15)
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'])
plt.show()

In [None]:
y_hat = model.predict(X_test)
y_test_inverse = scaler.inverse_transform(y_test)
y_hat_inverse = scaler.inverse_transform(y_hat)

In [None]:
plt.plot(y_test_inverse, label="Actual Price", color='green')
plt.plot(y_hat_inverse, label="Predicted Price", color='red')
 
plt.title(f'{SHEET_NAME[2]} Price Prediction\nLSTM = {LSTM_Layer}, Split Data = {SPLIT}, Window = {WINDOW_SIZE}', fontsize=15)
plt.xlabel('Time [days]')
plt.ylabel('Price')
plt.legend(loc='best')
 
plt.show()

In [None]:
def matrices(actual, predicted):
    mse = mean_squared_error(actual, predicted)
    mae = mean_absolute_error(actual, predicted)
    rmse = np.sqrt(mse)
    print("Mean Absolute Error for prediction :", mae)
    print("Mean Squared Error for prediction :", mse)
    print("Root Mean Squared Error for prediction :", rmse)
    return mae, mse, rmse

print(f"LSTM = {LSTM_Layer}, Split Data = {SPLIT}, Window = {WINDOW_SIZE}")
print("\n----------------- Normalized Error -----------------")
mae, mse, rmse = matrices(y_test, y_hat)
print("\n----------------- Actual Error -----------------")
mae_inverse, mse_inverse, rmse_inverse = matrices(y_test_inverse, y_hat_inverse)