In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_unixtime, col, to_timestamp
from pyspark.ml.feature import VectorAssembler, StandardScaler

In [2]:
import pandas as pd

In [3]:
spark = SparkSession.builder.appName("COMP4107").master("local[*]").getOrCreate()

In [4]:
df = pd.read_csv('data/btcusd_1-min_data.csv')
df.head()

Unnamed: 0,Timestamp,Open,High,Low,Close,Volume
0,1325412000.0,4.58,4.58,4.58,4.58,0.0
1,1325412000.0,4.58,4.58,4.58,4.58,0.0
2,1325412000.0,4.58,4.58,4.58,4.58,0.0
3,1325412000.0,4.58,4.58,4.58,4.58,0.0
4,1325412000.0,4.58,4.58,4.58,4.58,0.0


- `VectorAssembler` merges feature into one vector: `[open, high, low, close, volume]`
    - LSTMs expect input to be multi-dimensional as a tensor with shape: $$ \text{(batch\_size, sequence\_length, feature\_dimension)} $$
    - `feature_dimension` corresponds to the number of features, 5 in this case
- model will learn different weights for each dimension, allowing it to understand the relationship between features

In [5]:
def load_and_preprocess_data(filepath: str):
    # loading data into pyspark dataframe
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    # converting the UNIX timestamp to proper datetime format
    df = df.withColumn("datetime", to_timestamp(from_unixtime(col("Timestamp"))))
    df.drop("Timestamp")
    # sorting to maintain time order
    df = df.orderBy("datetime")
    # handling missing values
    df = df.na.fill(0)
    df = df.filter(col("datetime").isNotNull())
    
    # feature assembly and scaling
        # combining cols into one feature vector, needed for spark ML
    feature_cols = ["Open", "High", "Low", "Close", "Volume"]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    df = assembler.transform(df)
        # scaling features for zero mean and unit variance
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withMean=True, withStd=True)
    # analyzes `features` in `df` and computes the std deviation and mean for each feature
    scalerModel = scaler.fit(df)
    # applying the scaling transformation to the data using the statistics from the last line
    df = scalerModel.transform(df)
    
    preprocessed_df = df.select("datetime", "scaledFeatures")
    return preprocessed_df
    
    

In [6]:
df = load_and_preprocess_data('data/btcusd_1-min_data.csv')

In [7]:
df.show()

+-------------------+--------------------+
|           datetime|      scaledFeatures|
+-------------------+--------------------+
|2012-01-01 05:01:00|[-0.7298777848017...|
|2012-01-01 05:02:00|[-0.7298777848017...|
|2012-01-01 05:03:00|[-0.7298777848017...|
|2012-01-01 05:04:00|[-0.7298777848017...|
|2012-01-01 05:05:00|[-0.7298777848017...|
|2012-01-01 05:06:00|[-0.7298777848017...|
|2012-01-01 05:07:00|[-0.7298777848017...|
|2012-01-01 05:08:00|[-0.7298777848017...|
|2012-01-01 05:09:00|[-0.7298777848017...|
|2012-01-01 05:10:00|[-0.7298777848017...|
|2012-01-01 05:11:00|[-0.7298777848017...|
|2012-01-01 05:12:00|[-0.7298777848017...|
|2012-01-01 05:13:00|[-0.7298777848017...|
|2012-01-01 05:14:00|[-0.7298777848017...|
|2012-01-01 05:15:00|[-0.7298777848017...|
|2012-01-01 05:16:00|[-0.7298777848017...|
|2012-01-01 05:17:00|[-0.7298777848017...|
|2012-01-01 05:18:00|[-0.7298777848017...|
|2012-01-01 05:19:00|[-0.7298777848017...|
|2012-01-01 05:20:00|[-0.7298777848017...|
+----------