In [None]:
!pip install pyspark
!pip install elephas


In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
from pyspark.sql import SparkSession 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

spark = SparkSession.builder.appName('Spark Federated DL').getOrCreate()

maxlength = 1000
spark.conf.set("spark.sql.debug.maxToStringFields", maxlength)

In [None]:
features_df = pd.read_csv("/kaggle/input/nbaiot-dataset/features.csv")
features_df

In [None]:
summary_df = pd.read_csv("/kaggle/input/nbaiot-dataset/data_summary.csv")
summary_df

In [None]:
summary_df = pd.read_csv("/kaggle/input/nbaiot-dataset/device_info.csv")
summary_df

In [None]:
out_files = ["/kaggle/input/nbaiot-dataset/features.csv",
             "/kaggle/input/nbaiot-dataset/data_summary.csv",
             "/kaggle/input/nbaiot-dataset/device_info.csv"]

In [None]:
import glob

csv_files = glob.glob("/kaggle/input/nbaiot-dataset/*.csv")
[csv_files.remove(f) for f in out_files];

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DoubleType, FloatType

schema = StructType([ 
    StructField("MI_dir_L5_weight", DoubleType(), True), 
    StructField("MI_dir_L5_mean", DoubleType(), True), 
    StructField("MI_dir_L5_variance", DoubleType(), True),
    StructField("MI_dir_L3_weight", DoubleType(), True),
    StructField("MI_dir_L3_mean", DoubleType(), True),
    StructField("MI_dir_L3_variance", DoubleType(), True),
    StructField("MI_dir_L1_weight", DoubleType(), True),
    StructField("MI_dir_L1_mean", DoubleType(), True),
    StructField("MI_dir_L1_variance", DoubleType(), True), 
    StructField("MI_dir_L0_1_weight", DoubleType(), True),
    StructField("MI_dir_L0_1_mean", DoubleType(), True),
    StructField("MI_dir_L0_1_variance", DoubleType(), True),
    StructField("MI_dir_L0_01_weight", DoubleType(), True),
    StructField("MI_dir_L0_01_mean", DoubleType(), True),
    StructField("MI_dir_L0_01_variance", DoubleType(), True),
    StructField("H_L5_weight", DoubleType(), True),
    StructField("H_L5_mean", DoubleType(), True),
    StructField("H_L5_variance", DoubleType(), True),
    StructField("H_L3_weight", DoubleType(), True),
    StructField("H_L3_mean", DoubleType(), True),
    StructField("H_L3_variance", DoubleType(), True),   
    StructField("H_L1_weight", DoubleType(), True),
    StructField("H_L1_mean", DoubleType(), True),
    StructField("H_L1_variance", DoubleType(), True),   
    StructField("H_L0_1_weight", DoubleType(), True),
    StructField("H_L0_1_mean", DoubleType(), True),
    StructField("H_L0_1_variance", DoubleType(), True),
    StructField("H_L0_01_weight", DoubleType(), True),
    StructField("H_L0_01_mean", DoubleType(), True),
    StructField("H_L0_01_variance", DoubleType(), True), 
    StructField("HH_L5_weight", DoubleType(), True),
    StructField("HH_L5_mean", DoubleType(), True),
    StructField("HH_L5_std", DoubleType(), True),
    StructField("HH_L5_magnitude", DoubleType(), True),
    StructField("HH_L5_radius", DoubleType(), True),
    StructField("HH_L5_covariance", DoubleType(), True),
    StructField("HH_L5_pcc", DoubleType(), True),   
    StructField("HH_L3_weight", DoubleType(), True),
    StructField("HH_L3_mean", DoubleType(), True),
    StructField("HH_L3_std", DoubleType(), True),
    StructField("HH_L3_magnitude", DoubleType(), True),
    StructField("HH_L3_radius", DoubleType(), True),
    StructField("HH_L3_covariance", DoubleType(), True),
    StructField("HH_L3_pcc", DoubleType(), True),  
    StructField("HH_L1_weight", DoubleType(), True),
    StructField("HH_L1_mean", DoubleType(), True),
    StructField("HH_L1_std", DoubleType(), True),
    StructField("HH_L1_magnitude", DoubleType(), True),
    StructField("HH_L1_radius", DoubleType(), True),
    StructField("HH_L1_covariance", DoubleType(), True),
    StructField("HH_L1_pcc", DoubleType(), True),
    StructField("HH_L0_1_weight", DoubleType(), True),
    StructField("HH_L0_1_mean", DoubleType(), True),
    StructField("HH_L0_1_std", DoubleType(), True),
    StructField("HH_L0_1_magnitude", DoubleType(), True),
    StructField("HH_L0_1_radius", DoubleType(), True),
    StructField("HH_L0_1_covariance", DoubleType(), True),
    StructField("HH_L0_1_pcc", DoubleType(), True),    
    StructField("HH_L0_01_weight", DoubleType(), True),
    StructField("HH_L0_01_mean", DoubleType(), True),
    StructField("HH_L0_01_std", DoubleType(), True),
    StructField("HH_L0_01_magnitude", DoubleType(), True),
    StructField("HH_L0_01_radius", DoubleType(), True),
    StructField("HH_L0_01_covariance", DoubleType(), True),
    StructField("HH_L0_01_pcc", DoubleType(), True),
    StructField("HH_jit_L5_weight", DoubleType(), True),
    StructField("HH_jit_L5_mean", DoubleType(), True),
    StructField("HH_jit_L5_variance", DoubleType(), True),
    StructField("HH_jit_L3_weight", DoubleType(), True),
    StructField("HH_jit_L3_mean", DoubleType(), True),
    StructField("HH_jit_L3_variance", DoubleType(), True),
    StructField("HH_jit_L1_weight", DoubleType(), True),
    StructField("HH_jit_L1_mean", DoubleType(), True),
    StructField("HH_jit_L1_variance", DoubleType(), True),
    StructField("HH_jit_L0_1_weight", DoubleType(), True),
    StructField("HH_jit_L0_1_mean", DoubleType(), True),
    StructField("HH_jit_L0_1_variance", DoubleType(), True),
    StructField("HH_jit_L0_01_weight", DoubleType(), True),
    StructField("HH_jit_L0_01_mean", DoubleType(), True),
    StructField("HH_jit_L0_01_variance", DoubleType(), True),
    StructField("HpHp_L5_weight", DoubleType(), True),
    StructField("HpHp_L5_mean", DoubleType(), True),
    StructField("HpHp_L5_std", DoubleType(), True),
    StructField("HpHp_L5_magnitude", DoubleType(), True),
    StructField("HpHp_L5_radius", DoubleType(), True),
    StructField("HpHp_L5_covariance", DoubleType(), True),
    StructField("HpHp_L5_pcc", DoubleType(), True),
    StructField("HpHp_L3_weight", DoubleType(), True),
    StructField("HpHp_L3_mean", DoubleType(), True),
    StructField("HpHp_L3_std", DoubleType(), True),
    StructField("HpHp_L3_magnitude", DoubleType(), True),
    StructField("HpHp_L3_radius", DoubleType(), True),
    StructField("HpHp_L3_covariance", DoubleType(), True),
    StructField("HpHp_L3_pcc", DoubleType(), True),
    StructField("HpHp_L1_weight", DoubleType(), True),
    StructField("HpHp_L1_mean", DoubleType(), True),
    StructField("HpHp_L1_std", DoubleType(), True),
    StructField("HpHp_L1_magnitude", DoubleType(), True),
    StructField("HpHp_L1_radius", DoubleType(), True),
    StructField("HpHp_L1_covariance", DoubleType(), True),
    StructField("HpHp_L1_pcc", DoubleType(), True),
    StructField("HpHp_L0_1_weight", DoubleType(), True),
    StructField("HpHp_L0_1_mean", DoubleType(), True),
    StructField("HpHp_L0_1_std", DoubleType(), True),
    StructField("HpHp_L0_1_magnitude", DoubleType(), True),
    StructField("HpHp_L0_1_radius", DoubleType(), True),
    StructField("HpHp_L0_1_covariance", DoubleType(), True),
    StructField("HpHp_L0_1_pcc", DoubleType(), True),
    StructField("HpHp_L0_01_weight", DoubleType(), True),
    StructField("HpHp_L0_01_mean", DoubleType(), True),
    StructField("HpHp_L0_01_std", DoubleType(), True),
    StructField("HpHp_L0_01_magnitude", DoubleType(), True),
    StructField("HpHp_L0_01_radius", DoubleType(), True),
    StructField("HpHp_L0_01_covariance", DoubleType(), True),
    StructField("HpHp_L0_01_pcc", DoubleType(), True)
  ])
 
