# Bitcoin price forecasting with PySpark - Feature Engineering
## Big Data Computing final project - A.Y. 2022 - 2023
Prof. Gabriele Tolomei

MSc in Computer Science

La Sapienza, University of Rome

### Author
Corsi Danilo - corsi.1742375@studenti.uniroma1.it



Description: In this notebook I am going to explore the data and visualize the correlations check their stationarity and choose features to use to train the models.

## Global constants, dependencies, libraries and tools

In [23]:
# GDrive root
GDRIVE_DIR = "/content/drive"

# Dataset
GDRIVE_DATASET_RAW_DIR = GDRIVE_DIR + "/MyDrive/BDC/project/datasets/raw"
GDRIVE_DATASET_TEMP_DIR = GDRIVE_DIR + "/MyDrive/BDC/project/datasets/temp"
GDRIVE_DATASET_OUTPUT_DIR = GDRIVE_DIR + "/MyDrive/BDC/project/datasets/output"
GDRIVE_DATASET_NAME = "bitcoin_blockchain_data_30min"
GDRIVE_DATASET_NAME_EXT = "/" + GDRIVE_DATASET_NAME + ".parquet"
GDRIVE_DATASET = GDRIVE_DATASET_RAW_DIR + GDRIVE_DATASET_NAME_EXT

# Features
GDRIVE_FEATURES_DIR = GDRIVE_DIR + "/MyDrive/BDC/project/features"

GDRIVE_ALL_FEATURES_NAME = "all_features"
GDRIVE_MORE_REL_FEATURES_NAME = "more_rel_features"
GDRIVE_LESS_REL_FEATURES_NAME = "less_rel_features"

GDRIVE_ALL_FEATURES_NAME_EXT = "/" + GDRIVE_ALL_FEATURES_NAME + ".json"
GDRIVE_MORE_REL_FEATURES_NAME_EXT = "/" + GDRIVE_MORE_REL_FEATURES_NAME + ".json"
GDRIVE_LESS_REL_FEATURES_NAME_EXT = "/" + GDRIVE_LESS_REL_FEATURES_NAME + ".json"

GDRIVE_ALL_FEATURES = GDRIVE_FEATURES_DIR + GDRIVE_ALL_FEATURES_NAME_EXT
GDRIVE_MORE_REL_FEATURES = GDRIVE_FEATURES_DIR + GDRIVE_MORE_REL_FEATURES_NAME_EXT
GDRIVE_LESS_REL_FEATURES = GDRIVE_FEATURES_DIR + GDRIVE_LESS_REL_FEATURES_NAME_EXT
# Others
JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64"
MODEL_NAME = "LinearRegression"
SLOW_OPERATION = True

In [4]:
# Point Colaboratory to our Google Drive
from google.colab import drive

# Define GDrive paths
drive.mount(GDRIVE_DIR, force_remount=True)

Mounted at /content/drive


In [5]:
import warnings

warnings.simplefilter(action='ignore', category=FutureWarning)

In [6]:
# Install Spark and related dependencies
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=a7f78c0c539a0c880fda46e02df98eb6c63fa5ab9aead3cf57fa5f6825f97922
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


## Import files

In [7]:
GDRIVE_UTILITIES_DIR = GDRIVE_DIR + "/MyDrive/BDC/project/utilities"

import sys
sys.path.append(GDRIVE_UTILITIES_DIR)

from imports import *

## Create the pyspark session

In [8]:
# Create the session
conf = SparkConf().\
                set('spark.ui.port', "4050").\
                set('spark.executor.memory', '4G').\
                set('spark.driver.memory', '45G').\
                set('spark.driver.maxResultSize', '10G').\
                set("spark.kryoserializer.buffer.max", "1G").\
                setAppName("BitcoinPriceForecasting").\
                setMaster("local[*]")

# Create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

## Loading dataset

In [9]:
# Load datasets into pyspark dataset objects
df = spark.read.load(GDRIVE_DATASET,
                         format="parquet",
                         sep=",",
                         inferSchema="true",
                         header="true"
                    ) \
                     .withColumn("id", F.row_number().over(Window.orderBy(F.monotonically_increasing_id()))-1)

In [10]:
def dataset_info(dataset):
  dataset.show(3)

  # Get the number of rows
  num_rows = dataset.count()

  # Get the number of columns
  num_columns = len(dataset.columns)

  # Print the shape of the dataset
  print("Shape:", (num_rows, num_columns))

  # Print the schema of the dataset
  dataset.printSchema()

In [11]:
if SLOW_OPERATION:
  dataset_info(df)

## Adding useful features

