In [None]:
import findspark
findspark.init()

In [None]:
import re
import pyspark
from pyspark.sql.types import *

In [None]:
# Initialize the spark context
sc = pyspark.SparkContext(appName="RDDFromBasics", master="local[4]")

In [None]:
# Now initialize the spark session
ss = pyspark.sql.SparkSession(sc)

In [None]:
# Initialize a dataframe reader
dfr = ss.read

In [None]:
# Define the schema so that it can be passed to the read function
schemaStruct = StructType()
schemaStruct.add("SYMBOL", StringType(), True)
schemaStruct.add("SERIES", StringType(), True)
schemaStruct.add("OPEN", DoubleType(), True)
schemaStruct.add("HIGH", DoubleType(), True)
schemaStruct.add("LOW", DoubleType(), True)
schemaStruct.add("CLOSE", DoubleType(), True)
schemaStruct.add("LAST", DoubleType(), True)
schemaStruct.add("PREVCLOSE", DoubleType(), True)
schemaStruct.add("TOTTRDQTY", LongType(), True)
schemaStruct.add("TOTTRDVAL", DoubleType(), True)
schemaStruct.add("TIMESTAMP", StringType(), True)
schemaStruct.add("ADDNL", StringType(), True)


In [None]:
# Read the csv file. But first set header to true and ask to infer schema automatically
df = dfr.csv("/home/hduser/spark/nsedata.csv", schema=schemaStruct, header=True)

In [None]:
# All large number of functions are defines in pyspark.sql.functions.
# import that module and use some of these functions
import pyspark.sql.functions as sqlFns

In [None]:
df1 = df\
    .where("SYMBOL = 'TCS'")\
    .where("TIMESTAMP like '%2011'")\
    .select(sqlFns.abs(df.OPEN).alias("open"), sqlFns.abs(df.CLOSE).alias("close")).cache()

In [None]:
df1.printSchema()

In [None]:
df1.corr("open","close")

In [None]:
# Now let us run linear regression on this
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import LinearRegression

In [None]:
df1.rdd.first()

In [None]:
# Prepare the data for LinearRegression
myTuples = df1.rdd.map(lambda x: (x["close"], Vectors.dense(x["open"])))

In [None]:
myLRData = ss.createDataFrame(myTuples, ["label", "features"]).cache()

In [None]:
myLRData.show()

In [None]:
lr = LinearRegression(standardization=False, fitIntercept=True)

In [None]:
lrModel = lr.fit(myLRData)
print(type(lrModel))
print(type(lrModel.summary))

In [None]:
# Now print out all the output parameters generated by linear regression
print("Coefficients: ", lrModel.coefficients)
print("Intercept: ", lrModel.intercept)
print("pValues: ", lrModel.summary.pValues)
print("R-square: ", lrModel.summary.r2)

In [None]:
lrModel.summary.predictions.show()

In [None]:
t = lrModel.summary.predictions.collect()

In [None]:
len(t)

In [None]:
import matplotlib.pyplot as plt

In [None]:
yold = []
ynew = []
x = []

for i in range(0,len(t)):
    yold.append(t[i]["label"])
    ynew.append(t[i]["prediction"])
    x.append(t[i]["features"])

#print(x,y)

In [None]:
plt.plot(yold,ynew,"r.")

In [None]:
plt.plot(x,ynew,"r.")
plt.plot(x,yold,"b.")

In [None]:
ss.stop()
sc.stop()