len(schema)

In [None]:
df=spark.read.csv(csv_files,
                  header=False, 
                  schema=schema
                 )

df.printSchema()


In [None]:
# df.describe().toPandas().transpose()

In [None]:
df = df.na.drop()

In [None]:
from  pyspark.sql.functions import input_file_name

df = df.withColumn("path", input_file_name())

In [None]:
from pyspark.sql.functions import col, udf

def getType(path):
    sampleType = path.split('/')[-1].split('.')[1:-1]
    return "_".join(sampleType)

getTypeUDF = udf(lambda x:getType(x),StringType()) 
df = df.withColumn("type", getTypeUDF(col("path")))

In [None]:
df = df.drop("path")

In [None]:
df = df.filter("type != ''")

In [None]:
tmp_label_col = "type"

In [None]:
features_col = list(df.columns)
features_col.remove(tmp_label_col)

In [None]:
train_df, _ = df.randomSplit([.8, .2],seed=2)


In [None]:
train_df.limit(5).toPandas()

In [None]:
feature_col = "selectedFeatures"
label_col = "label_index"

In [None]:
preprocessing_stages = []

from pyspark.ml.feature import StandardScaler, VectorAssembler

unscaled_assembler = VectorAssembler(inputCols=features_col, outputCol="unscaled_features")
scaler = StandardScaler(inputCol="unscaled_features", outputCol="scaled_features")
assembler = VectorAssembler(inputCols=["scaled_features"], outputCol="features") 

