# Connect to Hive

In [None]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = "team9"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

sc = spark.sparkContext

In [None]:
spark.sql("SHOW DATABASES").show()
spark.sql("USE team9_projectdb").show()
spark.sql("SHOW TABLES").show()
spark.sql("SELECT * FROM team9_projectdb.ctr_part").show()

In [None]:
spark

# list Hive databases

In [None]:
print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

In [None]:
print(spark.catalog.listTables("team9_projectdb"))

In [None]:
ctr = spark.read.format("avro").table('team9_projectdb.ctr_part')
ctr

In [None]:
ctr.printSchema()

spark.sql("SELECT * FROM ctr_part WHERE user_id=287005").show()

spark.sql("SELECT AVG(product_category_1) FROM ctr_part;").show()

# spark.sql("SELECT * from ctr_part where product_category_2 is NULL;").show()

# Specify the input and output features

In [None]:
# We will use the following features
# Excluded 'product_category_2', 'city_development_index' because they have a lot of nulls
# Excuded hiredate because it is given as practice to implement the cos_sin_transformation for the student
features = ['session_id', 'DateTime', 'user_id', 'product', 'campaign_id',
       'webpage_id', 'product_category_1',
       'user_group_id', 'gender', 'age_level', 'user_depth', 'var_1']

# The output/target of our model
label = 'is_click'

# Read hive tables

In [None]:
# main = spark.read.format("avro").table('team7_projectdb.main')

# depts = spark.read.format("avro").table('team7_projectdb.departments')

In [None]:
# emps.show()

In [None]:
# depts.show()

# Feature selection

In [None]:
#тут Фирас своей херней со своими данными страдает
# import pyspark.sql.functions as F

# # Remove the quotes before and after each string in job and ename columns.
# emps = emps.withColumn("job", F.translate("job","'",""))
# emps.show()
# emps = emps.withColumn("ename", F.translate("ename","'",""))
# emps.show()

In [None]:
#тут тоже
# emps = emps.select(features + [label]).na.drop()
# emps = emps.withColumn("ename_job", F.concat(F.col('ename'), F.lit("_"), F.col('job')))
# emps = emps.withColumnRenamed("sal","label")

# emps.show()

In [None]:
# from pyspark.ml import Pipeline
# from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
# from pyspark.sql.functions import col

# categoricalCols = ['deptno']
# textCols = ['ename_job']
# others = ['empno', 'mgr']

In [None]:
# ctr_template = ctr 

# Feature extraction

In [None]:
import math
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.window import Window
from pyspark.sql.functions import when, avg, coalesce, date_format
from pyspark.sql.functions import sin, cos

In [None]:
ctr1_temp = ctr
from pyspark.sql.functions import col
ctr1_temp = ctr1_temp.orderBy(col("DateTime"))

In [None]:
ctr1_temp.show()

играюсь с разделение даты на месяц, год, день

In [None]:
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import col
# # Convert the datetime column to date type
# ctr_template = ctr_template.withColumn("datetime", ctr_template["datetime"].cast("Date"))
# # # Split dates to year, month, day# 
# ctr_template=ctr_template.withColumn("year", year(ctr_template["datetime"]).cast('int'))
# ctr_template=ctr_template.withColumn("month", month(ctr_template["datetime"]).cast('int'))
# ctr_template=ctr_template.withColumn("day", dayofmonth(ctr_template["datetime"]).cast('int'))
# ctr_template.drop("datetime")

# ctr1_temp = ctr

# ctr1_temp = ctr1_temp.orderBy(col("datetime"))
ctr1_temp = ctr1_temp.withColumn("year", date_format("datetime", "yyyy").cast('int'))    
ctr1_temp = ctr1_temp.withColumn("month", date_format("datetime", "MM").cast('int')) 
ctr1_temp = ctr1_temp.withColumn("day", date_format("datetime", "dd").cast('int'))
ctr1_temp = ctr1_temp.withColumn("hour", date_format("datetime", "HH").cast('int'))
ctr1_temp = ctr1_temp.withColumn("minute", date_format("datetime", "mm").cast('int'))
ctr1_temp = ctr1_temp.drop("datetime")

In [None]:
ctr1_temp.select('year').distinct().count()

In [None]:
ctr1_temp.select('month').distinct().count()

In [None]:
ctr1_temp.show()

In [None]:
# Encode cyclical month and days
#the columns 'year' and 'month' have only one distinct value. We can remove them from the database

