In [132]:
import pandas as pd
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline

In [138]:
stock_session = SparkSession.builder.appName('Google Stock').getOrCreate()
 
google_df = stock_session.read.csv('GOOG.csv', sep=',', inferSchema=True, header=True)

In [4]:
google_df.show()

+------+--------------------+------+--------+--------+------+-------+--------+--------+--------+-------+---------+-------+-----------+
|symbol|                date| close|    high|     low|  open| volume|adjClose| adjHigh|  adjLow|adjOpen|adjVolume|divCash|splitFactor|
+------+--------------------+------+--------+--------+------+-------+--------+--------+--------+-------+---------+-------+-----------+
|  GOOG|2016-06-14 00:00:...|718.27|  722.47|  713.12|716.48|1306065|  718.27|  722.47|  713.12| 716.48|  1306065|    0.0|        1.0|
|  GOOG|2016-06-15 00:00:...|718.92|  722.98|  717.31| 719.0|1214517|  718.92|  722.98|  717.31|  719.0|  1214517|    0.0|        1.0|
|  GOOG|2016-06-16 00:00:...|710.36|  716.65|  703.26|714.91|1982471|  710.36|  716.65|  703.26| 714.91|  1982471|    0.0|        1.0|
|  GOOG|2016-06-17 00:00:...|691.72|  708.82|688.4515|708.65|3402357|  691.72|  708.82|688.4515| 708.65|  3402357|    0.0|        1.0|
|  GOOG|2016-06-20 00:00:...|693.71|  702.48|  693.41|6

In [12]:
rows = google_df.count()
col = google_df.columns
print('Total row in dataset is', rows)
print('Columns in dataset are', col)

Total row in dataset is 1258
Columns in dataset are ['symbol', 'date', 'close', 'high', 'low', 'open', 'volume', 'adjClose', 'adjHigh', 'adjLow', 'adjOpen', 'adjVolume', 'divCash', 'splitFactor']


In [13]:
print('Columns with datatype and null value')
google_df.printSchema()

Columns with datatype and null value
root
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- close: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- open: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjClose: double (nullable = true)
 |-- adjHigh: double (nullable = true)
 |-- adjLow: double (nullable = true)
 |-- adjOpen: double (nullable = true)
 |-- adjVolume: integer (nullable = true)
 |-- divCash: double (nullable = true)
 |-- splitFactor: double (nullable = true)



In [17]:
non_duplicated_df = google_df.distinct()
print('Rows left after dropping are', non_duplicated_df.count())

Rows left after dropping are 1258


In [23]:
for col in google_df.columns:
    distinct_count = non_duplicated_df.select(col).distinct().count()
    print(f'Distinct Count in columns {col} are {distinct_count}')

Distinct Count in columns symbol are 1
Distinct Count in columns date are 1258
Distinct Count in columns close are 1247
Distinct Count in columns high are 1240
Distinct Count in columns low are 1244
Distinct Count in columns open are 1227
Distinct Count in columns volume are 1258
Distinct Count in columns adjClose are 1247
Distinct Count in columns adjHigh are 1240
Distinct Count in columns adjLow are 1244
Distinct Count in columns adjOpen are 1227
Distinct Count in columns adjVolume are 1258
Distinct Count in columns divCash are 1
Distinct Count in columns splitFactor are 1


In [69]:
# variation is zero as all the three columns has single value so they provid no new infromation
col_to_drop = ['symbol', 'divCash', 'splitFactor']
clean_df = non_duplicated_df.drop(*col_to_drop)
update_df = clean_df.select(F.col("date"),F.to_date(F.col("date"),"yyyy-MM-dd HH:mm:ss+SS:SS").alias("updated_date")).select('date', 'updated_date')
clean_df = clean_df.join(update_df ,['date'], "outer")
final_df = clean_df.drop('date')

In [70]:
final_df.show()

+------+--------+--------+------+-------+--------+--------+--------+-------+---------+------------+
| close|    high|     low|  open| volume|adjClose| adjHigh|  adjLow|adjOpen|adjVolume|updated_date|
+------+--------+--------+------+-------+--------+--------+--------+-------+---------+------------+
|718.27|  722.47|  713.12|716.48|1306065|  718.27|  722.47|  713.12| 716.48|  1306065|  2016-06-14|
|718.92|  722.98|  717.31| 719.0|1214517|  718.92|  722.98|  717.31|  719.0|  1214517|  2016-06-15|
|710.36|  716.65|  703.26|714.91|1982471|  710.36|  716.65|  703.26| 714.91|  1982471|  2016-06-16|
|691.72|  708.82|688.4515|708.65|3402357|  691.72|  708.82|688.4515| 708.65|  3402357|  2016-06-17|
|693.71|  702.48|  693.41|698.77|2082538|  693.71|  702.48|  693.41| 698.77|  2082538|  2016-06-20|
|695.94|  702.77|  692.01| 698.4|1465634|  695.94|  702.77|  692.01|  698.4|  1465634|  2016-06-21|
|697.46|  700.86|693.0819|699.06|1184318|  697.46|  700.86|693.0819| 699.06|  1184318|  2016-06-22|


