#Forecasting Realised Volatility with ML

In [0]:
# Import functions and libraries
from pyspark.sql.window import Window
from pyspark.sql import functions as sf

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("QuantVolatility").getOrCreate()
df = spark.read.option("header", True).option("inferSchema", True).csv("../data/nifty100_combined_data.csv")

In [0]:
# Import the table
#df = spark.table("workspace.default.nifty_100_combined_data")
#display(df)
df.show()

##Feature Engineering

In [0]:
#Define windows
window = Window.partitionBy("ticker").orderBy("date")
rolling_1h_back_window = window.rowsBetween(-11, 0)
rolling_1h_forward_window = window.rowsBetween(1, 12)


In [0]:
#TODO: Split dataframe modification
# Compute 1 tick lagged close
df = df. \
        withColumn("lag_close", sf.lag("close").over(window)
)

# Compute the log return on positive closes
df = df. \
        withColumn(
            "log_return", 
            sf.when(
                sf.col("lag_close").isNotNull() & (sf.col("lag_close") != 0),
                sf.log(sf.col("close") / sf.col("lag_close"))
            ).otherwise(sf.lit(None))
)

# Compute the abs return 
df = df. \
        withColumn("abs_return", sf.abs(sf.col("log_return"))
)

# Compute high low diff
df = df. \
        withColumn("hl_range", sf.col("high") - sf.col("low")
)

# Compute 1 tick lagged volume
df = df. \
        withColumn("lag_volume", sf.lag("volume").over(window)
)

# Compute the log vol change
df = df. \
        withColumn(
            "log_volume_change", 
            sf.when(
                sf.col("lag_volume").isNotNull() & (sf.col("lag_volume") != 0),
                sf.log(sf.col("volume") / sf.col("lag_volume"))
            ).otherwise(sf.lit(None))
)

#Extract the hour from timestamp for ease
df = df. \
        withColumn(
            "hour", 
            sf.hour(sf.col("date"))
)

# Compute 1 hour lagged realised vol
df = df. \
        withColumn("window_count", sf.count("log_return").over(rolling_1h_back_window))


df = df. \
        withColumn("realised_rolling_vol_backward", 
                                sf.when(
                                    sf.col("window_count") == 12, 
                                    sf.sqrt(sf.sum(sf.pow(sf.col("log_return"), 2)).over(rolling_1h_back_window))
                                )
                                .otherwise(sf.lit(None))
                    )

df = df. \
        drop("window_count")

# Compute 1 hour forward realised vol
df = df. \
        withColumn("window_count", sf.count("log_return").over(rolling_1h_forward_window))

df = df. \
        withColumn("realised_rolling_vol_forward", 
                                sf.when(
                                    sf.col("window_count") == 12, 
                                    sf.sqrt(sf.sum(sf.pow(sf.col("log_return"), 2)).over(rolling_1h_forward_window))
                                )
                                .otherwise(sf.lit(None))
                    )

df = df. \
        drop("window_count")

In [0]:
#display(df)
df.show()

##Data Cleaning

In [0]:
# Remove the null values from the dataset (part of cleaning)
df = df.dropna()

In [0]:
#display(df)
df.show(2)

##Save Data as Spark Table

In [0]:
# N.B/ for improved perforamnce:
#df.write.partitionBy("ticker").mode("overwrite").saveAsTable("cleaned_volatility_data")
#df.write.mode("overwrite").saveAsTable("cleaned_volatility_data")

df.write.partitionBy("ticker").mode("overwrite").parquet("../data/cleaned_volatility_data.par")

clean_df = spark.read.parquet("../data/cleaned_volatility_data.par").persist()

In [0]:
clean_df.show(2)

##Random Forest Regressor Model

In [0]:
# Import SparkML tools
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from functools import reduce

In [0]:
df.printSchema()

In [0]:
# Encode the ticker column for usage in Model
indexer = StringIndexer(
            inputCol="ticker",
            outputCol="ticker_index"
)

clean_df = indexer.fit(clean_df).transform(clean_df)

In [0]:
# Compact features and target into smaller df
features = [
    "abs_return",
    "hl_range",
    "log_volume_change",
    "hour",
    "realised_rolling_vol_backward",
    "ticker_index"
]
    
assembler = VectorAssembler(
                inputCols=features,
                outputCol="features"
)

model_df = assembler.transform(clean_df).select("features", "realised_rolling_vol_forward", "ticker").dropna()
model_df.show()

In [0]:
# Split into train and test data
#TODO: should data be split in order

tickers = [row["ticker"] for row in clean_df.select("ticker").distinct().collect()] #only use collect if small no. of records (all appear in driver)

train_parts = []
test_parts = []

# filter each ticker and split its rows randomly into train and test
for t in tickers:
    df_t = model_df.filter(sf.col("ticker") == t)
    train_t, test_t = df_t.randomSplit([0.8,0.2], seed=69)
    train_parts.append(train_t)
    test_parts.append(test_t)

# now combine the different parts into a unified df
train_df = reduce(lambda a, b: a.union(b), train_parts)
test_df = reduce(lambda a, b: a.union(b), test_parts)

#now drop ticker from both
train_df.drop(sf.col("ticker"))
test_df.drop(sf.col("ticker"))

train_df.show()
test_df.show()

In [0]:
# Setup and train the RF model
rf = RandomForestRegressor(
        featuresCol="features",
        labelCol="realised_rolling_vol_forward",
        numTrees=100,
        maxDepth=10,
        maxBins=99,
        seed=69
)

rf_model = rf.fit(train_df)

In [0]:
predictions = rf_model.transform(test_df)
predictions.show()

In [0]:
evaluator = RegressionEvaluator(
                labelCol="realised_rolling_vol_forward",
                predictionCol="prediction",
                metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")

In [0]:
evaluator_r2 = RegressionEvaluator(
                labelCol="realised_rolling_vol_forward",
                predictionCol="prediction",
                metricName="r2"
)

r2 = evaluator_r2.evaluate(predictions)
print(f"R2: {r2:.4f}")