we start off with importing necessary libraries:

1.pyspark to build the application

2.os for environment variables

3.sys to interact with the python interpreter

4.we import Sparkcontext which is the starting point to spark functionality

setting up environment variables:
os.environ['PYSPARK_PYTHON'/'PYSPARK_DRIVER_PYTHON']=sys.executable: This line sets the environment variable PYSPARK_PYTHON and 'PYSPARK_DRIVER_PYTHON' to the path of the Python executable currently running the script. This is done to ensure that PySpark, PySpark driver use the same python interpreter as the one running on script. 

In [None]:
import pyspark
import os
import sys
from pyspark import SparkConext
os.environ['PYSPARK_PYTHON']=sys.executable
os.environ['PYSPARK_DRIVER_PYTHON']=sys.executable

SparkSession is a point of entry to interact with SparkSQL and dataframe APIs. 
We use the imported SparkSession to initialize a SparkSession named spark with a driver memory allocation of 16 GB and application name to 'chapter_8'. 

In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.config("spark.driver.memory","16g").appName('chapter_8').getOrCreate()

### PREPARING THE DATA

we read data from 3 csv files specified in a list. the resulting dataframe contains combined data from all 3 files after their schemas are itnerpreted individually. it has a header contianing names of columns in csv as names of columns in dataframe. inferchema allows spark to autimatically infer schema used in the dataframe/csv files. 

stocks.show(2) shows first 2 rows from the dataframe

In [None]:
stocks = spark.read.csv(["data/stocksA/ABAX.csv","data/stocksA/AAME.csv","data/stocksA/AEPI.csv"], header='true', inferSchema='true')
#stocks=spark.read.format("csv").option("inferSchema","true").
# option("header","true").load('C:/Users/HP/Desktop/aas-pyspark-edition/data/
# stocksA/AAIT.csv').load('C:/Users/HP/Desktop/aas-pyspark-edition/data/stocksA/
# AAME.csv')
stocks.show(2)

