In [0]:
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, Normalizer, MinMaxScaler,RobustScaler
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.linalg import Vectors
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import StandardScaler
import time 
import h2o
from pysparkling import *
from pysparkling.ml import H2OXGBoost, H2ODeepLearning
from h2o.estimators import H2OXGBoostEstimator, H2ODeepLearningEstimator
from pyspark import SparkContext
from pyspark.sql import SparkSession
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
import mlflow
import mlflow.sklearn

In [0]:
mlflow.set_experiment("/mlflow/Modeling")

In [0]:
df = spark.sql("select * from HouseSensor")
# df = df.drop("house")
# print(df.rdd.getNumPartitions())
df = df.coalesce(24)
df.cache()

In [0]:
house_stringIdx = StringIndexer(inputCol="house", outputCol="houses")
house = house_stringIdx.fit(df)
df = house.transform(df).drop('house')

In [0]:
label_stringIdx = StringIndexer(inputCol="activity", outputCol="label")
label = label_stringIdx.fit(df)
df_ML = label.transform(df).drop('activity')

In [0]:
feature_names = df_ML.columns
feature_names.remove('label')
va = VectorAssembler(outputCol="features", inputCols=feature_names)
va_df = va.transform(df_ML).select('features', 'label') 

In [0]:
[train_ML,test_ML] = va_df.randomSplit([0.8,0.2], seed=42)
train_ML.cache()
test_ML.cache()

In [0]:
LR_model = LogisticRegression(maxIter=100, family="multinomial")
DT_model = DecisionTreeClassifier(maxBins=100, maxDepth = 20)
RF_model = RandomForestClassifier(maxBins=100, maxDepth = 15, seed=42)

In [0]:
def modeling(model,train,test):
  start_time = time.time()
  trained_model = model.fit(train)
  preds = trained_model.transform(test)
  end_time = time.time()
  evaluator = MulticlassClassificationEvaluator()
  accuracy = evaluator.evaluate(preds)
  total_time = float(end_time-start_time)
  return  [accuracy*100, total_time, preds]

In [0]:
with mlflow.start_run():
  LR_model = LogisticRegression(maxIter=10)
  [acc_LR,time_taken, preds] = modeling(LR_model,train_ML,test_ML)
  print("Accuracy LR",acc_LR)
  print("time taken LR",time_taken)
#   mlflow.sklearn.log_model("Logistic Regression")
  mlflow.log_param("Model", "Logistic Regression")
#   mlflow.log_param("Max Iterations", 100)
  mlflow.log_metric("Time Taken", time_taken)
  mlflow.log_metric("Accuracy", acc_LR)

In [0]:
with mlflow.start_run():
  DT_model = DecisionTreeClassifier(maxBins=100, maxDepth=10)
  [acc_DT,time_taken,preds] = modeling(DT_model,train_ML,test_ML)
  print("Accuracy DT",acc_DT)
  print("time taken DT",time_taken)
#   mlflow.log_model("Decision treet")
  mlflow.log_param("Model", "Decision tree")
  mlflow.log_param("Max Bins", 100)
  mlflow.log_param("Max Depth", 20)
  mlflow.log_metric("Time Taken", time_taken)
  mlflow.log_metric("Accuracy", acc_DT)

In [0]:
with mlflow.start_run():
  RF_model = RandomForestClassifier(maxDepth=10, maxBins=100, numTrees=10, seed=42)
  [acc_RF,time_taken,preds] = modeling(RF_model,train_ML,test_ML)
  print("Accuracy RF",acc_RF)
  print("time taken RF",time_taken)
#   mlflow.log_model("Random Forest")
  mlflow.log_param("Model", "Random Forest")
  mlflow.log_param("Max Bins", 100)
  mlflow.log_param("Max Depth", 15)
  mlflow.log_metric("Time Taken", time_taken)
  mlflow.log_metric("Accuracy", acc_DT)

In [0]:
hc = H2OContext.getOrCreate()

In [0]:
sparkDF = hc.asSparkFrame(df)
sparkDF = df.withColumn("activity", df.activity.cast("string"))
[trainingDF, testingDF] = sparkDF.randomSplit([0.8, 0.2])
trainingDF.cache()
testingDF.cache()

In [0]:
XGB_model = H2OXGBoost(labelCol = "activity")
DL_model = H2ODeepLearning(labelCol = "activity")

In [0]:
def modeling_H2O(model,train,test):
  start_time = time.time()
  model_H2O = model.fit(train)
  pred = model_H2O.transform(test)
  end_time = time.time()
  pred = pred.drop('detailed_prediction')
  df_pred = pred.toPandas()
  from sklearn.metrics import accuracy_score
  accuracy = accuracy_score(df_pred['activity'],df_pred['prediction'])
  total_time = float(end_time-start_time)
  return  [accuracy*100, total_time, df_pred]

In [0]:
with mlflow.start_run():
  XGB_model = H2OXGBoost(labelCol = "activity")
  [acc_XGB,time_taken, pred] = modeling_H2O(XGB_model,trainingDF, testingDF)
  print("Accuracy XGB",acc_XGB)
  print("time taken XGB",time_taken)
#   mlflow.log_model("XGBoost")
  mlflow.log_param("Model", "XGBoost")
  mlflow.log_metric("Time Taken", time_taken)
  mlflow.log_metric("Accuracy", acc_XGB)

In [0]:
with mlflow.start_run():
  DL_model = H2ODeepLearning(labelCol = "activity")
  [acc_DLE,time_taken, pred] = modeling_H2O(DL_model, trainingDF, testingDF)
  print("Accuracy DL",acc_DLE)
  print("time taken DL",time_taken)
#   mlflow.log_model("DNN")
  mlflow.log_param("Model", "DNN")
  mlflow.log_metric("Time Taken", time_taken)
  mlflow.log_metric("Accuracy", acc_DLE)