## Real Time Stock Market Recommender Model

### Streaming data using kafka

In [None]:
import pandas as pd
from kafka import KafkaProducer
KAFKA_TOPIC_NAME_CONS = "stockTopic"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092' 
if __name__ == "__main__":
    print("Kafka Producer Application Started ... ")
    kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,value_serializer=lambda x: x.encode('utf-8'))
    filepath = "yahoo.csv"
    df = pd.read_csv(filepath)
    list = df.to_dict(orient="records")
    message_list = []
    message = None
    for message in list:
        message_fields_value_list = []
        message_fields_value_list.append(message["Date"])
        message_fields_value_list.append(message["Open"])
        message_fields_value_list.append(message["High"])
        message_fields_value_list.append(message["Low"])
        message_fields_value_list.append(message["Close"])
        message_fields_value_list.append(message["Adj Close"])
        message_fields_value_list.append(message["Volume"])
        message = ','.join(str(v) for v in message_fields_value_list)
        print("Message Type: ", type(message))
        print("Message: ", message)
        kafka_producer_obj.send(KAFKA_BOOTSTRAP_SERVERS_CONS, message)
        time.sleep(1)
        
    print("Kafka Producer Application Completed. ")

Kafka Producer Application Started ... 
Message Type:  <class 'str'>
Message:  1996-10-01,19.25,19.625,19.0,19.25,17.329784,54000


KafkaTimeoutError: KafkaTimeoutError: Failed to update metadata after 60.0 secs.

### Installing PySpark

In [None]:
conda install -c conda-forge findspark

Collecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... done

## Package Plan ##

  environment location: C:\Users\sreer\anaconda3

  added / updated specs:
    - findspark


The following NEW packages will be INSTALLED:

  python_abi         conda-forge/win-64::python_abi-3.8-1_cp38

The following packages will be SUPERSEDED by a higher-priority channel:

  conda              pkgs/main::conda-4.10.1-py38haa95532_1 --> conda-forge::conda-4.10.1-py38haa244fe_0


Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... done

Note: you may need to restart the kernel to use updated packages.


In [None]:
conda update -n base -c defaults conda

Collecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... done

## Package Plan ##

  environment location: C:\Users\sreer\anaconda3

  added / updated specs:
    - conda


The following packages will be REMOVED:

  python_abi-3.8-1_cp38

The following packages will be UPDATED:

  conda              conda-forge::conda-4.10.1-py38haa244f~ --> pkgs/main::conda-4.10.1-py38haa95532_1


Preparing transaction: ...working... done
Verifying transaction: ...working... done
Executing transaction: ...working... done

Note: you may need to restart the kernel to use updated packages.


In [None]:
import findspark
findspark.init()
findspark.find()

'C:\\opt\\spark\\spark-3.1.1-bin-hadoop2.7'

In [None]:
import pyspark
findspark.find()

'C:\\opt\\spark\\spark-3.1.1-bin-hadoop2.7'

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('yahoo').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
#Loading Data
data = spark.read.csv("yahoo.csv", header=True, inferSchema=True)

In [None]:
data.show(5)

+----------+------+------+------+------+---------+------+
|      Date|  Open|  High|   Low| Close|Adj Close|Volume|
+----------+------+------+------+------+---------+------+
|1996-10-01| 19.25|19.625|  19.0| 19.25|17.329784| 54000|
|1996-10-02| 19.25| 19.75| 19.25|19.625|17.667372| 10000|
|1996-10-03| 19.75| 19.75| 19.25| 19.25|17.329784| 19300|
|1996-10-04| 19.75| 19.75| 19.25| 19.75|17.779911| 45400|
|1996-10-07|19.375|20.375|19.375|20.375|18.342564| 41500|
+----------+------+------+------+------+---------+------+
only showing top 5 rows



In [None]:
#Structure of data
data.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [None]:
#Number of rows
data.count()

6222

In [None]:
#Displaying Columns
data.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

### Finding Null Values if Any

