In [1]:
import findspark
findspark.init()

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3" pyspark-shell'

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

spark = (SparkSession.builder
    .appName("Stocks_per_industry")
    .getOrCreate())

Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9219126a-2dbd-444c-81bb-6a680bfdf69d;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.3 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 674ms :: artifacts dl 22ms
	:: modules in use

# AIRLINE INDUSTRY

## 1. Data Cleansing

In [3]:
airline_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/airline").toPandas().ffill()
airline_df.head(5)

                                                                                

Unnamed: 0,Datetime,EADSY_Close,BA_Close
0,2022-02-28 09:30:00-05:00,31.26,199.529999
1,2022-02-28 09:31:00-05:00,31.209999,198.389999
2,2022-02-28 09:32:00-05:00,31.1507,197.560196
3,2022-02-28 09:33:00-05:00,31.139999,197.699997
4,2022-02-28 09:34:00-05:00,31.139999,197.639999


In [4]:
import pandas as pd
airline_df["Datetime"] = pd.to_datetime(airline_df["Datetime"])
airline_df

Unnamed: 0,Datetime,EADSY_Close,BA_Close
0,2022-02-28 09:30:00-05:00,31.260000,199.529999
1,2022-02-28 09:31:00-05:00,31.209999,198.389999
2,2022-02-28 09:32:00-05:00,31.150700,197.560196
3,2022-02-28 09:33:00-05:00,31.139999,197.699997
4,2022-02-28 09:34:00-05:00,31.139999,197.639999
...,...,...,...
1165,2022-03-02 15:55:00-05:00,30.360001,198.110001
1166,2022-03-02 15:56:00-05:00,30.360001,197.960007
1167,2022-03-02 15:57:00-05:00,30.360001,197.759995
1168,2022-03-02 15:58:00-05:00,30.360001,197.664993


In [5]:
airline_df = airline_df.set_index("Datetime")
airline_df.head(5)

Unnamed: 0_level_0,EADSY_Close,BA_Close
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1
2022-02-28 09:30:00-05:00,31.26,199.529999
2022-02-28 09:31:00-05:00,31.209999,198.389999
2022-02-28 09:32:00-05:00,31.1507,197.560196
2022-02-28 09:33:00-05:00,31.139999,197.699997
2022-02-28 09:34:00-05:00,31.139999,197.639999


## 2. Featuring Engineering

In [6]:
airline_df["airline_value"] = airline_df.apply(lambda row: (row.EADSY_Close + row.BA_Close)/2, axis=1)
airline_df.head(1)

Unnamed: 0_level_0,EADSY_Close,BA_Close,airline_value
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2022-02-28 09:30:00-05:00,31.26,199.529999,115.395


In [7]:
lags = [1,2,3]

for lag in lags:
    airline_df[f"airline_value_{lag}"] = airline_df["airline_value"].shift(lag)

We select the rows starting from the third one to get rid of the NaNs. 

In [8]:
airline_df = airline_df.iloc[3:, :]

We calculate the slopes to see the changes on the stocks, first looking backwards and the last one, on one minute ahead.

In [9]:
airline_df["slope_x1"] = (airline_df["airline_value"] - airline_df["airline_value_1"]) / airline_df["airline_value_1"]
airline_df["slope_x2"] = (airline_df["airline_value_1"] - airline_df["airline_value_2"]) / airline_df["airline_value_2"]
airline_df["slope_x3"] = (airline_df["airline_value_2"] - airline_df["airline_value_3"]) / airline_df["airline_value_3"]
airline_df["airline_value_t+1"] = airline_df["airline_value"].shift(-1)

Again, we eliminate the NaNs from the last row as we shifted for the last row. 

In [10]:
airline_df = airline_df.iloc[:-1, :]

We create a dummy variable for the target variable. 

In [11]:
import numpy as np
airline_df["buy_sell_decision"] = np.where(airline_df["airline_value_t+1"] > airline_df["airline_value"], 1, 0)

In [12]:
from datetime import datetime

airline_df = airline_df.reset_index()
airline_df["date"] = airline_df["Datetime"].apply(datetime.date)
airline_df.head(1)

Unnamed: 0,Datetime,EADSY_Close,BA_Close,airline_value,airline_value_1,airline_value_2,airline_value_3,slope_x1,slope_x2,slope_x3,airline_value_t+1,buy_sell_decision,date
0,2022-02-28 09:33:00-05:00,31.139999,197.699997,114.419998,114.355448,114.799999,115.395,0.000564,-0.003872,-0.005156,114.389999,0,2022-02-28


### 2.1 Feature selection

In [13]:
airline_df = airline_df[["slope_x1", "slope_x2", "slope_x3", "buy_sell_decision"]]

We dump this new dataset with the columns selected above to further work with the algorithm.

In [14]:
airline_df.to_csv(r'clean_airline.csv')

### 2.2 Feature Transformation

In [15]:
airline_clean_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/clean_airline_csv")
airline_clean_df.limit(5).toPandas()

Unnamed: 0,_c0,slope_x1,slope_x2,slope_x3,buy_sell_decision
0,0,0.000564,-0.003872,-0.005156,0
1,1,-0.000262,0.000564,-0.003872,0
2,2,-0.000699,-0.000262,0.000564,1
3,3,0.001094,-0.000699,-0.000262,0
4,4,-0.00201,0.001094,-0.000699,1


In [16]:
airline_clean_df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- slope_x1: double (nullable = true)
 |-- slope_x2: double (nullable = true)
 |-- slope_x3: double (nullable = true)
 |-- buy_sell_decision: integer (nullable = true)



In [17]:
airline_clean_df = airline_clean_df.withColumn("slope_x1",airline_clean_df.slope_x1.cast('double'))
airline_clean_df =airline_clean_df.withColumn("slope_x2",airline_clean_df.slope_x2.cast('double'))
airline_clean_df =airline_clean_df.withColumn("slope_x3",airline_clean_df.slope_x3.cast('double'))

In [18]:
airline_clean_df = airline_clean_df.drop("_c0")
airline_clean_df =airline_clean_df.withColumn("buy_sell_decision",airline_clean_df.buy_sell_decision.cast('int'))

In [19]:
airline_clean_df.printSchema()

root
 |-- slope_x1: double (nullable = true)
 |-- slope_x2: double (nullable = true)
 |-- slope_x3: double (nullable = true)
 |-- buy_sell_decision: integer (nullable = true)



In [20]:
from pyspark.ml.feature import StringIndexer

label_colum = "buy_sell_decision"

numericCols = [field for (field, dataType) in airline_clean_df.dtypes if ((dataType == "double") & (field != "buy_sell_decision"))]

assemblerInputs = numericCols
print("Feature columns: ",assemblerInputs)

Feature columns:  ['slope_x1', 'slope_x2', 'slope_x3']


In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder, StringIndexer

vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

test_pipeline = Pipeline(stages = [vecAssembler])

features_df = test_pipeline.fit(airline_clean_df).transform(airline_clean_df)


In [22]:
features_df.limit(2).toPandas()

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision,features
0,0.000564,-0.003872,-0.005156,0,"[0.0005644715755962341, -0.0038723995718633643..."
1,-0.000262,0.000564,-0.003872,0,"[-0.0002621812600676737, 0.0005644715755962341..."


## 3. Model Training

In [23]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier

