In [None]:
# !pip install --upgrade pixiedust
import pixiedust
pixiedust.installPackage("org.apache.bahir:spark-sql-cloudant_2.11:0")

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
### TODO Please provide your Cloudant credentials in this cell
def readDataFrameFromCloudant(database):

  cloudantdata = spark.read.format("org.apache.bahir.cloudant")\
    .option("cloudant.host",'XXXX-bluemix.cloudant.com')\
    .option("cloudant.username", 'XXXX-bluemix')\
    .option("cloudant.password",'XXXX')\
    .load(database)
    
  return cloudantdata

In [None]:
df=readDataFrameFromCloudant('training')

In [None]:
# Enable SQL on the data frame
df.createOrReplaceTempView('df')

In [None]:
from pyspark.sql.functions import translate, col

df_cleaned = df \
    .withColumn("temp", df.tmp.cast('double')) \
    .withColumn("humidity", df.hmdty.cast('double')) \

df_cleaned.createOrReplaceTempView('df_cleaned')
df_cleaned.select('tmp', 'hmdty').distinct().show()

In [None]:
df_class_0 = spark.sql('select time, tmp, hmdty, class from df_cleaned where class = 0')
df_class_1 = spark.sql('select time, tmp, hmdty, class from df_cleaned where class = 1')
df_class_0.createOrReplaceTempView('df_class_0')
df_class_1.createOrReplaceTempView('df_class_1')

In [None]:
df_class_1.select('tmp', 'hmdty').distinct().show()

In [None]:
df_class_0.select('tmp', 'hmdty').distinct().show()

In [None]:
df_cleaned.printSchema()

In [None]:
spark.sql('select class, count(class) from df_cleaned group by class').show()

In [None]:
display(df_class_0)

In [None]:
# Imports for modelling
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# create binary classifier model
vectorAssembler = VectorAssembler(inputCols=["humidity","temp"],
                                  outputCol="features")
lr = LogisticRegression(maxIter=1000).setLabelCol("class")
pipeline = Pipeline(stages=[vectorAssembler, lr ])
model = pipeline.fit(df_cleaned)
result = model.transform(df_cleaned)

In [None]:
model.stages[1].coefficients

In [None]:
model.stages[1].intercept

In [None]:
#evaluate classification accuracy (1.0 = 100% accurate)
binEval = MulticlassClassificationEvaluator().setMetricName("accuracy").setPredictionCol("prediction").setLabelCol("class")    
binEval.evaluate(result)

In [None]:
# test the model
# re-read data from cloudant
new_df = readDataFrameFromCloudant('training')
new_df_cleaned = new_df \
    .withColumn("temp", new_df.tmp.cast('double')) \
    .withColumn("hmdty", new_df.hmdty.cast('double')) \

new_df_cleaned.createOrReplaceTempView('df_cleaned')

result = model.transform(new_df_cleaned)
result.createOrReplaceTempView('result')
spark.sql("select hmdty, tmp, class, prediction from result").show(50)