in the below cell, 
1. we first create a new column in the stocks dataframe called "Symbol" that contains the paths of the 3 input files. 
2. and then we process the file path to remove the directory name and retain only the file name by splitting valuesby '/' delimiter. (ex: result would be "ABAX.csv".)
3. we further process it by splitting it again to only contain the file name without .csv (ex: result would be "ABAX)


In [None]:
from pyspark.sql import functions as fun

stocks = stocks.withColumn("Symbol", fun.input_file_name()).\
withColumn("Symbol",fun.element_at(fun.split("Symbol", "/"), -1)).\
withColumn("Symbol",fun.element_at(fun.split("Symbol", "\."), 1))

stocks.show(2)

we repeat the reading of file and then adding symbol column, this time into a different dataframe called "features"

In [None]:
factors = spark.read.csv(["data/stocksA/ABAX.csv","data/stocksA/AAME.csv","data/stocksA/AEPI.csv"], header='true', inferSchema='true')

factors = factors.withColumn("Symbol", fun.input_file_name()).\
withColumn("Symbol",
fun.element_at(fun.split("Symbol", "/"), -1)).\
withColumn("Symbol",
fun.element_at(fun.split("Symbol", "\."), 1))

we count the occurrences of each unique value in the "Symbol" column. Then filter out rows where the count of occurrences for any symbol exceeds the threshold value of 260*5 + 10.

we use window to partition the data

In [None]:
from pyspark.sql import Window

stocks = stocks.withColumn('count', fun.count('Symbol').\
over(Window.partitionBy('Symbol'))).\
filter(fun.col('count') > 260*5 + 10)

we set the time parser policy to "legacy" which is different than the default parser to make it compatible with our requirement. 

In [None]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

we add a new column 'Date' that converts sting values of dates to date objects

In [None]:
stocks = stocks.withColumn('Date',
fun.to_date(fun.to_timestamp(fun.col('Date'),
'dd-MMM-yy')))
stocks.printSchema()

we filter the values id date column to only contain values in the given range

In [None]:
from datetime import datetime
stocks = stocks.filter(fun.col('Date') >= datetime(2009, 10, 23)).\
filter(fun.col('Date') <= datetime(2014, 10, 23))


we repeat the above tow operations on factors DF

In [None]:
factors = factors.withColumn('Date',
fun.to_date(fun.to_timestamp(fun.col('Date'),
'dd-MMM-yy')))
factors = factors.filter(fun.col('Date') >= datetime(2009, 10, 23)).\
filter(fun.col('Date') <= datetime(2014, 10, 23))


we convert the spark dataframe into pandas dataframes to work on the data further using functions that are compatible with pandas DFs

In [None]:
stocks_pd_df = stocks.toPandas()
factors_pd_df = factors.toPandas()
factors_pd_df.head(5)

1. First, we calculate how much the price of a stock or factor changes over a period of 10 steps (like 10 days). This helps us understand how the value of the stock or factor changes over time.
2. and then using this function, we calculate the rolling returns of each stock symbol and store them in two dataframes. 
3. we further use these values to create sequentil indices and then sort the DF based on values in 'level_1' column

In [None]:
n_steps = 10
def my_fun(x):
    return ((x.iloc[-1] - x.iloc[0]) / x.iloc[0])

stock_returns = stocks_pd_df.groupby('Symbol').Close.\
rolling(window=n_steps).apply(my_fun)

factors_returns = factors_pd_df.groupby('Symbol').Close.\
rolling(window=n_steps).apply(my_fun)

stock_returns = stock_returns.reset_index().\
sort_values('level_1').\
reset_index()

factors_returns = factors_returns.reset_index().\
sort_values('level_1').\
reset_index()

1. we create a new DataFrame stocks_pd_df_with_returns by adding a new column called 'stock_returns' containing closing valse from the returns DF
2. and another df containing closing values and squared returns to better represent returns and to capture any nonlinear relationships in rerurns.
3. we re-arrange data to have index as date and symbol columns. we use "pivot" for a structured format of organization of data. 
4. we then rename columns of the df.we do this by concatinating the column names into a single string format separated by underscores, and then using this new string format as the column index of the DataFrame.
5. we then rest index by converting current index to a column and generating a new sequential index starting from 0

In [None]:
# Create combined stocks DF
stocks_pd_df_with_returns = stocks_pd_df.\
assign(stock_returns = \
stock_returns['Close'])
# Create combined factors DF
factors_pd_df_with_returns = factors_pd_df.\
assign(factors_returns = \
factors_returns['Close'],
factors_returns_squared = \
factors_returns['Close']**2)

factors_pd_df_with_returns = factors_pd_df_with_returns.\
pivot(index='Date',
columns='Symbol',
values=['factors_returns', \
'factors_returns_squared'])

factors_pd_df_with_returns.columns = factors_pd_df_with_returns.\
columns.\
to_series().\
str.\
join('_').\
reset_index()[0]

factors_pd_df_with_returns = factors_pd_df_with_returns.\
reset_index()

print(factors_pd_df_with_returns.head(1)) #print one row of data


print all colmns in the df

In [None]:
print(factors_pd_df_with_returns.columns) 

we now perform linear Regression using sklearn to find relationship between stock returns and factors. 

1. we first perform left join on the two returns DFs based on date
2. and then we pre-process data to exclude missing or infinite values 
3. we then use the ML concept of oridanry least square to perform linear regression on our data and return a list of each stock symbol with the coefficients for each independent variable.
4. we apply the ols function to our data and create a new DF contianing coefficients for each stock's regression model.
5. we reset endex and rename columns
6. lastly,  we create a new DF where each coefficient in the list becomes its own column. The index of the new DF remains the same as the original DF, and the columns are labeled with the stock symbol followed by the names of the feature columns. 

In [None]:
import pandas as pd
from sklearn.linear_model import LinearRegression

# For each stock, create input DF for linear regression training
stocks_factors_combined_df = pd.merge(stocks_pd_df_with_returns,
factors_pd_df_with_returns,how="left", on="Date")

feature_columns = list(stocks_factors_combined_df.columns[-6:])
with pd.option_context('mode.use_inf_as_na', True):
    stocks_factors_combined_df = stocks_factors_combined_df.\
    dropna(subset=feature_columns \
    + ['stock_returns'])
    

def find_ols_coef(df):
    y = df[['stock_returns']].values
    X = df[feature_columns]
    regr = LinearRegression()
    regr_output = regr.fit(X, y)
    return list(df[['Symbol']].values[0]) + \
    list(regr_output.coef_[0])

coefs_per_stock = stocks_factors_combined_df.\
groupby('Symbol').\
apply(find_ols_coef)

coefs_per_stock = pd.DataFrame(coefs_per_stock).reset_index()
coefs_per_stock.columns = ['symbol', 'factor_coef_list']

coefs_per_stock = pd.DataFrame(coefs_per_stock.\
factor_coef_list.tolist(),
index=coefs_per_stock.index,
columns = ['Symbol'] + feature_columns)

coefs_per_stock

we thn plot a kernel density plot for the close prices of specific factors. This helps us udnerstand the distribution of prices for a specific factor. We can sue this distribution to look for anomalies or patterns

In [None]:
samples = factors_returns.loc[factors_returns.Symbol == \
factors_returns.Symbol.unique()[0]]['Close']
samples.plot.kde()

we create 3 series containing closing prices of 3 different symbols.
we then print the size of each series to check if they are equal and then create a DF using them.
.corr() calculates the correlation (measure of linear relationship) berween pairs of the variables. 
 

In [None]:
f_1 = factors_returns.loc[factors_returns.Symbol == \
factors_returns.Symbol.unique()[0]]['Close']

f_2 = factors_returns.loc[factors_returns.Symbol == \
factors_returns.Symbol.unique()[1]]['Close']

f_3 = factors_returns.loc[factors_returns.Symbol == \
factors_returns.Symbol.unique()[2]]['Close']

print(f_1.size,len(f_2),f_3.size)

pd.DataFrame({'f1': list(f_1)[1:1040], 'f2': list(f_2)[1:1040], 'f3': 
list(f_3)}).corr()

1. we computer covariance matrix (measure of how much two variables change together) and convert it to numpy array.
2. and then we calculate the mean of closing prices for each symbol. 

these are done to obtain a mathematical insight into our results. 

In [None]:
factors_returns_cov = pd.DataFrame({'f1': list(f_1)[1:1040],
'f2': list(f_2)[1:1040],
'f3': list(f_3)})\
.cov().to_numpy()

factors_returns_mean = pd.DataFrame({'f1': list(f_1)[1:1040],
'f2': list(f_2)[1:1040],
'f3': list(f_3)}).\
mean()

we create a random sample of a normal distribution (of multivariate family) using the mean and covariance obtained above.  It produces a random vector that simulates the joint behavior of the factors based on their mean and covariance.
we basically generate new/sample data in this step. we choose normal distribution based on KDE output.

In [None]:
from numpy.random import multivariate_normal
multivariate_normal(factors_returns_mean, factors_returns_cov)

we then create braodcast variables to be shared across all nodes in the cluster. we broadcast coefs_per_stock,feature_columns,factors_returns_mean and factors_returns_cov to ALL nodes in our spark cluster. 

In [None]:
b_coefs_per_stock = spark.sparkContext.broadcast(coefs_per_stock)
b_feature_columns = spark.sparkContext.broadcast(feature_columns)
b_factors_returns_mean = spark.sparkContext.broadcast(factors_returns_mean)
b_factors_returns_cov = spark.sparkContext.broadcast(factors_returns_cov)


we then generate random numbers across the cluster. this is done in parallel and each partition with have a unique seed for generating numbers. (so each partition will have the same generated numbers even on multiple runs)

In [None]:
from pyspark.sql.types import IntegerType
parallelism = 1000
num_trials = 1000000
base_seed = 1496
seeds = [b for b in range(base_seed,
base_seed + parallelism)]
seedsDF = spark.createDataFrame(seeds, IntegerType())
seedsDF = seedsDF.repartition(parallelism)

the below code tests the outcomes of each stock under different circumstances using the coefficients from linear regression and the sample data generated parallely. The goal is to simulate the performance of each stock under various market conditions represented by the randomly generated factors. This helps us estimate how each stock's returns vary under various scenarios. 

In [None]:
import random
from numpy.random import seed
from pyspark.sql.types import LongType, ArrayType, DoubleType
from pyspark.sql.functions import udf
def calculate_trial_return(x):
    # return x
    trial_return_list = []
    for i in range(int(num_trials/parallelism)):
        random_int = random.randint(0, num_trials*num_trials)
        seed(x)
        random_factors = multivariate_normal(b_factors_returns_mean.value,
        b_factors_returns_cov.value)
        coefs_per_stock_df = b_coefs_per_stock.value
        returns_per_stock = (coefs_per_stock_df[b_feature_columns.value] *
        (list(random_factors) + list(random_factors**2)))
    trial_return_list.append(float(returns_per_stock.sum(axis=1).sum()/
    b_coefs_per_stock.value.size))
    return trial_return_list
udf_return = udf(calculate_trial_return, ArrayType(DoubleType()))

1. we calculate trial returns for each seed vale using UDF
2. and then we flatten the trial_return column using "explode". this results in multiple rows for each seen value, each row representing a single trail return. 
3. finally we store the resultant DF using cache() for further analysis and faster access. 

In [None]:
from pyspark.sql.functions import col, explode
trials = seedsDF.withColumn("trial_return", udf_return(col("value")))
trials = trials.select('value', explode('trial_return').alias('trial_return'))
trials.cache()

we compute the return value of the bottom quartile (botton 5%) of all the simulated trials. This represents the worst 5% of all possible scenarios. this gives us an idea about the potental risks that come with our investments.

In [None]:
trials.approxQuantile('trial_return', [0.05], 0.0)

in the below cell, we compute average trial returns in the bottom 5% to get an idea about the average losses/risks associated with the investment strategy. 

In [None]:
trials.orderBy(col('trial_return').asc()).\
limit(int(trials.count()/20)).\
agg(fun.avg(col("trial_return"))).show()

we then visualize the reuslts obtained from above computation to get a visual idea about the extracted information.

In [None]:
import pandas
mytrials=trials.toPandas()
mytrials.plot.line()