# ctr1_temp = ctr1_temp.withColumn("month_sin", sin(2 * math.pi * ctr1_temp.month / 12))
# ctr1_temp = ctr1_temp.withColumn("month_cos", cos(2 * math.pi * ctr1_temp.month / 12))
ctr1_temp = ctr1_temp.withColumn("day_sin", sin(2 * math.pi * ctr1_temp.day / 31))
ctr1_temp = ctr1_temp.withColumn("day_cos", cos(2 * math.pi * ctr1_temp.day / 31))
ctr1_temp = ctr1_temp.drop(*["month", "day", "year"])

# Encode cyclical hours and minutes

ctr1_temp = ctr1_temp.withColumn("hour_sin", sin(2 * math.pi * ctr1_temp.hour / 24))
ctr1_temp = ctr1_temp.withColumn("hour_cos", cos(2 * math.pi * ctr1_temp.hour / 24))
ctr1_temp = ctr1_temp.withColumn("minute_sin", sin(2 * math.pi * ctr1_temp.minute / 60))
ctr1_temp = ctr1_temp.withColumn("minute_cos", cos(2 * math.pi * ctr1_temp.minute / 60))
ctr1_temp = ctr1_temp.drop(*["hour", "minute"])

In [None]:
ctr1_temp.show()

In [None]:
ctr1_temp.printSchema()

In [None]:
ctr1_temp.show()

# Feature extraction

In [None]:
# ctr1_temp = ctr1_temp.withColumn("year", date_format("datetime", "yyyy").cast('int'))    
# ctr1_temp = ctr1_temp.withColumn("month", date_format("datetime", "MM").cast('int')) 
# ctr1_temp = ctr1_temp.withColumn("day", date_format("datetime", "DD").cast('int')) 
# ctr1_temp=ctr1_temp.drop("datetime")
# ctr1_temp.show()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col
from pyspark.ml.feature import Imputer
from pyspark.sql.functions import when


In [None]:
categoricalCols = ['product', 'gender']
# textCols = ['ename_job']
#fill missing values
nanCols = ['user_group_id', 'user_depth', 'age_level']

In [None]:
# Table
# Fill missing values with median
imputer_median = Imputer(
    inputCols=nanCols, outputCols=["{}_imputed".format(c) for c in nanCols]
    ).setStrategy("median")

# Add imputation cols to df
ctr1_temp = imputer_median.fit(ctr1_temp).transform(ctr1_temp)
ctr1_temp = ctr1_temp.drop(*["user_group_id", "user_depth","age_level"])

# Fill missing values with mode
# imputer_mode = Imputer(
#     inputCols=["gender"], outputCols=["gender_imputed"]).setStrategy("mode")
# # ctr1_temp.fillna("Male", subset=['gender'])

# # Add imputation cols to df
# ctr1_temp = imputer_mode.fit(ctr1_temp).transform(ctr1_temp)
# ctr1_temp = ctr1_temp.drop("gender")

# # Функция для расчета моды
# def fill_with_mode(df, column):
#     mode = df.groupBy(column).count().orderBy(col("count").desc()).collect()[0][0]
#     df = df.withColumn(column, when(col(column).isNull(), mode).otherwise(col(column)))
#     return df

# def fill_with_mode(df, column):
#     mode_value = df.groupBy(column).count().orderBy(col("count").desc()).first()[0]
#     return df.fillna({column: mode_value})

# Заполните пропущенные значения модой
# ctr1_temp = fill_with_mode(ctr1_temp, "gender")
# mode_value = ctr1_temp.groupBy("gender").count().orderBy(col("count").desc()).first()[0]
# ctr1_temp = ctr1_temp.fillna(mode_value, "gender")


# Замена пустых значений (например, пустых строк) на указанное значение
ctr1_temp = ctr1_temp.withColumn("gender", when(ctr1_temp["gender"] == "NaN", 'Male').otherwise(ctr1_temp["gender"]))
#drop these columns
ctr1_temp = ctr1_temp.drop(*["product_category_2", "city_development_index"])

In [None]:
# ctr1_temp = ctr1_temp.drop(*["product_category_2", "city_development_index"])

In [None]:
ctr1_temp.show()

In [None]:
from pyspark.sql.functions import isnan, when, count, col

# Проверка на наличие значений NaN в DataFrame
ctr1_temp.select([count(when(isnan(c), c)).alias(c) for c in ctr1_temp.columns]).show()

In [None]:
# indexers_main = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_main]
# pipeline = Pipeline(stages=indexers_main)
# main = pipeline.fit(main).transform(main).drop(*encode_main + ["id"])

