<h2>Monte Carlo</h2>

<ul>
    <li>This code snippet sets up a PySpark environment in a Python script.</li><li> It first imports the necessary modules like pyspark, os, and sys.</li><li> Then, it sets the Python executable for PySpark to the same one being used by the script.</li><li> Finally, it imports the SparkContext class for creating RDDs and the SparkSession class for programming Spark with the DataFrame API.</li>
    </ul>

In [None]:
import pyspark
import os
import sys
from pyspark import SparkContext
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
from pyspark.sql import SparkSession

<ul><li>This code creates a SparkSession named spark with specific configuration options. It sets the driver memory to 16 GB and names the application 'chapter_8.</li><ul>

In [None]:
spark = SparkSession.builder.config("spark.driver.memory", "16g").appName('chapter_8').getOrCreate()

<h3>Preparing the Data</h3>

<ul><li>This code reads three CSV files (ABAX.csv, AAME.csv, and AEPI.csv) located in the data/stocksA/ directory into a Spark DataFrame named stocks.</li><li> The header='true' argument indicates that the first row of each CSV file contains the column names, and inferSchema='true' tells Spark to infer the data types of each column.</li></ul>

In [None]:
stocks = spark.read.csv(["data/stocksA/ABAX.csv","data/stocksA/AAME.csv","data/stocksA/AEPI.csv"], header='true', inferSchema='true')
stocks.show(2)

<ul><li>This code snippet adds a new column "Symbol" to the DataFrame stocks.</li><li> It extracts the file name from the full file path and assigns it to the "Symbol" column.</li></ul>

In [None]:
from pyspark.sql import functions as fun
stocks = stocks.withColumn("Symbol", fun.input_file_name()).\withColumn("Symbol",fun.element_at(fun.split("Symbol", "/"),-1)).\withColumn("Symbol",fun.element_at(fun.split("Symbol", "\."), 1))
stocks.show(2)

<b>This code snippet reads multiple CSV files into a DataFrame factors and adds a new column "Symbol" to it.</b><ul><li> The first withColumn call adds the full file path of each row as a new column called "Symbol".</li><li> The second withColumn call extracts the file name from the full path by splitting the path using "/" as the delimiter and taking the last element.</li><li> Finally, the third withColumn call further refines the "Symbol" column by splitting the file name using "." as the delimiter and taking the first element.</li>


In [None]:
factors = spark.read.csv(["data/stocksA/ABAX.csv","data/stocksA/AAME.csv","data/stocksA/AEPI.csv"], header='true', inferSchema='true')
factors = factors.withColumn("Symbol", fun.input_file_name()).withColumn("Symbol",fun.element_at(fun.split("Symbol", "/"),-1)). withColumn("Symbol",fun.element_at(fun.split("Symbol", "\."), 1))

<ul><li>This code snippet adds a new column "count" to the stocks DataFrame using the Window function to count the occurrences of each symbol.</li><li> It then filters the DataFrame to keep only rows where the count is greater than 260 multiplied by 5 plus 10.</li></ul>

In [None]:
from pyspark.sql import Window
stocks = stocks.withColumn('count', fun.count('Symbol').over(Window.partitionBy('Symbol'))).filter(fun.col('count') > 260*5 + 10)

<ul><li>This code sets a Spark SQL configuration spark.sql.legacy.timeParserPolicy to LEGACY, which specifies the behavior of the time parser for legacy datetime functions in Spark SQL. </li><li>This configuration affects how timestamps and dates are parsed in SQL queries.</li>

In [None]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

<ul><li>This code converts the 'Date' column in the stocks DataFrame to a standard date format. It first converts the 'Date' column to a timestamp using the 'dd-MMM-yy' format, and then converts this timestamp to a date format. </li></ul>

In [None]:
stocks = stocks.withColumn('Date',fun.to_date(fun.to_timestamp(fun.col('Date'),'dd-MMM-yy')))
stocks.printSchema()

<ul><li>This code filters the stocks DataFrame to include only rows where the 'Date' column falls within the range from October 23, 2009, to October 23, 2014.</li></ul>

In [None]:
from datetime import datetime
stocks = stocks.filter(fun.col('Date') >= datetime(2009, 10, 23)).\filter(fun.col('Date') <= datetime(2014, 10, 23))

<ul><li>This code converts the 'Date' column in the factors DataFrame to a date format and then filters the DataFrame to include only rows where the 'Date' falls within the range from October 23, 2009, to October 23, 2014, similar to what was done with the stocks DataFrame.</li></ul>

In [None]:
factors = factors.withColumn('Date',fun.to_date(fun.to_timestamp(fun.col('Date'),'dd-MMM-yy')))
factors = factors.filter(fun.col('Date') >= datetime(2009, 10, 23)).\filter(fun.col('Date') <= datetime(2014, 10, 23))