seed = 42
lr = LogisticRegression(labelCol="buy_sell_decision", featuresCol="features")
dt = DecisionTreeClassifier(labelCol="buy_sell_decision", featuresCol="features",seed=seed)
rf = RandomForestClassifier(labelCol="buy_sell_decision", featuresCol="features",maxDepth=10,seed=seed)
gbt = GBTClassifier(labelCol="buy_sell_decision", featuresCol="features",maxIter=10,seed=seed)

classifiers = [lr,dt,rf,gbt]

In [24]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]

## 4. Model Evaluation

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="buy_sell_decision",  metricName="accuracy")

## 5. Model Selection

In [26]:
seed = 42

(x_train, x_test) = airline_clean_df.randomSplit([0.8,0.2],seed=seed)

In [27]:
x_train.toPandas().head(2)

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision
0,-0.01696,0.000169,0.00076,0
1,-0.005003,0.00204,0.001017,1


In [28]:
models = [pipeline.fit(x_train) for pipeline in pipelines]
models

[PipelineModel_09eeea85eed6,
 PipelineModel_dc531317fbac,
 PipelineModel_9384a4ef9c3e,
 PipelineModel_3cc806c6006d]

In [29]:
import pandas as pd
names = []
values = [] 

for model in models:
    prediction_df = model.transform(x_test)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df = pd.DataFrame(data)
df.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df

Unnamed: 0,name,accuracy,model
2,RandomForestClassificationModel,0.492683,PipelineModel_9384a4ef9c3e
1,DecisionTreeClassificationModel,0.458537,PipelineModel_dc531317fbac
3,GBTClassificationModel,0.458537,PipelineModel_3cc806c6006d
0,LogisticRegressionModel,0.44878,PipelineModel_09eeea85eed6


In [30]:
best_model=df.iloc[0]['model']

In [31]:
best_model.transform(x_test).groupby("buy_sell_decision").pivot("prediction").count().toPandas()

                                                                                

Unnamed: 0,buy_sell_decision,0.0,1.0
0,1,67,46
1,0,55,37


## 6. Model Persistence

In [32]:
modelPath = "hdfs://localhost:9000/model-registry/airline-stocks-classifier"
best_model.write().overwrite().save(modelPath)

                                                                                

## 7. Model Loading

In [33]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [34]:
airline_predictions = savedModel.transform(x_test)
airline_predictions.select("features", "buy_sell_decision", "prediction")
airline_predictions.limit(5).toPandas()

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision,features,rawPrediction,probability,prediction
0,-0.004429,-0.00029,-0.01696,0,"[-0.0044293937891949605, -0.000290166484739615...","[13.14603327706776, 6.85396672293224]","[0.657301663853388, 0.342698336146612]",0.0
1,-0.003087,-0.001494,-0.000518,0,"[-0.0030869997771073745, -0.001493732918087307...","[5.274330113276982, 14.725669886723017]","[0.26371650566384913, 0.7362834943361508]",1.0
2,-0.002993,-0.002036,-0.000852,0,"[-0.0029931920770331763, -0.002035685038958559...","[2.4512463343108504, 17.548753665689148]","[0.12256231671554252, 0.8774376832844574]",1.0
3,-0.002852,-0.002939,9.5e-05,0,"[-0.0028520831991036147, -0.002939033246046851...","[16.71754680386394, 3.2824531961360606]","[0.8358773401931969, 0.16412265980680302]",0.0
4,-0.002485,-0.00061,0.000829,1,"[-0.0024852426475081056, -0.000609745002131335...","[8.57965922965923, 11.420340770340768]","[0.4289829614829615, 0.5710170385170384]",1.0


## SERVING

To upload it into MongoDB, we create a json file and extract it.

In [35]:
airline_predictions.coalesce(1).write.format("json").option("delimiter", ",").mode("overwrite").save("MongoDB/airlines_json")

### FOR THE REST OF THE INDUSTRIES WE ARE GOING TO FOLLOW THE SAME STEPS

# AUTOMOTIVE INDUSTRY

In [36]:
automotive_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/automotive").toPandas().ffill()
automotive_df.head(5)

Unnamed: 0,Datetime,TSLA_Close,GM_Close,F_Close
0,2022-02-28 09:30:00-05:00,822.26001,46.27,17.370001
1,2022-02-28 09:31:00-05:00,818.580017,46.145,17.43
2,2022-02-28 09:32:00-05:00,823.400024,46.080002,17.4699
3,2022-02-28 09:33:00-05:00,826.72998,46.18,17.4599
4,2022-02-28 09:34:00-05:00,829.280029,46.259998,17.469999


In [37]:
import pandas as pd
automotive_df["Datetime"] = pd.to_datetime(automotive_df["Datetime"])
automotive_df

Unnamed: 0,Datetime,TSLA_Close,GM_Close,F_Close
0,2022-02-28 09:30:00-05:00,822.260010,46.270000,17.370001
1,2022-02-28 09:31:00-05:00,818.580017,46.145000,17.430000
2,2022-02-28 09:32:00-05:00,823.400024,46.080002,17.469900
3,2022-02-28 09:33:00-05:00,826.729980,46.180000,17.459900
4,2022-02-28 09:34:00-05:00,829.280029,46.259998,17.469999
...,...,...,...,...
1164,2022-03-02 15:55:00-05:00,880.530029,46.459999,18.165001
1165,2022-03-02 15:56:00-05:00,881.640015,46.439999,18.150000
1166,2022-03-02 15:57:00-05:00,880.429993,46.360001,18.110001
1167,2022-03-02 15:58:00-05:00,880.710022,46.325001,18.100000


In [38]:
automotive_df = automotive_df.set_index("Datetime")
automotive_df.head(1)

Unnamed: 0_level_0,TSLA_Close,GM_Close,F_Close
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2022-02-28 09:30:00-05:00,822.26001,46.27,17.370001


In [39]:
automotive_df["automotive_value"] = automotive_df.apply(lambda row: row.TSLA_Close + row.GM_Close + row.F_Close, axis=1)
automotive_df.head(1)

Unnamed: 0_level_0,TSLA_Close,GM_Close,F_Close,automotive_value
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2022-02-28 09:30:00-05:00,822.26001,46.27,17.370001,885.900011


In [40]:
lags = [1,2,3]

for lag in lags:
    automotive_df[f"automotive_value_{lag}"] = automotive_df["automotive_value"].shift(lag)

In [41]:
automotive_df = automotive_df.iloc[3:, :]

In [42]:
automotive_df.head(1)

Unnamed: 0_level_0,TSLA_Close,GM_Close,F_Close,automotive_value,automotive_value_1,automotive_value_2,automotive_value_3
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2022-02-28 09:33:00-05:00,826.72998,46.18,17.4599,890.369881,886.949926,882.155018,885.900011


In [43]:
automotive_df["slope_x1"] = (automotive_df["automotive_value"] - automotive_df["automotive_value_1"]) / automotive_df["automotive_value_1"]
automotive_df["slope_x2"] = (automotive_df["automotive_value_1"] - automotive_df["automotive_value_2"]) / automotive_df["automotive_value_2"]
automotive_df["slope_x3"] = (automotive_df["automotive_value_2"] - automotive_df["automotive_value_3"]) / automotive_df["automotive_value_3"]
automotive_df["automotive_value_t+1"] = automotive_df["automotive_value"].shift(-1)

In [44]:
automotive_df = automotive_df.iloc[:-1, :]

In [45]:
import numpy as np
automotive_df["buy_sell_decision"] = np.where(automotive_df["automotive_value_t+1"] > automotive_df["automotive_value"], 1, 0)

