# **Bitcoin price prediction - Final scores**
### Big Data Computing final project - A.Y. 2022 - 2023
Prof. Gabriele Tolomei

MSc in Computer Science

La Sapienza, University of Rome

### Author: Corsi Danilo (1742375) - corsi.1742375@studenti.uniroma1.it


---


Description: display of final scores and making predictions on the test set with the models trained on the whole train / validation set.

# Global constants, dependencies, libraries and tools

In [None]:
# Main constants
LOCAL_RUNNING = False
SLOW_OPERATIONS = True # Decide whether or not to use operations that might slow down notebook execution
ROOT_DIR = "D:/Documents/Repository/BDC/project" if LOCAL_RUNNING else "/content/drive"

In [None]:
if not LOCAL_RUNNING:
    # Point Colaboratory to Google Drive
    from google.colab import drive

    # Define GDrive paths
    drive.mount(ROOT_DIR, force_remount=True)

    # Install Spark and related dependencies
    !pip install pyspark
    !pip install -U -q PyDrive -qq
    !apt install openjdk-8-jdk-headless -qq

## Import my utilities

In [None]:
# Set main dir
MAIN_DIR = ROOT_DIR + "" if LOCAL_RUNNING else ROOT_DIR + "/MyDrive/BDC/project"

# Utilities dir
UTILITIES_DIR = MAIN_DIR + "/utilities"

# Import my utilities
import sys
sys.path.append(UTILITIES_DIR)

from imports import *

In [None]:
# Set main dir
MAIN_DIR = ROOT_DIR + "" if LOCAL_RUNNING else ROOT_DIR + "/MyDrive/BDC/project"

###################
# --- DATASET --- #
###################

# Datasets dirs
DATASET_OUTPUT_DIR = MAIN_DIR + "/datasets/output"

# Datasets names
DATASET_TEST_NAME = "bitcoin_blockchain_data_15min_test"

# Datasets paths
DATASET_TEST = DATASET_OUTPUT_DIR + "/" + DATASET_TEST_NAME + ".parquet"

####################
# --- FEATURES --- #
####################

# Features dir
FEATURES_DIR = MAIN_DIR + "/features"

# Features labels
FEATURES_LABEL = "features"
TARGET_LABEL = "next-market-price"

# Features names
ALL_FEATURES_NAME = "all_features"
MOST_REL_FEATURES_NAME = "most_rel_features"
LEAST_REL_FEATURES_NAME = "least_rel_features"

# Features paths
ALL_FEATURES = FEATURES_DIR + "/" + ALL_FEATURES_NAME + ".json"
MOST_REL_FEATURES = FEATURES_DIR + "/" + MOST_REL_FEATURES_NAME + ".json"
LEAST_REL_FEATURES = FEATURES_DIR + "/" + LEAST_REL_FEATURES_NAME + ".json"

##################
# --- MODELS --- #
##################

# Model names
LR_MODEL_NAME = "LinearRegression"
GLR_MODEL_NAME = "GeneralizedLinearRegression"
RF_MODEL_NAME = "RandomForestRegressor"
GBT_MODEL_NAME = "GradientBoostingTreeRegressor"

# Model dir
MODELS_DIR = MAIN_DIR + "/models"

# Model path
LR_MODEL = MODELS_DIR + "/" + LR_MODEL_NAME
GLR_MODEL = MODELS_DIR + "/" + GLR_MODEL_NAME
RF_MODEL = MODELS_DIR + "/" + RF_MODEL_NAME
GBT_MODEL = MODELS_DIR + "/" + GBT_MODEL_NAME

###################
# --- RESULTS --- #
###################

# splits names
BLOCK_SPLITS_NAME = "block_splits"
WALK_FORWARD_SPLITS_NAME = "walk_forward_splits"
SHORT_TERM_SPLITS_NAME = "single_split"

# Results dir
RESULTS_DIR = MAIN_DIR + "/results"
RESULTS_FINAL_DIR = RESULTS_DIR + "/final"

In [None]:
# Suppression of warnings for better reading
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

if LOCAL_RUNNING: pio.renderers.default='notebook' # To correctly export the notebook in html format

# Create the pyspark session

