In [13]:
import pandas as pd


import findspark
findspark.init()

import pyspark

from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql.functions import lag, lead
from pyspark.sql.window import Window
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.sql.functions import monotonically_increasing_id
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext



In [14]:
"""
Spark Setup
"""
#SparkConf().set('spark.driver.host','127.0.0.1').setAppName("NewsStreamingApp").setMaster("local[2]")
conf = SparkConf().set('spark.driver.host','127.0.0.1').setMaster("local[2]")
sc = SparkContext.getOrCreate(conf)
spark = SQLContext (sc)


def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=sparkConf)\
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']

# create the Streaming Context from the above spark context with window size 2 seconds
ssc = StreamingContext(sc, 2)
# read data from port 5000
dataStream = ssc.socketTextStream("127.0.0.1",5000)

In [15]:
#Reading the necessary files

#Input file about market sentiments
Sentiments = pd.read_csv("MarketNewsTimeStamp.csv")

#Stock data for IBM

fileName = "D:/GitHub/cmpe-256-Team-Proj/Stock prediction/IBM.csv"
IBM = pd.read_csv(fileName)



In [16]:
#Processing the data

Sentiments.drop(columns=['Date'],inplace=True)
Sentiments.rename(columns={"Time":"DateTime"},inplace=True)
Sentiments = Sentiments.loc[:, ['DateTime','Polarity']].groupby(['DateTime']).mean().reset_index()
Sentiments['DateTime'] = pd.to_datetime(Sentiments['DateTime'])
Sentiments.head()

Unnamed: 0,DateTime,Polarity
0,2009-01-01 09:00:00,0.2
1,2009-01-01 09:19:00,-0.075
2,2009-01-01 09:31:00,0.0
3,2009-01-01 09:34:00,0.0
4,2009-01-01 09:48:00,-0.05


In [17]:
IBM['DateTime'] = pd.to_datetime(IBM['Date'].apply(str)+' '+IBM['Time'])
IBM.drop(columns=['Date','Time'],inplace = True)
IBM = IBM.set_index('DateTime')\
          .between_time('09:00:00', '16:00:00')\
          .reset_index()
IBM.head()

Unnamed: 0,DateTime,Open,High,Low,Close,Volume
0,2016-01-04 09:03:00,112.28,112.28,112.28,112.28,120
1,2016-01-04 09:30:00,112.53,112.7,112.28,112.3,134096
2,2016-01-04 09:31:00,112.28,112.36,112.15,112.17,20521
3,2016-01-04 09:32:00,112.17,112.36,112.17,112.33,20044
4,2016-01-04 09:33:00,112.32,112.36,112.12,112.25,21591


In [18]:
Stocks_Data =pd.merge_asof(IBM, Sentiments.sort_values('DateTime'),
              on='DateTime',
              tolerance=pd.Timedelta('7m'))

Stocks_Data.fillna(0,inplace=True)
Stocks_Data.head()

Unnamed: 0,DateTime,Open,High,Low,Close,Volume,Polarity
0,2016-01-04 09:03:00,112.28,112.28,112.28,112.28,120,0.0
1,2016-01-04 09:30:00,112.53,112.7,112.28,112.3,134096,0.103571
2,2016-01-04 09:31:00,112.28,112.36,112.15,112.17,20521,0.103571
3,2016-01-04 09:32:00,112.17,112.36,112.17,112.33,20044,0.103571
4,2016-01-04 09:33:00,112.32,112.36,112.12,112.25,21591,0.103571


In [19]:
StocksInfo = spark.createDataFrame(Stocks_Data)

#StocksInfo = spark.read.csv('IBMStockData.csv',header = True, inferSchema=True)
StocksInfo.show()