In [46]:
from datetime import datetime

automotive_df = automotive_df.reset_index()
automotive_df["date"] = automotive_df["Datetime"].apply(datetime.date)
automotive_df.head(1)

Unnamed: 0,Datetime,TSLA_Close,GM_Close,F_Close,automotive_value,automotive_value_1,automotive_value_2,automotive_value_3,slope_x1,slope_x2,slope_x3,automotive_value_t+1,buy_sell_decision,date
0,2022-02-28 09:33:00-05:00,826.72998,46.18,17.4599,890.369881,886.949926,882.155018,885.900011,0.003856,0.005435,-0.004227,893.010027,1,2022-02-28


In [47]:
automotive_df = automotive_df[["slope_x1", "slope_x2", "slope_x3", "buy_sell_decision"]]

In [48]:
automotive_df.to_csv(r'clean_automotive.csv')

In [49]:
automotive_clean_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/clean_automotive_csv")
automotive_clean_df.limit(2).toPandas()

Unnamed: 0,_c0,slope_x1,slope_x2,slope_x3,buy_sell_decision
0,0,0.003856,0.005435,-0.004227,1
1,1,0.002965,0.003856,0.005435,0


In [50]:
automotive_clean_df= automotive_clean_df.withColumn("slope_x1",automotive_clean_df.slope_x1.cast('double'))
automotive_clean_df =automotive_clean_df.withColumn("slope_x2",automotive_clean_df.slope_x2.cast('double'))
automotive_clean_df =automotive_clean_df.withColumn("slope_x3",automotive_clean_df.slope_x3.cast('double'))

In [51]:
automotive_clean_df = automotive_clean_df.drop("_c0")
automotive_clean_df =automotive_clean_df.withColumn("buy_sell_decision",automotive_clean_df.buy_sell_decision.cast('int'))

In [52]:
from pyspark.ml.feature import StringIndexer

label_colum = "buy_sell_decision"

numericCols = [field for (field, dataType) in automotive_clean_df.dtypes if ((dataType == "double") & (field != "buy_sell_decision"))]

assemblerInputs = numericCols
print("Feature columns: ",assemblerInputs)

Feature columns:  ['slope_x1', 'slope_x2', 'slope_x3']


In [53]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

test_pipeline = Pipeline(stages = [vecAssembler])

features_df = test_pipeline.fit(automotive_clean_df).transform(automotive_clean_df)

In [54]:
seed = 42
lr2 = LogisticRegression(labelCol="buy_sell_decision", featuresCol="features")
dt2 = DecisionTreeClassifier(labelCol="buy_sell_decision", featuresCol="features",seed=seed)
rf2 = RandomForestClassifier(labelCol="buy_sell_decision", featuresCol="features",maxDepth=10,seed=seed)
gbt2 = GBTClassifier(labelCol="buy_sell_decision", featuresCol="features",maxIter=10,seed=seed)

classifiers = [lr2,dt2,rf2,gbt2]

In [55]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]

In [56]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="buy_sell_decision",  metricName="accuracy")

In [57]:
seed = 42
(x_train, x_test) = automotive_clean_df.randomSplit([0.8,0.2],seed=seed)

In [58]:
models = [pipeline.fit(x_train) for pipeline in pipelines]
models

[PipelineModel_3e9d3e675bda,
 PipelineModel_4887b18a8ca2,
 PipelineModel_e7d4aba01599,
 PipelineModel_c9d37ae69327]

In [59]:
import pandas as pd
names = []
values = [] 

for model in models:
    prediction_df = model.transform(x_test)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df2 = pd.DataFrame(data)
df2.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df2

Unnamed: 0,name,accuracy,model
0,LogisticRegressionModel,0.546341,PipelineModel_3e9d3e675bda
2,RandomForestClassificationModel,0.507317,PipelineModel_e7d4aba01599
1,DecisionTreeClassificationModel,0.487805,PipelineModel_4887b18a8ca2
3,GBTClassificationModel,0.487805,PipelineModel_c9d37ae69327


In [60]:
best_model=df2.iloc[0]['model']

In [61]:
best_model.transform(x_test).groupby("buy_sell_decision").pivot("prediction").count().toPandas()

                                                                                

Unnamed: 0,buy_sell_decision,0.0,1.0
0,1,7,95
1,0,17,86


In [62]:
modelPath = "hdfs://localhost:9000/model-registry/automotive-stocks-classifier"
best_model.write().overwrite().save(modelPath)

In [63]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [64]:
automotive_predictions = savedModel.transform(x_test)
automotive_predictions.select("features", "buy_sell_decision", "prediction")
automotive_predictions.limit(5).toPandas()

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision,features,rawPrediction,probability,prediction
0,-0.005561,0.003665,0.004109,0,"[-0.005561279040621582, 0.0036648114560086584,...","[0.16915211804557026, -0.16915211804557026]","[0.5421874868730672, 0.4578125131269329]",0.0
1,-0.00456,0.000108,-0.002689,1,"[-0.004559918508228743, 0.0001078023551620459,...","[-0.0661983828365035, 0.0661983828365035]","[0.48345644531580667, 0.5165435546841933]",1.0
2,-0.004487,-0.003931,-0.000805,1,"[-0.004487040076603708, -0.00393101139148905, ...","[-0.07730456228587912, 0.07730456228587912]","[0.4806834780909789, 0.519316521909021]",1.0
3,-0.003861,0.000414,-0.001055,1,"[-0.003860529512974832, 0.0004140807065025922,...","[-0.02713442549840981, 0.02713442549840981]","[0.49321680981256055, 0.5067831901874394]",1.0
4,-0.003632,0.00175,0.002311,1,"[-0.003632120966492107, 0.0017504593161793707,...","[0.07590644809204394, -0.07590644809204394]","[0.5189675056672156, 0.48103249433278433]",0.0


In [65]:
automotive_predictions.coalesce(1).write.format("json").option("delimiter", ",").mode("overwrite").save("MongoDB/automotive_json")

# ENERGY INDUSTRY

In [66]:
energy_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/energy").toPandas().ffill()
energy_df.head(5)

Unnamed: 0,Datetime,XOM_Close,CVX_Close,SHELL_Close
0,2022-02-28 09:30:00-05:00,76.659599,139.550003,52.34
1,2022-02-28 09:31:00-05:00,76.610001,139.714996,52.380001
2,2022-02-28 09:32:00-05:00,76.559998,139.449997,52.289799
3,2022-02-28 09:33:00-05:00,76.6045,139.589996,52.23
4,2022-02-28 09:34:00-05:00,76.389999,139.2099,52.169998


In [67]:
import pandas as pd
energy_df["Datetime"] = pd.to_datetime(energy_df["Datetime"])
energy_df

Unnamed: 0,Datetime,XOM_Close,CVX_Close,SHELL_Close
0,2022-02-28 09:30:00-05:00,76.659599,139.550003,52.340000
1,2022-02-28 09:31:00-05:00,76.610001,139.714996,52.380001
2,2022-02-28 09:32:00-05:00,76.559998,139.449997,52.289799
3,2022-02-28 09:33:00-05:00,76.604500,139.589996,52.230000
4,2022-02-28 09:34:00-05:00,76.389999,139.209900,52.169998
...,...,...,...,...
1164,2022-03-02 15:55:00-05:00,80.735001,154.240005,54.840000
1165,2022-03-02 15:56:00-05:00,80.665001,154.164993,54.744999
1166,2022-03-02 15:57:00-05:00,80.570000,154.184998,54.689999
1167,2022-03-02 15:58:00-05:00,80.544998,154.145004,54.685001


