In [None]:
import os
user_name = os.environ.get('USER')

from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

gs_path = f'gs://bucket-{user_name}/survey/2020/survey_results_public.csv'
db_name = user_name.replace('-','_')
spark.sql(f'DROP DATABASE IF EXISTS {db_name} CASCADE')
spark.sql(f'CREATE DATABASE {db_name}')
spark.sql(f'USE {db_name}')
table_name = "survey_2020" 

spark.sql(f'DROP TABLE IF EXISTS {table_name}')

spark.sql(f'CREATE TABLE IF NOT EXISTS {table_name} \
          USING csv \
          OPTIONS (HEADER true, INFERSCHEMA true, NULLVALUE "NA") \
          LOCATION "{gs_path}"')

# Przygotowanie danych do analizy

spark_df= spark.sql(f'SELECT *, CAST((convertedComp > 60000) AS STRING) AS compAboveAvg \
                    FROM {table_name} WHERE convertedComp IS NOT NULL ')

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
y = 'compAboveAvg'      # chcemy przewidziec compAboveAvg
feature_columns = ['OpSys', 'EdLevel', 'MainBranch' , 'Country', 'JobSeek', 'YearsCode']

stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c).setHandleInvalid("keep") for c in feature_columns]
stringindexer_stages += [StringIndexer(inputCol=y, outputCol='label').setHandleInvalid("keep")]

onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in feature_columns]

# Polaczenie wszystkich kolumn predykcyjnych do jednej (features) ASEMBLACJA
extracted_columns = ['onehot_' + c for c in feature_columns]
vectorassembler_stage = VectorAssembler(inputCols=extracted_columns, outputCol='features') 

# Polaczenie wszystkich krokow przygotowania danych w jednym potoku przetwarzania
final_columns = [y] + feature_columns + extracted_columns + ['features', 'label']

transformed_df = Pipeline(stages=stringindexer_stages + \
                          onehotencoder_stages + \
                          [vectorassembler_stage]).fit(spark_df).transform(spark_df).select(final_columns)
training, test = transformed_df.randomSplit([0.8, 0.2], seed=1234) # Podzial na zbior treningowy/testowy

## DRZEWO DECYZYJNE - bez parametrów

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')
dt_model = Pipeline(stages=[dt]).fit(training)
pred_dt = dt_model.transform(test)
label_and_pred = pred_dt.select('label', 'prediction')

In [None]:
## Ewaluacje
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
evaluator_auroc = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator_prec = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_f = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedFMeasure")

In [None]:
%env MLFLOW_TRACKING_URI=http://localhost:5000

In [None]:
import mlflow
import mlflow.spark
mlflow.set_experiment(experiment_name="Classifier")
experiment = mlflow.get_experiment_by_name('Classifier')

with mlflow.start_run(experiment_id = experiment.experiment_id, run_name="dt_model"):
  
    mlflow.log_param("depth", dt.getMaxDepth())

    test_metric_auroc = evaluator_auroc.evaluate(dt_model.transform(test))
    test_metric_acc = evaluator_acc.evaluate(dt_model.transform(test))
    test_metric_recall = evaluator_recall.evaluate(dt_model.transform(test))
    test_metric_prec = evaluator_prec.evaluate(dt_model.transform(test))
    test_metric_f = evaluator_f.evaluate(dt_model.transform(test))

    mlflow.log_metric(evaluator_auroc.getMetricName(), test_metric_auroc) 
    mlflow.log_metric(evaluator_acc.getMetricName(), test_metric_acc) 
    mlflow.log_metric(evaluator_recall.getMetricName(), test_metric_recall) 
    mlflow.log_metric(evaluator_prec.getMetricName(), test_metric_prec)     
    mlflow.log_metric(evaluator_f.getMetricName(), test_metric_f) 
  
    mlflow.spark.log_model(spark_model=dt_model, artifact_path='classifier') 

## Drzewo decyzyjne - walidacja krzyżowa

In [None]:
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(dt.maxDepth, [2,3,4,5,6]).\
    build()
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator_auroc, numFolds=4)

with mlflow.start_run(experiment_id = experiment.experiment_id, run_name="best_model"):
    cv_model = cv.fit(training)
  
    mlflow.log_param("depth", cv_model.bestModel.depth)

    test_metric_auroc = evaluator_auroc.evaluate(cv_model.bestModel.transform(test))
    test_metric_acc = evaluator_acc.evaluate(cv_model.bestModel.transform(test))
    test_metric_recall = evaluator_recall.evaluate(cv_model.bestModel.transform(test))
    test_metric_prec = evaluator_prec.evaluate(cv_model.bestModel.transform(test))
    test_metric_f = evaluator_f.evaluate(cv_model.bestModel.transform(test))

    mlflow.log_metric(evaluator_auroc.getMetricName(), test_metric_auroc) 
    mlflow.log_metric(evaluator_acc.getMetricName(), test_metric_acc) 
    mlflow.log_metric(evaluator_recall.getMetricName(), test_metric_recall) 
    mlflow.log_metric(evaluator_prec.getMetricName(), test_metric_prec)     
    mlflow.log_metric(evaluator_f.getMetricName(), test_metric_f) 
  
    mlflow.spark.log_model(spark_model=cv_model.bestModel, artifact_path='classifier') 


## Gradient Boosted Trees

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbt_model = gbt.fit(training)

with mlflow.start_run(experiment_id = experiment.experiment_id, run_name="gbt_model"):
  
    mlflow.log_param("depth", gbt.getMaxDepth())

    test_metric_auroc = evaluator_auroc.evaluate(gbt_model.transform(test))
    test_metric_acc = evaluator_acc.evaluate(gbt_model.transform(test))
    test_metric_recall = evaluator_recall.evaluate(gbt_model.transform(test))
    test_metric_prec = evaluator_prec.evaluate(gbt_model.transform(test))
    test_metric_f = evaluator_f.evaluate(gbt_model.transform(test))

    mlflow.log_metric(evaluator_auroc.getMetricName(), test_metric_auroc) 
    mlflow.log_metric(evaluator_acc.getMetricName(), test_metric_acc) 
    mlflow.log_metric(evaluator_recall.getMetricName(), test_metric_recall) 
    mlflow.log_metric(evaluator_prec.getMetricName(), test_metric_prec)     
    mlflow.log_metric(evaluator_f.getMetricName(), test_metric_f) 
  
    mlflow.spark.log_model(spark_model=gbt_model, artifact_path='classifier') 