<ul><li>This code converts the Spark DataFrames stocks and factors into Pandas DataFrames stocks_pd_df and factors_pd_df, respectively, and then displays the first 5 rows of the factors_pd_df DataFrame.</li></ul>

In [None]:
stocks_pd_df = stocks.toPandas()
factors_pd_df = factors.toPandas()
factors_pd_df.head(5)

<h3>Determining Factor Weights</h3>

<ul><li>This code calculates rolling returns for both stock and factor dataframes using a custom function my_fun, which calculates the returns based on the close price. It then resets the index and sorts the dataframes by the rolling window index level_1.</li></ul>

In [None]:
n_steps = 10
def my_fun(x):
    return ((x.iloc[-1]- x.iloc[0]) / x.iloc[0])
stock_returns = stocks_pd_df.groupby('Symbol').Close.\
 rolling(window=n_steps).apply(my_fun)
factors_returns = factors_pd_df.groupby('Symbol').Close.\
 rolling(window=n_steps).apply(my_fun)
stock_returns = stock_returns.reset_index().\
 sort_values('level_1').\
 reset_index()
factors_returns = factors_returns.reset_index().\
 sort_values('level_1').\
 reset_index()

<b>This code combines the stock and factor dataframes with their respective rolling returns.</b><ul><li> It assigns the rolling returns as new columns in the dataframes. For the factors dataframe, it also calculates the squared rolling returns. </li><li>The dataframes are then pivoted to have symbols as columns and dates as rows. Finally, the columns are renamed to have a clear structure.</li></ul>

In [None]:
# Create combined stocks DF
stocks_pd_df_with_returns = stocks_pd_df.assign(stock_returns = stock_returns['Close'])
# Create combined factors DF
factors_pd_df_with_returns = factors_pd_df.assign(factors_returns = factors_returns['Close'],factors_returns_squared = factors_returns['Close']**2)
factors_pd_df_with_returns = factors_pd_df_with_returns.pivot(index='Date',columns='Symbol',values=['factors_returns', 'factors_returns_squared'])
factors_pd_df_with_returns.columns = factors_pd_df_with_returns.columns.to_series().str.join('_').reset_index()[0]
factors_pd_df_with_returns = factors_pd_df_with_returns.reset_index()

print(factors_pd_df_with_returns.head(1))

<ul><li>The code prints the columns of the factors_pd_df_with_returns DataFrame.</li></ul>

In [None]:
print(factors_pd_df_with_returns.columns)

<b>The code imports necessary libraries and defines a function to perform linear regression for each stock based on the provided features.</b><ul><li> It then merges the stock and factor DataFrames, drops any rows with missing values in the feature columns or 'stock_returns', and calculates the coefficients for each stock using linear regression.</li><li> The final output is a DataFrame (coefs_per_stock) containing the symbol and coefficients for each feature column.</li></ul>

In [None]:
import pandas as pd
from sklearn.linear_model import LinearRegression
 # For each stock, create input DF for linear regression training
stocks_factors_combined_df = pd.merge(stocks_pd_df_with_returns,
 factors_pd_df_with_returns,how="left", on="Date")
feature_columns = list(stocks_factors_combined_df.columns[-6:])
with pd.option_context('mode.use_inf_as_na', True):
    stocks_factors_combined_df = stocks_factors_combined_df.dropna(subset=feature_columns  + ['stock_returns'])
def find_ols_coef(df):
    y = df[['stock_returns']].values
    X = df[feature_columns]
    regr = LinearRegression()
    regr_output = regr.fit(X, y)
    return list(df[['Symbol']].values[0]) + list(regr_output.coef_[0])

coefs_per_stock = stocks_factors_combined_df.groupby('Symbol').apply(find_ols_coef)
coefs_per_stock = pd.DataFrame(coefs_per_stock).reset_index()
coefs_per_stock.columns = ['symbol', 'factor_coef_list']
coefs_per_stock = pd.DataFrame(coefs_per_stock.factor_coef_list.tolist(),index=coefs_per_stock.index,columns = ['Symbol'] + feature_columns)
coefs_per_stock

<h2>Sampling</h2>

<ul><li>This code snippet selects the 'Close' values from the 'factors_returns' DataFrame for a specific symbol (the first symbol in the DataFrame) and plots the kernel density estimate (KDE) of these values.</li><li> The KDE plot gives an estimate of the probability density function of the variable.</li></ul>

In [None]:
samples = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[0]]['Close']
samples.plot.kde()

<b>This code calculates the correlation matrix for the 'Close' values of three different symbols ('f_1', 'f_2', and 'f_3') in the 'factors_returns' DataFrame.</b><ul><li> It selects the 'Close' values for each symbol and creates a DataFrame with these values.</li><li> The correlation matrix shows how each pair of factors is correlated with each other.</li></ul>