preprocessing_stages += [unscaled_assembler, scaler, assembler]

In [None]:
from pyspark.ml.feature import StringIndexer 
from pyspark.ml.feature import OneHotEncoder


label_str_index =  StringIndexer(inputCol=tmp_label_col, outputCol=label_col)

preprocessing_stages += [label_str_index]

In [None]:
from pyspark.ml.feature import UnivariateFeatureSelector

selector = UnivariateFeatureSelector(featuresCol="features", outputCol=feature_col,
                                     labelCol="label_index", selectionMode="numTopFeatures")

selector.setFeatureType("continuous").setLabelType("continuous").setSelectionThreshold(60)

preprocessing_stages += [selector]

In [None]:
from pyspark.ml import Pipeline

# Set Pipeline
preprocessing_pipeline = Pipeline(stages=preprocessing_stages)

# Fit Pipeline to Data
preprocessing_pipeline = preprocessing_pipeline.fit(df)

# Transform Data using Fitted Pipeline
train_df = preprocessing_pipeline.transform(df)

In [None]:
# Number of Classes
nb_classes = train_df.select("label_index").distinct().count()

# Number of Inputs or Input Dimensions
input_dim = len(train_df.select(feature_col).first()[0])
print("nb_classes: ", nb_classes)
print("input_dim: ", input_dim)

In [None]:
from pyspark.sql.functions import col

y = train_df.select(col("label_index")).toPandas()
y = y["label_index"].astype(int)

In [None]:
from sklearn.utils import class_weight


y_collect = train_df.select("label_index").groupBy("label_index").count().collect()
unique_y = [x["label_index"] for x in y_collect]
total_y = sum([x["count"] for x in y_collect])
unique_y_count = len(y_collect)
bin_count = [x["count"] for x in y_collect]

class_weights_spark = {i: ii for i, ii in zip(unique_y, total_y / (unique_y_count * np.array(bin_count)))}
class_weights_spark = [class_weights_spark[key] for key in sorted(class_weights_spark.keys())]
class_weights_spark

# class_weights = class_weight.compute_class_weight('balanced',
#                                                   classes=np.unique(y),
#                                                   y=y)

# class_weights

In [None]:
csv_files = glob.glob("/kaggle/input/nbaiot-dataset/*.csv")
[csv_files.remove(f) for f in out_files];

In [None]:
from  pyspark.sql.functions import input_file_name
from pyspark.sql.functions import col, udf

def get_client_csvs(pack_num):
    pack_csv_file = []
    for f in csv_files:
        if pack_num in f:
            pack_csv_file.append(f)  
    return pack_csv_file

