# Summative

This exercise will go through a simulated streaming data workflow, as seen in the streaming data module. But it will be more complex, and closer to what a real world scenario might look like. You will need to 
- Investigate the incoming data
- Create an appropriate database to store the incoming records
- Write code to process records one by one as they arrive, including
 - Printing warnings when any reading goes above a predefined threshold
 - Storing the incoming data in a database
- Write code to analyse the stored data in a scalable manner
- Display relevant information in a dashboard

## Create an appropriate database

Each incoming record will look something like the following: 

{'Device_ID': 9,                  
  'Temp1': 33.01235436945101,  
  'Temp2': 46.313589806396116,  
  'Temp3': 16.506177184725505,  
  'Temp_Ambient': 23.782493817278034}
  
Each device is assigned an integer ID. Every device has multiple sensors, and reports the readings from each sensor as a float. Each call to gen_data.getReading() returns a time (an integer here to make things easier) and a record that follows the same pattern as above.

<b>Create a database to store the incoming data.</b> Include a time field for the time that the data arrives. If you create the database with python, show the code here, otherwise include any bash or sql code you run. You may wish to come back and add additional fields to make later analysis easier.

## Store and process the incoming data

As each record arrives (i.e. each loop of the for loop), you must
- <b>Store the record in the database you created above
- Use either moving windows or exponential averaging to keep track each sensor value for each device. Print out the values at the end of the loop
- Print a warning if any reported temperature exceeds 100 degrees for the first time for that device</b>

In [1]:
import pandas as pd
import numpy as np

In [2]:
import gen_data 

# Your code here for any initializations you may need

temp1_ave = 0
temp2_ave = 0
temp3_ave = 0
temp_am_ave = 0

window1 = []
window2 = []
window3 = []
windowA = []

window_t1_sd = 0
window_t1_av = 0
window_t2_sd = 0
window_t2_av = 0
window_t3_sd = 0
window_t3_av = 0
window_ta_sd = 0
window_ta_av = 0

df = pd.DataFrame(columns=['Time', 'Device_ID', 'Temp1', 'Temp1_ave', 'Temp1_sd',
                                                'Temp2', 'Temp2_ave', 'Temp2_sd',
                                                'Temp3', 'Temp3_ave', 'Temp3_sd',
                                                'Temp_Ambient', 'Temp_Am_ave', 'Temp_Am_sd'])

for i in range(20000): # Hint: make this lower for testing origional value: 20000
    
    # The simulated data arriving - don't change this
    arrival_time, record = gen_data.getReading()
    
    # Your code here 
    temp1_ave = temp1_ave*0.75 + record['Temp1']*0.25
    temp2_ave = temp2_ave*0.75 + record['Temp2']*0.25
    temp3_ave = temp3_ave*0.75 + record['Temp3']*0.25
    temp_am_avg = temp_am_ave*0.75 + record['Temp_Ambient']*0.25
   
    # Window for temp1
    window1.append(temp1_ave) # Add the temp1_ave to our moving window
    if len(window1)>10: # Keep the window size from growing beyond 10:
        del(window1[0]) # If the window is >10 items, delete the oldest
    window_t1_sd = np.std(window1) # Calculate the standard deviation of the ten items in the window
    window_t1_av = np.mean(window1) # Calculate the mean of the last ten readings  
    
    # Window for temp2
    window2.append(temp2_ave) # Add the temp2_ave to our moving window
    if len(window2)>10: # Keep the window size from growing beyond 10:
        del(window2[0]) # If the window is >10 items, delete the oldest
    window_t2_sd = np.std(window2) # Calculate the standard deviation of the ten items in the window
    window_t2_av = np.mean(window2) # Calculate the mean of the last ten readings     
    
    # Window for temp3
    window3.append(temp3_ave) # Add the temp3_ave to our moving window
    if len(window3)>10: # Keep the window size from growing beyond 10:
        del(window3[0]) # If the window is >10 items, delete the oldest
    window_t3_sd = np.std(window3) # Calculate the standard deviation of the ten items in the window
    window_t3_av = np.mean(window3) # Calculate the mean of the last ten readings     
    
    # Window for temp_ambient
    windowA.append(temp_am_avg) # Add the temp_am_ave to our moving window
    if len(windowA)>10: # Keep the window size from growing beyond 10:
        del(windowA[0]) # If the window is >10 items, delete the oldest
    window_ta_sd = np.std(windowA) # Calculate the standard deviation of the ten items in the window
    window_ta_av = np.mean(windowA) # Calculate the mean of the last ten readings      

    
    # Writing simulated data to df 
    df = pd.concat([df, pd.DataFrame([{'Time': arrival_time, 'Device_ID': record['Device_ID'], 
                                       'Temp1':record['Temp1'],'Temp1_ave':window_t1_av,'Temp1_sd':window_t1_sd, 
                                       'Temp2':record['Temp2'],'Temp2_ave':window_t2_av,'Temp2_sd':window_t2_sd, 
                                       'Temp3':record['Temp3'],'Temp3_ave':window_t3_av,'Temp3_sd':window_t3_sd, 
                                       'Temp_Ambient':record['Temp_Ambient'],'Temp_Am_ave':window_ta_av,'Temp_Am_sd':window_ta_sd}])], ignore_index=True)

