## Import Libraries

In [None]:
%pyspark
import pyspark
import pandas as pd
from pyspark.sql.functions import lit
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window
from pyspark.sql.types import (StructField,FloatType,TimestampType,StructType)
from pyspark.sql import functions as f
import matplotlib.pyplot as plt
from pyspark.sql.functions import format_number
from arch import arch_model
import numpy as np
from pyspark.sql.functions import sqrt

In [None]:
%pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
plt.rcParams['agg.path.chunksize'] = 100000

## Job Start Time

In [None]:
%pyspark
import time
start_time = time.time()

## Reading data from S3 Bucket (SPY ETF Apr'19 - Jul'20)

In [None]:
%pyspark
df = spark.read.csv("s3://aws-logs-664959780319-us-east-1/data/34lkh.csv",header=True)
df.printSchema()
df.show(10)

## Creating Log Returns in 10^3

In [None]:
%pyspark
df3 = df
df3 = df3.withColumn("Id", lit('1'))
w = Window().partitionBy().orderBy(col("Id"))
df3 = df3.select("*", lag("close").over(w).alias("close_shift"))
df3 = df3.withColumn('returns', pyspark.sql.functions.log(col('close') / col('close_shift'))*1000)
df3.show()
#df3 = df3.select(df3['date'],df3['minute'],df3['close'],format_number(df3['returns'].cast('float'),6).alias('returns'))

## Squared Returns

In [None]:
%pyspark
df3 = df3.withColumn('sqd_returns_vol',df3['returns']**2)
#df3 = df3.select(df3['date'],df3['minute'],df3['close'],df3['returns'],df3['cumulative_returns'],
                #format_number(df3['sqd_returns_vol'].cast('float'),6).alias('sqd_returns_vol'))

## Spark Dataframe to Pandas Dataframe for Computing ARCH and GARCH

In [None]:
%pyspark
df3 = df3.na.drop()
df_vol = df3
dfp = df_vol.toPandas()

## Volatility Modeling - ARCH(1)

In [None]:
%pyspark
model_arch_1 = arch_model(dfp['returns'][1:],vol = "ARCH",p=1)
results_arch_1 = model_arch_1.fit()
results_arch_1.summary()


## Volatility Modeling - GARCH (1,1)

In [None]:
%pyspark
mod_garch_1 = arch_model(dfp.returns[1:], vol = "GARCH", p = 1, q=1)
results_garch_1 = mod_garch_1.fit()
results_garch_1.summary()

# Testing GARCH(1,1) Model on Test Data (Till End)

In [None]:
%pyspark
dfp['datetime'] = pd.to_datetime(dfp.datetime,dayfirst = True)

In [None]:
%pyspark
dfp = dfp.set_index(dfp['datetime'])
dfp = dfp.drop(['datetime'],axis=1)
dfp.head(3)

In [None]:
%pyspark
dfp['returns'].head()

## Splitting the data into train and test

In [None]:
%pyspark
df_train = dfp.loc['2019-01-01 09:30:00':'2019-12-01 09:30:00']
df_test = dfp.loc['2019-12-01 09:30:00':'2019-12-31 06:31:00']


In [None]:
df_test

## Defining End Date for Volatility Forecasting

In [None]:
%pyspark
start_date = "2019-12-01 09:30:00"
end_date = "2019-12-31 06:31:00"

## GARCH(1,1) Model Fit

In [None]:
%pyspark
mod_garch_1 = arch_model(dfp.returns[1:], vol = "GARCH", p = 1,q=1)
results_arch_1 = mod_garch_1.fit(last_obs = start_date)
results_arch_1.summary()

## Forecasting 1 Period Ahead Volatility

In [None]:
%pyspark
pred_garch = results_arch_1.forecast(horizon=1)
dff = pred_garch.residual_variance[start_date:end_date]

## Standardized Residuals

In [None]:
%pyspark
dff['standardized_residuals'] = (dff['h.1'] - dff['h.1'].mean())/dff['h.1'].std()
dff

## Aggregate Predicted 1 Minute to Daily Returns (Sum Intraday Squared Returns - Realized Variance)

In [None]:
%pyspark
df_aggregated_to_daily_predicted = dff.resample('D').sum()

In [None]:
%pyspark
df_aggregated_to_daily_predicted

## Aggregate Actual 1 Minute to Daily Returns (Sum Intraday Squared Returns - Realized Variance)

In [None]:
%pyspark
df_aggregated_to_daily_actual = df_test.resample('D').sum()

## Actual vs Predicted Volatility (Daily Level)

In [None]:
%pyspark
t2['sqd_returns_vol'].plot(figsize = (15,12), color = "green")
t1['h.1'].plot(figsize = (20,5), color = "red")
plt.title("Actual Squared Returns vs Predicted Variance Residuals Aggregated to daily level", size = 15)
plt.xlabel("Date",size=15)
plt.ylabel("Variance (In range of 10^3)",size=15)

## Estimating Normalized Residuals and Actual Squared Returns

In [None]:
%pyspark
dff['standardized_residuals'] = (dff['h.1'] - dff['h.1'].mean())/dff['h.1'].std()
df_test['normalized_squarred_returns'] = (df_test['sqd_returns_vol'] - df_test['sqd_returns_vol'].mean())/df_test['sqd_returns_vol'].std()

In [None]:
%pyspark
dff

## Predicted Standardized Squared Returns

In [None]:
%pyspark
dff['standardized_residuals'].plot(figsize = (20,5), color = "red")
plt.title("Predicted standardized residuals", size = 24)
plt.show()

## Actual Standardized Squared Returns

In [None]:
%pyspark
df_test['normalized_squarred_returns'].plot(figsize = (20,5), color = "orange")
plt.title("Actual normalized squarred returns", size = 24)
plt.show()

## Actual vs Predicted Volatility (1-Minute Level)

In [None]:
%pyspark
df_test['normalized_squarred_returns'].plot(figsize = (20,5), color = "green")
dff['standardized_residuals'].plot(figsize = (20,5), color = "red")
plt.title("Actual Squared Returns vs Predicted Variance Residuals 1 Minute - Normalized", size = 12)


## End Clock - Decision Latency

In [None]:
%pyspark
print("--- %s seconds ---" % (time.time() - start_time))