def read_csv(pack_csv_file, schema):
    return spark.read.csv(pack_csv_file, header=False, schema=schema)

def getType(path):
    sampleType = path.split('/')[-1].split('.')[1:-1]
    return "_".join(sampleType)

def df_preparation(df):
    df = df.na.drop()
    df = df.withColumn("path", input_file_name())
    getTypeUDF = udf(lambda x:getType(x),StringType()) 
    df = df.withColumn("type", getTypeUDF(col("path")))
    df = df.drop("path")
    df = df.filter("type != ''")
    return df
    
def read_client_df(client_name, schema):
    pack_num = client_name.split('_')[-1]
    pack_csv_file = get_client_csvs(pack_num)
    df = read_csv(pack_csv_file, schema)
    return df_preparation(df)


def df_preprocessing(df, preprocessing_pipeline):
    transformed_df = preprocessing_pipeline.transform(df)
    return transformed_df.select(feature_col, label_col)

In [None]:
clients = {}
num_clients = 9
for n in range(num_clients):
    clients["client_{}".format(n+1)]=None
clients

In [None]:
for k in clients.keys():
    clients[k] = {}
    client_df = read_client_df(k, schema)
    clients[k]["df"] = client_df
    
full_dataset_size = sum([clients[c_key]["df"].count() for c_key in clients.keys()])
full_dataset_size

In [None]:
for c_key in clients.keys():
    clients[c_key]["df"] = df_preprocessing(clients[c_key]["df"], preprocessing_pipeline)

hyper_params = {
    "full_dataset_size":full_dataset_size,
    "class_weights_spark": class_weights_spark,
    "nb_classes":  nb_classes,
    "input_dim":  input_dim
}

## Saving Previous pipeline and params for not running it from beginning of connection reset

In [None]:
import pickle

preprocessing_pipeline.write().overwrite().save("preprocessing_pipeline")

for c_key in clients.keys():
    clients[c_key]["df"].write.parquet("preprocessed_df_{}.parquet".format(c_key))
    
with open('hyper_params.pkl', 'wb') as fp:
    pickle.dump(hyper_params, fp)

In [None]:
import pickle
from pyspark.ml import PipelineModel

preprocessing_pipeline = PipelineModel.load('./preprocessing_pipeline')

for c_key in clients.keys():
    clients[c_key]["df"] = spark.read.parquet("preprocessed_df_{}.parquet".format(c_key))
    

with open('hyper_params.pkl', 'rb') as fp:
    hyper_params = pickle.load(fp)
    

full_dataset_size = hyper_params["full_dataset_size"]
class_weights = hyper_params["class_weights_spark"]
nb_classes = hyper_params["nb_classes"]
input_dim = hyper_params["input_dim"]

In [None]:
# Keras / Deep Learning
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from tensorflow.keras.optimizers import Adam, RMSprop

In [None]:
import keras.backend as K
import tensorflow as tf
from tensorflow.keras.metrics import Recall, Precision


