In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import functools
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank, col

In [6]:
spark_application_name = "Spark_Project_ML"
spark = (SparkSession.builder.appName(spark_application_name).getOrCreate())
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [7]:
def get_data(path, schema, header=True, delimiter=';'):
    """Get datas from path

    Parameters
    ----------
    path : string
        file location
    schema : StructType
        Schema for dataframe
    header: bool
        Take the header or not, default True
    delimiter: 
        the delimiter of the input file, default ;
    Returns
    -------
        DataFrame
    """
    
    extension = path.split('.')[-1]
    if (extension == "csv"):
        df = spark.read.csv(path, schema, header=header, sep=delimiter)
    elif (extension == 'json'):
        df = spark.read.json(path, schema)
    else:
        # join all csv in a folder
        df = spark.read.csv(path, schema, header=header, sep=delimiter)
    return df

In [8]:
microsoft_path = 'stocks_data/MICROSOFT.csv'
amazon_path = 'stocks_data/AMAZON.csv'
zoom_path = 'stocks_data/ZOOM.csv'
facebook_path = 'stocks_data/FACEBOOK.csv'
apple_path = 'stocks_data/APPLE.csv'
google_path = 'stocks_data/GOOGLE.csv'
tesla_path = 'stocks_data/TESLA.csv'


stocksColumns = [StructField("Date",TimestampType()), StructField("High",DoubleType()), 
              StructField("Low",DoubleType()), StructField("Open",DoubleType()),
              StructField("Close",DoubleType()), StructField("Volume", DoubleType()), 
              StructField("Adj Close",DoubleType()), StructField("company_name", StringType())]

stocksSchema = StructType(stocksColumns)


microsoft = get_data(microsoft_path, stocksSchema, delimiter=',').drop(col("company_name"))
amazon = get_data(amazon_path, stocksSchema, delimiter=',').drop(col("company_name"))
zoom = get_data(zoom_path, stocksSchema, delimiter=',').drop(col("company_name"))
facebook = get_data(facebook_path, stocksSchema, delimiter=',').drop(col("company_name"))
apple = get_data(apple_path, stocksSchema, delimiter=',').drop(col("company_name"))
google = get_data(google_path, stocksSchema, delimiter=',').drop(col("company_name"))
tesla = get_data(tesla_path, stocksSchema, delimiter=',').drop(col("company_name"))

In [9]:
w = Window.partitionBy().orderBy("Date")

In [10]:
microsoft = microsoft.withColumn('diffOpenClose', microsoft.Open - microsoft.Close)
microsoft = microsoft.withColumn('diffHighLow', microsoft.High - microsoft.Low)
microsoft = microsoft.withColumn('target', F.when(F.lag(microsoft.Close).over(w) < microsoft.Close, 'yes').otherwise('no'))
microsoft.drop('Date')
categoricalColumns = ['High', 'Low', 'Open', 'Close']
stages = []

In [11]:
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + 'Index')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [12]:
label_stringIdx = StringIndexer(inputCol='target', outputCol='label')
stages += [label_stringIdx]

In [13]:
assembler = VectorAssembler(inputCols=[c + "classVec" for c in categoricalColumns], outputCol="features")
stages += [assembler]

In [14]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(microsoft)
microsoft = pipelineModel.transform(microsoft)

22/05/30 19:03:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:08 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [15]:
microsoft.select('Close', 'label', 'features').show()
microsoft = microsoft.withColumn("rank", percent_rank().over(w))
trainingData = microsoft.where("rank <= .8").drop("rank")
testData = microsoft.where("rank > .8").drop("rank")
#(trainingData, testData) = microsoft.randomSplit([0.8, 0.2])

22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------------+-----+--------------------+
|             Close|label|            features|
+------------------+-----+--------------------+
| 62.58000183105469|  1.0|(3736,[612,1543,2...|
| 62.29999923706055|  1.0|(3736,[610,963,24...|
| 62.29999923706055|  1.0|(3736,[609,962,24...|
| 62.84000015258789|  0.0|(3736,[618,1542,2...|
| 62.63999938964844|  1.0|(3736,[616,1549,2...|
|62.619998931884766|  1.0|(3736,[615,1545,2...|
|63.189998626708984|  0.0|(3736,[619,1548,2...|
| 62.61000061035156|  1.0|(3736,[620,1541,2...|
| 62.70000076293945|  0.0|(3736,[613,1546,2...|
|62.529998779296875|  1.0|(3736,[35,962,249...|
|              62.5|  1.0|(3736,[35,963,188...|
| 62.29999923706055|  1.0|(3736,[614,1544,2...|
|  62.7400016784668|  0.0|(3736,[611,1547,1...|
|62.959999084472656|  0.0|(3736,[617,1550,2...|
| 63.52000045776367|  0.0|(3736,[624,1552,2...|
| 63.68000030517578|  0.0|(3736,[627,1558,2...|
|  64.2699966430664|  0.0|(3736,[36,1560,18...|
| 65.77999877929688|  0.0|(3736,[666,970

In [16]:
dr = RandomForestRegressor(labelCol="label", featuresCol="features")

In [17]:
model = dr.fit(trainingData)
predictions = model.transform(testData)

evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Root Mean Squared Error (RMSE) on test data = 0.496615


22/05/30 19:03:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [18]:
predictions.select("features", "label", "prediction").show(1000)

+--------------------+-----+-------------------+
|            features|label|         prediction|
+--------------------+-----+-------------------+
|(3736,[452,1355,2...|  1.0|0.42209687669355833|
|(3736,[454,1367,2...|  1.0|0.42209687669355833|
|(3736,[445,1369,2...|  0.0|0.42209687669355833|
|(3736,[433,1341,2...|  1.0|0.42209687669355833|
|(3736,[424,1321,2...|  0.0|0.42209687669355833|
|(3736,[444,1352,2...|  0.0|0.42209687669355833|
|(3736,[455,1350,2...|  1.0|0.42209687669355833|
|(3736,[441,1363,2...|  0.0|0.42209687669355833|
|(3736,[442,958,22...|  1.0|0.42209687669355833|
|(3736,[419,1329,2...|  1.0|0.42209687669355833|
|(3736,[25,1307,22...|  1.0|0.42209687669355833|
|(3736,[416,1323,2...|  0.0|0.42209687669355833|
|(3736,[402,1317,2...|  1.0|0.42209687669355833|
|(3736,[395,1264,2...|  1.0|0.42209687669355833|
|(3736,[417,1280,2...|  0.0|0.42209687669355833|
|(3736,[375,935,18...|  1.0|0.42209687669355833|
|(3736,[372,935,18...|  0.0|0.42209687669355833|
|(3736,[368,1221,2..

22/05/30 19:03:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/05/30 19:03:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