# Going though the df to check for Temperatures that are out of bound    
for g in range(4):
    if df.iloc[g][1] >= 100 :
        print('Warning! Deive ID:', df.iloc[g][0],'Temp1 is above 100 °C. Reading: ', df.iloc[g][1],'°C')
    elif df.iloc[g][2] >= 100 :
        print('Warning! Deive ID:', df.iloc[g][0],'Temp2 is above 100 °C. Reading: ', df.iloc[g][2],'°C')
    elif df.iloc[g][3] >= 100 :
        print('Warning! Deive ID:', df.iloc[g][0],'Temp3 is above 100 °C. Reading: ', df.iloc[g][3],'°C')
    elif df.iloc[g][4] >= 100 :
        print('Warning! Deive ID:', df.iloc[g][0],'Temp_Ambient is above 100 °C. Reading: ', df.iloc[g][4],'°C')


#### I did not like the arrival_time recorded so I used the index as my arrival time instead

In [3]:
df.reset_index(level=0, inplace=True)

#### create a datetime from the index 

- epoch conversion - the origion is fixed though
- re-arrange the df

In [4]:
df['Date'] = pd.to_datetime(df['index'], unit='s', origin = '2018-07-22')

df = df[['Date', 'Device_ID', 'Temp1', 'Temp1_ave', 'Temp1_sd',
                              'Temp2', 'Temp2_ave', 'Temp2_sd',
                              'Temp3', 'Temp3_ave', 'Temp3_sd',
                              'Temp_Ambient', 'Temp_Am_ave', 'Temp_Am_sd']]

# Show data frame
df.head(5)

Unnamed: 0,Date,Device_ID,Temp1,Temp1_ave,Temp1_sd,Temp2,Temp2_ave,Temp2_sd,Temp3,Temp3_ave,Temp3_sd,Temp_Ambient,Temp_Am_ave,Temp_Am_sd
0,2018-07-22 00:00:00,10,17.850188,4.462547,0.0,29.635207,7.408802,0.0,208.925094,52.231274,0.0,21.785917,5.446479,0.0
1,2018-07-22 00:00:01,0,68.392134,12.453745,7.991198,85.231347,17.13662,9.727818,34.196067,49.976873,2.254401,21.90478,5.461337,0.014858
2,2018-07-22 00:00:02,10,34.954155,16.326579,8.518833,48.44957,22.177987,10.673225,217.477077,63.371623,19.032259,22.039832,5.477544,0.025933
3,2018-07-22 00:00:03,10,21.076215,18.075744,7.975376,33.183837,24.756365,10.265593,210.538108,77.59256,29.637397,21.925831,5.478522,0.022522
4,2018-07-22 00:00:04,0,74.523142,21.685238,10.148847,91.975457,29.27759,12.886885,37.261571,81.975432,27.920217,21.752906,5.470463,0.025799


#### Writing to datebase - csv file

In [5]:
with open("sensor_data.csv","a") as f:
    df.to_csv(f,header=True,index=False)  

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20000 entries, 0 to 19999
Data columns (total 14 columns):
Date            20000 non-null datetime64[ns]
Device_ID       20000 non-null object
Temp1           20000 non-null float64
Temp1_ave       20000 non-null float64
Temp1_sd        20000 non-null float64
Temp2           20000 non-null float64
Temp2_ave       20000 non-null float64
Temp2_sd        20000 non-null float64
Temp3           20000 non-null float64
Temp3_ave       20000 non-null float64
Temp3_sd        20000 non-null float64
Temp_Ambient    20000 non-null float64
Temp_Am_ave     20000 non-null float64
Temp_Am_sd      20000 non-null float64
dtypes: datetime64[ns](1), float64(12), object(1)
memory usage: 2.1+ MB


## Analyzing the stored data

You now have a nice big database. <b>Load it into spark for analysis.</b>

You are told that during the time the data was being collected, devices 3 and 10 had malfunctioning sensors - their temperature3 readings are all 200+. <b>Verify this.</b> Since the engineers knew about the faulty sensors, no harm has been done, but seeing those false readings in the historical data makes you unhappy. You decide to go the extra mile and replace these readings with slightly more believable (but still false) data, to practise your new machine learning skills.

