In [None]:
#Importing libraries which are needed
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

import os
import sys
from pyspark.sql.types import *

from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array

You must now load the data (hvac.csv), parse it, and use it to train the model. 
For this,you define a function that checks whether the actual temperature of the building is greater than the 
target temperature. If the actual temperature is greater, the building is hot, denoted by the value 1.0. 
If the actual temperature is lesser, the building is cold, denoted by the value 0.0.

In [4]:
# List the structure of data for better understanding. Because the data will be
 # loaded as an array, this structure makes it easy to understand what each element
 # in the array corresponds to

 # 0 Date
 # 1 Time
 # 2 TargetTemp
 # 3 ActualTemp
 # 4 System
 # 5 SystemAge
 # 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

 # Define a function that parses the raw CSV file and returns an object of type LabeledDocument

def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
         hot = 1.0
    else:
         hot = 0.0        

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)

 # Load the raw HVAC.csv file, parse it using the function
data = sc.textFile("/home/osboxes/data/SensorFiles/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

Configure the Spark machine learning pipeline that consists of three stages: tokenizer, hashingTF, and lr. 
For more information about what is a pipeline and how it works see - http://spark.apache.org/docs/latest/ml-pipeline.html .

In [None]:
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

Fit the pipeline to the training document. Verify the training document to checkpoint your progress with the application. press SHIFT + ENTER.

In [22]:
model = pipeline.fit(training)

training.show()

+----------+----------+-----+
|BuildingID|SystemInfo|label|
+----------+----------+-----+
|         4|     13 20|  0.0|
|        17|      3 20|  0.0|
|        18|     17 20|  1.0|
|        15|      2 23|  0.0|
|         3|      16 9|  1.0|
|         4|     13 28|  0.0|
|         2|     12 24|  0.0|
|        16|     20 26|  1.0|
|         9|      16 9|  1.0|
|        12|       6 5|  0.0|
|        15|     10 17|  1.0|
|         7|      2 11|  0.0|
|        15|      14 2|  1.0|
|         6|       3 2|  0.0|
|        20|     19 22|  0.0|
|         8|     19 11|  0.0|
|         6|      15 7|  0.0|
|        13|      12 5|  0.0|
|         4|      8 22|  0.0|
|         7|      17 5|  0.0|
+----------+----------+-----+
only showing top 20 rows



Notice how the actual temperature is less than the target temperature suggesting the building is cold. Hence in the training output, the value for label in the first row is 0.0, which means the building is not hot.

Prepare a data set to run the trained model against. To do so, we would pass on a system ID and system age (denoted as SystemInfo in the training output), and the model would predict whether the building with that system ID and system age would be hotter (denoted by 1.0) or cooler (denoted by 0.0).

In [6]:
# SystemInfo here is a combination of system ID followed by system age
Document = Row("id", "SystemInfo")
test = sc.parallelize([(1L, "20 25"),
              (2L, "4 15"),
              (3L, "16 9"),
              (4L, "9 22"),
              (5L, "17 10"),
              (6L, "7 22")]) \
    .map(lambda x: Document(*x)).toDF() 

Finally, make predictions on the test data

In [7]:
# Make predictions on test documents and print columns of interest
prediction = model.transform(test)
selected = prediction.select("SystemInfo", "prediction", "probability")
for row in selected.collect():
    print row

Row(SystemInfo=u'20 25', prediction=0.0, probability=DenseVector([0.5001, 0.4999]))
Row(SystemInfo=u'4 15', prediction=0.0, probability=DenseVector([0.5018, 0.4982]))
Row(SystemInfo=u'16 9', prediction=1.0, probability=DenseVector([0.4787, 0.5213]))
Row(SystemInfo=u'9 22', prediction=1.0, probability=DenseVector([0.455, 0.545]))
Row(SystemInfo=u'17 10', prediction=1.0, probability=DenseVector([0.4927, 0.5073]))
Row(SystemInfo=u'7 22', prediction=0.0, probability=DenseVector([0.5017, 0.4983]))