In [68]:
energy_df = energy_df.set_index("Datetime")
energy_df.head(1)

Unnamed: 0_level_0,XOM_Close,CVX_Close,SHELL_Close
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2022-02-28 09:30:00-05:00,76.659599,139.550003,52.34


In [69]:
energy_df["energy_value"] = energy_df.apply(lambda row: row.XOM_Close + row.CVX_Close + row.SHELL_Close, axis=1)
energy_df.head(1)

Unnamed: 0_level_0,XOM_Close,CVX_Close,SHELL_Close,energy_value
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2022-02-28 09:30:00-05:00,76.659599,139.550003,52.34,268.549603


In [70]:
lags = [1,2,3]

for lag in lags:
    energy_df[f"energy_value_{lag}"] = energy_df["energy_value"].shift(lag)

In [71]:
energy_df = energy_df.iloc[3:, :]

In [72]:
energy_df["slope_x1"] = (energy_df["energy_value"] - energy_df["energy_value_1"]) / energy_df["energy_value_1"]
energy_df["slope_x2"] = (energy_df["energy_value_1"] - energy_df["energy_value_2"]) / energy_df["energy_value_2"]
energy_df["slope_x3"] = (energy_df["energy_value_2"] - energy_df["energy_value_3"]) / energy_df["energy_value_3"]
energy_df["energy_value_t+1"] = energy_df["energy_value"].shift(-1)

In [73]:
energy_df =energy_df.iloc[:-1, :]

In [74]:
import numpy as np
energy_df["buy_sell_decision"] = np.where(energy_df["energy_value_t+1"] > energy_df["energy_value"], 1, 0)

In [75]:
from datetime import datetime

energy_df = energy_df.reset_index()
energy_df["date"] = energy_df["Datetime"].apply(datetime.date)
energy_df.head(1)

Unnamed: 0,Datetime,XOM_Close,CVX_Close,SHELL_Close,energy_value,energy_value_1,energy_value_2,energy_value_3,slope_x1,slope_x2,slope_x3,energy_value_t+1,buy_sell_decision,date
0,2022-02-28 09:33:00-05:00,76.6045,139.589996,52.23,268.424496,268.299793,268.704998,268.549603,0.000465,-0.001508,0.000579,267.769897,0,2022-02-28


In [76]:
energy_df = energy_df[["slope_x1", "slope_x2", "slope_x3", "buy_sell_decision"]]

In [77]:
energy_df.to_csv(r'clean_energy.csv')

In [78]:
energy_clean_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/clean_energy_csv")
energy_clean_df.limit(2).toPandas()

Unnamed: 0,_c0,slope_x1,slope_x2,slope_x3,buy_sell_decision
0,0,0.000465,-0.001508,0.000579,0
1,1,-0.002439,0.000465,-0.001508,1


In [79]:
energy_clean_df= energy_clean_df.withColumn("slope_x1",energy_clean_df.slope_x1.cast('double'))
energy_clean_df =energy_clean_df.withColumn("slope_x2",energy_clean_df.slope_x2.cast('double'))
energy_clean_df =energy_clean_df.withColumn("slope_x3",energy_clean_df.slope_x3.cast('double'))

In [80]:
energy_clean_df = energy_clean_df.drop("_c0")
energy_clean_df =energy_clean_df.withColumn("buy_sell_decision",energy_clean_df.buy_sell_decision.cast('int'))

In [81]:
from pyspark.ml.feature import StringIndexer

label_colum = "buy_sell_decision"

numericCols = [field for (field, dataType) in energy_clean_df.dtypes if ((dataType == "double") & (field != "buy_sell_decision"))]

assemblerInputs = numericCols
print("Feature columns: ",assemblerInputs)

Feature columns:  ['slope_x1', 'slope_x2', 'slope_x3']


In [82]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

test_pipeline = Pipeline(stages = [vecAssembler])

features_df = test_pipeline.fit(energy_clean_df).transform(energy_clean_df)

In [83]:
seed = 42
lr3 = LogisticRegression(labelCol="buy_sell_decision", featuresCol="features")
dt3 = DecisionTreeClassifier(labelCol="buy_sell_decision", featuresCol="features",seed=seed)
rf3 = RandomForestClassifier(labelCol="buy_sell_decision", featuresCol="features",maxDepth=10,seed=seed)
gbt3 = GBTClassifier(labelCol="buy_sell_decision", featuresCol="features",maxIter=10,seed=seed)

classifiers = [lr3,dt3,rf3,gbt3]

In [84]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]

In [85]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="buy_sell_decision",  metricName="accuracy")

In [86]:
seed = 42
(x_train, x_test) = energy_clean_df.randomSplit([0.8,0.2],seed=seed)

In [87]:
models = [pipeline.fit(x_train) for pipeline in pipelines]
models

[PipelineModel_6418ecf6b534,
 PipelineModel_71ea02bb583f,
 PipelineModel_8d5b8678ee9c,
 PipelineModel_2dfddb235638]

In [88]:
import pandas as pd
names = []
values = [] 

for model in models:
    prediction_df = model.transform(x_test)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df3 = pd.DataFrame(data)
df3.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df3

Unnamed: 0,name,accuracy,model
1,DecisionTreeClassificationModel,0.536585,PipelineModel_71ea02bb583f
0,LogisticRegressionModel,0.526829,PipelineModel_6418ecf6b534
2,RandomForestClassificationModel,0.502439,PipelineModel_8d5b8678ee9c
3,GBTClassificationModel,0.497561,PipelineModel_2dfddb235638


In [89]:
best_model=df3.iloc[0]['model']

In [90]:
best_model.transform(x_test).groupby("buy_sell_decision").pivot("prediction").count().toPandas()

                                                                                

Unnamed: 0,buy_sell_decision,0.0,1.0
0,1,43,69
1,0,41,52


In [91]:
modelPath = "hdfs://localhost:9000/model-registry/energy-stocks-classifier"
best_model.write().overwrite().save(modelPath)

In [92]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [93]:
energy_predictions = savedModel.transform(x_test)
energy_predictions.select("features", "buy_sell_decision", "prediction")
energy_predictions.limit(5).toPandas()

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision,features,rawPrediction,probability,prediction
0,-0.002807,0.001774,-0.002336,1,"[-0.00280688490142731, 0.0017742077411634035, ...","[2.0, 0.0]","[1.0, 0.0]",0.0
1,-0.002473,-7.8e-05,0.001562,0,"[-0.0024734713314778795, -7.805700184633366e-0...","[55.0, 66.0]","[0.45454545454545453, 0.5454545454545454]",1.0
2,-0.002368,0.002965,0.000363,0,"[-0.002368046942765487, 0.0029647028680362566,...","[4.0, 11.0]","[0.26666666666666666, 0.7333333333333333]",1.0
3,-0.002192,0.001754,-0.001045,1,"[-0.0021923351926209475, 0.0017543671002370567...","[4.0, 11.0]","[0.26666666666666666, 0.7333333333333333]",1.0
4,-0.001828,-0.000924,0.002045,1,"[-0.001828130058620124, -0.0009236437990173076...","[55.0, 66.0]","[0.45454545454545453, 0.5454545454545454]",1.0


In [94]:
energy_predictions.coalesce(1).write.format("json").option("delimiter", ",").mode("overwrite").save("MongoDB/energy_json")