In [12]:
# Creation of a new dataset for the new features
new_features_df = df.select("timestamp", "id", "market-price")

In [13]:
# Add the column 'tomorrow-market-price' which will be the target feature on which to make predictions
new_features_df = new_features_df.withColumn("next-market-price", F.lag("market-price", offset=-1) \
        .over(Window.orderBy("id"))) \
        .dropna()

In [14]:
# Generate additional valuable features
# Rate of Change allows investors to spot security momentum and other trends
# Typically a 12-day Rate-of-Change is used but for simplicity, I used it for every 30-min interval
new_features_df = new_features_df.withColumn("rate-of-change", (F.col("next-market-price") / F.col("market-price") - 1) * 100)

In [15]:
# Computing Simple Moving Averages
# Adapted from: https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data
def simple_moving_average(dataset, period, days, col="next-market-price", orderby="id"):
    dataset = dataset.withColumn(f"sma-{days}-days", F.avg(col) \
          .over(Window.orderBy(orderby) \
          .rowsBetween(-period,0)))
    return dataset

In [16]:
# MA number 5/7/10/20/50/100/200 days;
MA5 = 60 * 24 * 5
MA7 = 60 * 24 * 7
MA10 = 60 * 24 * 10
MA20 = 60 * 24 * 20
MA50 = 60 * 24 * 50
MA100 = 60 * 24 * 100

# Periods selected based on this article:
# https://www.investopedia.com/ask/answers/122414/what-are-most-common-periods-used-creating-moving-average-
# ma-lines.asp#:~:text=Traders%20and%20market%20analysts%20commonly,averages%20are%20the%20most%20common.

# To analyze short-term trends
new_features_df = simple_moving_average(new_features_df, MA5, 5) # these might have to be 240 - 1 actually
new_features_df = simple_moving_average(new_features_df, MA7, 7)
new_features_df = simple_moving_average(new_features_df, MA10, 10)
new_features_df = simple_moving_average(new_features_df, MA20, 20)
new_features_df = simple_moving_average(new_features_df, MA50, 50)
# To analyze long-term trends
new_features_df = simple_moving_average(new_features_df, MA100, 100)

In [17]:
# Drop useful columns
columns_to_drop = ["market-price", "next-market-price"]

new_features_df = new_features_df.drop(*columns_to_drop)

In [18]:
if SLOW_OPERATION:
  dataset_info(new_features_df)

In [19]:
# Merge original dataset with the one with the new features
merged_df = df.join(new_features_df, on=['timestamp','id'], how='inner')

In [24]:
# Reorder the columns
new_columns = ["timestamp", "id"] + [col for col in merged_df.columns if col not in ["timestamp", "id"]]
merged_df = merged_df.select(*new_columns)

In [25]:
if SLOW_OPERATION:
  dataset_info(merged_df)

+-------------------+---+------------------+--------------+-------------------+--------------+------------------+------------------+--------------------+------------------------+-----------------+-------------------+------------------+--------------------+------------------+------------------+--------------------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|          timestamp| id|      market-price|total-bitcoins|         market-cap|  trade-volume|       blocks-size|    avg-block-size|n-transactions-total|n-transactions-per-block|        hash-rate|         difficulty|    miners-revenue|transaction-fees-usd|n-unique-addresses|    n-transactions|estimated-transaction-volume-usd|      rate-of-change|        sma-5-days|        sma-7-days|       sma-10-days|       sma-20-days|       sma-50-days|      sma-100-days|
+-------------------+---+------------------+--------------+---------------

In [26]:
# Set the "timestamp" column as the index of the Pandas dataset
merged_df.toPandas().set_index("timestamp", inplace=True)
merged_df.cache()

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

## Showing train / validation and test sets

In [27]:
# Calculates the total number of rows in the dataset
total_rows = merged_df.count()

# Calculates the index corresponding to 95% of the rows (train / valid set) and 5% (test set)
index = int(total_rows * 0.95)

# Dividi il dataset in base all'indice
train_valid_df = merged_df.filter(merged_df['id'] < index)
test_df = merged_df.filter(merged_df['id'] >= index)