In [None]:
# Create the session
conf = SparkConf().\
                set('spark.ui.port', "4050").\
                set('spark.executor.memory', '12G').\
                set('spark.driver.memory', '12G').\
                set('spark.driver.maxResultSize', '109G').\
                set("spark.kryoserializer.buffer.max", "1G").\
                setAppName("BitcoinPricePrediction").\
                setMaster("local[*]")

# Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

# Loading dataset

In [None]:
# Load datasets into pyspark dataset objects
df = spark.read.load(DATASET_TEST,
                         format="parquet",
                         sep=",",
                         inferSchema="true",
                         header="true"
                    )

In [None]:
def dataset_info(dataset):
  # Print dataset
  dataset.show(3)

  # Get the number of rows
  num_rows = dataset.count()

  # Get the number of columns
  num_columns = len(dataset.columns)

  # Print the shape of the dataset
  print("Shape:", (num_rows, num_columns))

  # Print the schema of the dataset
  dataset.printSchema()

In [None]:
if SLOW_OPERATIONS:
  dataset_info(df)

# Load train / validation data

❗TOFIX

In [None]:
# Load models results
splits_list = [BLOCK_SPLITS_NAME, WALK_FORWARD_SPLITS_NAME, SHORT_TERM_SPLITS_NAME]
models_list = [LR_MODEL_NAME, GLR_MODEL_NAME, RF_MODEL_NAME, GBT_MODEL_NAME]
train_valid_results = pd.DataFrame(columns=['Model', 'Type', 'Dataset', 'Splitting', 'Features', 'Parameters', 'RMSE', 'MSE', 'MAE', 'MAPE', 'R2', 'Adjusted_R2', 'Time'])
train_valid_accuracy = pd.DataFrame(columns=['Model', 'Features', 'Splitting', 'Accuracy (default)', 'Accuracy (validated)'])
for split in splits_list:
    for model in models_list:
        if split == BLOCK_SPLITS_NAME:
            train_valid_results = pd.concat([train_valid_results, pd.read_csv(RESULTS_DIR + "/" + split + "/" + model + "_rel.csv")], ignore_index=True)
            train_valid_accuracy = pd.concat([train_valid_accuracy, pd.read_csv(RESULTS_DIR + "/" + split + "/" + model + "_accuracy.csv")], ignore_index=True)
        elif split == WALK_FORWARD_SPLITS_NAME:
            train_valid_results = pd.concat([train_valid_results, pd.read_csv(RESULTS_DIR + "/" + split + "/" + model + "_rel.csv")], ignore_index=True)
            train_valid_accuracy = pd.concat([train_valid_accuracy, pd.read_csv(RESULTS_DIR + "/" + split + "/" + model + "_accuracy.csv")], ignore_index=True)
        elif split == SHORT_TERM_SPLITS_NAME:
            train_valid_results = pd.concat([train_valid_results, pd.read_csv(RESULTS_DIR + "/" + split + "/" + model + "_rel.csv")], ignore_index=True)
            train_valid_accuracy = pd.concat([train_valid_accuracy, pd.read_csv(RESULTS_DIR + "/" + split + "/" + model + "_accuracy.csv")], ignore_index=True)


In [None]:
train_valid_results

In [None]:
train_valid_accuracy

In [None]:
default_mask = train_valid_results['Type'].str.contains('default_norm|default')
tuned_mask = train_valid_results['Type'].str.contains('final|tuned|cross_val')

default_results = train_valid_results[default_mask]
tuned_results = train_valid_results[tuned_mask]

In [None]:
default_results

In [None]:
tuned_results

In [None]:
def results_bar_plot(dataset, title):
  fig = px.bar(
    x=dataset['Model'],
    y=dataset['RMSE'],
    color=dataset['Splitting'],
    barmode='group'
  )

  fig.update_yaxes(side="left", showticklabels=True)
  fig.update_layout(title=title)
  fig.update_layout(legend_title_text="Splitting methods")
  fig.update_yaxes(showticklabels=False)
  fig.update_xaxes(title_text="Models")
  fig.update_yaxes(title='RMSE')

  fig.show()

In [None]:
results_bar_plot(default_results, "RMSE value for each default model")

In [None]:
results_bar_plot(tuned_results, "RMSE value for each tuned model")