# FINANCE INDUSTRY

In [95]:
finance_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/finance").toPandas().ffill()
finance_df

Unnamed: 0,Datetime,GS_Close,JPM_Close,AXP_Close
0,2022-02-28 09:30:00-05:00,341.410004,144.029999,189.759995
1,2022-02-28 09:31:00-05:00,340.480011,143.940002,189.630005
2,2022-02-28 09:32:00-05:00,340.880005,143.630005,189.660004
3,2022-02-28 09:33:00-05:00,340.369995,143.720093,189.229996
4,2022-02-28 09:34:00-05:00,340.410004,143.824997,190.175003
...,...,...,...,...
1165,2022-03-02 15:55:00-05:00,336.690002,139.414993,180.110001
1166,2022-03-02 15:56:00-05:00,336.690002,139.289993,180.089996
1167,2022-03-02 15:57:00-05:00,336.350006,139.205002,180.005005
1168,2022-03-02 15:58:00-05:00,336.239990,139.110001,179.860001


In [96]:
import pandas as pd
finance_df["Datetime"] = pd.to_datetime(finance_df["Datetime"])
finance_df.head(5)

Unnamed: 0,Datetime,GS_Close,JPM_Close,AXP_Close
0,2022-02-28 09:30:00-05:00,341.410004,144.029999,189.759995
1,2022-02-28 09:31:00-05:00,340.480011,143.940002,189.630005
2,2022-02-28 09:32:00-05:00,340.880005,143.630005,189.660004
3,2022-02-28 09:33:00-05:00,340.369995,143.720093,189.229996
4,2022-02-28 09:34:00-05:00,340.410004,143.824997,190.175003


In [97]:
finance_df = finance_df.set_index("Datetime")
finance_df

Unnamed: 0_level_0,GS_Close,JPM_Close,AXP_Close
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2022-02-28 09:30:00-05:00,341.410004,144.029999,189.759995
2022-02-28 09:31:00-05:00,340.480011,143.940002,189.630005
2022-02-28 09:32:00-05:00,340.880005,143.630005,189.660004
2022-02-28 09:33:00-05:00,340.369995,143.720093,189.229996
2022-02-28 09:34:00-05:00,340.410004,143.824997,190.175003
...,...,...,...
2022-03-02 15:55:00-05:00,336.690002,139.414993,180.110001
2022-03-02 15:56:00-05:00,336.690002,139.289993,180.089996
2022-03-02 15:57:00-05:00,336.350006,139.205002,180.005005
2022-03-02 15:58:00-05:00,336.239990,139.110001,179.860001


In [98]:
finance_df["finance_value"] =finance_df.apply(lambda row: row.GS_Close + row.JPM_Close + row.AXP_Close, axis=1)
finance_df.head(1)

Unnamed: 0_level_0,GS_Close,JPM_Close,AXP_Close,finance_value
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2022-02-28 09:30:00-05:00,341.410004,144.029999,189.759995,675.199997


In [99]:
lags = [1,2,3]

for lag in lags:
    finance_df[f"finance_value_{lag}"] = finance_df["finance_value"].shift(lag)

In [100]:
finance_df.head(2)

Unnamed: 0_level_0,GS_Close,JPM_Close,AXP_Close,finance_value,finance_value_1,finance_value_2,finance_value_3
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2022-02-28 09:30:00-05:00,341.410004,144.029999,189.759995,675.199997,,,
2022-02-28 09:31:00-05:00,340.480011,143.940002,189.630005,674.050018,675.199997,,


In [101]:
finance_df = finance_df.iloc[3:, :]

finance_df["slope_x1"] = (finance_df["finance_value"] - finance_df["finance_value_1"]) / finance_df["finance_value_1"]
finance_df["slope_x2"] = (finance_df["finance_value_1"] - finance_df["finance_value_2"]) / finance_df["finance_value_2"]
finance_df["slope_x3"] = (finance_df["finance_value_2"] - finance_df["finance_value_3"]) / finance_df["finance_value_3"]
finance_df["finance_value_t+1"] = finance_df["finance_value"].shift(lag)

finance_df =finance_df.iloc[:-1, :]


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  after removing the cwd from sys.path.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[r

In [102]:
finance_df["buy_sell_decision"] = np.where(finance_df["finance_value_t+1"] > finance_df["finance_value"], 1, 0)

In [103]:
finance_df = finance_df.reset_index()
finance_df["date"] = finance_df["Datetime"].apply(datetime.date)
finance_df.head(1)

Unnamed: 0,Datetime,GS_Close,JPM_Close,AXP_Close,finance_value,finance_value_1,finance_value_2,finance_value_3,slope_x1,slope_x2,slope_x3,finance_value_t+1,buy_sell_decision,date
0,2022-02-28 09:33:00-05:00,340.369995,143.720093,189.229996,673.320084,674.170013,674.050018,675.199997,-0.001261,0.000178,-0.001703,,0,2022-02-28


In [104]:
finance_df =finance_df[["slope_x1", "slope_x2", "slope_x3", "buy_sell_decision"]]
finance_df.to_csv(r'clean_finance.csv')

In [105]:
finance_clean_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/clean_finance_csv")
finance_clean_df.limit(5).toPandas()

Unnamed: 0,_c0,slope_x1,slope_x2,slope_x3,buy_sell_decision
0,0,-0.001261,0.000178,-0.001703,0
1,1,0.001619,-0.001261,0.000178,0
2,2,0.001676,0.001619,-0.001261,0
3,3,0.003242,0.001676,0.001619,0
4,4,-0.001719,0.003242,0.001676,0


In [106]:
finance_clean_df= finance_clean_df.withColumn("slope_x1",finance_clean_df.slope_x1.cast('double'))
finance_clean_df =finance_clean_df.withColumn("slope_x2",finance_clean_df.slope_x2.cast('double'))
finance_clean_df =finance_clean_df.withColumn("slope_x3",finance_clean_df.slope_x3.cast('double'))

In [107]:
finance_clean_df = finance_clean_df.drop("_c0")
finance_clean_df =finance_clean_df.withColumn("buy_sell_decision",finance_clean_df.buy_sell_decision.cast('int'))

In [108]:
from pyspark.ml.feature import StringIndexer

label_colum = "buy_sell_decision"

numericCols = [field for (field, dataType) in finance_clean_df.dtypes if ((dataType == "double") & (field != "buy_sell_decision"))]

assemblerInputs = numericCols
print("Feature columns: ",assemblerInputs)

Feature columns:  ['slope_x1', 'slope_x2', 'slope_x3']


In [109]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

test_pipeline = Pipeline(stages = [vecAssembler])

features_df = test_pipeline.fit(finance_clean_df).transform(finance_clean_df)

In [110]:
seed = 42
lr4 = LogisticRegression(labelCol="buy_sell_decision", featuresCol="features")
dt4 = DecisionTreeClassifier(labelCol="buy_sell_decision", featuresCol="features",seed=seed)
rf4 = RandomForestClassifier(labelCol="buy_sell_decision", featuresCol="features",maxDepth=10,seed=seed)
gbt4 = GBTClassifier(labelCol="buy_sell_decision", featuresCol="features",maxIter=10,seed=seed)

classifiers = [lr4,dt4,rf4,gbt4]

In [111]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]

In [112]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="buy_sell_decision",  metricName="accuracy")

