In [None]:
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext
from datetime import datetime
from pyspark.context import SparkContext
from pyspark.ml.linalg import Vectors
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import *
from pyspark.sql.window import *
import sys
from datetime import datetime as dt

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, HiveContext
from pyspark.sql.dataframe import DataFrame
 
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.types import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import udf

In [None]:
spark = SparkSession(sc)

In [63]:
bucket = spark._jsc.hadoopConfiguration().get("fs.gs.system.bucket")
project = spark._jsc.hadoopConfiguration().get("fs.gs.project.id")

In [None]:
todays_date = datetime.strftime(datetime.today(), "%Y-%m-%d-%H-%M-%S")
input_directory = "gs://{}/tmp/natality-{}".format(bucket, todays_date)

In [None]:
conf = {
    # Input Parameters.
    'mapred.bq.project.id': project,
    #'mapred.bq.gcs.bucket': bucket,
    #'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'm9-assignment',
    'mapred.bq.input.dataset.id': 'prd_smrt_sock_dat',
    'mapred.bq.input.table.id': 'tab1_prd_smrt_sock_dat',
}


# Read the data from BigQuery into Spark as an RDD.

In [None]:
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Extract the JSON strings from the RDD.

In [None]:
table_json = table_data.map(lambda x: x[1])

# Load the JSON strings as a Spark Dataframe.

In [None]:
df = sqlContext.read.json(table_json)

# Cast strings to integer

In [None]:
df_testset=df.select("alias","timestamp","current_ma")

In [None]:
df_testset = df_testset.withColumn("current_ma", df_testset.current_ma.cast('double'))

In [None]:
df_testset.dropna(thresh=2)

In [None]:
df_testset.dtypes

In [None]:
#df_testset.show(50)

In [37]:
window_01 = Window.partitionBy("alias").orderBy("timestamp").rowsBetween(-2, 2)

In [38]:
win_curr01 = df_testset.withColumn("avg_curr", func.avg(df_testset['current_ma']).over(window_01))

In [39]:
win_curr02 = win_curr01.withColumn("max_curr", func.max(win_curr01['current_ma']).over(window_01))

In [40]:
win_curr03 = win_curr02.withColumn("min_curr", func.min(win_curr02['current_ma']).over(window_01))

In [41]:
win_curr04 = win_curr03.withColumn("std_curr", func.stddev(win_curr03['current_ma']).over(window_01))

In [42]:
#win_curr01.show(10, truncate=False)

In [43]:
win_curr04.dtypes

[('alias', 'string'),
 ('timestamp', 'string'),
 ('current_ma', 'int'),
 ('avg_curr', 'double'),
 ('max_curr', 'int'),
 ('min_curr', 'int'),
 ('std_curr', 'double')]

In [44]:
vectorAssembler = VectorAssembler(inputCols = ['avg_curr','max_curr','min_curr','std_curr'], outputCol = 'features')
va_df = vectorAssembler.transform(win_curr04)
va_df = va_df.select(['features', 'current_ma'])

In [45]:
va_df.cache()

DataFrame[features: vector, current_ma: int]

In [46]:
splits = va_df.randomSplit([0.7, 0.3])

In [47]:
train_df = splits[0]

In [48]:
test_df = splits[1]

In [53]:
train_df.show(3)

# Construct a new LinearRegression object and fit the training data.

In [49]:
from pyspark.ml.regression import LinearRegression

In [50]:
lr = LinearRegression(featuresCol = 'features', labelCol='current_ma', maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [51]:
lr_model = lr.fit(train_df)


KeyboardInterrupt



In [None]:
print("Coefficients: " + str(lr_model.coefficients))

In [33]:
print("Intercept: " + str(lr_model.intercept))

Intercept: 181722.18730392525


# Summarize the model over the training set and print out some metrics

In [34]:
trainingSummary = lr_model.summary

In [35]:
#RMSE measures the differences between predicted values by the model and the actual values
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)

RMSE: 4720.978859


In [36]:
print("r2: %f" % trainingSummary.r2)

r2: 0.223893


In [37]:
train_df.describe().show()

+-------+-----------------+
|summary|         total_wh|
+-------+-----------------+
|  count|           428150|
|   mean|6326.256998715404|
| stddev|5358.853389745615|
|    min|                0|
|    max|            20940|
+-------+-----------------+



# Prediction

In [46]:
lr_predictions = lr_model.transform(test_df)

In [None]:
#lr_predictions = lr_model.transform(test_df)
#lr_predictions.select("prediction","current_ma","features").show(5)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="total_wh",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

In [None]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

In [None]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","current_ma","features").show()