In [1]:
#Imports
import string
import time
import csv
import requests
import gzip

#Python and Pandas Imports
import os, sys
import pandas as pd
import numpy as np
from random import randint, seed, Random

#PYspark imports
#from pyspark.sql.types import IntegerType
#from pyspark.sql.types import DoubleType
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import Window
#from pyspark.sql.functions import to_timestamp
from pyspark import SparkContext, SQLContext
from functools import reduce
sqlContext = SQLContext(sc)

In [2]:
#Print do Spark Context and Version
print(sc)
print(sc.version)

In [3]:
# File location and type
file_location = "/FileStore/tables/dataset_11_row.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

Date,Open,High,Low,Close,Adj_Close,Volume,Simbol
03-01-00,48.03125,48.25,47.03125,47.1875,26.088079,2173400,MMM
04-01-00,46.4375,47.40625,45.3125,45.3125,25.051456,2713800,MMM
05-01-00,45.5625,48.125,45.5625,46.625,25.77709,3699400,MMM
06-01-00,47.15625,51.25,47.15625,50.375,27.850321,5975800,MMM
07-01-00,50.5625,51.90625,49.96875,51.375,28.403187,4101200,MMM
10-01-00,50.21875,51.75,50.0,51.125,28.264967,3863800,MMM
11-01-00,50.375,51.25,50.25,50.25,27.781218,2357600,MMM
12-01-00,50.96875,51.8125,50.375,50.375,27.850321,2868400,MMM
13-01-00,50.65625,50.9375,50.1875,50.375,27.850321,2244400,MMM
14-01-00,50.375,50.46875,49.46875,49.65625,27.452953,2541800,MMM


In [4]:
#Temp table

#temp_table_name = "dataset_11_csv"
#df.createOrReplaceTempView(temp_table_name)

#OR

#Permanent table saved to be used upon cluster restarts
permanent_table_name = "dataset_11_row_csv_parquet"

#Convert to parquet file system - More eficient than csv
#df.write.format("parquet").saveAsTable(permanent_table_name)

In [5]:
#Show top 5 rows
df.show(5)

In [6]:
#Sort by date
#df_new=df.withColumn("Or_Date", f.to_date("Date", "dd-MM-yy").alias("Date")).orderBy("Or_Date", ascending=True)
#df_new.show()

#or

df_date=df.withColumn("Date", to_date("Date", "dd-MM-yy")).orderBy("Date", ascending=True)
df_date.show(11)

In [7]:
#New dataframe (Chached) - df_date
df_date.registerTempTable("df_date")
df_date.cache()

#Print Schema
df_date.printSchema()

In [8]:
# we have to change these columns from string to int/Double for modeling

df_date = df_date.withColumn("Open", df_date["Open"].cast(DoubleType()))
df_date = df_date.withColumn("High", df_date["High"].cast(DoubleType()))
df_date = df_date.withColumn("Low", df_date["Low"].cast(DoubleType()))
df_date = df_date.withColumn("Close", df_date["Close"].cast(DoubleType()))
df_date = df_date.withColumn("Adj_Close", df_date["Adj_Close"].cast(DoubleType()))
df_date = df_date.withColumn("Volume", df_date["Volume"].cast(IntegerType()))


In [9]:
#New schema after changing data type for each column
df_date.printSchema()

In [10]:
#Calculate the number of missing values for each column

df_date.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_date.columns[1:-1]]).show()

In [11]:
#Check the day when Volume is Null
df_date.where(reduce(lambda x, y: x | y, (col(x).isNull() for x in df.columns))).show()

In [12]:
#Code tests
df_date.select("*")

In [13]:
#Dataframe size before remove nans
print((df_date.count(), len(df_date.columns)))

In [14]:
#Remove Nan from dataframe - Creation of df_3
df_3 = df_date.filter(df_date.Volume.isNotNull())
df_3.show()

In [15]:
#Size of datafrme after removing Nans - 1 less row (56572 rows)
print((df_3.count(), len(df_3.columns)))

In [16]:
#Calculation of log returns in new col

df_4=df_3.select("Simbol", "Date", "Close").withColumn("Log Return", log(col("Close") / lag("Close", 1).over(Window.partitionBy("Simbol").orderBy(df_3["Date"])) ) * 100)
df_4.show(5)

In [17]:
#Return (%) - Mean and Std Dev -  for df_5
df_5=df_4.groupBy("Simbol").agg(avg("Log Return").alias("Mean Return"), sqrt(variance("Log Return")).alias("StdDev"))
df_5.show(11)

