#**CS 431 Final Project Winter 2021**



Authors:

Arsheya Jain (a76jain) and Gregory Hogg (gahogg)

In the cells below, we are installing Spark. We are also setting the environment path that enables us to run Pyspark, as well as creating a SparkContext. 

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [54]:
import os
import findspark
import random
from datetime import datetime
import pandas as pd
import numpy as np

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"
findspark.init()
from pyspark import SparkContext
sc = SparkContext(appName="YourTest", master="local[*]")
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("YourTest").master("local[2]").config('spark.ui.port', random.randrange(4000,5000)).getOrCreate()

ValueError: ignored

##Problem statement 

Which company out of Microsoft, Nintendo, and Sony is the best to invest in,
given that you plan to sell in exactly x years from today (April 14th, 2021)? 

 

## Data Processing with Spark

In [4]:
# For converting strings to datetime
def to_date(d):
  return datetime.strptime(d, '%Y-%m-%d')

In [47]:
# Using Spark to remove unnecessary columns, seperate into Microsoft, Sony, and Nintendo,
# and only keeping data from the last decade

original_cols = ['Stock', 'Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']
data = sc.textFile('stocks.csv').map(lambda x: x.split(',')).map(lambda x: [x[0], x[1], x[5]])
microsoft = data.filter(lambda x: 'Microsoft' == x[0]).map(lambda x: [to_date(x[1]), float(x[2])])
sony = data.filter(lambda x: 'Sony' == x[0]).map(lambda x: [to_date(x[1]), float(x[2])])
nintendo = data.filter(lambda x: 'Nintendo' == x[0]).map(lambda x: [to_date(x[1]), float(x[2])])
nintendo.take(5)

[[datetime.datetime(1996, 11, 18, 0, 0), 9.0],
 [datetime.datetime(1996, 11, 19, 0, 0), 9.0],
 [datetime.datetime(1996, 11, 20, 0, 0), 9.0625],
 [datetime.datetime(1996, 11, 21, 0, 0), 9.0],
 [datetime.datetime(1996, 11, 22, 0, 0), 9.125]]

In [48]:
# Converting processed RDDs to Spark DataFrames

nintendo_df = spark.createDataFrame(nintendo).withColumnRenamed('_1', 'Date').withColumnRenamed('_2', 'Closing Value (USD)')
sony_df = spark.createDataFrame(sony).withColumnRenamed('_1', 'Date').withColumnRenamed('_2', 'Closing Value (USD)')
microsoft_df = spark.createDataFrame(microsoft).withColumnRenamed('_1', 'Date').withColumnRenamed('_2', 'Closing Value (USD)')
microsoft_df.show()

+-------------------+-------------------+
|               Date|Closing Value (USD)|
+-------------------+-------------------+
|1986-03-13 00:00:00|           0.097222|
|1986-03-14 00:00:00|           0.100694|
|1986-03-17 00:00:00|           0.102431|
|1986-03-18 00:00:00|           0.099826|
|1986-03-19 00:00:00|            0.09809|
|1986-03-20 00:00:00|           0.095486|
|1986-03-21 00:00:00|           0.092882|
|1986-03-24 00:00:00|           0.090278|
|1986-03-25 00:00:00|           0.092014|
|1986-03-26 00:00:00|           0.094618|
|1986-03-27 00:00:00|           0.096354|
|1986-03-31 00:00:00|           0.095486|
|1986-04-01 00:00:00|           0.094618|
|1986-04-02 00:00:00|           0.095486|
|1986-04-03 00:00:00|           0.096354|
|1986-04-04 00:00:00|           0.096354|
|1986-04-07 00:00:00|           0.094618|
|1986-04-08 00:00:00|           0.095486|
|1986-04-09 00:00:00|           0.097222|
|1986-04-10 00:00:00|            0.09809|
+-------------------+-------------

In [49]:
# Converting Spark DataFrames to Pandas DataFrames

nintendo_pd_df = nintendo_df.select('*').toPandas()
microsoft_pd_df = microsoft_df.select('*').toPandas()
sony_pd_df = sony_df.select('*').toPandas()
sony_pd_df

Unnamed: 0,Date,Closing Value (USD)
0,1973-02-21,7.763636
1,1973-02-22,7.581818
2,1973-02-23,7.436364
3,1973-02-26,7.327273
4,1973-02-27,7.163636
...,...,...
12139,2021-04-08,109.820000
12140,2021-04-09,111.790001
12141,2021-04-12,111.389999
12142,2021-04-13,111.879997


In [50]:
# Save Pandas DataFrames into Pickle files with Date as Pandas Index Column

dataframes = [["Nintendo", nintendo_pd_df], ["Sony", sony_pd_df], ["Microsoft", microsoft_pd_df]]
for pair in dataframes:
  name, pd_df = pair
  pd_df.set_index('Date', drop=True, inplace=True)
  if not os.path.exists('data'):
    os.makedirs('data')
  pd_df.to_pickle('data/' +name + ".pkl")
pd.read_pickle('data/Sony.pkl')

Unnamed: 0_level_0,Closing Value (USD)
Date,Unnamed: 1_level_1
1973-02-21,7.763636
1973-02-22,7.581818
1973-02-23,7.436364
1973-02-26,7.327273
1973-02-27,7.163636
...,...
2021-04-08,109.820000
2021-04-09,111.790001
2021-04-12,111.389999
2021-04-13,111.879997