+-------------------+------+------+------+------+------+-----------+
|           DateTime|  Open|  High|   Low| Close|Volume|   Polarity|
+-------------------+------+------+------+------+------+-----------+
|2016-01-04 09:03:00|112.28|112.28|112.28|112.28|   120|        0.0|
|2016-01-04 09:30:00|112.53| 112.7|112.28| 112.3|134096|0.103571429|
|2016-01-04 09:31:00|112.28|112.36|112.15|112.17| 20521|0.103571429|
|2016-01-04 09:32:00|112.17|112.36|112.17|112.33| 20044|0.103571429|
|2016-01-04 09:33:00|112.32|112.36|112.12|112.25| 21591|0.103571429|
|2016-01-04 09:34:00|112.25|112.32|112.12| 112.2| 17154|0.103571429|
|2016-01-04 09:35:00|112.15|112.37|112.15|112.27| 28000|0.103571429|
|2016-01-04 09:36:00|112.21|112.36|112.18|112.36| 19287|0.103571429|
|2016-01-04 09:37:00|112.37|112.79|112.36|112.72| 33264|        0.0|
|2016-01-04 09:38:00|112.71|112.71|112.53|112.63| 27673|        0.0|
|2016-01-04 09:39:00|112.61|112.66|112.51|112.62| 21850|        0.0|
|2016-01-04 09:40:00|112.62|112.62

In [20]:
"""
Creating a 5,15,30 minutes lag
"""

StockInfo_Lag5 = StocksInfo.withColumn("lead",lead(("Close"),5).over(Window.partitionBy().orderBy("DateTime")))

StockInfo_Lag15 = StocksInfo.withColumn("lead",lead(("Close"),15).over(Window.partitionBy().orderBy("DateTime")))

StockInfo_Lag30 = StocksInfo.withColumn("lead",lead(("Close"),30).over(Window.partitionBy().orderBy("DateTime")))



StockInfo_Lag5.show()

+-------------------+------+------+------+------+------+-----------+------+
|           DateTime|  Open|  High|   Low| Close|Volume|   Polarity|  lead|
+-------------------+------+------+------+------+------+-----------+------+
|2016-01-04 09:03:00|112.28|112.28|112.28|112.28|   120|        0.0| 112.2|
|2016-01-04 09:30:00|112.53| 112.7|112.28| 112.3|134096|0.103571429|112.27|
|2016-01-04 09:31:00|112.28|112.36|112.15|112.17| 20521|0.103571429|112.36|
|2016-01-04 09:32:00|112.17|112.36|112.17|112.33| 20044|0.103571429|112.72|
|2016-01-04 09:33:00|112.32|112.36|112.12|112.25| 21591|0.103571429|112.63|
|2016-01-04 09:34:00|112.25|112.32|112.12| 112.2| 17154|0.103571429|112.62|
|2016-01-04 09:35:00|112.15|112.37|112.15|112.27| 28000|0.103571429|112.57|
|2016-01-04 09:36:00|112.21|112.36|112.18|112.36| 19287|0.103571429|112.53|
|2016-01-04 09:37:00|112.37|112.79|112.36|112.72| 33264|        0.0|112.31|
|2016-01-04 09:38:00|112.71|112.71|112.53|112.63| 27673|        0.0|112.28|
|2016-01-04 

In [21]:
Data_Lag5 = StockInfo_Lag5.select("Open","Volume","Polarity",StockInfo_Lag5.lead.alias("label"))
Data_Lag5 = Data_Lag5.filter(Data_Lag5["label"].isNotNull())


Data_Lag15 = StockInfo_Lag15.select("Open","Volume","Polarity",StockInfo_Lag15.lead.alias("label"))
Data_Lag15 = Data_Lag15.filter(Data_Lag15["label"].isNotNull())

Data_Lag30 = StockInfo_Lag30.select("Open","Volume","Polarity",StockInfo_Lag30.lead.alias("label"))
Data_Lag30 = Data_Lag30.filter(Data_Lag30["label"].isNotNull())

In [22]:
assembler = VectorAssembler().setInputCols(['Open','Volume','Polarity']).setOutputCol('features')
scaler = MinMaxScaler(inputCol="features", outputCol="features_scaled")

pipeline = Pipeline(stages=[assembler, scaler])
lr = LinearRegression()