In [18]:
#Save the symbols returns distribuition in memory for use in our simulation - It would be the same procedure if there were 5000 symbols
map_sym_results = df_5.rdd.map(lambda r: (r[0], (r[1], r[2]))).collectAsMap()
#Example: Show mean and average for AAPL
map_sym_results['AAPL']

In [19]:
#Dataframe with last price foor each symbol and last trading day
Df_Price = df_3.orderBy("Date", ascending=False).groupBy("Simbol").agg(first("Close").alias("Last Price"), first("Date").alias("Date"))
Df_Price.show(11)

#Dataframe 'Last_Prices' -> in memory for later use in our simulation
Last_Prices = Df_Price.rdd.map(lambda r: (r[0], r[1])).collectAsMap()
Last_Prices['AAPL']

In [20]:
Last_Prices

In [21]:
#Function to create a random portfolio
def random_portfolio(symbols):
    result = {}
    for s in symbols:
        result[s] = Last_Prices[s]
    return result

#Function to calculate the portfolio value
def portfolio_value(pf):
  item2=0
  for item in pf.values():
    item2=item+item2
  return item2


In [22]:
#df_3.select("Simbol").distinct().sample(True, 0.5, sd).rdd.map(lambda r: r[0]).collect()

In [23]:
#Random Portfolio of stocks on last trading day
sd=123 #Seed number
prct_stocks=0.4 #% of stocks to randomly chosen from total portfolio

portfolio = random_portfolio(df_3.select("Simbol").distinct().sample(True, prct_stocks, sd).rdd.map(lambda r: r[0]).collect())
portfolio

In [24]:
#Value of the random portfolio generated (€) for 1 unit invested in each stock
portfolio_value(portfolio)

In [25]:
#Output stocks of our portfolio
list(portfolio.keys())

In [26]:
#Dataframe to prepare Correlation Matrix
df_corr=df_3.groupby("Date").pivot("Simbol").agg(first('Close'))
df_corr=df_corr.drop('DJI')
df_corr.show(5)

In [27]:
#Remove "Date" column from dataframe - To create the correlation matrix
#df_corr.columns[1:]
df_corr=df_corr.select(df_corr.columns[1:])
df_corr.show()

In [28]:
#Tests
df_corr.printSchema()

In [29]:
#Tests
df_corr.columns

In [30]:
#Create correlation matrix between assets
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

# convert to vector column first
vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df_corr.columns, outputCol=vector_col)
df_vector = assembler.transform(df_corr).select(vector_col)

# get correlation matrix
matrix = Correlation.corr(df_vector, vector_col).collect()[0][0]

corrmatrix = matrix.toArray().tolist()
print(corrmatrix)

In [31]:
import numpy as np 
from pandas import DataFrame
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

Index= ['aaa', 'bbb', 'ccc', 'ddd', 'eee']
Cols = ['A', 'B', 'C', 'D']
df = DataFrame(corrmatrix, index=df_corr.columns, columns=df_corr.columns)

fig, ax = plt.subplots(figsize=(10,10))
ax.set_title("Correlation Matrix for all stocks of the Dataset")
sns.heatmap(df, annot=True, ax=ax)

In [32]:
#Functions to prepare for MC simulations
#Function to produce a 'count' number of seeds (random numbers)
def seeds(count):
    return [randint(0, 1 << 32 - 1) for i in range(count)] #Random number - large enough

#Functio to simulate the steps
def simstep(pf, params, prng):
    def daily_return(sym):
        mean, stddev = params[sym]
        change = (prng.normalvariate(mean, stddev) + 100)/ 100.0
        return change
    return dict([(stock, daily_return(stock) * value) for stock, value in pf.items()])

#Function to simulate portfolio value (randomly)
def simulate(seeded, pf, params, days):
    
    prng = Random()
    prng.seed(seeded)
    pf = pf.copy()
    
    for day in range(days):
        pf = simstep(pf, params, prng)
    return pf

In [33]:
#Calculation of possible portfolio returns

pt_days=5 #Nr of simulation days 
sc = spark.sparkContext
seed_rdd = sc.parallelize(seeds(10000)) #Parallelize processing
bparams = sc.broadcast(map_sym_results) #Broadcast variables are used to save the copy of data across all nodes - Boradcast Mean return and std dev
bpf = sc.broadcast(portfolio) #Broadcast portfolio (Stocks with last price)
initial_value = portfolio_value(portfolio)