#Encode categorical columns
indexers_temp = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols]
pipeline = Pipeline(stages=indexers_temp)
ctr1_temp = pipeline.fit(ctr1_temp).transform(ctr1_temp).drop(*categoricalCols + ["session_id"])

In [None]:
ctr1_temp.show()

In [None]:
# ctr1_temp.printSchema()

In [None]:
import pandas as pd
#and t[0] != 'month_cos' and t[0] != 'day_cos'
numeric_features = [t[0] for t in ctr1_temp.dtypes if (t[0] != 'is_click' and t[0] != 'hour_cos' and t[0] != 'minute_cos') and (t[1] == 'int' or t[1] == 'float'or t[1] == 'double')]
ctr1_temp.select(numeric_features).describe().toPandas().transpose()

In [None]:
# numeric_data = ctr1_temp.select(numeric_features).toPandas()
# axs = pd.plotting.scatter_matrix(numeric_data, figsize=(12, 12));
# n = len(numeric_data.columns)

# for i in range(n):
#     v = axs[i, 0]
#     v.yaxis.label.set_rotation(0)
#     v.yaxis.label.set_ha('right')
#     v.set_yticks(())
#     h = axs[n-1, i]
#     h.xaxis.label.set_rotation(90)
#     h.set_xticks(())

In [None]:
# ctr1_temp.schema.names

In [None]:
# # Encode cyclical month and days
# ctr1_temp["month_sin"] = sin(2 * math.pi * ctr1_temp.month / 12)
# ctr1_temp["month_cos"] = cos(2 * math.pi * ctr1_temp.month / 12)
# ctr1_temp["day_sin"] = sin(2 * math.pi * ctr1_temp.day / 31)
# ctr1_temp["day_cos"] = cos(2 * math.pi * ctr1_temp.day / 31)

# # Encode cyclical month and days
# ctr1_temp = ctr1_temp.withColumn("month_sin", sin(2 * math.pi * ctr1_temp.month / 12))
# ctr1_temp = ctr1_temp.withColumn("month_cos", cos(2 * math.pi * ctr1_temp.month / 12))
# ctr1_temp = ctr1_temp.withColumn("day_sin", sin(2 * math.pi * ctr1_temp.day / 31))
# ctr1_temp = ctr1_temp.withColumn("day_cos", cos(2 * math.pi * ctr1_temp.day / 31))
# ctr1_temp = ctr1_temp.drop(*["month", "day"])


# Assemble all features into single column
assembler = VectorAssembler(inputCols=[i for i in ctr1_temp.schema.names if i != "is_click"], outputCol="features")
pipeline = Pipeline(stages=[assembler])
# ctr1_temp = pipeline.fit(ctr1_temp).transform(ctr1_temp)
# ctr1_temp = ctr1_temp.select(["label", "features"])
# # .withColumnRenamed("sales", "label")

# # Display final table
# ctr1_temp.show()

In [None]:
ctr1_temp.show()

In [None]:
ctr1_temp = pipeline.fit(ctr1_temp).transform(ctr1_temp)
# ctr1_temp = ctr1_temp.select(["label", "features"])
# # .withColumnRenamed("sales", "label")

# # Display final table
# ctr1_temp.show()

In [None]:
ctr1_temp = ctr1_temp.select(["is_click", "features"]).withColumnRenamed("is_click", "label")

# Display final table
ctr1_temp.show()

In [None]:
# # from pyspark.ml import Pipeline
# # from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
# # from pyspark.sql.functions import col
# # from pyspark.ml.feature import Imputer

# categoricalCols = ['product', 'gender']
# # textCols = ['ename_job']
# #fill missing values
# nanCols = ['user_group_id', 'user_depth', 'age_level']

# # Since the tokenizer only return tokens separated by white spaces, I used RegexTokenizer to tokenize by '_'
# # Then created word2Vec model

# # tokenizer = Tokenizer(inputCol="ename", outputCol="ename_tokens")
# # emps_tok = tokenizer.transform(emps)
# # tokenizer = RegexTokenizer(inputCol=textCols[0], outputCol="ename_job_tokens", pattern="_")
# # emps_tok = tokenizer.transform(emps)
# # emps_tok.show()

# # word2Vec = Word2Vec(vectorSize=5, seed=42, minCount=1, inputCol="ename_job_tokens", outputCol="ename_enc")
# # word2VecModel = word2Vec.fit(emps_tok)
# # print(word2VecModel)

# # emps_tok = word2VecModel.transform(emps_tok)
# # emps_tok.show()