In [23]:
scalerModel = pipeline.fit(Data_Lag5)
scaledData = scalerModel.transform(Data_Lag5)

dataset =scaledData.select('label',scaledData.features_scaled.alias('features'))
dataset.show()


train, test = dataset.randomSplit([0.7,0.3])

+------+--------------------+
| label|            features|
+------+--------------------+
| 112.2|[0.30815884476534...|
|112.27|[0.31176895306859...|
|112.36|[0.30815884476534...|
|112.72|[0.30657039711191...|
|112.63|[0.30873646209386...|
|112.62|[0.30772563176895...|
|112.57|[0.30628158844765...|
|112.53|[0.30714801444043...|
|112.31|[0.30945848375451...|
|112.28|[0.31436823104693...|
|112.29|[0.31292418772563...|
|112.24|[0.31306859205776...|
|112.31|[0.31191335740072...|
|112.18|[0.31148014440433...|
|112.37|[0.30801444043321...|
|112.37|[0.30801444043321...|
|112.33|[0.30859205776173...|
|112.33|[0.30758122743682...|
| 112.3|[0.30873646209386...|
|112.31|[0.30685920577617...|
+------+--------------------+
only showing top 20 rows



In [24]:
model = lr.fit(train)

pred = model.transform(test)

evaluator = RegressionEvaluator()
print("RMSE for 5mins stock price prediction",evaluator.evaluate(pred,
{evaluator.metricName: "rmse"})
)

RMSE for 5mins stock price prediction 0.23412054438254057


In [9]:
scalerModel = pipeline.fit(Data_Lag15)
scaledData = scalerModel.transform(Data_Lag15)

dataset =scaledData.select('label',scaledData.features_scaled.alias('features'))
dataset.show()


train, test = dataset.randomSplit([0.7,0.3])

+------+--------------------+
| label|            features|
+------+--------------------+
|102.96|[0.18223826714801...|
|102.93|[0.18093862815884...|
|102.99|[0.17761732851985...|
|103.04|[0.17833935018050...|
| 103.0|[0.17906137184115...|
|102.97|[0.17703971119133...|
|102.87|[0.17429602888086...|
|102.91|[0.17227436823104...|
|102.96|[0.17270758122743...|
|102.94|[0.17270758122743...|
|102.93|[0.17285198555956...|
|102.83|[0.17314079422382...|
| 102.8|[0.17429602888086...|
|102.76|[0.17328519855595...|
|102.76|[0.17270758122743...|
| 102.9|[0.17126353790613...|
|102.81|[0.17371841155234...|
|102.81|[0.17342960288808...|
|102.75|[0.17386281588447...|
|102.74|[0.17458483754512...|
+------+--------------------+
only showing top 20 rows



In [25]:
model15 = lr.fit(train)

pred = model15.transform(test)

evaluator = RegressionEvaluator()
print("RMSE for 15mins stock price prediction",evaluator.evaluate(pred,
{evaluator.metricName: "rmse"})
)

RMSE for 15mins stock price prediction 0.23412054438254057


In [26]:
scalerModel = pipeline.fit(Data_Lag30)
scaledData = scalerModel.transform(Data_Lag30)

dataset =scaledData.select('label',scaledData.features_scaled.alias('features'))
dataset.show()


train, test = dataset.randomSplit([0.7,0.3])

+------+--------------------+
| label|            features|
+------+--------------------+
|112.35|[0.30815884476534...|
| 112.2|[0.31176895306859...|
|112.21|[0.30815884476534...|
|112.02|[0.30657039711191...|
| 112.2|[0.30873646209386...|
|112.04|[0.30772563176895...|
|112.04|[0.30628158844765...|
|112.14|[0.30714801444043...|
|112.08|[0.30945848375451...|
|112.16|[0.31436823104693...|
|112.22|[0.31292418772563...|
|112.22|[0.31306859205776...|
|112.17|[0.31191335740072...|
| 112.2|[0.31148014440433...|
|112.27|[0.30801444043321...|
|112.24|[0.30801444043321...|
|112.17|[0.30859205776173...|
|112.21|[0.30758122743682...|
|112.24|[0.30873646209386...|
|112.32|[0.30685920577617...|
+------+--------------------+
only showing top 20 rows



In [27]:
model30 = lr.fit(train)

pred = model30.transform(test)

evaluator = RegressionEvaluator()
print("RMSE for 15mins stock price prediction",evaluator.evaluate(pred,
{evaluator.metricName: "rmse"})
)

RMSE for 15mins stock price prediction 0.5383431598597492


In [None]:
def readMyStream(rdd):
    
    spark = getSparkSessionInstance(rdd.context.getConf())
    if not rdd.isEmpty():
        liveData = spark.read.json(rdd)
        liveData.show()
        scaledData = scalerModel.transform(liveData)

        datetime = liveData.select('DateTime')
        dataset = scaledData.select(scaledData.features_scaled.alias('features'))
        datetime =datetime.withColumn('id',monotonically_increasing_id())
        
        p5 = model.transform(dataset)
        p5 =p5.withColumn('id',monotonically_increasing_id())
        p = p5.join(datetime,"id","outer").drop("id")
        p.show()
        #p15 = model15.transform(dataset)
        #p15.union(datetime)
        #p30 = model30.transform(dataset)
        #p30.union(datetime)

        #p5.select(p5.prediction.alias("Prediction 5 mins")).show()
        #p15.select('prediction').show()
        #p30.select('prediction').show()
        #p = p5.union(p15)
        #p.show()
        

dataStream.foreachRDD( lambda rdd: readMyStream(rdd) )

words = dataStream.flatMap(lambda line: line.split(" "))
words.pprint()

ssc.start()
# wait for the streaming to finish
ssc.awaitTermination()

+-------------+------+--------+------+
|     DateTime|  Open|Polarity|Volume|
+-------------+------+--------+------+
|1585645200000|111.77|     0.0|   149|
+-------------+------+--------+------+

+--------------------+------------------+-------------+
|            features|        prediction|     DateTime|
+--------------------+------------------+-------------+
|[0.30079422382671...|112.05381005199716|1585645200000|
+--------------------+------------------+-------------+

-------------------------------------------
Time: 2020-05-01 20:21:30
-------------------------------------------
[{"DateTime":1585645200000,"Open":111.77,"Volume":149,"Polarity":0.0}]

-------------------------------------------
Time: 2020-05-01 20:21:32
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:21:34
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:21:36
-------------------------------------

+--------------------+------------------+----------+
|            features|        prediction|  DateTime|
+--------------------+------------------+----------+
|[0.30079422382671...|112.05381005199716|2020-03-31|
+--------------------+------------------+----------+

-------------------------------------------
Time: 2020-05-01 20:23:32
-------------------------------------------
[{"DateTime":"2020-03-31","Open":111.77,"Volume":149,"Polarity":0.0}]

-------------------------------------------
Time: 2020-05-01 20:23:34
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:23:36
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:23:38
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:23:40
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:23:42
-------------------

-------------------------------------------
Time: 2020-05-01 20:25:30
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:25:32
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:25:34
-------------------------------------------

+-------------------+------+--------+------+
|           DateTime|  Open|Polarity|Volume|
+-------------------+------+--------+------+
|2020-03-31 09:00:00|111.77|     0.0|   149|
+-------------------+------+--------+------+

+--------------------+------------------+-------------------+
|            features|        prediction|           DateTime|
+--------------------+------------------+-------------------+
|[0.30079422382671...|112.05381005199716|2020-03-31 09:00:00|
+--------------------+------------------+-------------------+

-------------------------------------------
Time: 2020-05-01 20:25:36
-------------------------------------------
[{"D

-------------------------------------------
Time: 2020-05-01 20:27:12
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:14
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:16
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:18
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:20
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:22
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:24
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:26
-------------------------------------------

-------------------------------------------
Time: 2020-05-01 20:27:28
----------