In [92]:
# null_value = {col:df.filter(df[col].isNull()).count() for col in final_df.columns}
for col in final_df.columns:
    try:
        null_values = final_df.select([F.count(F.when(F.isnan('volume'),True))]).collect().count(0)
        print(f'Null value in {col} are {null_values}')
    except:
        pass

Null value in close are 0
Null value in high are 0
Null value in low are 0
Null value in open are 0
Null value in volume are 0
Null value in adjClose are 0
Null value in adjHigh are 0
Null value in adjLow are 0
Null value in adjOpen are 0
Null value in adjVolume are 0
Null value in updated_date are 0


In [95]:
final_df.select('open', 'close').show()

+------+------+
|  open| close|
+------+------+
|716.48|718.27|
| 719.0|718.92|
|714.91|710.36|
|708.65|691.72|
|698.77|693.71|
| 698.4|695.94|
|699.06|697.46|
|697.45|701.87|
|675.17|675.22|
| 671.0|668.26|
|678.97|680.04|
| 683.0|684.11|
|685.47| 692.1|
| 692.2|699.21|
|696.06|694.49|
|689.98|697.77|
|698.08|695.36|
| 699.5|705.63|
|708.05|715.09|
|719.12|720.64|
+------+------+
only showing top 20 rows



## Data Prediction

In [106]:
assembler = VectorAssembler(inputCols= ['open'], outputCol= 'dependent_feat')
df = assembler.transform(final_df)
df.show()

+------+--------+--------+------+-------+--------+--------+--------+-------+---------+------------+--------------+
| close|    high|     low|  open| volume|adjClose| adjHigh|  adjLow|adjOpen|adjVolume|updated_date|dependent_feat|
+------+--------+--------+------+-------+--------+--------+--------+-------+---------+------------+--------------+
|718.27|  722.47|  713.12|716.48|1306065|  718.27|  722.47|  713.12| 716.48|  1306065|  2016-06-14|      [716.48]|
|718.92|  722.98|  717.31| 719.0|1214517|  718.92|  722.98|  717.31|  719.0|  1214517|  2016-06-15|       [719.0]|
|710.36|  716.65|  703.26|714.91|1982471|  710.36|  716.65|  703.26| 714.91|  1982471|  2016-06-16|      [714.91]|
|691.72|  708.82|688.4515|708.65|3402357|  691.72|  708.82|688.4515| 708.65|  3402357|  2016-06-17|      [708.65]|
|693.71|  702.48|  693.41|698.77|2082538|  693.71|  702.48|  693.41| 698.77|  2082538|  2016-06-20|      [698.77]|
|695.94|  702.77|  692.01| 698.4|1465634|  695.94|  702.77|  692.01|  698.4|  14

In [107]:
training, testing =  df.select('dependent_feat', 'close').randomSplit([0.8, 0.2])

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
reg_evaluated = RegressionEvaluator(predictionCol="prediction", labelCol="close",metricName="r2")

In [135]:
lienar_model = LinearRegression(maxIter=10,regParam = 0.1, elasticNetParam=0.8, featuresCol= 'dependent_feat', labelCol= 'close')

lienar_fitted_model = lienar_model.fit(training)

intercept = lienar_fitted_model.intercept
coeef = lienar_fitted_model.coefficients[0]
print('Intercept', intercept)
print('Coefficients', coeef)

r2_score = lienar_fitted_model.summary.r2
print('R2 Score for train data for Linear Model', r2_score)

linear_pred = lienar_fitted_model.transform(testing)
print("R2 Score on test data for Linear Model", reg_evaluated.evaluate(linear_pred))

Intercept -0.4213149669173777
Coefficients 1.0012657896103523
R2 Score for train data for Linear Model 0.9981210204746053
R2 Score on test data for Linear Model 0.9983231837964388


In [136]:
gbt_model = GBTRegressor(featuresCol= 'dependent_feat', labelCol= 'close')

# Fit the model
gbt_fitted_model = gbt_model.fit(training)

train_pred = gbt_fitted_model.transform(training)
test_pred = gbt_fitted_model.transform(testing)

print("R2 Score on train data for GBT Model", reg_evaluated.evaluate(train_pred))
print("R2 Score on test data for GBT Model", reg_evaluated.evaluate(test_pred))

R2 Score on train data for GBT Model 0.9929548890754568
R2 Score on test data for GBT Model 0.9934773044615969


In [137]:
random_model = RandomForestRegressor(featuresCol= 'dependent_feat', labelCol= 'close')

# Fit the model
random_model_fitted = random_model.fit(training)

train_pred = random_model_fitted.transform(training)
test_pred = random_model_fitted.transform(testing)

print("R2 Score on train data for Random Forest Model", reg_evaluated.evaluate(train_pred))
print("R2 Score on test data for  Random Forest Model", reg_evaluated.evaluate(test_pred))

R2 Score on train data for Random Forest Model 0.9928553316532696
R2 Score on test data for  Random Forest Model 0.9933849049307797