In [113]:
seed = 42
(x_train, x_test) = finance_clean_df.randomSplit([0.8,0.2],seed=seed)

In [114]:
models = [pipeline.fit(x_train) for pipeline in pipelines]
models

[PipelineModel_aa6f8fd39354,
 PipelineModel_1eb75404f0f0,
 PipelineModel_dd6aba238ed1,
 PipelineModel_bba9738c54df]

In [115]:
import pandas as pd
names = []
values = [] 

for model in models: 
    prediction_df = model.transform(x_test)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df4 = pd.DataFrame(data)
df4.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df4

Unnamed: 0,name,accuracy,model
0,LogisticRegressionModel,1.0,PipelineModel_aa6f8fd39354
2,RandomForestClassificationModel,0.931707,PipelineModel_dd6aba238ed1
3,GBTClassificationModel,0.878049,PipelineModel_bba9738c54df
1,DecisionTreeClassificationModel,0.873171,PipelineModel_1eb75404f0f0


In [116]:
best_model=df4.iloc[0]['model']

In [117]:
best_model.transform(x_test).groupby("buy_sell_decision").pivot("prediction").count().toPandas()

                                                                                

Unnamed: 0,buy_sell_decision,0.0,1.0
0,1,,100.0
1,0,105.0,


In [118]:
modelPath = "hdfs://localhost:9000/model-registry/finance-stocks-classifier"
best_model.write().overwrite().save(modelPath)

In [119]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [120]:
finance_predictions = savedModel.transform(x_test)
finance_predictions.select("features", "buy_sell_decision", "prediction")
finance_predictions.limit(5).toPandas()

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision,features,rawPrediction,probability,prediction
0,-0.002996,0.000391,0.001256,1,"[-0.002995534786798572, 0.0003912146189028336,...","[-17.68487390210297, 17.68487390210297]","[2.0871652820906436e-08, 0.9999999791283471]",1.0
1,-0.002706,-0.000325,0.000653,1,"[-0.0027059208787953855, -0.000324876481046172...","[-31.481026381110173, 31.481026381110173]","[2.1279624396994405e-14, 0.9999999999999787]",1.0
2,-0.002536,-0.001819,0.002016,1,"[-0.002536347164417309, -0.001819394998980686,...","[-30.806716633431506, 30.806716633431506]","[4.1765045660499457e-14, 0.9999999999999583]",1.0
3,-0.002132,0.000256,4.4e-05,1,"[-0.002132137916398404, 0.00025557501024714365...","[-24.292490865241287, 24.292490865241287]","[2.81776821265639e-11, 0.9999999999718223]",1.0
4,-0.001973,0.000972,0.000106,1,"[-0.0019733493035105592, 0.0009724133415611252...","[-11.781547910783491, 11.781547910783491]","[7.644259720883805e-06, 0.9999923557402791]",1.0


In [121]:
finance_predictions.coalesce(1).write.format("json").option("delimiter", ",").mode("overwrite").save("MongoDB/finance_json")

# ENTERTAINMENT INDUSTRY

In [122]:
entertainment_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/entertainment").toPandas().ffill()
entertainment_df

entertainment_df["Datetime"] = pd.to_datetime(entertainment_df["Datetime"])
entertainment_df.head(5)


Unnamed: 0,Datetime,NFLX_Close,DIS_Close,EA_Close
0,2022-02-28 09:30:00-05:00,391.589905,148.429993,128.279999
1,2022-02-28 09:31:00-05:00,391.600006,148.210007,128.449997
2,2022-02-28 09:32:00-05:00,391.019989,148.419998,128.690002
3,2022-02-28 09:33:00-05:00,390.0,148.490005,129.220001
4,2022-02-28 09:34:00-05:00,389.975006,148.479996,129.249893


In [123]:
entertainment_df = entertainment_df.set_index("Datetime")
entertainment_df

Unnamed: 0_level_0,NFLX_Close,DIS_Close,EA_Close
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2022-02-28 09:30:00-05:00,391.589905,148.429993,128.279999
2022-02-28 09:31:00-05:00,391.600006,148.210007,128.449997
2022-02-28 09:32:00-05:00,391.019989,148.419998,128.690002
2022-02-28 09:33:00-05:00,390.000000,148.490005,129.220001
2022-02-28 09:34:00-05:00,389.975006,148.479996,129.249893
...,...,...,...
2022-03-02 15:55:00-05:00,380.220001,147.289993,129.419998
2022-03-02 15:56:00-05:00,380.369995,147.190002,129.380005
2022-03-02 15:57:00-05:00,380.450012,147.080002,129.369995
2022-03-02 15:58:00-05:00,380.540009,147.020004,129.279999


In [124]:
entertainment_df["entertainment_value"] = entertainment_df.apply(lambda row: row.NFLX_Close + row.DIS_Close + row.EA_Close, axis=1)
entertainment_df.head(2)

Unnamed: 0_level_0,NFLX_Close,DIS_Close,EA_Close,entertainment_value
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2022-02-28 09:30:00-05:00,391.589905,148.429993,128.279999,668.299896
2022-02-28 09:31:00-05:00,391.600006,148.210007,128.449997,668.26001


In [125]:
lags = [1,2,3]

for lag in lags:
    entertainment_df[f"entertainment_value_{lag}"] = entertainment_df["entertainment_value"].shift(lag)

In [126]:
entertainment_df = entertainment_df.iloc[3:, :]

In [127]:
entertainment_df["slope_x1"] = (entertainment_df["entertainment_value"] - entertainment_df["entertainment_value_1"]) / entertainment_df["entertainment_value_1"]
entertainment_df["slope_x2"] = (entertainment_df["entertainment_value_1"] - entertainment_df["entertainment_value_2"]) / entertainment_df["entertainment_value_2"]
entertainment_df["slope_x3"] = (entertainment_df["entertainment_value_2"] - entertainment_df["entertainment_value_3"]) / entertainment_df["entertainment_value_3"]
entertainment_df["entertainment_value_t+1"] = entertainment_df["entertainment_value"].shift(-1)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until
A value is trying to be set on a copy of a slice from a DataFrame.
Try using

In [128]:
entertainment_df = entertainment_df.iloc[:-1, :]

In [129]:
entertainment_df["buy_sell_decision"] = np.where(entertainment_df["entertainment_value_t+1"] > entertainment_df["entertainment_value"], 1, 0)

In [130]:
entertainment_df = entertainment_df.reset_index()
entertainment_df["date"] = entertainment_df["Datetime"].apply(datetime.date)
entertainment_df.head(1)

Unnamed: 0,Datetime,NFLX_Close,DIS_Close,EA_Close,entertainment_value,entertainment_value_1,entertainment_value_2,entertainment_value_3,slope_x1,slope_x2,slope_x3,entertainment_value_t+1,buy_sell_decision,date
0,2022-02-28 09:33:00-05:00,390.0,148.490005,129.220001,667.710007,668.12999,668.26001,668.299896,-0.000629,-0.000195,-6e-05,667.704895,0,2022-02-28


In [131]:
entertainment_df = entertainment_df[["slope_x1", "slope_x2", "slope_x3", "buy_sell_decision"]]

In [132]:
entertainment_df.to_csv(r'clean_entertainment.csv')

In [133]:
entertainment_clean_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/clean_entertainment_csv")
entertainment_clean_df.limit(5).toPandas()