results = seed_rdd.map(lambda s: portfolio_value(simulate(s, bpf.value, bparams.value, pt_days)) - initial_value)


In [34]:
# Histogram of portfolio return distribuiton for each 1 € invested in the 'portfolio'
simulated_results = list(zip(results.collect(), seed_rdd.collect()))
simulated_values = [v for v, ss in simulated_results]
simulated_values.sort()

%matplotlib inline
import seaborn as sns
import numpy as np
sns.set(color_codes=True)
import matplotlib
import numpy as np
import matplotlib.pyplot as plt

#plt.hist(simulated_values, bins=25)
fig, ax = plt.subplots(figsize=(10,10))
ax.set_title("Histogram with the return on investment (€) of 1 unit of each asset of our Portfolio (10 000 simulations) at the last Close price for 5 days ahead")
sns.distplot(simulated_values, kde=False, ax=ax)

In [35]:
# 5-day 95% VaR calculation

simulated_values[int(len(simulated_values) * 0.05)]

In [36]:
#Tests
#simulated_values

In [37]:
#Monte carlo simulation - To simulate and store portfolio valuation over time
#The only dif to previous "simulate" function is the saved portfolio history
def simulate_with_history(seed, pf, params, days):
    from random import Random
    prng = Random()
    prng.seed(seed)
    pf = pf.copy()
    values = [portfolio_value(pf)]
    
    for day in range(days):
        pf = simstep(pf, params, prng)
        values.append(portfolio_value(pf))
    return values

In [38]:
#Monte carlo simulation - Version 2 - To simulate and store portfolio valuation over time for multiple assets
def seeds(count):
    return [randint(0, 1 << 32 - 1) for i in range(count)] #Random number - large enough
  
def simstep2(pf, params, prng):
    def daily_return2(sym):
        mean, stddev = params[sym]
        change = (prng.normalvariate(mean, stddev) + 100)/ 100.0
        return change
    return dict([(stock, daily_return2(stock) * value) for stock, value in pf.items()])

def simulate2(seeded, pf, params, days):
    
    prng = Random()
    prng.seed(seeded)
    pf = pf.copy()
    
    for day in range(days):
        pf = simstep(pf, params, prng)
    return pf


#Monte carlo simulation - To simulate portfolio valuation over time
#The only dif to previous "simulate" function is the saved portfolio history
def simulate_with_history_stock(seed, Last_Prices, Df_5, days):
    from random import Random
    prng = Random()
    prng.seed(seed)
    Last_Prices2 = Last_Prices.copy()
    #values = [portfolio_value(pf)]
    #last_pr=Last_Prices
    #list_dic=[0]
    list_dic=[Last_Prices.copy()]
    #list_dic[1]=Last_Prices
    elem=[]
    list_aux=[Last_Prices.copy()]
    #pf_aux2={}
     
    cont=0  
    for day in range(1,days+1):
      #pf_aux2={}
      for ele in list_aux[-1].copy().items():
        elem.append(ele)
        pf_aux=dict(elem)
        pf = simstep2(pf_aux, Df_5, prng)
        #last_pr.update(pf)
        list_dic[0].update(pf)
        #last_pr=last_pr,pf
                
        elem=[]
        #print(cont)
        #print(ele)
      #pf_aux2=list_aux[cont+1]
      print(list_aux)
      aux=list_dic[0].copy()
      list_aux.append(aux.copy())
      #print(list_aux[-1])
      #cont=cont+1
    return list_aux
    

In [39]:
#Running Test
Last_Prices = Df_Price.rdd.map(lambda r: (r[0], r[1])).collectAsMap()

tes_df=simulate_with_history_stock(123, Last_Prices, map_sym_results, 3) #- Runing ok - 3 days
tes_df

In [40]:
#Convert simulation to dataframe - for each path
myJson = sc.parallelize(tes_df)
myDf = sqlContext.read.json(myJson)
display(myDf)