<b>Using the other devices for training, build a model to predict temperature3 given readings from the other sensors. Use the model to replace the erroneous values with the predicted ones. 
    
Do you think this is a reasonable step to take? Explain.</b> 

In [7]:
import findspark
findspark.init()

#from pyspark.sql.session import SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [8]:
# read csv file
data = spark.read.csv('sensor_data.csv',header=True)

In [9]:
from pyspark.sql.types import DoubleType, IntegerType

#convert all columns
for col_name in data.columns:
    data = data.withColumn(col_name, data[col_name].cast(DoubleType()))

In [10]:
# inspect the first 10 rows
data.show()

# the printSchema() method tells you the data type of each column
data.printSchema()


+----+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|Date|Device_ID|             Temp1|         Temp1_ave|          Temp1_sd|             Temp2|         Temp2_ave|          Temp2_sd|             Temp3|         Temp3_ave|          Temp3_sd|      Temp_Ambient|       Temp_Am_ave|          Temp_Am_sd|
+----+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+
|null|     10.0|17.850188075591173| 4.462547018897793|               0.0|29.635206883150293| 7.408801720787573|               0.0| 208.9250940377956|  52.2312735094489|               0.0|21.785917113159456| 5.446479278289864|                 0.0|
|null|      

### Prepare data for model

In [11]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])


In [12]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [13]:
# assemble variables to one feature column
assembler = VectorAssembler(
    inputCols = ['Date', 'Device_ID', 'Temp1', 'Temp1_ave', 'Temp1_sd',
                                                'Temp2', 'Temp2_ave', 'Temp2_sd',
                                                'Temp3', 'Temp3_ave', 'Temp3_sd',
                                                'Temp_Ambient', 'Temp_Am_ave', 'Temp_Am_sd'],
    outputCol = "features")

#define the estimator - decision tree
dt = DecisionTreeRegressor(labelCol="Device_ID", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

### Fit pipeline and transform data

In [14]:
#fit the pipeline
PipelineModel = pipeline.fit(trainingData)

# transform using the pipeline
predictions = PipelineModel.transform(testData)

# evaluate model fit
predictions.select("prediction", "Device_ID")
evaluator = RegressionEvaluator(
    labelCol="Device_ID", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

Py4JJavaError: An error occurred while calling o137.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<Date:double,Device_ID:double,Temp1:double,Temp1_ave:double,Temp1_sd:double,Temp2:double,Temp2_ave:double,Temp2_sd:double,Temp3:double,Temp3_ave:double,Temp3_sd:double,Temp_Ambient:double,Temp_Am_ave:double,Temp_Am_sd:double>) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:163)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:146)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:146)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1358)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1331)
	at org.apache.spark.ml.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:112)
	at org.apache.spark.ml.tree.impl.RandomForest$.run(RandomForest.scala:105)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:111)
	at org.apache.spark.ml.regression.DecisionTreeRegressor.train(DecisionTreeRegressor.scala:46)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<Date:double,Device_ID:double,Temp1:double,Temp1_ave:double,Temp1_sd:double,Temp2:double,Temp2_ave:double,Temp2_sd:double,Temp3:double,Temp3_ave:double,Temp3_sd:double,Temp_Ambient:double,Temp_Am_ave:double,Temp_Am_sd:double>) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1358)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:163)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:146)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:146)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
	... 29 more


In [None]:
predictions.show()

In [None]:
##Root mean square error
print(rmse)

### Kmeans clustering

In [15]:
from pyspark.ml.clustering import KMeans

# Trains a k-means model with 4 clusters.
kmeans = KMeans(featuresCol='features', predictionCol='prediction',k=4)

#transform data using pipeline
pipeline = Pipeline(stages=[assembler, kmeans])

#fir pipeline
PipelineModel = pipeline.fit(data)

# transform using the pipeline
predictions = PipelineModel.transform(data)

Py4JJavaError: An error occurred while calling o223.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 4, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<Date:double,Device_ID:double,Temp1:double,Temp1_ave:double,Temp1_sd:double,Temp2:double,Temp2_ave:double,Temp2_sd:double,Temp3:double,Temp3_ave:double,Temp3_sd:double,Temp_Ambient:double,Temp_Am_ave:double,Temp_Am_sd:double>) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:163)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:146)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:146)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
	... 26 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
	at org.apache.spark.rdd.RDD$$anonfun$takeSample$1.apply(RDD.scala:571)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.takeSample(RDD.scala:560)
	at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:354)
	at org.apache.spark.mllib.clustering.KMeans.runAlgorithm(KMeans.scala:256)
	at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:227)
	at org.apache.spark.ml.clustering.KMeans.fit(KMeans.scala:325)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$3: (struct<Date:double,Device_ID:double,Temp1:double,Temp1_ave:double,Temp1_sd:double,Temp2:double,Temp2_ave:double,Temp2_sd:double,Temp3:double,Temp3_ave:double,Temp3_sd:double,Temp_Ambient:double,Temp_Am_ave:double,Temp_Am_sd:double>) => vector)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Values to assemble cannot be null.
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:163)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$assemble$1.apply(VectorAssembler.scala:146)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:146)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:99)
	at org.apache.spark.ml.feature.VectorAssembler$$anonfun$3.apply(VectorAssembler.scala:98)
	... 26 more