In [None]:
# def results_bar_plot(title):
#   from plotly.subplots import make_subplots

#   fig = make_subplots(rows=1, cols=2)

#   fig1 = px.bar(
#     x=default_results['Model'],
#     y=default_results['RMSE'],
#     color=default_results['Splitting'],
#     barmode='group'
#   )

#   fig2 = px.bar(
#     x=tuned_results['Model'],
#     y=tuned_results['RMSE'],
#     color=tuned_results['Splitting'],
#     barmode='group'
#   )

#   # fig1.update_yaxes(side="left", showticklabels=True)
#   # fig1.update_layout(title=title)
#   # fig1.update_layout(legend_title_text="Splitting methods")
#   # fig1.update_yaxes(showticklabels=False)
#   # fig1.update_xaxes(title_text="Models")
#   # fig1.update_yaxes(title='RMSE')

#   # fig2.update_yaxes(side="left", showticklabels=True)
#   # fig2.update_layout(title=title)
#   # fig2.update_layout(legend_title_text="Splitting methods")
#   # fig2.update_yaxes(showticklabels=False)
#   # fig2.update_xaxes(title_text="Models")
#   # fig2.update_yaxes(title='RMSE')

#   # Add the traces to the subplot
#   fig.add_trace(fig1.data[0], row=1, col=1)
#   fig.add_trace(fig2.data[0], row=1, col=2)

#   fig.show()

In [None]:
# results_bar_plot("RMSE value for each model")

In [None]:
def accuracy_bar_plot(dataset, accuracy, title):
  fig = px.bar(
    x=dataset['Model'],
    y=dataset[accuracy],
    color=dataset['Splitting'],
    barmode='group'
  )

  fig.update_yaxes(side="left", showticklabels=True)
  fig.update_layout(title=title)
  fig.update_layout(legend_title_text="Splitting methods")
  fig.update_yaxes(showticklabels=False)
  fig.update_xaxes(title_text="Models")
  fig.update_yaxes(title='Accuracy')

  fig.show()

In [None]:
accuracy_bar_plot(train_valid_accuracy, 'Accuracy (default)', "Accuracy value for each default model")

In [None]:
accuracy_bar_plot(train_valid_accuracy, 'Accuracy (validated)', "Accuracy value for each tuned model")

# Test models
For each model, predictions on the various mini-sets are made and the obtained results are compared.

The test set is divided into further mini-sets of 1 week, 15 days, 1 month and 3 months to see how the models' performance degrades as the time taken into account increases.

In [None]:
# Retrieve the last value of the timestamp column
first_timestamp = df.select(col("timestamp")).first()[0]

# Split the test set into mini-sets of 1 week, 15 days, 1 month, and 3 months
one_week_df = df.filter(col("timestamp") <= first_timestamp + relativedelta(weeks=1))
fifteen_days_df = df.filter(col("timestamp") <= first_timestamp + relativedelta(days=15))
one_month_df = df.filter(col("timestamp") <= first_timestamp + relativedelta(months=1))
three_months_df = df.filter(col("timestamp") <= first_timestamp + relativedelta(months=3))

In [None]:
'''
Description: Evaluate final model by making predictions on the test set
Args:
    dataset: The dataSet which needs to be splited
    dataset_name: Name of selected dataset [one_week | fifteen_days | one_month | three_months]
    model: Trained model
    model_name: Model name selected
    features_normalization: Indicates whether features should be normalized (True) or not (False)
    features: Features to be used to make predictions
    features_name: Name of features used
    features_label: The column name of features
    target_label: The column name of target variable
Return:
    results_df: Results obtained from the evaluation
    predictions: Predictions obtained from the model
'''
def evaluate_final_model(dataset, dataset_name, model, model_name, features_normalization, features, features_name, features_label, target_label):
    # Select the type of features to be used
    dataset = utilities.select_features(dataset, features_normalization, features, features_label, target_label)

    # Make predictions
    predictions = model.transform(dataset).select(target_label, "market-price", "prediction", 'timestamp')

    # Compute validation error by several evaluators
    eval_res = utilities.model_evaluation(target_label, predictions)

    # Use dict to store each result
    results = {
        "Model": model_name,
        "Dataset": dataset_name,
        "Features": features_name,
        "RMSE": eval_res['rmse'],
        "MSE": eval_res['mse'],
        "MAE": eval_res['mae'],
        "MAPE": eval_res['mape'],
        "R2": eval_res['r2'],
        "Adjusted_R2": eval_res['adj_r2'],
    }

    # Transform dict to pandas dataset
    results_pd = pd.DataFrame(results, index=[0])

    return results_pd, predictions