AAPL,DJI,INTC,JNJ,KO,MCD,MMM,MRK,MSFT,PG,UNH
335.899994,25128.16992,335.899994,140.869995,45.540001,187.509995,152.380005,77.349998,186.270004,116.260002,283.730011
337.02123300531576,24957.47494040632,342.6180915498088,138.52275069781547,45.693177908024325,188.0777349120331,152.0036341870173,76.78982047993023,181.6296393378697,116.08308647835702,284.9218885000521
345.81957505735784,25122.28735749682,350.9122064406966,137.75857812703387,45.81341365707912,187.5014511848759,150.8272127486524,80.25562114722823,176.88573624743103,115.9843396124362,287.01558304751893
348.1709015602683,25115.89730868164,348.1353489514739,136.45353689931747,45.392353450960805,188.3839242026341,149.52678048976364,80.5341446198818,174.69502072357167,119.1476267075205,296.05900889073155


In [41]:
rg=10 #Nº of days
paths=15 #Nº of paths ot be simulated

simulated_results=sorted(simulated_results, key = lambda t: t[0], reverse=False)
rg_results = [simulated_results[int((len(simulated_results) - 1) * i / (paths-1))] for i in range(paths)]
rg_seeds = sc.parallelize([seed for (_,seed) in rg_results])
walks = rg_seeds.map(lambda s: simulate_with_history(s, bpf.value, bparams.value, rg))

walk_results = walks.collect()


In [42]:
#Plot 15 alternative paths for portfolio value along 10 days

plt.xlabel('Days')
plt.ylabel('Portfolio Total Value')
plt.plot([list(c) for c in zip(*walk_results)])

In [43]:
#simulate_with_history_stock(123, Last_Prices, map_sym_results, 3) 

rg2=10 #Nº of days
paths2=15 #Nº of paths ot be simulated

#simulated_results=sorted(simulated_results, key = lambda t: t[0], reverse=False)
#eleven_results = [simulated_results[int((len(simulated_results) - 1) * i / rg)][0] for i in range(rg+1)]
rg_seeds2 = sc.parallelize([seed for seed in seeds(paths2)])
bparams = sc.broadcast(map_sym_results)#Broadcast variables are used to save the copy of data across all nodes - Boradcast Mean return and std dev
bpf2 = sc.broadcast(Last_Prices)#Broadcast portfolio (Stocks with last price)
walks2 = rg_seeds2.map(lambda s: simulate_with_history_stock(s, Last_Prices, bparams.value, rg2))

walk_results2 = walks2.collect()



In [44]:
#Beginning of final Monte Carlo Simlation
#1st Script - Remove DJI from all lists

walk_copy3=walk_results2.copy() #[]caminhos e o 2o [] é o n de dias
walk_copy=walk_copy3.copy()

for path in range(0,len(walk_results2.copy())):
  for day in range(0,len(walk_results2.copy()[0])):
    for k, v in  list(walk_results2.copy()[path][day].items()):
      #print(k)
      if k == 'DJI':
        
        del walk_copy[path][day][k]
    
#walk_copy

In [45]:
list_main=[]
listaaa=[]
walk_copy#=walk_results2.copy()

#2nd script - Remove keys from dicts
for caminho in range(0,len(walk_copy)):
  for day in range(0,len(walk_copy[0])): #From 0 to 5 days
    listaaa=walk_copy[caminho]
  list_main.append(list(listaaa))
  listaaa=[]

#3rd Script - Reorder values to plot

nova_asset=[]
nova_day=[]
nova_path=[]
nova_master=[]


for asset in range(0,len(list(walk_copy[0][0].values()))):
  nova_day=[]
  for day in range(0,len(walk_copy[0])): #From 0 to 5 days:
    
    aux_asset=0
    nova_path=[]
    
    for path in range(0,len(walk_copy)):
      aux_asset=list(list_main[path][day].values())[asset] #list(list_main[path][day].values())[asset]
      nova_path.append(aux_asset)
    
    nova_day.append(nova_path)
        
  nova_master.append(nova_day)


In [46]:
#Plot the 10 assets (all assets from the dataset minu the DJI index) Monte Carlo Simulation
nova_master

plt.figure(figsize=(8,8), dpi=100)
plt.xlabel('Days')
plt.ylabel('Close Price (€)')
plt.title('Individual Stocks of the Dataset Monte Carlo Simulation')
for element in nova_master:
  #print(element)
  #plt.ylim((0,400))
  plt.plot(element)
  

In [47]:
#Tests

#type(max_date)

#max_date-min_date

In [48]:
#Prepare to make a linear regression to calculate our Portfolio's Beta
#Log returns table to later use in Beta calculation - 'df_beta'
df_beta=df_4.orderBy('Date', ascending=True).groupby('Date').pivot('Simbol').agg(first("Log Return"))
#df_beta.show(5)
display(df_beta)

