# Bitcoin Price Prediction - Local Mode ARIMA/VectorARIMA

* **Description**: COMP4103(Big Data)--Group Project
* **Author**: Aaron
* **Version**: 0.2

**Updates:**
1. Update the way to calcute Training time accurately.
2. Add Cross Validation part

**Issues:**  
1. N/A

**To be done:**  
1. Visualize the influence of the different number of partitions

## 1. Load related packages

In [72]:
# Apache Spark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Python
import numpy as np
import pandas as pd
from itertools import product
import time
import io
from contextlib import redirect_stdout

# Graph packages
# https://plotly.com/python/getting-started/#jupyterlab-support
# https://plotly.com/python/time-series/
import plotly.express as px

# ARIMA
# https://alkaline-ml.com/pmdarima/index.html
import pmdarima as pm
from pmdarima.arima import ndiffs

#Vector Autoregressions
# https://www.statsmodels.org/dev/vector_ar.html
from statsmodels.tsa.api import VAR

# Scikit-learn
from sklearn.metrics import explained_variance_score, mean_absolute_error, mean_squared_error, mean_squared_log_error, mean_absolute_percentage_error, r2_score

# Load the customized Time Series Cross Validation
from tsCrossValidation import mulTsCrossValidation, blockedTsCrossValidation, wfTsCrossValidation, modelComparison

## 2. Create a Spark Session

In [73]:
# Start a SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Bitcoin Prediction - Local Auto Regressions") \
    .getOrCreate()

sc = spark.sparkContext

In [74]:
spark

## 3. load Data

In [75]:
# Read csv file
filename = "bitcoin_1m_1min.csv"
dataset = spark.read.format("csv") \
          .option("inferSchema",'True') \
          .option("header",True) \
          .load(filename)

## 4. Train/Test Data

In [76]:
'''
Description: Split and keep the original time-series order
Args:
    dataSet: The dataSet which needs to be splited
    proportion: A number represents the split proportion

Return: 
    train_data: The train dataSet
    test_data: The test dataSet
'''
def trainSplit(dataSet, proportion):
    records_num = dataset.count()
    split_point = round(records_num * proportion)
    
    train_data = dataset.filter(F.col("id") < split_point)
    test_data = dataset.filter(F.col("id") >= split_point)
    
    return (train_data,test_data)

In [77]:
# Have a look on the data
dataset.select("id","Timestamp","Close").show(5)

+---+-------------------+--------+
| id|          Timestamp|   Close|
+---+-------------------+--------+
|  0|2021-03-02 00:00:00|49657.53|
|  1|2021-03-02 00:01:00|49706.37|
|  2|2021-03-02 00:02:00|49714.75|
|  3|2021-03-02 00:03:00|49814.72|
|  4|2021-03-02 00:04:00|49847.43|
+---+-------------------+--------+
only showing top 5 rows



In [78]:
# Split the dataSet
proportion = 0.9990
#proportion = 0.7
train_data,test_data = trainSplit(dataset, proportion)

# Cache it
train_data.cache()
test_data.cache()

# Number of train and test dataSets
print(f"Training data: {train_data.count()}\nTest data: {test_data.count()}")

Training data: 41718
Test data: 42


In [79]:
# Save column name 
column_names = dataset.columns
# labels and features
feature_cols = dataset.columns
# Gain the column list of features
non_feature_cols  = ['id',"NEXT_BTC_CLOSE",'Timestamp']
[feature_cols.remove(non_feature) for non_feature in non_feature_cols]

[None, None, None]

## 5. Local Mode building