# # Adding the encoded ename_job to the list of other columns
# # others += [ename_enc]


# # Create String indexer to assign index for the string fields where each unique string will get a unique index
# # String Indexer is required as an input for One-Hot Encoder 
# # We set the case as `skip` for any string out of the input strings
# # indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ]

# # Encode the strings using One Hot encoding
# # default setting: dropLast=True ==> For example with 5 categories, an input value of 2.0 would map to an output vector of [0.0, 0.0, 1.0, 0.0]. The last category is not included by default (configurable via dropLast), because it makes the vector entries sum up to one, and hence linearly dependent. So an input value of 4.0 maps to [0.0, 0.0, 0.0, 0.0].
# # encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]

# # This will concatenate the input cols into a single column.
# # assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol= "features")

# # You can create a pipeline to use only a single fit and transform on the data.
# # pipeline = Pipeline(stages=[tokenizer, word2Vec] + indexers + encoders + [assembler])


# # Fit the pipeline ==> This will call the fit functions for all transformers if exist
# # model=pipeline.fit(emps)
# # Fit the pipeline ==> This will call the transform functions for all transformers
# # data = model.transform(emps)

# # data.show()


# #Сашин код
# ###################################################
# # # Table main
# # # Encode categorical features in table 
# # mainindexers_main = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_main]
# # pipeline = Pipeline(stages=indexers_main)main = pipeline.fit(main).transform(main).drop(*encode_main + ["id"])
# # # Table oil
# # # Fill missing values in oil table with average of 
# # neighborswindow = Window.rowsBetween(-1, 1)
# # oil = oil.withColumn("avg_dcoilwtico", avg(oil["dcoilwtico"]).over(window))oil = oil.withColumn("dcoilwtico", coalesce(oil["dcoilwtico"], oil["avg_dcoilwtico"]))
# # oil = oil.drop("avg_dcoilwtico")
# # # Table holidays_events# Convert boolean "transferred" column to integer column
# # hol_events = hol_events.withColumn('transferred', when(hol_events.transferred==True, 1).otherwise(0))
# # # Encode categorical features in table 
# # holidays_eventsindexers_hol_events = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_hol_events]
# # pipeline = Pipeline(stages=indexers_hol_events)hol_events = pipeline.fit(hol_events).transform(hol_events).drop(*(encode_hol_events + ["description", "id"]))
# # # Table stores
# # # Encode categorical features in table 
# # storesindexers_stores = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_stores]
# # pipeline = Pipeline(stages=indexers_stores)stores = pipeline.fit(stores).transform(stores).drop(*encode_stores)

# # # We delete all features and keep only the features and label columns
# # data = data.select(["features", "label"])






# ctr1_temp = ctr

# # Table main
# # Encode categorical features in table main
# #Это безобразие у меня сработало!!!!
# indexers_temp = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols]
# pipeline = Pipeline(stages=categoricalCols)
# ctr1_temp = pipeline.fit(ctr1_temp).transform(ctr1_temp).drop(*categoricalCols + ["session_id"])
# #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!




# # Table
# # Fill missing values in oil table with median

# imputer = Imputer(
#     inputCols=nanCols, outputCols=["{}_imputed".format(c) for c in nanCols]
#     ).setStrategy("median")

# # Add imputation cols to df
# ctr1_temp = imputer.fit(ctr1_temp).transform(ctr1_temp)

# # Замена пустых значений (например, пустых строк) на указанное значение
# ctr1_temp = ctr1_temp.withColumn("gender", when(ctr1_temp["gender"] == "NaN", 'Male').otherwise(ctr1_temp["gender"]))
# #drop these columns
# ctr1_temp = ctr1_temp.drop(*["product_category_2", "city_development_index"])

# # window = Window.rowsBetween(-1, 1)
# # oil = oil.withColumn("avg_dcoilwtico", avg(oil["dcoilwtico"]).over(window))
# # oil = oil.withColumn("dcoilwtico", coalesce(oil["dcoilwtico"], oil["avg_dcoilwtico"]))
# # oil = oil.drop(*["avg_dcoilwtico", "id"])

# # Table holidays_events
# # Convert boolean "transferred" column to integer column
# # hol_events = hol_events.withColumn('transferred', when(hol_events.transferred, 1).otherwise(0)).drop("id")

# # Encode categorical features in table holidays_events
# # indexers_hol_events = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_hol_events]
# # pipeline = Pipeline(stages=indexers_hol_events)
# # hol_events = pipeline.fit(hol_events).transform(hol_events).drop(*(encode_hol_events + ["description", "id"]))