Date,AAPL,DJI,INTC,JNJ,KO,MCD,MMM,MRK,MSFT,PG,UNH
2000-01-03,,,,,,,,,,,
2000-01-04,-8.807799502547725,-3.217213379092668,-8.807799502547725,-3.729714067681954,0.1108033354361864,-2.0717872503747143,-4.054609439434991,-3.57516909638428,-3.436389390963265,-1.9429519067146888,-1.287312755340573
2000-01-05,1.4528255534767336,1.127655976273101,1.4528255534767336,1.0500621511068888,0.8820343842972885,1.5974780607734465,2.8553945349086174,3.8520497531975897,1.048864155932463,-1.9208273638267053,-0.2358491481293451
2000-01-06,-9.051412656916993,1.1674239164837057,-9.051412656916993,3.0856064659629263,0.1097092814373149,-1.4365769802033752,7.735814230286748,0.8260717007366531,-3.4071991695410437,4.503130474874703,3.5945823422262357
2000-01-07,4.628067406624787,2.364897147195609,4.628067406624787,4.165891631692261,6.371581438610778,2.53981906056104,1.9656652549551592,9.164717204558778,1.2983529830943046,7.691812360216323,11.09259311697958
2000-01-10,-1.774444745820547,0.4298871860296959,-1.774444745820547,-2.558355340174536,-3.241266487505944,0.4691173575880239,-0.4878058453432854,-2.962552673613112,0.7264630999004577,-0.4840020204000994,-1.7481230194366808
2000-01-11,-5.250537461577474,-0.5295630611832945,-5.250537461577474,0.331675262599382,3.3440942600625467,2.3131332023418927,-1.726306742378071,0.3430535096789222,-2.594618194458516,1.551248009477288,0.2072539601972375
2000-01-12,-6.18468212281364,0.34705800325284,-6.18468212281364,-0.9314771274547484,1.1241814866248323,4.1797128678461455,0.2484473327661965,1.950040596712784,-3.311368478442444,-0.638979809877101,-0.936037588391298
2000-01-13,10.406941030681296,0.2708631160304892,10.406941030681296,-0.1337792841659942,-1.5361285161487206,-1.0286644710275423,0.0,0.4189365155276924,1.8724947332324844,-1.7241806434506104,3.086664594783283
2000-01-14,3.740547240879021,1.2061790207022844,3.740547240879021,0.3341132409835129,0.8222040152016457,0.6991748813422904,-1.4370756529857616,-0.8396354952906279,4.03349193966848,1.7241806434505953,-0.1013749673802462


In [49]:
#Drop null row of 'df_beta'
df_beta2 = df_beta.na.drop()
display(df_beta2)

Date,AAPL,DJI,INTC,JNJ,KO,MCD,MMM,MRK,MSFT,PG,UNH
2000-01-04,-8.807799502547725,-3.217213379092668,-8.807799502547725,-3.729714067681954,0.1108033354361864,-2.0717872503747143,-4.054609439434991,-3.57516909638428,-3.436389390963265,-1.9429519067146888,-1.287312755340573
2000-01-05,1.4528255534767336,1.127655976273101,1.4528255534767336,1.0500621511068888,0.8820343842972885,1.5974780607734465,2.8553945349086174,3.8520497531975897,1.048864155932463,-1.9208273638267053,-0.2358491481293451
2000-01-06,-9.051412656916993,1.1674239164837057,-9.051412656916993,3.0856064659629263,0.1097092814373149,-1.4365769802033752,7.735814230286748,0.8260717007366531,-3.4071991695410437,4.503130474874703,3.5945823422262357
2000-01-07,4.628067406624787,2.364897147195609,4.628067406624787,4.165891631692261,6.371581438610778,2.53981906056104,1.9656652549551592,9.164717204558778,1.2983529830943046,7.691812360216323,11.09259311697958
2000-01-10,-1.774444745820547,0.4298871860296959,-1.774444745820547,-2.558355340174536,-3.241266487505944,0.4691173575880239,-0.4878058453432854,-2.962552673613112,0.7264630999004577,-0.4840020204000994,-1.7481230194366808
2000-01-11,-5.250537461577474,-0.5295630611832945,-5.250537461577474,0.331675262599382,3.3440942600625467,2.3131332023418927,-1.726306742378071,0.3430535096789222,-2.594618194458516,1.551248009477288,0.2072539601972375
2000-01-12,-6.18468212281364,0.34705800325284,-6.18468212281364,-0.9314771274547484,1.1241814866248323,4.1797128678461455,0.2484473327661965,1.950040596712784,-3.311368478442444,-0.638979809877101,-0.936037588391298
2000-01-13,10.406941030681296,0.2708631160304892,10.406941030681296,-0.1337792841659942,-1.5361285161487206,-1.0286644710275423,0.0,0.4189365155276924,1.8724947332324844,-1.7241806434506104,3.086664594783283
2000-01-14,3.740547240879021,1.2061790207022844,3.740547240879021,0.3341132409835129,0.8222040152016457,0.6991748813422904,-1.4370756529857616,-0.8396354952906279,4.03349193966848,1.7241806434505953,-0.1013749673802462
2000-01-18,3.4254109432127517,-1.3937936445732977,3.4254109432127517,-2.567708635189824,6.248934009263567,-2.638087224926401,-2.2272635609123177,-4.571731852966444,2.691730761248261,-0.0534330763447738,-4.353018845713475