In [80]:
# Define a function to plot line-like graph
# https://plotly.com/python/time-series/#time-series-with-range-selector-buttons
'''
Description: Plot the line graph by plotly(custom design)
Args:
    data: The data(pandas dataframe) which you want to ploy by line
    graph_title: The title of the graph
    
Return: None
'''
def line_plot(data,graph_title):
    plot = px.line(data,title=graph_title)
    plot.update_xaxes(
        rangeslider_visible=True,
        rangeselector=dict(
            buttons=list([
                dict(count=7, label="1w", step="day", stepmode="backward"),
                dict(count=1, label="1m", step="month", stepmode="backward"),
                dict(count=6, label="6m", step="month", stepmode="backward"),
                dict(count=1, label="1y", step="year", stepmode="backward"),
                dict(step="all")
            ])
        )
    )
    plot.show()

In [81]:
'''
Description: Transform each partition of Spark to pandas dataframe
Args:
    partition_rdd: RDD of each partition
    
Return: 
    pandas_df: Data in pandas dataframe
'''
def partitionToPandas(partition_rdd):
    pandas_df = pd.DataFrame(columns = column_names)
    
    # each_row is Row() type in Spark
    for each_row in partition_rdd:
        pandas_df = pandas_df.append(each_row.asDict(),ignore_index=True)
    return [pandas_df]

In [82]:
'''
Description: Build ARIMA model on each partition
Args:
    partition_rdd: RDD of each partition
    
Return: 
    arima_model: ARIMA model
'''
def buildARIMA(pandas_df):
    
    # Only choose Close as prediction
    pandas_df = pandas_df[['Timestamp','Close']].set_index("Timestamp")
    
    # Choose the best degree of differencing
    kpss_diffs = ndiffs(pandas_df, alpha=0.05, test='kpss', max_d=6)
    adf_diffs = ndiffs(pandas_df, alpha=0.05, test='adf', max_d=6)
    n_diffs = max(adf_diffs, kpss_diffs)
    
    # Auto training
    # p: AR (i.e., the number of lag observations)
    # d: The degree of differencing.
    # q: MA (the size of the “window”)
    output = io.StringIO()
    # Capture the trace time output to get the model training time
    with redirect_stdout(output):
        arima_model = pm.auto_arima(pandas_df, start_p=1, seasonal=False,
                             d=n_diffs, trace=True,
                             suppress_warnings=True,
                             error_action="ignore",
                             max_order=None,
                             stepwise=True)
    # Get the model training time from trace time output
    model_results = output.getvalue().split('\n')
    model_results = [ line for line in model_results if "AIC" in line ]
    model_results = [line.split(':')[1].split(',') for line in model_results]
    AIC_lst = [ line[0].split('=')[1] for line in model_results ]
    time_lst = [ line[1].split('=')[1].split(' ')[0] for line in model_results ]
    model_results_dict = {"AIC": AIC_lst, "time": time_lst}
    model_results_df = pd.DataFrame(model_results_dict)
    train_time = model_results_df.sort_values("AIC").iloc[0,1]
    
    # Save the (p,d,q)
    order_info = arima_model.order
    return (arima_model,float(train_time))

In [83]:
'''
Description: Make prediction on each partition
Args:
    pandas_df: Data in pandas dataframe
    broadcast_models: Trained Models
    model_name: specify which model to make prediction
    
Return: 
    partition_pred: Predictions on the partition in a list
'''
def makePrediction(pandas_df, broadcast_models, model_name):
    prediction_lst = []
    num_pred = pandas_df.shape[0]
    num_models = len(broadcast_models.value)
    
    if model_name == "VectorARIMA":
        pandas_df.drop(['id'], axis=1, inplace=True)
        pandas_df.set_index("Timestamp", inplace=True)
        
        # Get the prediction from each model, then save to a list
        for model in broadcast_models.value:
            results = model.fit(maxlags=6, ic='aic')
            lag_order = results.k_ar
            prediction = results.forecast(pandas_df.values[-lag_order:],num_pred)
            close_prediction = [lst[3] for lst in prediction]
            prediction_lst.append(close_prediction)
            
    elif model_name == "ARIMA":
        # Get the prediction from each model, then save to a list
        for model in broadcast_models.value:
            prediction_lst.append(model.predict(num_pred).tolist())
            
    else:
        return "Wrong model name"
    
    # Define weight value
    weight = list(range(1,num_models+1))
    # Weighted the results from each Model
    weighted_pred_lst = [[i*b for i in a] for a,b in zip(prediction_lst,weight)]
    
    # Aggregate the weighted predictions, then get Weighted value
    partition_pred = [value / sum(weight) for value in map(sum,zip(*weighted_pred_lst))]
    # Simple average 
    #partition_pred = [value / num_models for value in map(sum,zip(*prediction_lst))]
    
    return partition_pred