# # Table stores
# # Encode categorical features in table stores
# # stores = stores.withColumnRenamed("type", "type_store")
# # indexers_stores = [StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in encode_stores]
# # pipeline = Pipeline(stages=indexers_stores)
# # stores = pipeline.fit(stores).transform(stores).drop(*encode_stores)

# # Table transactions
# # transactions = transactions.drop("id")

# # Join tables
# # total_data = main. \
# #     join(oil, on="dates"). \
# #     join(hol_events, on="dates"). \
# #     join(transactions, on=["dates", "store_nbr"]). \
# #     join(stores, on="store_nbr")

# # Split dates to year, month, day
# # total_data = total_data. \
# #     withColumn("year", date_format("dates", "yyyy").cast('int')). \
# #     withColumn("month", date_format("dates", "MM").cast('int')). \
# #     withColumn("day", date_format("dates", "DD").cast('int')). \
# #     drop("dates")

# # Encode cyclical month and days
# ctr1_temp["month_sin"] = sin(2 * math.pi * ctr1_temp.month / 12)
# ctr1_temp["month_cos"] = cos(2 * math.pi * ctr1_temp.month / 12)
# ctr1_temp["day_sin"] = sin(2 * math.pi * ctr1_temp.day / 31)
# ctr1_temp["day_cos"] = cos(2 * math.pi * ctr1_temp.day / 31)

# # Assemble all features into single column
# assembler = VectorAssembler(inputCols=[i for i in ctr1_temp.schema.names if i != "label"], outputCol="features")
# pipeline = Pipeline(stages=[assembler])
# ctr1_temp = pipeline.fit(ctr1_temp).transform(ctr1_temp)
# ctr1_temp = ctr1_temp.select(["is_click", "features"]).withColumnRenamed("is_click", "label")

# # Display final table
# ctr1_temp.show()

# ###########################################################################



# # from pyspark.ml.feature import VectorIndexer

# # Automatically identify categorical features, and index them.
# # We specify maxCategories so features with > 4
# # distinct values are treated as continuous.
# # featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
# # transformed = featureIndexer.transform(data)

# # Display the output Spark DataFrame
# # transformed.show()

# Split the dataset

In [None]:
#  split the data into 60% training and 40% test (it is not stratified)
(train_data, test_data) = ctr1_temp.randomSplit([0.6, 0.4], seed = 10)

def run(command):
    import os
    # return os.popen(command).read()
    #поменять потом, когда соберу весь пайплайн
    return os.popen("cd ..\n" + command).read()


train_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

# First model

## Build a model

In [None]:
from pyspark.ml.classification import LogisticRegression
# Create Linear Classification Model
lr = LogisticRegression()

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)

## Predict for test data

In [None]:
predictions = model_lr.transform(test_data)
predictions.show()

## Evaluate the model

In [None]:
# !pip install matplotlib

In [None]:
#надо дропнуть никто не шарит 
import matplotlib.pyplot as plt
import numpy as np

beta = np.sort(model_lr.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

Summarize the model over the training set, we can also obtain the ROC Receiver-Operating Characteristic) and the Area under ROC (areaUnderROC).

In [None]:
trainingSummary = model_lr.summary
lrROC = trainingSummary.roc.toPandas()

