In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

#start Spark
spark = SparkSession.builder \
    .appName("BTC_Prediction") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")#remove logs

R2: 0.6362680773186531
RMSE: 79.3166343387894
MAE: 65.43346401334597


In [None]:
#load silver data
silver_path = "../data/silver/silver_dataset"
df = spark.read.parquet(silver_path)
df.show(5)

In [None]:
#window spec
window_spec = Window.orderBy("open_time")

#create T+10 target
df = df.withColumn("close_t_plus_10", F.lead("close", 10).over(window_spec))
df = df.na.drop(subset=["close_t_plus_10"])  # drop rows where target is null


In [None]:
#feature engineering
df = df.withColumn("return_1m", (F.col("close") - F.lag("close",1).over(window_spec)) / F.lag("close",1).over(window_spec))
df = df.withColumn("MA_5", F.avg("close").over(window_spec.rowsBetween(-4,0)))   # 5-min MA
df = df.withColumn("MA_10", F.avg("close").over(window_spec.rowsBetween(-9,0)))  # 10-min MA
df = df.withColumn("taker_ratio", F.col("taker_buy_base_volume") / F.col("volume"))

In [None]:
#drop nan in numeric features
numeric_cols = [c for c, t in df.dtypes if t in ["int", "bigint", "double", "float"] and c != "close_t_plus_10"]
df = df.na.drop(subset=numeric_cols + ["close_t_plus_10"])

In [None]:
#assemble features
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
df_ml = assembler.transform(df).select("features", "close_t_plus_10")  # only keep features + target

In [None]:
#split data
train_data, test_data = df_ml.randomSplit([0.8, 0.2], seed=42)

In [None]:
#train linear regression
lr = LinearRegression(featuresCol="features", labelCol="close_t_plus_10")
model = lr.fit(train_data)
predictions = model.transform(test_data)

In [None]:
#evaluate model
rmse = RegressionEvaluator(labelCol="close_t_plus_10", predictionCol="prediction", metricName="rmse")
mae = RegressionEvaluator(labelCol="close_t_plus_10", predictionCol="prediction", metricName="mae")
r2  = RegressionEvaluator(labelCol="close_t_plus_10", predictionCol="prediction", metricName="r2")

rmse = rmse.evaluate(predictions)
mae = mae.evaluate(predictions)
r2  = r2.evaluate(predictions)

In [None]:
#printing results
print(f"r2: {r2}")
print(f"root mean square error (RMSE): {rmse}")
print(f"mean absolute error (MAE): {mae}")