Unnamed: 0,_c0,slope_x1,slope_x2,slope_x3,buy_sell_decision
0,0,-0.000629,-0.000195,-6e-05,0
1,1,-8e-06,-0.000629,-0.000195,0
2,2,-0.000599,-8e-06,-0.000629,0
3,3,-0.001386,-0.000599,-8e-06,0
4,4,-0.002026,-0.001386,-0.000599,0


In [134]:
entertainment_clean_df = entertainment_clean_df.withColumn("slope_x1",entertainment_clean_df.slope_x1.cast('double'))
entertainment_clean_df =entertainment_clean_df.withColumn("slope_x2",entertainment_clean_df.slope_x2.cast('double'))
entertainment_clean_df =entertainment_clean_df.withColumn("slope_x3",entertainment_clean_df.slope_x3.cast('double'))


In [135]:
entertainment_clean_df = entertainment_clean_df.drop("_c0")

In [136]:
entertainment_clean_df =entertainment_clean_df.withColumn("buy_sell_decision",entertainment_clean_df.buy_sell_decision.cast('int'))

In [137]:
# entertainment
numericCols = [field for (field, dataType) in entertainment_clean_df.dtypes if ((dataType == "double") & (field != "buy_sell_decision"))]

assemblerInputs = numericCols
print("Feature columns: ",assemblerInputs)


Feature columns:  ['slope_x1', 'slope_x2', 'slope_x3']


In [138]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

test_pipeline = Pipeline(stages = [vecAssembler])

# entertainment
features_df = test_pipeline.fit(entertainment_clean_df).transform(entertainment_clean_df) # energy


In [139]:
seed = 42
lr5 = LogisticRegression(labelCol="buy_sell_decision", featuresCol="features")
dt5 = DecisionTreeClassifier(labelCol="buy_sell_decision", featuresCol="features",seed=seed)
rf5 = RandomForestClassifier(labelCol="buy_sell_decision", featuresCol="features",maxDepth=10,seed=seed)
gbt5 = GBTClassifier(labelCol="buy_sell_decision", featuresCol="features",maxIter=10,seed=seed)
classifiers = [lr5,dt5,rf5,gbt5]

In [140]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]

In [141]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="buy_sell_decision",  metricName="accuracy")

In [142]:
seed = 42
(x_train, x_test) = entertainment_clean_df.randomSplit([0.8,0.2],seed=seed)

In [143]:
models = [pipeline.fit(x_train) for pipeline in pipelines]
models

[PipelineModel_4d98b41f3958,
 PipelineModel_0386b251e8ec,
 PipelineModel_34af66ef69a3,
 PipelineModel_8eee553d2594]

In [144]:
import pandas as pd
names = []
values = [] 

for model in models: 
    prediction_df = model.transform(x_test)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df5 = pd.DataFrame(data)
df5.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df5


Unnamed: 0,name,accuracy,model
1,DecisionTreeClassificationModel,0.536585,PipelineModel_0386b251e8ec
2,RandomForestClassificationModel,0.492683,PipelineModel_34af66ef69a3
3,GBTClassificationModel,0.453659,PipelineModel_8eee553d2594
0,LogisticRegressionModel,0.429268,PipelineModel_4d98b41f3958


In [145]:
best_model=df5.iloc[0]['model']

In [146]:
best_model.transform(x_test).groupby("buy_sell_decision").pivot("prediction").count().toPandas()

                                                                                

Unnamed: 0,buy_sell_decision,0.0,1.0
0,1,63,22
1,0,88,32


In [147]:
modelPath = "hdfs://localhost:9000/model-registry/entertainment-stocks-classifier"
best_model.write().overwrite().save(modelPath)

In [148]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [149]:
entertainment_predictions = savedModel.transform(x_test)
entertainment_predictions.select("features", "buy_sell_decision", "prediction")
entertainment_predictions.limit(5).toPandas()

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision,features,rawPrediction,probability,prediction
0,-0.002873,-0.000682,0.000948,0,"[-0.0028725793855331073, -0.000681692060843952...","[5.0, 10.0]","[0.3333333333333333, 0.6666666666666666]",1.0
1,-0.002626,-0.000809,-0.001436,1,"[-0.0026255737184478277, -0.000809247298701227...","[3.0, 5.0]","[0.375, 0.625]",1.0
2,-0.002577,0.002202,-0.000704,0,"[-0.0025766981387399597, 0.002201971968792754,...","[16.0, 10.0]","[0.6153846153846154, 0.38461538461538464]",0.0
3,-0.00234,-0.00149,-0.000381,1,"[-0.0023397728851896333, -0.001489761847033939...","[25.0, 3.0]","[0.8928571428571429, 0.10714285714285714]",0.0
4,-0.002083,-0.00086,0.000713,1,"[-0.002083118997816012, -0.0008595967911876768...","[5.0, 10.0]","[0.3333333333333333, 0.6666666666666666]",1.0


In [150]:
entertainment_predictions.coalesce(1).write.format("json").option("delimiter", ",").mode("overwrite").save("MongoDB/entertainment_json")

# TECHNOLOGY INDUSTRY

In [151]:
tech_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/tech").toPandas().ffill()


tech_df["Datetime"] = pd.to_datetime(tech_df["Datetime"])
tech_df.head(5)


Unnamed: 0,Datetime,IBM_Close,MSFT_Close,INTC_Close
0,2022-02-28 09:30:00-05:00,122.379997,296.075012,47.34
1,2022-02-28 09:31:00-05:00,122.360001,295.76001,47.330002
2,2022-02-28 09:32:00-05:00,122.32,295.72641,47.32
3,2022-02-28 09:33:00-05:00,122.139999,295.459991,47.240002
4,2022-02-28 09:34:00-05:00,122.07,296.269989,47.150002


In [152]:
tech_df = tech_df.set_index("Datetime")
tech_df.head(5)

Unnamed: 0_level_0,IBM_Close,MSFT_Close,INTC_Close
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2022-02-28 09:30:00-05:00,122.379997,296.075012,47.34
2022-02-28 09:31:00-05:00,122.360001,295.76001,47.330002
2022-02-28 09:32:00-05:00,122.32,295.72641,47.32
2022-02-28 09:33:00-05:00,122.139999,295.459991,47.240002
2022-02-28 09:34:00-05:00,122.07,296.269989,47.150002


In [153]:
tech_df["tech_value"] = tech_df.apply(lambda row: row.IBM_Close + row.MSFT_Close + row.INTC_Close, axis=1)
tech_df.head(2)

Unnamed: 0_level_0,IBM_Close,MSFT_Close,INTC_Close,tech_value
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2022-02-28 09:30:00-05:00,122.379997,296.075012,47.34,465.79501
2022-02-28 09:31:00-05:00,122.360001,295.76001,47.330002,465.450012


In [154]:
lags = [1,2,3]

for lag in lags:
    tech_df[f"tech_value_{lag}"] = tech_df["tech_value"].shift(lag)

In [155]:
tech_df = tech_df.iloc[3:, :]

In [156]:
tech_df["slope_x1"] = (tech_df["tech_value"] - tech_df["tech_value_1"]) / tech_df["tech_value_1"]
tech_df["slope_x2"] = (tech_df["tech_value_1"] - tech_df["tech_value_2"]) / tech_df["tech_value_2"]
tech_df["slope_x3"] = (tech_df["tech_value_2"] - tech_df["tech_value_3"]) / tech_df["tech_value_3"]
tech_df["tech_value_t+1"] = tech_df["tech_value"].shift(-1)

In [157]:
tech_df = tech_df.iloc[:-1, :]

