In [1]:
df = sqlContext.read.format('csv').load('/FileStore/tables/modifiedStockData.csv')

In [2]:
display(df)

In [3]:
df = df.filter(df._c0. isNotNull())

In [4]:
df = df.drop(df._c0)

In [5]:
df = (df.withColumnRenamed("_c1","open").withColumnRenamed("_c2","high").withColumnRenamed("_c3","low").withColumnRenamed("_c4","close").withColumnRenamed("_c5","adj_close").withColumnRenamed("_c6","volume").withColumnRenamed("_c7","year").withColumnRenamed("_c8","month").withColumnRenamed("_c9","date"))

In [6]:
display(df)

In [7]:
df = (df.withColumn('open',df.open.cast('float')).withColumn('high',df.high.cast('float')).withColumn('low',df.low.cast('float')).withColumn('close',df.close.cast('float')).withColumn('adj_close',df.adj_close.cast('float')).withColumn('volume',df.volume.cast('double')).withColumn('year',df.year.cast('int')).withColumn('month',df.month.cast('int')).withColumn('date',df.date.cast('int')))

In [8]:
display(df)

In [9]:
df.columns

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

In [11]:
stages=[]
numcols = ['high','low','close','adj_close','volume','year','month','date']

In [12]:
assembler = VectorAssembler(inputCols=numcols, outputCol="features")
stages += [assembler]

In [13]:
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)

In [14]:
(trainingData, testData) = df.randomSplit([0.7, 0.3], seed = 100)

In [15]:
from pyspark.ml.regression import LinearRegression
logit = LinearRegression(featuresCol='features',labelCol='open')
fit = logit.fit(trainingData)
predictions = fit.transform(testData)
predictions.select("prediction", "open", "features").show(5)

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="open", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [17]:
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3,featuresCol='features',labelCol='open')
model = glr.fit(trainingData)
glr_testPred = model.transform(testData)
glr_testPred.select("prediction", "open", "features").show(5)

In [18]:
# Select (prediction, true label) and compute test error
glrevaluator = RegressionEvaluator(
    labelCol="open", predictionCol="prediction", metricName="rmse")
rmse = glrevaluator.evaluate(glr_testPred)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [19]:
from pyspark.ml.regression import RandomForestRegressor
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol='features',labelCol='open')
rffit = rf.fit(trainingData)
rftest_pred = rffit.transform(testData)
rftest_pred.select("prediction", "open", "features").show(5)

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
# Select (prediction, true label) and compute test error
rfevaluator = RegressionEvaluator(labelCol="open", predictionCol="prediction", metricName="rmse")
rmse = rfevaluator.evaluate(rftest_pred)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)