In [None]:
'''
Description: How good the models are at predicting whether the price will go up or down
Args:
    dataset: The dataset which needs to be splited
Return:
    accuracy: Return the percentage of correct predictions
'''
def model_accuracy(dataset):
    # Compute the number of total rows in the DataFrame.
    total_rows = dataset.count()

    # Create a column "correct_prediction" which is worth 1 if the prediction is correct, otherwise 0
    dataset = dataset.withColumn(
        "correct_prediction",
        (
            (col("market-price") < col("next-market-price")) & (col("market-price") < col("prediction"))
        ) | (
            (col("market-price") > col("next-market-price")) & (col("market-price") > col("prediction"))
        )
    )

    # Count the number of correct predictions
    correct_predictions = dataset.filter(col("correct_prediction")).count()

    # Compite percentage of correct predictions
    accuracy = (correct_predictions / total_rows) * 100

    return accuracy

In [None]:
# Loading all the features
with open(ALL_FEATURES, "r") as f:
    ALL_FEATURES = json.load(f)
print(ALL_FEATURES)

In [None]:
# Loading the most relevant features
with open(MOST_REL_FEATURES, "r") as f:
    MOST_REL_FEATURES = json.load(f)
print(MOST_REL_FEATURES)

In [None]:
# Load models
lr = PipelineModel.load(LR_MODEL)
glr = PipelineModel.load(GLR_MODEL)
rf = PipelineModel.load(RF_MODEL)
gbt = PipelineModel.load(GBT_MODEL)

In [None]:
# Test models
model_name_list = [LR_MODEL_NAME, GLR_MODEL_NAME, RF_MODEL_NAME, GBT_MODEL_NAME]
model_list = [lr, glr, rf, gbt]
dataset_list = [one_week_df, fifteen_days_df, one_month_df, three_months_df]
dataset_name_list = ["one_week", "fifteen_days", "one_month", "three_months"]
predictions_df = pd.DataFrame(columns=[TARGET_LABEL, "market-price", "prediction", 'timestamp'])
test_results = pd.DataFrame(columns=['Model', 'Dataset', 'Features', 'RMSE', 'MSE', 'MAE', 'MAPE', 'R2', 'Adjusted_R2'])
test_accuracy = pd.DataFrame(columns=['Model', 'Features', 'Dataset', 'Accuracy'])

# For each model makes predictions based on the dataset type
for i, model in enumerate(model_list):
    for j, dataset in enumerate(dataset_list):
        # Select the type of feature to be used based on the model
        if (model_name_list[i] == LR_MODEL_NAME) or (model_name_list[i] == GLR_MODEL_NAME):
          FEATURES_NORMALIZATION = True
          CHOSEN_FEATURES = ALL_FEATURES
          CHOSEN_FEATURES_LABEL = ALL_FEATURES_NAME
        elif (model_name_list[i] == RF_MODEL_NAME) or (model_name_list[i] == GBT_MODEL_NAME):
          FEATURES_NORMALIZATION = True
          CHOSEN_FEATURES = MOST_REL_FEATURES
          CHOSEN_FEATURES_LABEL = MOST_REL_FEATURES_NAME

        results, predictions = evaluate_final_model(dataset, dataset_name_list[j], model, model_name_list[i], FEATURES_NORMALIZATION, CHOSEN_FEATURES, CHOSEN_FEATURES_LABEL, FEATURES_LABEL, TARGET_LABEL)
        test_results = pd.concat([test_results, results], ignore_index=True)

        predictions = predictions.withColumn("Model", lit(model_name_list[i])).withColumn("Dataset", lit(dataset_name_list[j]))
        predictions_df = pd.concat([predictions_df, predictions.toPandas()], ignore_index=True)

        accuracy = model_accuracy(predictions)
        accuracy_data = {
            'Model': model_name_list[i],
            'Features': CHOSEN_FEATURES_LABEL,
            'Dataset': dataset_name_list[j],
            'Accuracy': accuracy
        }
        accuracy_data_df = pd.DataFrame(accuracy_data, index=['Model'])

        test_accuracy = pd.concat([test_accuracy, accuracy_data_df], ignore_index=True)