In [None]:
#view result
predictions.show()

### END

In [25]:
spark.stop()

## Step 4: Visualization

Time to get creative. Your final task is to build up a set of visualizations that could let an engineer get a quick overview of the current status of the system. Include the current sensor readings for each device and any metrics you think would be important to display. Choose one device and show more detail - a downsampled graph showing the readings over time, perhaps.

You don't need to have your visualizations update in real time - merely show them as they would be presented at a given instant (i.e. feel free to use all the data you stored in the first section).

In [16]:
import plotly.graph_objs as go
import plotly.plotly as py

In [17]:
import plotly
plotly.tools.set_credentials_file(username='dgebert18', api_key='wWcelh2OcxQaebCFxNBF')

### Temperature 1's info

In [18]:
trace1 = go.Scatter(
    y = df['Temp1'],
    mode='lines',
    name = 'Temp 1 actual',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace2 = go.Scatter(
    y = df['Temp1_ave'],
    mode='lines',
    name = 'Temp 1 average',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace3 = go.Scatter(
    y = df['Temp1_sd'],
    mode='lines',
    name = 'Temp 1 standard deviation',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
data = [trace1, trace2, trace3]

layout = dict(
    title='Temperature 1 High and Low values',
    xaxis=dict(
        rangeselector=dict(),
        rangeslider=dict(),
        type='date'
    )
)

#layout = dict(title = 'Temperature 1 High and Low values',
#              xaxis = dict(title = 'Time'),
#              yaxis = dict(title = 'Temperature (degrees C)'),
#              )


fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-line')

#url_1 = py.plot(data, filename='temp1_info', auto_open=False)
#py.iplot(data, filename='temp1_info')

The draw time for this plot will be slow for clients without much RAM.



Estimated Draw Time Slow



### Temperature 2's info

In [19]:
trace4 = go.Scatter(
    y = df['Temp2'],
    mode='lines',
    name = 'Temp 2 actual',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace5 = go.Scatter(
    y = df['Temp2_ave'],
    mode='lines',
    name = 'Temp 2 average',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace6 = go.Scatter(
    y = df['Temp2_sd'],
    mode='lines',
    name = 'Temp 2 standard deviation',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)

data = [trace4, trace5, trace6]

layout = dict(
    title='Temperature 2 High and Low values',
    xaxis=dict(
        rangeselector=dict(),
        rangeslider=dict(),
        type='date'
    )
)

fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-line')

The draw time for this plot will be slow for clients without much RAM.



Estimated Draw Time Slow



### Temperature 3's info

In [20]:
trace7 = go.Scatter(
    y = df['Temp3'],
    mode='lines',
    name = 'Temp 3 actual',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace8 = go.Scatter(
    y = df['Temp3_ave'],
    mode='lines',
    name = 'Temp 3 average',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace9 = go.Scatter(
    y = df['Temp3_sd'],
    mode='lines',
    name = 'Temp 3 standard deviation',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)

data = [trace7, trace8, trace9]

layout = dict(
    title='Temperature 3 High and Low values',
    xaxis=dict(
        rangeselector=dict(),
        rangeslider=dict(),
        type='date'
    )
)

fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-line')

The draw time for this plot will be slow for clients without much RAM.



Estimated Draw Time Slow



### Ambient Temperature info

In [21]:
trace10 = go.Scatter(
    y = df['Temp_Ambient'],
    mode='lines',
    name = 'Ambient Temp actual',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace11 = go.Scatter(
    y = df['Temp_Am_ave'],
    mode='lines',
    name = 'Ambient Temp average',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)
trace12 = go.Scatter(
    y = df['Temp_Am_sd'],
    mode='lines',
    name = 'Ambient Temp standard deviation',
    marker=dict(
        size='16',
        color = np.random.randn(500),
        showscale=True
    )
)

data = [trace10, trace11, trace12]

layout = dict(
    title='Temperature 3 High and Low values',
    xaxis=dict(
        rangeselector=dict(),
        rangeslider=dict(),
        type='date'
    )
)

fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-line')

The draw time for this plot will be slow for clients without much RAM.



Estimated Draw Time Slow

