### Intasll packages

In [None]:
!pip install google.cloud pandas

### Set env variables

In [None]:
%env IAP_CLIENT_ID="389410459067-mltiuc7631od8mhp9aokhb03qdlj81qp.apps.googleusercontent.com"

In [None]:
import os
import subprocess
mlflow_token=subprocess.getoutput("""curl -s -X POST -H "content-type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" -d "{\"audience\": \"${IAP_CLIENT_ID}\", \"includeEmail\": true }" "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/$(gcloud auth list --filter=status:ACTIVE --format='value(account)'):generateIdToken"  | jq -r '.token'""")
os.environ['MLFLOW_TRACKING_TOKEN'] = mlflow_token

In [None]:
%env MLFLOW_TRACKING_URI=https://mlflow-dot-tbd-2023l-mlops.ew.r.appspot.com/

### Test connectivity with MLflow tracking server

In [None]:
%%bash 
mlflow experiments search

### Prepare training data

In [None]:
%%bash
gsutil mb -l europe-west1 gs://tbd-2023l-2001-data

In [None]:
%%bash
curl -L https://github.com/datascienceverse/stack-overflow-dataset-2022/raw/master/survey_results_public.csv | gsutil cp - gs://tbd-2023l-2001-data/survey_results_public.csv

In [None]:
%%bash
gsutil du -h gs://tbd-2023l-2001-data/survey_results_public.csv

### GCS connector

In [None]:
%%bash
wget https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.9/gcs-connector-hadoop3-2.2.9-shaded.jar

### Spark session

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master('yarn') \
.config('spark.executor.instance',2) \
.config('spark.jars','gcs-connector-hadoop3-2.2.9-shaded.jar') \
.config('spark.jars.packages','org.mlflow:mlflow-spark:1.11.0') \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

In [None]:
spark

In [None]:
db_name = "tbd"
gs_path = "gs://tbd-2023l-2001-data/survey_results_public.csv"
spark.sql(f'DROP DATABASE IF EXISTS {db_name} CASCADE')
spark.sql(f'CREATE DATABASE {db_name}')
spark.sql(f'USE {db_name}')
table_name = "survey_2022" 

spark.sql(f'DROP TABLE IF EXISTS {table_name}')

spark.sql(f'CREATE TABLE IF NOT EXISTS {table_name} \
          USING csv \
          OPTIONS (HEADER true, INFERSCHEMA true, NULLVALUE "NA") \
          LOCATION "{gs_path}"')

spark_df= spark.sql(f'SELECT *, CAST((ConvertedCompYearly > 60000) AS STRING) AS compAboveAvg \
                    FROM {table_name} WHERE ConvertedCompYearly IS NOT NULL ')

In [None]:
spark_df.printSchema()

In [None]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
y = 'compAboveAvg' 
feature_columns = ['OpSys', 'EdLevel', 'MainBranch' , 'Country', 'YearsCode']

stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c).setHandleInvalid("keep") for c in feature_columns]
stringindexer_stages += [StringIndexer(inputCol=y, outputCol='label').setHandleInvalid("keep")]

onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in feature_columns]
extracted_columns = ['onehot_' + c for c in feature_columns]
vectorassembler_stage = VectorAssembler(inputCols=extracted_columns, outputCol='features') 

final_columns = [y] + feature_columns + extracted_columns + ['features', 'label']

transformed_df = Pipeline(stages=stringindexer_stages + \
                          onehotencoder_stages + \
                          [vectorassembler_stage]).fit(spark_df).transform(spark_df).select(final_columns)
training, test = transformed_df.randomSplit([0.8, 0.2], seed=1234) # Podzial na zbior treningowy/testowy

In [None]:
import mlflow   ## zaimportowanie modulu 
import mlflow.spark

## tworzymy nowy eksperyment - powinien się pojawić w UI ML Flow. 
## Jesli nie tworzymy nowego eksperymentu nowe przebiegi beda sie zapisywac pod domyslnym (default)
ename = f"tbd-2023l-2001"
artifacts_location= "artifacts"
mlflow.set_experiment(experiment_name=ename)
experiment = mlflow.get_experiment_by_name(ename)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator_prec = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_f = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedFMeasure")

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

with mlflow.start_run(experiment_id = experiment.experiment_id):
    dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')
    mlflow.set_tag("classifier", "decision_tree")  ## ustawienie tagow
    mlflow.log_param("depth", dt.getMaxDepth())    ## zapisanie metadanych - hiperparametrow

    dt_model = Pipeline(stages=[dt]).fit(training)
    pred_dt = dt_model.transform(test)
    label_and_pred = pred_dt.select('label', 'prediction')
    res = dt_model.transform(test)

    test_metric_acc = evaluator_acc.evaluate(res)
    test_metric_recall = evaluator_recall.evaluate(res)
    test_metric_prec = evaluator_prec.evaluate(res)
    test_metric_f = evaluator_f.evaluate(res)

    mlflow.log_metric(evaluator_acc.getMetricName(), test_metric_acc) 
    mlflow.log_metric(evaluator_recall.getMetricName(), test_metric_recall) 
    mlflow.log_metric(evaluator_prec.getMetricName(), test_metric_prec)     
    mlflow.log_metric(evaluator_f.getMetricName(), test_metric_f)
    mlflow.spark.log_model(dt_model, artifact_path=artifacts_location)

In [None]:
spark.stop()

In [None]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbt_model = gbt.fit(training)

with mlflow.start_run(experiment_id = experiment.experiment_id, run_name="gbt_model"):
  
    mlflow.log_param("depth", gbt.getMaxDepth())

    res = gbt_model.transform(test)
    
    test_metric_acc = evaluator_acc.evaluate(res)
    test_metric_recall = evaluator_recall.evaluate(res)
    test_metric_prec = evaluator_prec.evaluate(res)
    test_metric_f = evaluator_f.evaluate(res)

    mlflow.log_metric(evaluator_acc.getMetricName(), test_metric_acc) 
    mlflow.log_metric(evaluator_recall.getMetricName(), test_metric_recall) 
    mlflow.log_metric(evaluator_prec.getMetricName(), test_metric_prec)     
    mlflow.log_metric(evaluator_f.getMetricName(), test_metric_f) 
  
    mlflow.spark.log_model(spark_model=gbt_model, artifact_path='gbt_classifier') 

In [None]:
raw_training, raw_test = spark_df.randomSplit([0.8, 0.2], seed=1234)


In [None]:
import mlflow
from pyspark.sql.functions import struct

logged_model = 'runs:/e08c7cc74cad4df6a1adb0939dca91f9/gbt_classifier'
pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model)


In [None]:
predicted_df = raw_test.limit(10)\
    .withColumn("prediction", pyfunc_udf(struct('OpSys', 'EdLevel', 'MainBranch' , 'Country', 'YearsCode'))) \
    .select ('OpSys', 'EdLevel', 'MainBranch' , 'Country', 'YearsCode', 'prediction')
predicted_df.toPandas()

In [None]:
spark.stop()