In [None]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----+----+----+---+-----+---------+------+
|Date|Open|High|Low|Close|Adj Close|Volume|
+----+----+----+---+-----+---------+------+
|   0|   0|   0|  0|    0|        0|     0|
+----+----+----+---+-----+---------+------+



In [None]:
#Identifying string column
strCols=[item[0] for item in data.dtypes if item[1].startswith('string')]
strCols

['Date']

In [None]:
data=data.drop(*['Date'])

### Vector Assembler - Data Transformation

In [None]:
from pyspark.ml.feature import VectorAssembler
X=data.drop(*['VWAP'])
vectorAssembler = VectorAssembler(inputCols=X.columns, outputCol = 'features')
v_data= vectorAssembler.transform(data)
v_data.show(5)

+------+------+------+------+---------+------+--------------------+
|  Open|  High|   Low| Close|Adj Close|Volume|            features|
+------+------+------+------+---------+------+--------------------+
| 19.25|19.625|  19.0| 19.25|17.329784| 54000|[19.25,19.625,19....|
| 19.25| 19.75| 19.25|19.625|17.667372| 10000|[19.25,19.75,19.2...|
| 19.75| 19.75| 19.25| 19.25|17.329784| 19300|[19.75,19.75,19.2...|
| 19.75| 19.75| 19.25| 19.75|17.779911| 45400|[19.75,19.75,19.2...|
|19.375|20.375|19.375|20.375|18.342564| 41500|[19.375,20.375,19...|
+------+------+------+------+---------+------+--------------------+
only showing top 5 rows



### StandardScaler

In [None]:
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(inputCol='features', outputCol="scaledFeatures", withStd=False, withMean=True)
scalerModel = scaler.fit(v_data)
scaledData = scalerModel.transform(v_data)
scaledData.select(['scaledFeatures']).show(5, truncate = False)

+----------------------------------------------------------------------------------------------------------------+
|scaledFeatures                                                                                                  |
+----------------------------------------------------------------------------------------------------------------+
|[6.267681009321739,6.458783556412733,6.2032797965284345,6.267034314850514,5.208434203310778,-13413.596914175607]|
|[6.267681009321739,6.583783556412733,6.4532797965284345,6.642034314850514,5.546022203310779,-57413.59691417561] |
|[6.767681009321739,6.583783556412733,6.4532797965284345,6.267034314850514,5.208434203310778,-48113.59691417561] |
|[6.767681009321739,6.583783556412733,6.4532797965284345,6.767034314850514,5.658561203310777,-22013.596914175607]|
|[6.392681009321739,7.208783556412733,6.5782797965284345,7.392034314850514,6.221214203310778,-25913.596914175607]|
+-------------------------------------------------------------------------------

### PCA- Dimensionality Reduction

In [None]:
from pyspark.ml.feature import PCA
pca = PCA(k=2, inputCol = scaler.getOutputCol(), outputCol="pcaFeatures")
model = pca.fit(scaledData)
transformed_feature = model.transform(scaledData)
transformed_feature.select(['pcaFeatures']).show(5, truncate = False)

+----------------------------------------+
|pcaFeatures                             |
+----------------------------------------+
|[13413.596069293028,-14.421448068458078]|
|[57413.59596010959,-17.558389599284773] |
|[48113.595982613086,-16.907903720113996]|
|[22013.596004577776,-15.754070142834886]|
|[25913.595955007295,-16.685433062001366]|
+----------------------------------------+
only showing top 5 rows



## Machine Learning Algorithms

### As, the target column 'High' is numerical type, this is a Regression problem.

### Logistic Regression

In [None]:
from pyspark.ml.regression import LinearRegression
trainTest = v_data.randomSplit([0.5, 0.5])
trainingDF = trainTest[0]
testDF = trainTest[1]
lir = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8).setFeaturesCol('features').setLabelCol('High')
model = lir.fit(trainingDF)
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
trainingSummary = model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
fullPredictions = model.transform(testDF).cache()
print(fullPredictions)
fullPredictions.show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="High",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(fullPredictions))

Coefficients: [0.1996344038993007,0.20427241960720058,0.19864979943991856,0.1983754856197377,0.1875054051309425,0.0]
Intercept: 0.49484103143261166
RMSE: 0.320786
r2: 0.998789
DataFrame[Open: double, High: double, Low: double, Close: double, Adj Close: double, Volume: int, features: vector, prediction: double]
+------+------+------+------+---------+------+--------------------+------------------+
|  Open|  High|   Low| Close|Adj Close|Volume|            features|        prediction|
+------+------+------+------+---------+------+--------------------+------------------+
|1.1875|1.3125| 1.125|1.1875| 1.069045| 87300|[1.1875,1.3125,1....|1.6595180661690372|
|1.3125|1.5625|1.3125|   1.5| 1.350373| 17700|[1.3125,1.5625,1....|1.8875301688240809|
|1.3125| 1.625|1.3125|1.5625| 1.406639|140800|[1.3125,1.625,1.3...| 1.923245842025862|
| 1.375| 1.375|1.0625|1.1875| 1.069045|153600|[1.375,1.375,1.06...|1.6973009306606115|
| 1.375|1.4375|1.3125|1.4375| 1.294107| 30800|[1.375,1.4375,1.3...|1.8515246196

### RandomForest Regressor

In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
rf = RandomForestRegressor(featuresCol="features",impurity='gini',maxDepth=5,numTrees=30)
predictions = model.transform(testDF)
predictions.show(5)
from pyspark.ml.evaluation import RegressionEvaluator
rf_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="High",metricName="r2")
print("R Squared (R2) on test data = %g" % rf_evaluator.evaluate(predictions))

+------+------+------+------+---------+------+--------------------+------------------+
|  Open|  High|   Low| Close|Adj Close|Volume|            features|        prediction|
+------+------+------+------+---------+------+--------------------+------------------+
|1.1875|1.3125| 1.125|1.1875| 1.069045| 87300|[1.1875,1.3125,1....|1.6595180661690372|
|1.3125|1.5625|1.3125|   1.5| 1.350373| 17700|[1.3125,1.5625,1....|1.8875301688240809|
|1.3125| 1.625|1.3125|1.5625| 1.406639|140800|[1.3125,1.625,1.3...| 1.923245842025862|
| 1.375| 1.375|1.0625|1.1875| 1.069045|153600|[1.375,1.375,1.06...|1.6973009306606115|
| 1.375|1.4375|1.3125|1.4375| 1.294107| 30800|[1.375,1.4375,1.3...|1.8515246196405557|
+------+------+------+------+---------+------+--------------------+------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.998812


### Gradient-Boosted Tree Regressor

In [None]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
gbt = GBTRegressor(featuresCol="features", maxIter=10)
predictions = model.transform(testDF)
predictions.show(5)
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="High",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions))

+------+------+------+------+---------+------+--------------------+------------------+
|  Open|  High|   Low| Close|Adj Close|Volume|            features|        prediction|
+------+------+------+------+---------+------+--------------------+------------------+
|1.1875|1.3125| 1.125|1.1875| 1.069045| 87300|[1.1875,1.3125,1....|1.6595180661690372|
|1.3125|1.5625|1.3125|   1.5| 1.350373| 17700|[1.3125,1.5625,1....|1.8875301688240809|
|1.3125| 1.625|1.3125|1.5625| 1.406639|140800|[1.3125,1.625,1.3...| 1.923245842025862|
| 1.375| 1.375|1.0625|1.1875| 1.069045|153600|[1.375,1.375,1.06...|1.6973009306606115|
| 1.375|1.4375|1.3125|1.4375| 1.294107| 30800|[1.375,1.4375,1.3...|1.8515246196405557|
+------+------+------+------+---------+------+--------------------+------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.998812


### Clustering- KMeans

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(v_data)
predictions = model.transform(v_data)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.8397873255490698
Cluster Centers: 
[1.24325350e+01 1.25910137e+01 1.22665241e+01 1.24287798e+01
 1.15773071e+01 4.67322499e+04]
[1.74336404e+01 1.78233406e+01 1.70894481e+01 1.74699269e+01
 1.65261871e+01 2.34859942e+05]