In [28]:
# Data visualization with rangeslider
def data_visualization(train_valid, test):
  trace1 = go.Scatter(
      x = train_valid['timestamp'],
      y = train_valid["market-price"].astype(float),
      mode = 'lines',
      name = "Train / Validation set"
  )

  trace2 = go.Scatter(
      x = test['timestamp'],
      y = test['market-price'].astype(float),
      mode = 'lines',
      name = "Test set"
  )

  layout = dict(
      title="Train, valid and test set with rangeslider",
      xaxis=dict(
          rangeselector=dict(
              buttons=list([
                  #change the count to desired amount of months.
                  dict(count=1,
                      label='1m',
                      step='month',
                      stepmode='backward'),
                  dict(count=6,
                      label='6m',
                      step='month',
                      stepmode='backward'),
                  dict(count=12,
                      label='1y',
                      step='month',
                      stepmode='backward'),
                  dict(count=36,
                      label='3y',
                      step='month',
                      stepmode='backward'),
                  dict(step='all')
              ])
          ),
          rangeslider=dict(
              visible = True
          ),
          type='date'
      )
  )

  data = [trace1, trace2]
  fig = dict(data=data, layout=layout)
  iplot(fig, filename = "Train, valid and test set with rangeslider")

In [29]:
data_visualization(train_valid_df.toPandas(), test_df.toPandas())

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

## Saving enginered dataset

In [30]:
def output(dataset, type):
  dataset.write.parquet(GDRIVE_DATASET_TEMP_DIR, mode='overwrite')

  while True:
      parquet_files = glob.glob(os.path.join(GDRIVE_DATASET_TEMP_DIR, "part*.parquet"))
      if len(parquet_files) > 0:
          # .parquet file found!
          file_path = parquet_files[0]
          break
      else:
          print(".parquet file not found. I'll try again after 1 second...")
          time.sleep(1)

  print(".parquet file found:", file_path)

  new_file_path = GDRIVE_DATASET_OUTPUT_DIR + "/" + GDRIVE_DATASET_NAME + "_" + type +".parquet"

  # rename and move the file
  shutil.move(file_path, new_file_path)

  print("File renamed and moved successfully!")

In [31]:
output(train_valid_df, "train_valid")

.parquet file found: /content/drive/MyDrive/BDC/project/datasets/temp/part-00000-0cf02b9e-ede6-4392-b542-dd0af8aed499-c000.snappy.parquet
File renamed and moved successfully!


In [33]:
output(test_df, "test")

.parquet file found: /content/drive/MyDrive/BDC/project/datasets/temp/part-00000-12d860aa-b5c2-4f6e-86fe-faaf6e2ed2c7-c000.snappy.parquet
File renamed and moved successfully!


## Data visualization

In [34]:
merged_df_pd = merged_df.toPandas()

KeyboardInterrupt: ignored

In [None]:
# List of features according to categories
currency_statistics = {'Market price (USD)':'market-price', 'Market cap (USD)':'market-cap', 'N. total bitcoins':'total-bitcoins', 'Trade volume (USD)':'trade-volume'}
block_details = {'Blocks size (MB)':'blocks-size', 'Avg. block size (MB)':'avg-block-size', 'N. total transactions':'n-transactions-total', 'N. transactions per block':'n-transactions-per-block'}
mining_information = {'Hash rate (TH/s)':'hash-rate', 'Difficulty (T)':'difficulty', 'Miners revenue (USD)':'miners-revenue', 'Transaction fees (USD)':'transaction-fees-usd'}
network_activity = {"N. unique addresses":'n-unique-addresses', 'N. transactions':'n-transactions', 'Estimated transaction volume (USD)':'estimated-transaction-volume-usd'}
additional_features = {"Rate of change (%)":"rate-of-change", "Simple moving avg. (5d)":"sma-5-days", "Simple moving avg. (7d)":"sma-7-days", "Simple moving avg. (10d)":"sma-10-days", "Simple moving avg. (20d)":"sma-20-days", "Simple moving avg. (50d)":"sma-50-days", "Simple moving avg. (100d)":"sma-100-days"}

In [None]:
# Data visualization with rangeslider
def data_visualization(dataset, key, value):
  trace = go.Scatter(
      x = dataset['timestamp'],
      y = dataset[value].astype(float),
      mode = 'lines',
      name = key
  )

  layout = dict(
      title=key,
      xaxis=dict(
          rangeselector=dict(
              buttons=list([
                  #change the count to desired amount of months.
                  dict(count=1,
                      label='1m',
                      step='month',
                      stepmode='backward'),
                  dict(count=6,
                      label='6m',
                      step='month',
                      stepmode='backward'),
                  dict(count=12,
                      label='1y',
                      step='month',
                      stepmode='backward'),
                  dict(count=36,
                      label='3y',
                      step='month',
                      stepmode='backward'),
                  dict(step='all')
              ])
          ),
          rangeslider=dict(
              visible = True
          ),
          type='date'
      )
  )

  data = [trace]
  fig = dict(data=data, layout=layout)
  iplot(fig, filename = "Data visualization with rangeslider")

