In [None]:
# Install visualization libraries

!pip install --upgrade pixiedust
!pip install --upgrade bokeh

# pixiedust.installPackage("cloudant-labs:spark-cloudant:2.0.0-s_2.11")

In [None]:
# Use visualization libraries

import pixiedust
import bokeh

In [None]:
# The code was removed by Watson Studio for sharing.

In [None]:
# Create a Spark dataframe from Cloudant

sparkSession = SQLContext.getOrCreate(sc).sparkSession

cloudantdata=sparkSession.read.format("com.cloudant.spark")\
  .option("cloudant.host",credentials_1['host'])\
  .option("cloudant.username", credentials_1['username'])\
  .option("cloudant.password", credentials_1['password'])\
  .option("jsonstore.rdd.partitions", "1")\
  .option("inferSchema", "True")\
  .load("zzshake")

# Persist dataframe and show structure

cloudantdata.createOrReplaceTempView("dataframe")
cloudantdata.printSchema

In [None]:
# Use SQL to query the dataframe and show aggregation of energy

sqldata = "select cast (sum(sqrt((AX * AX) + (AY * AY) + (AZ * AZ))) as integer) as EN, ID from dataframe group by ID order by EN desc"
modeldata = spark.sql(sqldata)
modeldata.show()

In [None]:
# Use PixieDust to visualize the results

display(modeldata)

In [None]:
# Access the full dataset and show timeseries

sqldata = "select TS, ID, AX, AY, AZ, OA, OB, OG, sqrt((AX * AX) + (AY * AY) + (AZ * AZ)) as EN from dataframe"
modeldata = spark.sql(sqldata)
display(modeldata)

In [None]:
# Another visualization of time series

display(modeldata)

In [None]:
# Check structure of Python object

modeldata

In [None]:
# Preview data

display(modeldata)

In [None]:
# Check structure of dataframe

modeldata.printSchema()

In [None]:
# Preview data

modeldata.show()

In [None]:
# Basic statistics

modeldata.describe().show()

In [None]:
# Number of records

modeldata.count()

In [None]:
# 50/50 split into training and test set

split_data = modeldata.randomSplit([0.5, 0.5], 0)
train_data = split_data[0]
test_data  = split_data[1]

print "Number of training records : " + str(train_data.count())
print "Number of testing records  : " + str(test_data.count())

In [None]:
# Preprocessing and machine learning functions

from pyspark.ml.feature import StringIndexer, IndexToString, VectorAssembler

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.ml import Pipeline, Model

In [None]:
# Convert target to numerical and check contents

stringIndexer_label = StringIndexer(inputCol = "ID", outputCol = "label").fit(modeldata)
stringIndexer_label.labels

In [None]:
# Define model input

vectorAssembler_features = VectorAssembler(inputCols=["AX", "AY", "AZ", "OA", "OB", "OG"], outputCol = "features")

In [None]:
# Convert predictoins back to labels

labelConverter = IndexToString(inputCol = "prediction", outputCol = "predlabel", labels = stringIndexer_label.labels)

In [None]:
# Define algorithm to use in modeling

cf = RandomForestClassifier(labelCol = "label", featuresCol = "features")

# cf = GBTClassifier(labelCol="label", featuresCol="features")

In [None]:
# Build the modeling pipeline

pipeline_cf = Pipeline(stages = [stringIndexer_label, vectorAssembler_features, cf, labelConverter])

In [None]:
# Build a model

model_cf = pipeline_cf.fit(train_data)

In [None]:
# Run the test data through the pipeline and check accuracy

predictions = model_cf.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)

print("Accuracy = %g" % accuracy)

In [None]:
# Sample the prediction output

predictions.show()

In [None]:
# Visualize the classification

display(predictions)