In [84]:
'''
Description: Build Vector ARIMA model on each partition
Args:
    partition_rdd: RDD of each partition
    
Return: 
    vector_arima: Vector ARIMA model
'''
def buildVectorARIMA(pandas_df):
    
    # Drop the column that don't need to predict
    pandas_df.drop(['id'], axis=1, inplace=True)
    pandas_df.set_index("Timestamp", inplace=True)
    start = time.time()
    vector_arima = VAR(pandas_df)
    end = time.time()
    return (vector_arima, end-start)

In [85]:
'''
Description: Calculate evaluation metrics
Args:
    y_test: Label of test data
    y_pred: Prediction on test data
    partition_num_train: Number of partition of Train data
    partition_num_test: Number of partition of Test data
    train_time: Time of training model
    model_name: specify which model to make prediction
    
Return: 
    results: All the evaluation metrics in a dict
'''
def evaluationAssemble(y_test, y_pred, partition_num_train, partition_num_test, train_time, model_name):
    # Explained variance score
    exp_var = explained_variance_score(y_test,y_pred)

    # Mean absolute error
    mae = mean_absolute_error(y_test,y_pred)

    # Root Mean squared error
    rmse = mean_squared_error(y_test,y_pred,squared=False)

    # Mean squared logarithmic error
    msle = mean_squared_log_error(y_test,y_pred)

    # Mean absolute percentage error
    mape = mean_absolute_percentage_error(y_test,y_pred)

    # R2 score, the coefficient of determination
    r2 = r2_score(y_test,y_pred)

    # Adjusted R2 score
    n = len(y_pred)
    if model_name == "ARIMA":
        p = 1
    elif model_name == "VectorARIMA":
        p = len(feature_cols)
        
    adj_r2 = 1-(1-r2)*(n-1)/(n-p-1)

    # Use dict to store each result
    results = {
        "Model": model_name,
        "P_train": partition_num_train,
        "P_test": partition_num_test,
        "Proportion": proportion,
        "RMSE": rmse,
        "MAPE": mape,
        "MAE": mae,
        "MSLE": msle,
        "Variance": exp_var,
        "R2": r2,
        "Adjusted_R2": adj_r2,
        "Time": train_time,
    }
    return results

In [86]:
'''
Description: Transform a Spark Row type list to pandas dataframe 
Args:
    row_list: Data in pandas dataframe
    column_names: Column names will display in pandas dataframe. The format need to be a list
    
Return: 
    pandas_df: Data in pandas dataframe
'''
def row2Pandasdf(row_list, column_names):
    pandas_df = pd.DataFrame(columns = column_names)
    
    # each_row is Row() type in Spark
    for each_row in row_list:
        pandas_df = pandas_df.append(each_row.asDict(), ignore_index=True)
    return pandas_df