In [50]:
#Date manipulation to select just the last 365 days of our df_beta2 dataframe
from pyspark.sql.functions import current_date, datediff, unix_timestamp
from datetime import datetime, timedelta


min_date, max_date = df_beta2.select(min("Date"), max("Date")).first()
#min_date, max_date

date_filter=max_date - timedelta(days=365) #Minus 365 days
df_beta3=df_beta2.filter(col("Date").between(date_filter,max_date))
display(df_beta3)

Date,AAPL,DJI,INTC,JNJ,KO,MCD,MMM,MRK,MSFT,PG,UNH
2019-06-12,-0.3187642801979357,-0.1678266961472563,-0.3187642801979357,1.3501193172578545,0.6021131077827973,0.8182500143277377,0.0591802812584213,0.875878166794967,-0.4628415823882821,0.2739004762750305,-0.5587611715226657
2019-06-13,-0.020604625098441,0.3912356588213966,-0.020604625098441,-0.6869859519591176,-0.9925117963711008,-0.2149324271065627,-0.0947072820013836,-0.972334645925946,0.6292443052903389,1.1152062427868732,0.1879651570540416
2019-06-14,-0.7288868908911432,-0.0657522647225609,-0.7288868908911432,-0.4416039892921858,0.3514255793354488,0.390448055161416,-1.287497041275743,-0.1448612026327605,0.0981908890586455,0.261125715613664,0.1672294552135697
2019-06-17,0.5948826420526055,0.0878121933405359,0.5948826420526055,-0.4650629100097287,-0.8612345266529718,-0.7235402049183415,0.0539760709599791,0.6262110946828383,0.3015524301518135,-0.1890265693817724,0.2442326304129472
2019-06-18,2.3246181326566204,1.342822494144677,2.3246181326566204,0.5649487659140309,-0.4531581088036621,0.342867210889412,3.000462504085005,1.4184575306110634,1.7238574071103332,-1.2602732878506735,-0.1098314024611372
2019-06-19,-0.2926939976684095,0.1452191106118351,-0.2926939976684095,0.1567628976345616,0.9434032418778564,0.0195609447627065,-0.6479699252867199,1.0244458215274108,0.3913595456697659,0.7453902477497913,1.810936988529176
2019-06-20,0.8003525933244934,0.9357303005894096,0.8003525933244934,1.2453355417245937,1.050799716023051,0.2782690314581529,1.8509925208937288,-0.8943375737071465,0.9242986905940932,1.188346738907147,-0.976012171828596
2019-06-21,-0.3415069559304764,-0.1273147558691591,-0.3415069559304764,-0.084425610877192,-0.2131596603367774,-0.4201481684982944,-0.3397716916748406,-0.0354649177384627,0.0146057244316686,-0.484437129424478,1.807906234747378
2019-06-24,-0.100662883221658,0.031463887579707,-0.100662883221658,0.680347720127771,0.7151842810705944,-0.1665917411254776,0.0173039535795148,1.105375776281007,0.5896271859503753,1.0110630620095955,-1.051970177152696
2019-06-25,-1.5273644279587897,-0.6731730827535103,-1.5273644279587897,0.8214504859419905,-0.3086422322264495,0.8739693732776666,-0.781687188287538,-0.3162567789687713,-3.2081259614739497,-0.5445235430235583,-0.7963022121946575