plt.plot(lrROC['FPR'],lrROC['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()

print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

Precision and recall

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
# from pyspark.ml.evaluation import BinaryClassificationEvaluator 

# lrEval = BinaryClassificationEvaluator()
# evaluator1_area_ROC = lrEval.evaluate(predictions)
# print('Area Under ROC on test data = {}'.format(evaluator1_area_ROC))

from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator 

# Evaluate the performance of the model
lrEval = BinaryClassificationEvaluator(labelCol='label')
# evaluator2_area_ROC = lrEval.evaluate(predictions)
auroc = lrEval.evaluate(predictions,{lrEval.metricName:'areaUnderROC'})
aupr = lrEval.evaluate(predictions,{lrEval.metricName:'areaUnderPR'})

print(f"AUROC: {auroc}")
print(f"AUPR: {aupr}")

# Initialize the MulticlassClassificationEvaluator
evaluator1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Compute accuracy, precision, recall, and F1 score
accuracy1 = evaluator1.evaluate(predictions, {evaluator1.metricName: "accuracy"})
precision1 = evaluator1.evaluate(predictions, {evaluator1.metricName: "weightedPrecision"})
recall1 = evaluator1.evaluate(predictions, {evaluator1.metricName: "weightedRecall"})
f11 = evaluator1.evaluate(predictions, {evaluator1.metricName: "f1"})

# Print the metrics

print(f"Accuracy: {accuracy1}")
print(f"Precision: {precision1}")
print(f"Recall: {recall1}")
print(f"F1 Score: {f11}")

## Hyperparameter optimization

In [None]:
model_lr.params

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


# grid = ParamGridBuilder()
# grid = grid.addGrid(
#                     model_lr.aggregationDepth, [2, 3, 4])\
#                     .addGrid(model_lr.regParam, np.logspace(1e-3,1e-1)
#                     )\
#                     .build()




# Создайте сетку параметров
paramGrid = ParamGridBuilder() \
    .addGrid(model_lr.regParam, [0.1, 0.01]) \
    .addGrid(model_lr.fitIntercept, [False, True]) \
    .addGrid(model_lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Создайте экземпляр CrossValidator
cv = CrossValidator(estimator = lr, 
                    estimatorParamMaps = paramGrid, 
                    evaluator = lrEval,
                    parallelism = 5,
                    numFolds=3)

# Обучите модель на тренировочных данных
cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

# Предскажите на тестовых данных
# predictions = cvModel.transform(testData)

## Best model 1


In [None]:
from pprint import pprint
model1 = bestModel
pprint(model1.extractParamMap())

## Save the model to HDFS

In [None]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

## Predict for test data using best model1

In [None]:
predictions = model1.transform(test_data)
predictions.show()

In [None]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

## Evaluate the best model1

In [None]:
# from pyspark.ml.evaluation import RegressionEvaluator 

# # Evaluate the performance of the model
# evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
# evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# rmse1 = evaluator1_rmse.evaluate(predictions)
# r21 = evaluator1_r2.evaluate(predictions)

# print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
# print("R^2 on test data = {}".format(r21))



from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Evaluate the performance of the model
lrEval1 = BinaryClassificationEvaluator(labelCol='label')
# evaluator2_area_ROC = lrEval.evaluate(predictions)
auroc1 = lrEval1.evaluate(predictions,{lrEval1.metricName:'areaUnderROC'})
aupr1 = lrEval1.evaluate(predictions,{lrEval1.metricName:'areaUnderPR'})

print(f"AUROC: {auroc1}")
print(f"AUPR: {aupr1}")
# lrEval = BinaryClassificationEvaluator()
# evaluator1_area_ROC = lrEval.evaluate(predictions)
# print('Area Under ROC on test data = {}'.format(evaluator1_area_ROC))

# Initialize the MulticlassClassificationEvaluator
evaluator1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Compute accuracy, precision, recall, and F1 score
accuracy1 = evaluator1.evaluate(predictions, {evaluator1.metricName: "accuracy"})
precision1 = evaluator1.evaluate(predictions, {evaluator1.metricName: "weightedPrecision"})
recall1 = evaluator1.evaluate(predictions, {evaluator1.metricName: "weightedRecall"})
f11 = evaluator1.evaluate(predictions, {evaluator1.metricName: "f1"})

# Print the metrics

print(f"Accuracy: {accuracy1}")
print(f"Precision: {precision1}")
print(f"Recall: {recall1}")
print(f"F1 Score: {f11}")

# Second model

## Build a model

In [None]:
from pyspark.ml.classification import GBTClassifier

# Create Linear Regression Model
gbt = GBTClassifier()

# Fit the data to the pipeline stages
model_gbt = gbt.fit(train_data)


# from pyspark.ml.classification import GBTClassifier

# gbt = GBTClassifier()
# gbtModel = gbt.fit(train)
# gbtPreds = gbtModel.transform(test)

## Predict for test data

In [None]:
predictions = model_gbt.transform(test_data)
predictions.show()

## Evaluate the model

In [None]:
# from pyspark.ml.evaluation import RegressionEvaluator 

# # Evaluate the performance of the model
# evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
# evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# rmse2 = evaluator2_rmse.evaluate(predictions)
# r22 = evaluator2_r2.evaluate(predictions)

# print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
# print("R^2 on test data = {}".format(r22))

from pyspark.ml.evaluation import BinaryClassificationEvaluator 

# Evaluate the performance of the model
lrEval2 = BinaryClassificationEvaluator(labelCol='label')
# evaluator2_area_ROC = lrEval.evaluate(predictions)
auroc2 = lrEval2.evaluate(predictions,{lrEval2.metricName:'areaUnderROC'})
aupr2 = lrEval2.evaluate(predictions,{lrEval2.metricName:'areaUnderPR'})

print(f"AUROC: {auroc2}")
print(f"AUPR: {aupr2}")
# print('Area Under ROC on test data = {}'.format(evaluator2_area_ROC))


# Evaluate the model using a BinaryClassificationEvaluator
# evaluator = BinaryClassificationEvaluator(labelCol='label')
# auroc = evaluator.evaluate(prediction,{evaluator.metricName:'areaUnderROC'})
# aupr = evaluator.evaluate(prediction,{evaluator.metricName:'areaUnderPR'})

# Initialize the MulticlassClassificationEvaluator
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Compute accuracy, precision, recall, and F1 score
accuracy2 = evaluator2.evaluate(predictions, {evaluator2.metricName: "accuracy"})
precision2 = evaluator2.evaluate(predictions, {evaluator2.metricName: "weightedPrecision"})
recall2 = evaluator2.evaluate(predictions, {evaluator2.metricName: "weightedRecall"})
f12 = evaluator2.evaluate(predictions, {evaluator2.metricName: "f1"})

# Print the metrics

print(f"Accuracy: {accuracy2}")
print(f"Precision: {precision2}")
print(f"Recall: {recall2}")
print(f"F1 Score: {f12}")

## Hyperparameter optimization

In [None]:
model_gbt.params

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
grid = grid.addGrid(model_gbt.maxDepth, [2, 5, 10]).addGrid(model_gbt.maxBins, [20, 30]).build()

cv = CrossValidator(estimator = gbt, 
                    estimatorParamMaps = grid, 
                    evaluator = lrEval2,
                    parallelism = 5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

## Best model 2


In [None]:
from pprint import pprint
model2 = bestModel
pprint(model2.extractParamMap())

## Save the model to HDFS

In [None]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

## Predict for test data using best model2

In [None]:
predictions = model2.transform(test_data)
predictions.show()

In [None]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

## Evaluate the best model2

In [None]:
# from pyspark.ml.evaluation import RegressionEvaluator 

# # Evaluate the performance of the model
# evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
# evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

# rmse2 = evaluator2_rmse.evaluate(predictions)
# r22 = evaluator2_r2.evaluate(predictions)

# print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
# print("R^2 on test data = {}".format(r22))

from pyspark.ml.evaluation import BinaryClassificationEvaluator 

# Evaluate the performance of the model
lrEval2 = BinaryClassificationEvaluator(labelCol='label')
# evaluator2_area_ROC = lrEval.evaluate(predictions)
auroc2 = lrEval2.evaluate(predictions,{lrEval2.metricName:'areaUnderROC'})
aupr2 = lrEval2.evaluate(predictions,{lrEval2.metricName:'areaUnderPR'})
# print('Area Under ROC on test data = {}'.format(evaluator2_area_ROC))


# Evaluate the model using a BinaryClassificationEvaluator
# evaluator = BinaryClassificationEvaluator(labelCol='label')
# auroc = evaluator.evaluate(prediction,{evaluator.metricName:'areaUnderROC'})
# aupr = evaluator.evaluate(prediction,{evaluator.metricName:'areaUnderPR'})

print(f"AUROC: {auroc2}")
print(f"AUPR: {aupr2}")

# Initialize the MulticlassClassificationEvaluator
evaluator2 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Compute accuracy, precision, recall, and F1 score
accuracy2 = evaluator2.evaluate(predictions, {evaluator2.metricName: "accuracy"})
precision2 = evaluator2.evaluate(predictions, {evaluator2.metricName: "weightedPrecision"})
recall2 = evaluator2.evaluate(predictions, {evaluator2.metricName: "weightedRecall"})
f12 = evaluator2.evaluate(predictions, {evaluator2.metricName: "f1"})

# Print the metrics

print(f"Accuracy: {accuracy2}")
print(f"Precision: {precision2}")
print(f"Recall: {recall2}")
print(f"F1 Score: {f12}")

# Third Model

## Model training

In [None]:
# from pyspark.ml.regression import LinearRegression
# # Create Linear Regression Model
# lr = LinearRegression()

# # Fit the data to the lr model
# model_lr = lr.fit(train_data)

from pyspark.ml.classification import RandomForestClassifier
# Create Random Forest Model
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')

# Fit the data to the rf model
rfModel = rf.fit(train_data)

## Prediction

In [None]:
# Transform the data (Prediction)
# predictions = model_lr.transform(testData)
predictions = rfModel.transform(test_data)
# Display the predictions
predictions.show()

## Evaluation

In [None]:
# from pyspark.ml.evaluation import RegressionEvaluator 

# # Evaluate the performance of the model
# evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
# rmse = evaluator.evaluate(predictions)
# print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

from pyspark.ml.evaluation import BinaryClassificationEvaluator 

# Evaluate the performance of the model
lrEval3 = BinaryClassificationEvaluator(labelCol='label')
# evaluator2_area_ROC = lrEval.evaluate(predictions)
auroc3 = lrEval3.evaluate(predictions,{lrEval3.metricName:'areaUnderROC'})
aupr3 = lrEval3.evaluate(predictions,{lrEval3.metricName:'areaUnderPR'})

print(f"AUROC: {auroc3}")
print(f"AUPR: {aupr3}")


In [None]:
# !pip install sklearn

In [None]:
# Initialize the MulticlassClassificationEvaluator
evaluator3 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")

# Compute accuracy, precision, recall, and F1 score
accuracy3 = evaluator3.evaluate(predictions, {evaluator3.metricName: "accuracy"})
precision3 = evaluator3.evaluate(predictions, {evaluator3.metricName: "weightedPrecision"})
recall3 = evaluator3.evaluate(predictions, {evaluator3.metricName: "weightedRecall"})
f13 = evaluator3.evaluate(predictions, {evaluator3.metricName: "f1"})

# Print the metrics

print(f"Accuracy: {accuracy}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")

In [None]:
# Returns confusion matrix: predicted classes are in columns, they are ordered by class label ascending, as in “labels”.
# Meaning 0 1 in columns. Rows also 0 then 1.

# Evaluate using MultiClass Metrics
pred_rdd = predictions.select('prediction','label').rdd.map(tuple)
metrics = MulticlassMetrics(pred_rdd)

print(metrics.confusionMatrix())

confusion_matrix = metrics.confusionMatrix().toArray()
labels = ['Class 0', 'Class 1']
fig = plt.figure()
ax = fig.add_subplot(111)
cax = ax.matshow(confusion_matrix, cmap=plt.cm.Blues)
# Add actual values to the cells
for i in range(len(labels)):
    for j in range(len(labels)):
        plt.text(j, i, str(int(confusion_matrix[i, j])), fontsize=12, ha='center', va='center', color='red')
fig.colorbar(cax)
ax.set_xticklabels([''] + labels)
ax.set_yticklabels([''] + labels)
plt.xlabel('Predicted')
plt.ylabel('Expected')
plt.show()

In [None]:
# from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
# from pyspark.mllib.evaluation import MulticlassMetrics

# from sklearn.metrics import confusion_matrix

# # Evaluate using MultiClass Metrics
# pred_rdd = predictions.select('prediction','label').rdd.map(tuple)
# metrics = MulticlassMetrics(pred_rdd)
# accuracy_o = metrics.accuracy   # Positive class
# precision_o = metrics.precision(1.0)  # Positive class
# recall_o = metrics.recall(1.0)  # Positive class
# f1_o = metrics.fMeasure(1.0)  # Positive class

# print("Accuracy:", accuracy_o)
# print("Precision:", precision_o)
# print("Recall:", recall_o)
# print("F1-score:", f1_o)

## Hyperparameter optimization

In [None]:
rfModel.params

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
# grid = grid.addGrid(
#                     model_lr.aggregationDepth, [2, 3, 4])\
#                     .addGrid(model_lr.regParam, np.logspace(1e-3,1e-1)
#                     )\
#                     .build()

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

cv = CrossValidator(estimator = rf, 
                    estimatorParamMaps = grid, 
                    evaluator = lrEval3,
                    parallelism = 5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

## Select best model

In [None]:
from pprint import pprint
model3 = bestModel
pprint(model3.extractParamMap())

## Save the model to HDFS

In [None]:
model3.write().overwrite().save("project/models/model3")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model3 models/model3")

## Prediction

In [None]:
predictions = model3.transform(test_data)
predictions.show()

predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model3_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model3_predictions.csv/*.csv > output/model3_predictions.csv")

## Evaluation

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse1 = evaluator1.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse1)

# Compare best models

In [None]:
models = [[str(model1),auroc1, aupr1, f11], [str(model2),auroc2, aupr2, f12], [str(model3),auroc3, aupr3, f13]]

df = spark.createDataFrame(models, ["model", "AUROC", "AUPR", "F1"])
df.show(truncate=False)

In [None]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")