## Exploratory Data Analysis

In [51]:
sony_df = pd.read_pickle('data/Sony.pkl')
nintendo_df = pd.read_pickle('data/Nintendo.pkl')
microsoft_df = pd.read_pickle('data/Microsoft.pkl')
sony_df

Unnamed: 0_level_0,Closing Value (USD)
Date,Unnamed: 1_level_1
1973-02-21,7.763636
1973-02-22,7.581818
1973-02-23,7.436364
1973-02-26,7.327273
1973-02-27,7.163636
...,...
2021-04-08,109.820000
2021-04-09,111.790001
2021-04-12,111.389999
2021-04-13,111.879997


Given N=5 days of history required, for Sony, the first possible prediction day is 1973-02-28.

In [52]:
import plotly.graph_objects as go

fig = go.Figure(layout=go.Layout(
        title=go.layout.Title(text="Closing Value Vs. Time"),
        xaxis_title="Time (Daily)",
        yaxis_title="Closing Stock Value (USD)",
    ))

fig.add_trace(go.Scatter(x=sony_df.index, y=sony_df['Closing Value (USD)'], mode='lines', name='Sony'))
fig.add_trace(go.Scatter(x=nintendo_df.index, y=nintendo_df['Closing Value (USD)'], mode='lines', name='Nintendo'))
fig.add_trace(go.Scatter(x=microsoft_df.index, y=microsoft_df['Closing Value (USD)'], mode='lines', name='Microsoft'))

fig.show()

We observe a few interesting details from this graph:
1. When the Playstation 2 was released in 2000, Sony had a dramatic increase.
2. When the Wii was released in 2006, Nintendo dramatically increased, and with the release of its successor, the Wii U, they decreased back to what they were before.
3. For Microsoft, they launched Outlook.com in 2012, which helped spark the Cloud revolution which led to an exponential increase, largely due to Azure.
4. There was a drop in all 3 companies when Covid-19 hit in March 2020.

In [146]:
weights = [0.02, 0.04, 0.08, 0.16, 0.7]
def weighted_average(business_week, weights=weights):
  return np.dot(business_week, weights)

In [87]:
def get_windowed_time_series(first_prediction_day, pd_dataframe, N=5):
  pd_dataframe = pd_dataframe.copy()
  days = pd_dataframe.loc[first_prediction_day:].index
  windows = []
  true_values = []
  for prediction_day in days:
    previous_N_rows = pd_dataframe.loc[:prediction_day].tail(N+1)[:-1]
    previous_N_stocks = list(previous_N_rows['Closing Value (USD)'].to_numpy())
    windows.append(previous_N_stocks)
    true_values.append(pd_dataframe.loc[prediction_day, 'Closing Value (USD)'])
  return days, np.array(windows), np.array(true_values)

In [116]:
days, X, Y = get_windowed_time_series('1973-02-28', sony_df)

In [102]:
days, X.shape, Y.shape

(DatetimeIndex(['1973-02-28', '1973-03-01', '1973-03-02', '1973-03-05',
                '1973-03-06', '1973-03-07', '1973-03-08', '1973-03-09',
                '1973-03-12', '1973-03-13',
                ...
                '2021-03-31', '2021-04-01', '2021-04-05', '2021-04-06',
                '2021-04-07', '2021-04-08', '2021-04-09', '2021-04-12',
                '2021-04-13', '2021-04-14'],
               dtype='datetime64[ns]', name='Date', length=12139, freq=None),
 (12139, 5),
 (12139,))

In [103]:
X

array([[  7.763636,   7.581818,   7.436364,   7.327273,   7.163636],
       [  7.581818,   7.436364,   7.327273,   7.163636,   7.309091],
       [  7.436364,   7.327273,   7.163636,   7.309091,   7.145455],
       ...,
       [111.389999, 109.900002, 110.860001, 109.82    , 111.790001],
       [109.900002, 110.860001, 109.82    , 111.790001, 111.389999],
       [110.860001, 109.82    , 111.790001, 111.389999, 111.879997]])

In [151]:
def get_inputs_outputs_using_weighted_average(first_prediction_day, pd_dataframe, weights=weights, N=5):
  days = pd_dataframe.loc[first_prediction_day:].index
  predictions = []
  for prediction_day in days:
    previous_N_rows = pd_dataframe.loc[:prediction_day].tail(N+1)[:-1]
    previous_N_stocks = list(previous_N_rows['Closing Value (USD)'].to_numpy())
    prediction = weighted_average(previous_N_stocks)
    predictions.append(prediction)
  return days, predictions

In [152]:
sony_wa_days_all, sony_wa_predictions_all = get_inputs_outputs_using_weighted_average('1973-02-28', sony_df)
#sony_wa_days_all

In [153]:
import plotly.graph_objects as go

fig = go.Figure(layout=go.Layout(
        title=go.layout.Title(text="Closing Value Vs. Time"),
        xaxis_title="Time (Daily)",
        yaxis_title="Closing Stock Value (USD)",
    ))

fig.add_trace(go.Scatter(x=sony_df.index, y=sony_df['Closing Value (USD)'], mode='lines', name='Sony'))
fig.add_trace(go.Scatter(x=sony_wa_days_all, y=sony_wa_predictions_all, mode='lines', name='Next Day Predictions'))