In [87]:
'''
Description: Local mode on Spark using Scikit-learn
Args:
    train_data: Train data in Spark datafram
    test_data: Test data in Spark datafram
    partition_num_train: Number of partition of Train data
    partition_num_test: Number of partition of Test data
    model_name: specify which model to make prediction
    
Return: 
    results: All the evaluation metrics in a dict
'''
def localMode(train_data, test_data, partition_num_train, partition_num_test, model_name):
    
    # Transform Train/Test to RDD type, manually set partition number
    train_rdd = train_data.orderBy("id").rdd.coalesce(partition_num_train)
    test_rdd  = test_data.orderBy("id").rdd.coalesce(partition_num_test)
    
    # Collect all the models which generated from each partition, to driver
    if model_name == "ARIMA":
        models = train_rdd.mapPartitions(partitionToPandas).map(buildARIMA).collect()
    elif model_name == "VectorARIMA":
        models = train_rdd.mapPartitions(partitionToPandas).map(buildVectorARIMA).collect()
    else:
        return "Wrong model name"
    
    train_time = max([model[1] for model in models])
    models = [model[0] for model in models]
    # broadcast models
    broadcast_models = sc.broadcast(models)

    # Transform each partition of test_rdd to pandas dataframe, then make prediction on each partition, then merge the results in a single list
    y_pred = test_rdd.mapPartitions(partitionToPandas).map(lambda x: makePrediction(x, broadcast_models, model_name)).reduce(lambda x,y: x+y)

    # Get the label of test data. (Row() type also works for calculating evaluation metrics)
    y_test = test_data.select("NEXT_BTC_CLOSE").collect()
    
    # Generate a pandas dataframe on predictions. Can help to plot graph easier later.
    y_test_rows = test_data.select("Timestamp","Close").collect()
    y_df = row2Pandasdf(y_test_rows, ["Timestamp","Close"])
    
    # Add prediction to y_test_df
    y_df["prediction"] = y_pred
    
    # Plot the prediction
    #line_plot(y_df.set_index("Timestamp"), model_name)
    
    # Calculate evaluation metrics
    results = evaluationAssemble(y_test, y_pred, partition_num_train, partition_num_test, train_time, model_name)
    return results

In [88]:
# Only use VectorARIMA
# model_name == "ARIMA" or "VectorARIMA"
localMode(train_data, test_data, 3, 1, "VectorARIMA")

{'Model': 'VectorARIMA',
 'P_train': 3,
 'P_test': 1,
 'Proportion': 0.999,
 'RMSE': 99.30026088094228,
 'MAPE': 0.00146075831591763,
 'MAE': 85.67083008310995,
 'MSLE': 2.861754882841444e-06,
 'Variance': -0.13429644874403057,
 'R2': -2.8299854231302666,
 'Adjusted_R2': -3.6185118337747335,
 'Time': 0.009519815444946289}

In [89]:
# Only use ARIMA
# model_name == "ARIMA" or "VectorARIMA"
localMode(train_data, test_data, 3, 1, "ARIMA")

{'Model': 'ARIMA',
 'P_train': 3,
 'P_test': 1,
 'Proportion': 0.999,
 'RMSE': 854.7430334750338,
 'MAPE': 0.014540466125094412,
 'MAE': 853.2356479229529,
 'MSLE': 0.00021529217008165193,
 'Variance': -7.221118289546524e-06,
 'R2': -282.77065439343886,
 'Adjusted_R2': -289.8649207532748,
 'Time': 0.49}

## 6. Time Series Cross Validation