def f1_score(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    recall = true_positives / (possible_positives + K.epsilon())
    f1_val = 2*(precision*recall)/(precision+recall+K.epsilon())
    return f1_val



In [None]:
from keras import backend as K

def weighted_categorical_crossentropy(weights):
    
    weights = K.variable(weights)
        
    def loss(y_true, y_pred):
        # scale predictions so that the class probas of each sample sum to 1
        y_pred /= K.sum(y_pred, axis=-1, keepdims=True)
        # clip to prevent NaN's and Inf's
        y_pred = K.clip(y_pred, K.epsilon(), 1 - K.epsilon())
        # calc
        loss = y_true * K.log(y_pred) * weights
        loss = -K.sum(loss, -1)
        return loss
    
    return loss

In [None]:
# Set up Deep Learning Model / Architecture

class CustomModel:
    @staticmethod
    def build(input_dim, num_classes):            
        model = Sequential()
        model.add(Dense(64, input_shape=(input_dim,), activation="relu"))
        model.add(Dense(32, activation="relu"))
        model.add(Dropout(rate=0.2))
        model.add(Dense(nb_classes, activation="softmax"))
        return model

In [None]:
from keras.metrics import Recall, Precision
from keras.utils.vis_utils import plot_model

init_learning_rate = 0.001
learning_rate = init_learning_rate 
comms_round = 30
batch_size = 64
loss=weighted_categorical_crossentropy(class_weights)
metrics = ["accuracy", Recall(), Precision(), f1_score]
optimizer = Adam(learning_rate=learning_rate)

#initialize global model
global_model = CustomModel()
global_model = global_model.build(input_dim, nb_classes)

global_model.summary()


In [None]:
from keras.utils.vis_utils import plot_model

plot_model(global_model, to_file='global_model.png', show_shapes=True, show_layer_names=True)

In [None]:
def weight_scalling_factor(full_dataset_size, client_dataset_size):
    return client_dataset_size/full_dataset_size

In [None]:
def scale_model_weights(weight, scalar):
    '''function for scaling a models weights'''
    weight_final = []
    steps = len(weight)
    for i in range(steps):
        weight_final.append(scalar * weight[i])
    return weight_final

In [None]:
def sum_scaled_weights(scaled_weight_list):
    '''Return the sum of the listed scaled weights. The is equivalent to scaled avg of the weights'''
    avg_grad = list()
    #get the average grad accross all client gradients
    for grad_list_tuple in zip(*scaled_weight_list):
        layer_mean = tf.math.reduce_sum(grad_list_tuple, axis=0)
        avg_grad.append(layer_mean)
        
    return avg_grad

In [None]:
from sklearn.metrics import accuracy_score

def test_model(X_test, y_test,  model, comm_round):
    #logits = model.predict(X_test, batch_size=100)
    logits = model.predict(X_test)
    loss = cce(y_test, logits)
    y_hat = np.argmax(logits, axis=1)
    y_true = np.argmax(y_test, axis=1)

    accuracy = accuracy_score(np.argmax(y_test, axis=1), np.argmax(logits, axis=1))
    
    r = Recall()
    r.update_state(y_test, logits)
    recall = r.result().numpy()
    
    p = Precision()
    p.update_state(y_test, logits)
    precision = p.result().numpy()
    
    f = f1_score(y_test, logits)
    f1 = f.numpy()
    
    print('comm_round: {} | global_loss: {} | global_accuracy: {:.4} | global_recall: {:.4} | global_precision: {:.4} | global_f1_score: {:.4} \n'.format(comm_round, loss, accuracy, recall, precision, f1))
    return loss, accuracy, precision, recall, f1

In [None]:
# Elephas for Deep Learning on Spark
from elephas.ml_model import ElephasEstimator
from tensorflow.keras import optimizers

def get_optimizer(lr):
    # Set and Serialize Optimizer
    optimizer_conf = optimizers.Adam(learning_rate=lr)
    opt_conf = optimizers.serialize(optimizer_conf)
    return opt_conf
    

def get_sparkML_estimator(model, nb_classes, opt_conf, class_weights):
    # Initialize SparkML Estimator and Get Settings
    estimator = ElephasEstimator()
    estimator.setFeaturesCol(feature_col)
    estimator.setLabelCol(label_col)
    estimator.set_keras_model_config(model.to_json())
    estimator.set_categorical_labels(True)
    estimator.set_nb_classes(nb_classes)
    estimator.set_num_workers(1)
    estimator.set_epochs(1) 
    estimator.set_batch_size(batch_size)
    estimator.set_verbosity(1)
    estimator.set_optimizer_config(opt_conf)
    estimator.set_mode("synchronous")
    estimator.set_num_workers(1)
    estimator.set_frequency("epoch")
    estimator.set_validation_split(0.0)
    estimator.set_loss(weighted_categorical_crossentropy(class_weights))
    estimator.set_metrics(['accuracy', Recall(), Precision(), f1_score])
    return estimator

In [None]:
# We can add the preprocessing pipeline stages with the estimator as a single pipeline for whole process
def get_pipeline_model(estimator):
    stages = [estimator]
    return Pipeline(stages=stages)
    

In [None]:
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from elephas.ml_model import ElephasTransformer
from elephas.utils.model_utils import ModelType, argmax


def fit_pipeline_model(pipeline, train_df):
    return pipeline.fit(train_df)

def train_client(global_weights, client_df, learning_rate):
    local_model = CustomModel()
    local_model = local_model.build(input_dim, nb_classes)

    #set local model weight to the weight of the global model
    local_model.set_weights(global_weights)

    #fit local model with client's data
    opt_conf = get_optimizer(learning_rate)
    estimator = get_sparkML_estimator(local_model, nb_classes, opt_conf, class_weights)
    pipeline = get_pipeline_model(estimator)
    
    fitted_pipeline = fit_pipeline_model(pipeline, client_df)

    client_dataset_size = client_df.count()
    local_model_weights = estimator.get_model().get_weights()
    #scale the model weights and add to list
    scaling_factor = weight_scalling_factor(full_dataset_size, client_dataset_size)
    scaled_weights = scale_model_weights(local_model_weights, scaling_factor)
    
    return fitted_pipeline, scaled_weights


In [None]:
def learning_rate_scheduler(nb_round, learning_rate):
    if (nb_round > 0 and nb_round % 2):
        return learning_rate * 0.5
    return learning_rate

In [None]:
import random
from elephas.spark_model import SparkModel


#commence global training loop
global_accuracy=[]
global_precision=[]
global_recall=[]
global_f1=[]
best_global_accuracy = 0
    
number_rounds = 10
learning_rate = 0.001

global_pipeline = None
local_pipeline = None

for comm_round in range(number_rounds): 
    learning_rate = learning_rate_scheduler(comm_round, learning_rate)

    # get the global model's weights - will serve as the initial weights for all local models
    global_weights = global_model.get_weights()
    
    #initial list to collect local model weights after scalling
    scaled_local_weight_list = list()

    #randomize client data - using keys
    client_names= list(clients.keys())[:-1]
    random.shuffle(client_names)
            
    #loop through each client and create new local model
    for c_key in client_names:  
        
        print(f"Round: {comm_round} | Client: {c_key} training")
        client_df = clients[c_key]["df"]
        local_pipeline, scaled_weights = train_client(global_weights, client_df, learning_rate)
        scaled_local_weight_list.append(scaled_weights)
        global_pipeline = local_pipeline
            
    #to get the average over all the local model, we simply take the sum of the scaled weights
    average_weights = sum_scaled_weights(scaled_local_weight_list)
    
    #update global model 
    global_model.set_weights(average_weights)
    global_pipeline.stages[-1].get_model().set_weights(average_weights)
    
    #test global model and print out metrics after each communications round
    test_client_key = list(clients.keys())[-1]
    test_df = clients[test_client_key]["df"]
    prediction_and_label = global_pipeline.transform(test_df)

    prediction_and_label = prediction_and_label.select(label_col, argmax('prediction').astype(DoubleType()).alias('prediction'))
    prediction_and_label = prediction_and_label.rdd.map(lambda row: (row[label_col], row["prediction"]))
    metrics = MulticlassMetrics(prediction_and_label)
    
    print("round {} results on client '{}'".format(comm_round, test_client_key))
    g_accuracy = metrics.accuracy
    weightedPrecision = metrics.weightedPrecision
    weightedRecall = metrics.weightedRecall
    weightedF1Score = 2*((weightedPrecision*weightedRecall)/(weightedPrecision+weightedRecall))
    print("global accuracy : ", g_accuracy)
    print("global weightedPrecision : ",weightedPrecision)
    print("global weightedRecall : ",weightedRecall)
    print("global weightedF1Score : ",weightedF1Score)
    
    global_accuracy.append(g_accuracy)
    global_precision.append(weightedPrecision)
    global_recall.append(weightedRecall)
    global_f1.append(weightedF1Score)
        
    if g_accuracy > best_global_accuracy:
        best_global_accuracy = g_accuracy
        global_pipeline.stages[-1].save('global_transformer_best_weights.h5')
        print("New Weights Saved")

In [None]:
from elephas.ml_model import load_ml_transformer

load_ml_transformer = load_ml_transformer("global_transformer_best_weights.h5")
prediction_and_label = load_ml_transformer.transform(test_df)

prediction_and_label.head(5)


In [None]:
from matplotlib import pyplot as plt

figure, axis = plt.subplots(2, 2, figsize=(15, 15))

axis[0, 0].plot(global_accuracy)
axis[0, 0].set_title("global accuracy")
  
axis[0, 1].plot(global_precision)
axis[0, 1].set_title("global weighted Precision")
  
axis[1, 0].plot(global_recall)
axis[1, 0].set_title("global weighted Recall")

axis[1, 1].plot(global_f1)
axis[1, 1].set_title("global weighted F1Score")
  
plt.show()