In [51]:
#Temporary list to insert DJI to the first column
temp_list=list(portfolio.keys())
temp_list2=temp_list.copy()
temp_list2.insert(0, 'DJI')
temp_list2

In [52]:
#Select our portfolio assets and the market index (DJI)
df_beta4=df_beta3.select([c for c in df_beta3.columns if c in temp_list2])
display(df_beta4)

AAPL,DJI,JNJ,MCD,PG
-0.3187642801979357,-0.1678266961472563,1.3501193172578545,0.8182500143277377,0.2739004762750305
-0.020604625098441,0.3912356588213966,-0.6869859519591176,-0.2149324271065627,1.1152062427868732
-0.7288868908911432,-0.0657522647225609,-0.4416039892921858,0.390448055161416,0.261125715613664
0.5948826420526055,0.0878121933405359,-0.4650629100097287,-0.7235402049183415,-0.1890265693817724
2.3246181326566204,1.342822494144677,0.5649487659140309,0.342867210889412,-1.2602732878506735
-0.2926939976684095,0.1452191106118351,0.1567628976345616,0.0195609447627065,0.7453902477497913
0.8003525933244934,0.9357303005894096,1.2453355417245937,0.2782690314581529,1.188346738907147
-0.3415069559304764,-0.1273147558691591,-0.084425610877192,-0.4201481684982944,-0.484437129424478
-0.100662883221658,0.031463887579707,0.680347720127771,-0.1665917411254776,1.0110630620095955
-1.5273644279587897,-0.6731730827535103,0.8214504859419905,0.8739693732776666,-0.5445235430235583


In [53]:
#Calculation of the sum of all columns except for DJI (Our market index column) - It returns the column named 'total_col'
def column_add(a,b):
     return  a.__add__(b)

df_beta5=df_beta4.withColumn('total_col',reduce(column_add, ( df_beta4[col] for col in df_beta4.columns if col!='DJI'  ) ))
display(df_beta5)
#df_beta4.withColumn('total', sum(df_beta4[col] for col in df_beta4.columns))

AAPL,DJI,JNJ,MCD,PG,total_col
-0.3187642801979357,-0.1678266961472563,1.3501193172578545,0.8182500143277377,0.2739004762750305,2.123505527662686
-0.020604625098441,0.3912356588213966,-0.6869859519591176,-0.2149324271065627,1.1152062427868732,0.1926832386227517
-0.7288868908911432,-0.0657522647225609,-0.4416039892921858,0.390448055161416,0.261125715613664,-0.518917109408249
0.5948826420526055,0.0878121933405359,-0.4650629100097287,-0.7235402049183415,-0.1890265693817724,-0.7827470422572371
2.3246181326566204,1.342822494144677,0.5649487659140309,0.342867210889412,-1.2602732878506735,1.9721608216093895
-0.2926939976684095,0.1452191106118351,0.1567628976345616,0.0195609447627065,0.7453902477497913,0.62902009247865
0.8003525933244934,0.9357303005894096,1.2453355417245937,0.2782690314581529,1.188346738907147,3.5123039054143863
-0.3415069559304764,-0.1273147558691591,-0.084425610877192,-0.4201481684982944,-0.484437129424478,-1.330517864730441
-0.100662883221658,0.031463887579707,0.680347720127771,-0.1665917411254776,1.0110630620095955,1.424156157790231
-1.5273644279587897,-0.6731730827535103,0.8214504859419905,0.8739693732776666,-0.5445235430235583,-0.3764681117626907


In [54]:
df_beta5=df_beta5.withColumn('Port_return',( col('total_col') * (1/(len(df_beta5.columns[:-1])-1))))
display(df_beta5)