In [19]:
'''
Description: Cross Validation on Time Series data
Args:
    dataSet: The dataSet which needs to be splited
    cv_info: The type of Cross Validation
    ml_model: The module to use
    partition_num_train: Number of partition of Train data
    partition_num_test: Number of partition of Test data
Return: 
    tsCv_df: All the splits performance of each model in a pandas dataframe
'''
def tsCrossValidation(dataSet, ml_model, partition_num_train, partition_num_test, cv_info):
    
    # Get the number of samples
    num = dataSet.count()
    
    # Save results in a list
    result_lst = []
        
    # Identify the type of Cross Validation 
    if cv_info['cv_type'] == 'mulTs':
        split_position_df = mulTsCrossValidation(num, cv_info['kSplits'])
    elif cv_info['cv_type'] == 'blkTs':
        split_position_df = blockedTsCrossValidation(num, cv_info['kSplits'])
    elif cv_info['cv_type'] == 'wfTs':
        split_position_df = wfTsCrossValidation(num, cv_info['min_obser'], cv_info['expand_window'])


    for position in split_position_df.itertuples():
        # Get the start/split/end position from a kind of Time Series Cross Validation
        start = getattr(position, 'start')
        splits = getattr(position, 'split')
        end = getattr(position, 'end')
        idx  = getattr(position, 'Index')

        # Train/Test size
        train_size = splits - start
        test_size = end - splits

        # Get training data and test data
        train_data = dataSet.filter(F.col("id").between(start, splits-1))
        test_data = dataSet.filter(F.col("id").between(splits, end-1))

        # Cache it
        train_data.cache()
        test_data.cache()
        
        # train the local mode
        if ml_model == "VectorARIMA":
            results = localMode(train_data, test_data, partition_num_train, partition_num_test, "VectorARIMA")
        elif ml_model == "ARIMA":
            results = localMode(train_data, test_data, partition_num_train, partition_num_test, "ARIMA")
        else:
            return "Wrong model name"
        
        # Store each splits result
        result_lst.append(results)
            
        # Release Cache
        train_data.unpersist()
        test_data.unpersist()

    # Transform dict to pandas dataframe
    tsCv_df = pd.DataFrame(result_lst)
    return tsCv_df

In [20]:
## Cross Validation Parameter
# Multiple Splits Time Series Cross Validation
mul_cv = {'cv_type':'mulTs',
          'kSplits': 5}

# Blocked Time Series Cross Validation
blk_cv = {'cv_type':'blkTs',
          'kSplits': 10}

# Walk Forward Validation
wf_cv = {'cv_type':'wfTs',
         'min_obser': 41710,
         'expand_window': 10}

In [21]:
# Vector ARIMA CV
arima_mul_cv = tsCrossValidation(dataset, "VectorARIMA", 3, 1, mul_cv)
arima_blk_cv = tsCrossValidation(dataset, "VectorARIMA", 3, 1, blk_cv)
arima_wf_cv = tsCrossValidation(dataset, "VectorARIMA", 3, 1, wf_cv)

In [37]:
# ARIMA CV
var_mul_cv = tsCrossValidation(dataset, "ARIMA", 3, 1, mul_cv)
var_blk_cv = tsCrossValidation(dataset, "ARIMA", 3, 1, blk_cv)
var_wf_cv = tsCrossValidation(dataset, "ARIMA", 3, 1, wf_cv)

In [50]:
# Define what model_info and evaluators in the Model Comparison Table
model_info = ['Model']
evaluator_lst = ['RMSE','MAPE','MAE','Variance','R2','Adjusted_R2','Time']

# The the Cross Validation results would like to compare
comparison_lst = [arima_mul_cv, arima_blk_cv, arima_wf_cv, var_mul_cv, var_blk_cv, var_wf_cv]
pd.concat([modelComparison(cv_result,model_info,evaluator_lst) for cv_result in comparison_lst])

Unnamed: 0,Model,RMSE,MAPE,MAE,Variance,R2,Adjusted_R2,Time
0,VectorARIMA,3687.882986,0.057466,3230.708155,-0.715322,-2.758465,-2.76225,0.003124
0,VectorARIMA,1532.644759,0.024457,1335.276478,-0.902524,-3.814275,-3.854976,0.001555
0,VectorARIMA,65.436719,0.0009,52.796204,-0.034613,-1.622355,-10.800596,0.005222
0,ARIMA,3386.892678,0.051853,2890.069438,-0.163985,-2.03449,-2.034926,3.766
0,ARIMA,1710.349874,0.027724,1500.74542,-0.051543,-7.588432,-7.59873,0.371
0,ARIMA,809.841188,0.013782,808.719076,0.001849,-416.845531,-469.076223,4.28
