In [None]:
# This cell is neccesary only if used spark in Windows
# with the correct configuration

#%pip install findspark

import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
import mlflow
experiment_name = "mlflow-experiment-ny-taxis"
mlflow.set_experiment(experiment_name)
mlflow.spark.autolog(disable=True)

In [None]:
spark = (
    SparkSession.builder
        .appName("Taxis Classifier to tripType")
        #.config("spark.jars.packages", "org.mlflow.mlflow-spark:1.11.0")
        .master("local[*]")
        .getOrCreate()
)

In [None]:
df = spark.read.csv("../data/taxis.csv", header=True)

In [None]:
df.show()

In [None]:
#Check dimension's
print((df.count(),len(df.columns)))

In [None]:
#Check for the schema
df.printSchema()

In [None]:
#Datatypes of the columns
df.dtypes

In [None]:
#Drop unwanted columns
my_data = df.drop(*["_c0", "lpepPickupDatetime", "lpepDropoffDatetime", "ehailFee"])
my_data.columns

In [None]:
# get the dimensions of the data
(my_data.count() , len(my_data.columns))

In [None]:
my_data.describe().show()

In [None]:
# import sql function pyspark
import pyspark.sql.functions as f

# null values in each column
data_agg = my_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
data_agg.show()

In [None]:
# value counts of columns
for column in my_data.columns:
    print(column, ":")
    my_data.groupBy(column).count().show()

In [None]:
my_data.dtypes

In [None]:
#Preprocessing steps
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# create object of StringIndexer class and specify input and output column
for column in my_data.columns:
    SI = StringIndexer(inputCol=column, outputCol=column+'_Index', handleInvalid='skip')
    my_data = SI.fit(my_data).transform(my_data)
    print(column, "transformed")

In [None]:
# view the transformed data
my_data.select("vendorID", "vendorID_Index", "tipAmount", "tipAmount_Index").show(10)

In [None]:
inputCols = [column for column in my_data.columns if column.endswith('Index')]
inputCols

In [None]:
outputCols = [col+"_OHE" for col in my_data.columns if not col.endswith('Index')]
outputCols

In [None]:
# create object and specify input and output column
OHE = OneHotEncoder(inputCols=inputCols, outputCols=outputCols)

# transform the data
my_data = OHE.fit(my_data).transform(my_data)

# view and transform the data
my_data.select('vendorID', 'vendorID_Index', 'vendorID_OHE','tipAmount','tipAmount_Index','tipAmount_OHE').show(10)


In [None]:
my_data.show(5)

In [None]:
cols_to_vector =[val[0] for val in my_data.dtypes if val[1] != 'string' and not val[0].startswith("tripType")]
len(cols_to_vector)


In [None]:
# specify the input and output columns of the vector assembler
assembler = VectorAssembler(inputCols=cols_to_vector,
                           outputCol='features')

# fill the null values
my_data = my_data.fillna(0)

# transform the data
final_data = assembler.transform(my_data)

In [None]:
# view the transformed vector
final_data.select('features','tripType_Index').show()

In [None]:
final_data.groupby("tripType_Index").count().show()

In [None]:
f_data_agg = final_data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in my_data.columns])
f_data_agg.show()

In [None]:
#Model_Dataframe
model_df = final_data.select(['features','tripType_Index'])
model_df = model_df.withColumnRenamed("tripType_Index","label")
model_df.printSchema()

In [None]:
#Split into training & testing Dataframe
training_df,test_df = model_df.randomSplit([0.75,0.25])

In [None]:
#Create a logistic regression model object
from pyspark.ml.classification import LogisticRegression
with mlflow.start_run():
    log_reg=LogisticRegression().fit(training_df)
    lr_summary=log_reg.summary
    mlflow.log_metric("accuracy",lr_summary.accuracy)
    mlflow.log_metric("recall",lr_summary.weightedRecall)
    mlflow.log_metric("precision",lr_summary.weightedPrecision) 
    mlflow.log_metric("f1",lr_summary.weightedFMeasure())
    mlflow.log_metric("area under ROC",lr_summary.areaUnderROC)
    mlflow.spark.log_model(log_reg, "model")

#log_reg.save("model")

In [None]:
lr_summary=log_reg.summary

In [None]:
#Overall accuracy of the classification model
lr_summary.accuracy

In [None]:
#Area under ROC
lr_summary.areaUnderROC

In [None]:
#Precision of both classes
print(lr_summary.precisionByLabel)

In [None]:
#Recall of both classes
print(lr_summary.recallByLabel)

In [None]:
#Get Preditions
predictions = log_reg.transform(test_df)

In [None]:
predictions.select('label','prediction').show(50)

In [None]:
with mlflow.start_run():
    rf_clf = RandomForestClassifier().fit(training_df)
    rf_summary=rf_clf.summary
    mlflow.log_metric("accuracy",rf_summary.accuracy)
    mlflow.log_metric("recall",rf_summary.weightedRecall)
    mlflow.log_metric("precision",rf_summary.weightedPrecision) 
    mlflow.log_metric("f1",rf_summary.weightedFMeasure())
    mlflow.log_metric("area under ROC",rf_summary.areaUnderROC)
    mlflow.spark.log_model(rf_clf,"model")

In [None]:
rf_summary = rf_clf.summary

In [None]:
rf_summary.accuracy

In [None]:
rf_summary.areaUnderROC

In [None]:
rf_summary.precisionByLabel