AAPL,DJI,JNJ,MCD,PG,total_col,Port_return
-0.3187642801979357,-0.1678266961472563,1.3501193172578545,0.8182500143277377,0.2739004762750305,2.123505527662686,0.5308763819156717
-0.020604625098441,0.3912356588213966,-0.6869859519591176,-0.2149324271065627,1.1152062427868732,0.1926832386227517,0.0481708096556879
-0.7288868908911432,-0.0657522647225609,-0.4416039892921858,0.390448055161416,0.261125715613664,-0.518917109408249,-0.1297292773520622
0.5948826420526055,0.0878121933405359,-0.4650629100097287,-0.7235402049183415,-0.1890265693817724,-0.7827470422572371,-0.1956867605643092
2.3246181326566204,1.342822494144677,0.5649487659140309,0.342867210889412,-1.2602732878506735,1.9721608216093895,0.4930402054023474
-0.2926939976684095,0.1452191106118351,0.1567628976345616,0.0195609447627065,0.7453902477497913,0.62902009247865,0.1572550231196625
0.8003525933244934,0.9357303005894096,1.2453355417245937,0.2782690314581529,1.188346738907147,3.5123039054143863,0.8780759763535966
-0.3415069559304764,-0.1273147558691591,-0.084425610877192,-0.4201481684982944,-0.484437129424478,-1.330517864730441,-0.3326294661826102
-0.100662883221658,0.031463887579707,0.680347720127771,-0.1665917411254776,1.0110630620095955,1.424156157790231,0.3560390394475577
-1.5273644279587897,-0.6731730827535103,0.8214504859419905,0.8739693732776666,-0.5445235430235583,-0.3764681117626907,-0.0941170279406726


In [55]:
#Tests
#temp_list3=[]
#temp_list4=temp_list.copy()
#temp_list4.append(list(df_beta5.columns)[-1])
#temp_list4.insert(0, 'DJI')
#temp_list4

In [56]:
#Selecto only the columns to import to ML model
df_beta6=df_beta5.select(['DJI','Port_return'])
df_beta6.show(5)

In [57]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

df_beta6.registerTempTable("df_beta6")
df_beta6.cache()
df_beta6.printSchema()


def transData(data):
  # Combine columns to a dense vector 
  dataFeaturesRDD = data.rdd.map(lambda r: [Vectors.dense(r[0]),r[1]]) # X(Feature): DJI index; Y(label): Portfolio
  
  # Convert the RDD back to a DataFrame, labelling the columns
  featuresDF =  dataFeaturesRDD.toDF(['features','label'])
  
  return featuresDF

dataLR = transData(df_beta6)
display(dataLR)



features,label
"List(1, 1, List(), List(-0.16782669614725634))",0.5308763819156717
"List(1, 1, List(), List(0.39123565882139666))",0.0481708096556879
"List(1, 1, List(), List(-0.0657522647225609))",-0.1297292773520622
"List(1, 1, List(), List(0.08781219334053596))",-0.1956867605643092
"List(1, 1, List(), List(1.342822494144677))",0.4930402054023474
"List(1, 1, List(), List(0.14521911061183518))",0.1572550231196625
"List(1, 1, List(), List(0.9357303005894096))",0.8780759763535966
"List(1, 1, List(), List(-0.1273147558691591))",-0.3326294661826102
"List(1, 1, List(), List(0.031463887579707014))",0.3560390394475577
"List(1, 1, List(), List(-0.6731730827535103))",-0.0941170279406726


In [58]:
## The slope of the model will be beta of our portfolio
# Import LinearRegression class
from pyspark.ml.regression import LinearRegression

# Define LinearRegression algorithm
lr = LinearRegression()

# Fit the model without using  regularization parameters
model = lr.fit(dataLR, {lr.regParam:0.0})


# Print the fitted model parameters
print(">>>> Model intercept: %r, coefficient: %r" % (model.intercept, model.coefficients[0]))



In [59]:
#Make predictions
predictions = model.transform(dataLR)
display(predictions)


features,label,prediction
"List(1, 1, List(), List(-0.16782669614725634))",0.5308763819156717,-0.0669359543480477
"List(1, 1, List(), List(0.39123565882139666))",0.0481708096556879,0.3764564858604868
"List(1, 1, List(), List(-0.0657522647225609))",-0.1297292773520622,0.0140192930050531
"List(1, 1, List(), List(0.08781219334053596))",-0.1956867605643092,0.1358112884502044
"List(1, 1, List(), List(1.342822494144677))",0.4930402054023474,1.1311601521589694
"List(1, 1, List(), List(0.14521911061183518))",0.1572550231196625,0.181340723418506
"List(1, 1, List(), List(0.9357303005894096))",0.8780759763535966,0.8082952704760741
"List(1, 1, List(), List(-0.1273147558691591))",-0.3326294661826102,-0.0348059282465852
"List(1, 1, List(), List(0.031463887579707014))",0.3560390394475577,0.0911214384346163
"List(1, 1, List(), List(-0.6731730827535103))",-0.0941170279406726,-0.4677262517388925


In [60]:
#Evaluation metrics
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse")
RMSE = evaluator.evaluate(predictions)


print("Model: Root Mean Squared Error = " + str(RMSE))