In [None]:
f_1 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[0]]['Close']
f_2 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[1]]['Close']
f_3 = factors_returns.loc[factors_returns.Symbol == factors_returns.Symbol.unique()[2]]['Close']

print(f_1.size,len(f_2),f_3.size)
pd.DataFrame({'f1': list(f_1)[1:1040], 'f2': list(f_2)[1:1040], 'f3': list(f_3)}).corr()

In [None]:
factors_returns_cov = pd.DataFrame({'f1': list(f_1)[1:1040],
 'f2': list(f_2)[1:1040],
 'f3': list(f_3)})\
 .cov().to_numpy()
factors_returns_mean = pd.DataFrame({'f1': list(f_1)[1:1040],
 'f2': list(f_2)[1:1040],
 'f3': list(f_3)}).\
 mean()

<ul><li>This line of code generates a random sample from a multivariate normal distribution with the mean vector factors_returns_mean and covariance matrix factors_returns_cov.</li><li> The result is a set of values that simulate a random observation of the factors.</li></ul>

In [None]:
from numpy.random import multivariate_normal
multivariate_normal(factors_returns_mean, factors_returns_cov)

<h3>Running the Trails</h3>

In [None]:
b_coefs_per_stock = spark.sparkContext.broadcast(coefs_per_stock)
b_feature_columns = spark.sparkContext.broadcast(feature_columns)
b_factors_returns_mean = spark.sparkContext.broadcast(factors_returns_mean)
b_factors_returns_cov = spark.sparkContext.broadcast(factors_returns_cov)

<ul><li>The code creates a list of seeds for random number generation (seeds) and converts it into a Spark DataFrame (seedsDF) with an IntegerType schema.</li><li> It then repartitions the DataFrame into parallelism partitions. </li><li>This partitioning can help distribute the seed values evenly across the Spark cluster, potentially improving parallelism and performance for tasks that use these seeds.</li></ul>

In [None]:
from pyspark.sql.types import IntegerType
parallelism = 1000
num_trials = 1000000
base_seed = 1496
seeds = [b for b in range(base_seed,base_seed + parallelism)]
seedsDF = spark.createDataFrame(seeds, IntegerType())
seedsDF = seedsDF.repartition(parallelism)

<b>This code defines a function calculate_trial_return that is intended to calculate trial returns based on some random factors and coefficients per stock.</b>
<ul><li> It then iterates through a specified number of trials, each time setting a random seed and generating random factors.</li><li> It calculates the returns for each stock based on these factors and coefficients, and appends the total return for all stocks to a list.</li></ul>

In [None]:
import random
from numpy.random import seed
from pyspark.sql.types import LongType, ArrayType, DoubleType
from pyspark.sql.functions import udf
def calculate_trial_return(x):
    trial_return_list = []
    for i in range(int(num_trials/parallelism)):
        random_int = random.randint(0, num_trials*num_trials)
        seed(x)
        random_factors = multivariate_normal(b_factors_returns_mean.value,b_factors_returns_cov.value)

        coefs_per_stock_df = b_coefs_per_stock.value
        returns_per_stock = (coefs_per_stock_df[b_feature_columns.value] *(list(random_factors) + list(random_factors**2)))
        trial_return_list.append(float(returns_per_stock.sum(axis=1).sum()/b_coefs_per_stock.value.size))
    return trial_return_list

udf_return = udf(calculate_trial_return, ArrayType(DoubleType()))

<b>This code generates random trials to simulate stock returns. </b><ul><li>It creates a DataFrame seedsDF with random seeds, then uses these seeds to generate random factors and calculate returns for each stock.</li><li> The calculate_trial_return function generates random factors, multiplies them by the coefficients per stock, and calculates the sum of returns for each trial.</li><li> This is done for multiple trials, and the results are stored in the trials DataFrame.</li></ul>

In [None]:
from pyspark.sql.functions import col, explode
trials = seedsDF.withColumn("trial_return", udf_return(col("value")))
trials = trials.select('value', explode('trial_return').alias('trial_return'))
trials.cache()

<h3>Takes Some Time</h3>

<b>This code finds the 5th percentile of the 'trial_return' values in the trials DataFrame.</b>

In [None]:
trials.approxQuantile('trial_return', [0.05], 0.0)

<b>This code calculates the average of the 5% lowest trial_return values in the trials DataFrame.</b>

In [None]:
trials.orderBy(col('trial_return').asc()).limit(int(trials.count()/20)).agg(fun.avg(col("trial_return"))).show()

<h3>Visualizing the Distribution of Returns</h3>

<b>
This code converts the Spark DataFrame trials to a Pandas DataFrame mytrials and then plots a line graph of the data.</b>

In [None]:
import pandas
mytrials=trials.toPandas()
mytrials.plot.line()