In [158]:
import numpy as np
tech_df["buy_sell_decision"] = np.where(tech_df["tech_value_t+1"] > tech_df["tech_value"], 1, 0)

In [159]:
tech_df = tech_df.reset_index()
tech_df.head(1)

Unnamed: 0,Datetime,IBM_Close,MSFT_Close,INTC_Close,tech_value,tech_value_1,tech_value_2,tech_value_3,slope_x1,slope_x2,slope_x3,tech_value_t+1,buy_sell_decision
0,2022-02-28 09:33:00-05:00,122.139999,295.459991,47.240002,464.839993,465.366409,465.450012,465.79501,-0.001131,-0.00018,-0.000741,465.48999,1


In [160]:
tech_df = tech_df.reset_index()
tech_df["date"] = tech_df["Datetime"].apply(datetime.date)
tech_df.head(1)

Unnamed: 0,index,Datetime,IBM_Close,MSFT_Close,INTC_Close,tech_value,tech_value_1,tech_value_2,tech_value_3,slope_x1,slope_x2,slope_x3,tech_value_t+1,buy_sell_decision,date
0,0,2022-02-28 09:33:00-05:00,122.139999,295.459991,47.240002,464.839993,465.366409,465.450012,465.79501,-0.001131,-0.00018,-0.000741,465.48999,1,2022-02-28


In [161]:
tech_df = tech_df[["slope_x1", "slope_x2", "slope_x3", "buy_sell_decision"]]

In [162]:
tech_df.to_csv(r'clean_tech.csv')

In [163]:
tech_clean_df = spark.read.parquet("hdfs://localhost:9000/datalake/std/stocks/clean_tech_csv")
tech_clean_df.limit(5).toPandas()

Unnamed: 0,_c0,slope_x1,slope_x2,slope_x3,buy_sell_decision
0,0,-0.001131,-0.00018,-0.000741,1
1,1,0.001398,-0.001131,-0.00018,0
2,2,-0.000161,0.001398,-0.001131,1
3,3,0.001128,-0.000161,0.001398,0
4,4,-0.001438,0.001128,-0.000161,0


In [164]:
tech_clean_df = tech_clean_df.withColumn("slope_x1",tech_clean_df.slope_x1.cast('double'))
tech_clean_df =tech_clean_df.withColumn("slope_x2",tech_clean_df.slope_x2.cast('double'))
tech_clean_df =tech_clean_df.withColumn("slope_x3",tech_clean_df.slope_x3.cast('double'))

In [165]:
tech_clean_df = tech_clean_df.drop("_c0")

In [166]:
tech_clean_df =tech_clean_df.withColumn("buy_sell_decision",tech_clean_df.buy_sell_decision.cast('int'))

In [167]:
numericCols = [field for (field, dataType) in tech_clean_df.dtypes if ((dataType == "double") & (field != "buy_sell_decision"))]

assemblerInputs = numericCols
print("Feature columns: ",assemblerInputs)

Feature columns:  ['slope_x1', 'slope_x2', 'slope_x3']


In [168]:
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

test_pipeline = Pipeline(stages = [vecAssembler])

features_df = test_pipeline.fit(tech_clean_df).transform(tech_clean_df)

In [169]:
seed = 42
lr6 = LogisticRegression(labelCol="buy_sell_decision", featuresCol="features")
dt6 = DecisionTreeClassifier(labelCol="buy_sell_decision", featuresCol="features",seed=seed)
rf6 = RandomForestClassifier(labelCol="buy_sell_decision", featuresCol="features",maxDepth=10,seed=seed)
gbt6 = GBTClassifier(labelCol="buy_sell_decision", featuresCol="features",maxIter=10,seed=seed)

classifiers = [lr6,dt6,rf6,gbt6]

In [170]:
from pyspark.ml import Pipeline

def create_pipeline(classifier):
    return Pipeline(stages = [vecAssembler, classifier])

pipelines = [create_pipeline(classifier) for classifier in classifiers]

In [171]:
evaluator = MulticlassClassificationEvaluator(labelCol="buy_sell_decision",  metricName="accuracy")

In [172]:
seed = 42
(x_train, x_test) = tech_clean_df.randomSplit([0.8,0.2],seed=seed)

In [173]:
models = [pipeline.fit(x_train) for pipeline in pipelines]
models

[PipelineModel_142db0e1dbbd,
 PipelineModel_a43bfabceee5,
 PipelineModel_03ef0ead8ec5,
 PipelineModel_e31d5b147eaf]

In [174]:
import pandas as pd
names = []
values = [] 

for model in models: 
    prediction_df = model.transform(x_test)
    accuracy = evaluator.evaluate(prediction_df)
    names.append(type(model.stages[-1]).__name__) # the algorithm is the last stage in the pipeline
    values.append(accuracy)

data = {'name':names,'accuracy':values,'model':models}
df6 = pd.DataFrame(data)
df6.sort_values(by=['accuracy'], inplace=True, ascending=False)  
df6


Unnamed: 0,name,accuracy,model
2,RandomForestClassificationModel,0.517073,PipelineModel_03ef0ead8ec5
3,GBTClassificationModel,0.507317,PipelineModel_e31d5b147eaf
0,LogisticRegressionModel,0.458537,PipelineModel_142db0e1dbbd
1,DecisionTreeClassificationModel,0.439024,PipelineModel_a43bfabceee5


In [175]:
best_model=df6.iloc[0]['model']

In [176]:
best_model.transform(x_test).groupby("buy_sell_decision").pivot("prediction").count().toPandas()

                                                                                

Unnamed: 0,buy_sell_decision,0.0,1.0
0,1,38,51
1,0,55,61


In [177]:
modelPath = "hdfs://localhost:9000/model-registry/tech-stocks-classifier"
best_model.write().overwrite().save(modelPath)

                                                                                

In [178]:
from pyspark.ml import PipelineModel
savedModel = PipelineModel.load(modelPath)

In [179]:
tech_predictions = savedModel.transform(x_test)
tech_predictions.select("features", "buy_sell_decision", "prediction")
tech_predictions.limit(5).toPandas()

Unnamed: 0,slope_x1,slope_x2,slope_x3,buy_sell_decision,features,rawPrediction,probability,prediction
0,-0.002366,0.001669,-0.000621,0,"[-0.0023663471629630822, 0.0016689413114238123...","[14.079017857142858, 5.920982142857143]","[0.703950892857143, 0.29604910714285715]",0.0
1,-0.001963,-0.00028,-0.00041,0,"[-0.0019627022400983346, -0.000280268675628146...","[5.246145189737825, 14.753854810262176]","[0.26230725948689126, 0.7376927405131088]",1.0
2,-0.001721,-0.000246,-0.000291,1,"[-0.0017211480300518324, -0.000246052110935261...","[9.46198984572326, 10.53801015427674]","[0.473099492286163, 0.526900507713837]",1.0
3,-0.001569,-0.001016,-0.000432,1,"[-0.0015693044191044006, -0.001016356207831703...","[11.105504124254125, 8.894495875745877]","[0.5552752062127062, 0.44472479378729385]",0.0
4,-0.001472,-0.000615,0.000173,0,"[-0.0014719299092907055, -0.000614703231330523...","[7.588445380824494, 12.411554619175506]","[0.37942226904122467, 0.6205777309587753]",1.0


In [180]:
tech_predictions.coalesce(1).write.format("json").option("delimiter", ",").mode("overwrite").save("MongoDB/tech_json")