In [None]:
if SLOW_OPERATION:
  for key, value in currency_statistics.items():
    data_visualization(merged_df_pd, key, value)

In [None]:
if SLOW_OPERATION:
  for key, value in block_details.items():
    data_visualization(merged_df_pd, key, value)

In [None]:
if SLOW_OPERATION:
  for key, value in mining_information.items():
    data_visualization(merged_df_pd, key, value)

In [None]:
if SLOW_OPERATION:
  for key, value in network_activity.items():
    data_visualization(merged_df_pd, key, value)

In [None]:
if SLOW_OPERATION:
  first_pair = next(iter(additional_features.items()))
  data_visualization(merged_df_pd, first_pair[0], first_pair[1])

In [None]:
# Data visualization with rangeslider
def sma_visualization(dataset, features, title):
  trace1 = go.Scatter(
      x = dataset['timestamp'],
      y = dataset["market-price"].astype(float),
      mode = 'lines',
      name = "Market price (usd)"
  )

  trace2 = go.Scatter(
      x = dataset['timestamp'],
      y = dataset[features[0][1]].astype(float),
      mode = 'lines',
      name = features[0][0]
  )

  trace3 = go.Scatter(
      x = dataset['timestamp'],
      y = dataset[features[1][1]].astype(float),
      mode = 'lines',
      name = features[1][0]
  )

  trace4 = go.Scatter(
      x = dataset['timestamp'],
      y = dataset[features[2][1]].astype(float),
      mode = 'lines',
      name = features[2][0]
  )

  layout = dict(
      title=title,
      xaxis=dict(
          rangeselector=dict(
              buttons=list([
                  #change the count to desired amount of months.
                  dict(count=1,
                      label='1m',
                      step='month',
                      stepmode='backward'),
                  dict(count=6,
                      label='6m',
                      step='month',
                      stepmode='backward'),
                  dict(count=12,
                      label='1y',
                      step='month',
                      stepmode='backward'),
                  dict(count=36,
                      label='3y',
                      step='month',
                      stepmode='backward'),
                  dict(step='all')
              ])
          ),
          rangeslider=dict(
              visible = True
          ),
          type='date'
      )
  )

  # data = [trace1, trace2, trace3, trace4, trace5, trace6, trace7]
  data = [trace1, trace2, trace3, trace4]

  fig = dict(data=data, layout=layout)
  iplot(fig, filename = title)

In [None]:
if SLOW_OPERATION:
  # Extract the first 4 key-value pairs
  short_term_sma = list(additional_features.items())[1:4]

  # Extract the last 4 key-value pairs
  long_term_sma = list(additional_features.items())[-3:]

  sma_visualization(merged_df_pd, short_term_sma, "Short term SMA (usd)")

  sma_visualization(merged_df_pd,long_term_sma, "Long term SMA (usd)")

##  Feature selection

In [None]:
new_columns = [col for col in merged_df.columns if col not in ["timestamp", "id"]]
merged_df_no_index = merged_df.select(*new_columns)

In [None]:
all_features = merged_df_no_index.columns[1:]
print(all_features)

In [None]:
# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=merged_df_no_index.columns, outputCol="features")
df_vector = assembler.transform(merged_df_no_index).select("market-price", "features")

# Calculate the correlation matrix using Pearson method
correlation_matrix = Correlation.corr(df_vector, "features", method="pearson").head()

# Get the correlation values with the "market-price" column
correlation_with_market_price = correlation_matrix[0].toArray()[0]

# Create a dictionary with feature names and their correlation values
feature_correlations = dict(zip(merged_df_no_index.columns, correlation_with_market_price))

# Sort the features based on their correlation with "market-price"
sorted_features = dict(sorted(feature_correlations.items(), key=lambda x: x[1], reverse=True))

# Print the sorted features and their correlation values
more_rel_features = []
less_rel_features = []
# Set the threshold value
threshold = 0.7
for feature, correlation in sorted_features.items():
    print(f"{feature}: {correlation}")
    if (correlation > threshold) & (feature != 'market-price'):
      more_rel_features.append(feature)
    elif (feature != 'market-price'):
      less_rel_features.append(feature)

In [None]:
# Print the selected keys
print(more_rel_features)

In [None]:
# Print the selected keys
print(less_rel_features)

## Output

In [None]:
with open(GDRIVE_ALL_FEATURES, 'w') as file:
    json.dump(all_features, file)

In [None]:
with open(GDRIVE_MORE_REL_FEATURES, 'w') as file:
    json.dump(more_rel_features, file)

In [None]:
with open(GDRIVE_LESS_REL_FEATURES, 'w') as file:
    json.dump(less_rel_features, file)