# Merge results and accuracy
merged_results = pd.merge(test_results, test_accuracy)
merged_results


# Models comparison

In [None]:
def show_results(predictions, model0_name, model0_predictions, model1_name, model1_predictions, model2_name, model2_predictions, model3_name, model3_predictions, title):
  trace1 = go.Scatter(
      x = predictions['timestamp'],
      y = predictions['next-market-price'].astype(float),
      mode = 'lines',
      name = 'Next Market price (usd)'
  )

  trace2 = go.Scatter(
      x = model0_predictions['timestamp'],
      y = model0_predictions['prediction'].astype(float),
      mode = 'lines',
      name = model0_name + ' predictions'
  )

  trace3 = go.Scatter(
      x = model1_predictions['timestamp'],
      y = model1_predictions['prediction'].astype(float),
      mode = 'lines',
      name = model1_name + ' predictions'
  )

  trace4 = go.Scatter(
      x = model2_predictions['timestamp'],
      y = model2_predictions['prediction'].astype(float),
      mode = 'lines',
      name = model2_name + ' predictions'
  )

  trace5 = go.Scatter(
      x = model3_predictions['timestamp'],
      y = model3_predictions['prediction'].astype(float),
      mode = 'lines',
      name = model3_name + ' predictions'
  )

  layout = dict(
      title=title + " predictions",
      xaxis=dict(
          rangeselector=dict(
              buttons=list([
                  # Change the count to desired amount of months.
                  dict(count=1,
                      label='1m',
                      step='month',
                      stepmode='backward'),
                  dict(count=6,
                      label='6m',
                      step='month',
                      stepmode='backward'),
                  dict(count=12,
                      label='1y',
                      step='month',
                      stepmode='backward'),
                  dict(count=36,
                      label='3y',
                      step='month',
                      stepmode='backward'),
                  dict(step='all')
              ])
          ),
          rangeslider=dict(
              visible = True
          ),
          type='date'
      )
  )

  data = [trace1,trace2,trace3,trace4,trace5]
  fig = dict(data=data, layout=layout)
  iplot(fig, filename = title + " predictions")

In [None]:
# For each dataset type, it displays the predicitons of each model
for dataset_name in dataset_name_list:
    predictions_to_show = predictions_df[predictions_df['Dataset'] == dataset_name]

    model_0_predictions = predictions_to_show[predictions_to_show['Model'] == model_name_list[0]]
    model_1_predictions = predictions_to_show[predictions_to_show['Model'] == model_name_list[1]]
    model_2_predictions = predictions_to_show[predictions_to_show['Model'] == model_name_list[2]]
    model_3_predictions = predictions_to_show[predictions_to_show['Model'] == model_name_list[3]]

    show_results(predictions_to_show, model_name_list[0], model_0_predictions, model_name_list[1], model_1_predictions, model_name_list[2], model_2_predictions, model_name_list[3], model_3_predictions, dataset_name)

# Summary

In [None]:
scatter_plot(test_results, "RMSE", "Model", "RMSE value for each model")

In [None]:
adv_scatter_plot(test_results, "RMSE", "Model", "Dataset", "RMSE value for each model (and dataset type)")

In [None]:
scatter_plot(test_accuracy, "Accuracy", "Model", "Accuracy for each model")

In [None]:
adv_scatter_plot(test_accuracy, "Accuracy", "Model", "Dataset", "Accuracy for each model (and dataset type)")

In [None]:
# Saving final test results
test_results.to_csv(RESULTS_FINAL_DIR + "/final.csv", index=False)
test_accuracy.to_csv(RESULTS_FINAL_DIR + "/final_accuracy.csv", index=False)

In [None]:
# Export notebook in html format (remember to save the notebook and change the model name)
if LOCAL_RUNNING:
    !jupyter nbconvert --to html 6-final-scores.ipynb --output 6-final-scores --output-dir='./exports'