In [1]:
import os
user_name = os.environ.get('USER')

from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

gs_path = f'gs://bdg-lab-{user_name}/survey/2020/survey_results_public.csv'
db_name = user_name.replace('-','_')
spark.sql(f'DROP DATABASE IF EXISTS {db_name} CASCADE')
spark.sql(f'CREATE DATABASE {db_name}')
spark.sql(f'USE {db_name}')
table_name = "survey_2020" 

spark.sql(f'DROP TABLE IF EXISTS {table_name}')

spark.sql(f'CREATE TABLE IF NOT EXISTS {table_name} \
          USING csv \
          OPTIONS (HEADER true, INFERSCHEMA true, NULLVALUE "NA") \
          LOCATION "{gs_path}"')

# Przygotowanie danych do analizy

spark_df= spark.sql(f'SELECT *, CAST((convertedComp > 60000) AS STRING) AS compAboveAvg \
                    FROM {table_name} WHERE convertedComp IS NOT NULL ')

In [2]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
y = 'compAboveAvg'      # chcemy przewidziec compAboveAvg
feature_columns = ['OpSys', 'EdLevel', 'MainBranch' , 'Country', 'JobSeek', 'YearsCode']

stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c).setHandleInvalid("keep") for c in feature_columns]
stringindexer_stages += [StringIndexer(inputCol=y, outputCol='label').setHandleInvalid("keep")]

onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in feature_columns]

# Polaczenie wszystkich kolumn predykcyjnych do jednej (features) ASEMBLACJA
extracted_columns = ['onehot_' + c for c in feature_columns]
vectorassembler_stage = VectorAssembler(inputCols=extracted_columns, outputCol='features') 

# Polaczenie wszystkich krokow przygotowania danych w jednym potoku przetwarzania
final_columns = [y] + feature_columns + extracted_columns + ['features', 'label']

transformed_df = Pipeline(stages=stringindexer_stages + \
                          onehotencoder_stages + \
                          [vectorassembler_stage]).fit(spark_df).transform(spark_df).select(final_columns)
training, test = transformed_df.randomSplit([0.8, 0.2], seed=1234) # Podzial na zbior treningowy/testowy

In [None]:
spark.stop()

In [49]:
import mlflow
import mlflow.spark
from pyspark.ml.classification import GBTClassifier

mlflow.set_experiment(experiment_name="test")
experiment = mlflow.get_experiment_by_name('test')

training, test = spark_df.randomSplit([0.8, 0.2], seed=1234) # Podzial na zbior treningowy/testowy
with mlflow.start_run(experiment_id = experiment.experiment_id):
    pipeline = Pipeline(stages=stringindexer_stages + \
                          onehotencoder_stages + \
                          [vectorassembler_stage] + [GBTClassifier(maxIter=2)] )
    model = pipeline.fit(training)
    mlflow.spark.log_model(spark_model=model, artifact_path='gbt_classifier')
    

In [50]:
#### serving as pandas udf 
import mlflow.pyfunc
from pyspark.sql.functions import struct

pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri="gs://bdg-lab-mwiewior/mlflow/artifacts/1/89e6825c999b43f681779fffe895ca6a/artifacts/gbt_classifier")
predicted_df = test.limit(10).withColumn("prediction", pyfunc_udf(struct('OpSys', 'EdLevel', 'MainBranch' , 'Country', 'JobSeek', 'YearsCode')) )
predicted_df.toPandas()


Unnamed: 0,Respondent,MainBranch,Hobbyist,Age,Age1stCode,CompFreq,CompTotal,ConvertedComp,Country,CurrencyDesc,...,Trans,UndergradMajor,WebframeDesireNextYear,WebframeWorkedWith,WelcomeChange,WorkWeekHrs,YearsCode,YearsCodePro,compAboveAvg,prediction
0,10,I am a developer by profession,Yes,22.0,14,Yearly,25000.0,32315.0,United Kingdom,Pound sterling,...,No,Mathematics or statistics,Flask;jQuery,Flask;jQuery,Somewhat more welcome now than last year,36.0,8,4,False,1.0
1,13,"I am not primarily a developer, but I write co...",Yes,53.0,14,Monthly,3000.0,38916.0,Netherlands,European Euro,...,No,,,,A lot less welcome now than last year,36.0,35,20,False,0.0
2,45,I am a developer by profession,Yes,22.0,13,Yearly,34000.0,36753.0,France,European Euro,...,No,"Computer science, computer engineering, or sof...",Angular;Flask,Angular;Flask;jQuery;Spring,A lot less welcome now than last year,35.0,9,2,False,0.0
3,48,I am a developer by profession,No,38.0,30,Yearly,125000.0,125000.0,United States,United States dollar,...,No,Fine arts or performing arts (such as graphic ...,Gatsby;Ruby on Rails,Gatsby;Ruby on Rails,,30.0,8,8,True,1.0
4,49,I am a developer by profession,No,23.0,15,Yearly,52000.0,52000.0,United States,United States dollar,...,No,"Computer science, computer engineering, or sof...",Angular;Angular.js;ASP.NET;ASP.NET Core,ASP.NET;ASP.NET Core,Just as welcome now as I felt last year,40.0,8,4,False,1.0
5,51,I am a developer by profession,No,,10,Yearly,137000.0,103615.0,Canada,Canadian dollar,...,,I never declared a major,,Django;React.js,Just as welcome now as I felt last year,35.0,23,7,True,1.0
6,52,I am a developer by profession,No,28.0,15,Yearly,70000.0,90482.0,United Kingdom,Pound sterling,...,No,"Computer science, computer engineering, or sof...",,,Just as welcome now as I felt last year,35.0,11,6,True,1.0
7,74,I am a developer by profession,Yes,34.0,8,Monthly,8000.0,124092.0,United Kingdom,Pound sterling,...,No,"Computer science, computer engineering, or sof...",Angular;ASP.NET;ASP.NET Core;Vue.js,Angular;Angular.js;ASP.NET;ASP.NET Core;Gatsby,Somewhat more welcome now than last year,35.0,17,11,True,1.0
8,93,"I am not primarily a developer, but I write co...",Yes,42.0,12,Yearly,20000.0,21620.0,Greece,European Euro,...,No,,jQuery,Drupal;jQuery,Just as welcome now as I felt last year,25.0,24,15,False,0.0
9,95,I am a developer by profession,Yes,,12,Yearly,95000.0,71850.0,Canada,Canadian dollar,...,No,"Computer science, computer engineering, or sof...",Express;React.js,Express;Gatsby;React.js,Just as welcome now as I felt last year,40.0,40,25,True,1.0


In [52]:
spark.stop()

### serving as a rest api
```bash
unset PYSPARK_SUBMIT_ARGS
source /opt/conda/etc/profile.d/conda.sh
conda activate $HOME/venv/$JUPYTER_KERNEL_NAME
mlflow models serve -m gs://bdg-lab-mwiewior/mlflow/artifacts/1/89e6825c999b43f681779fffe895ca6a/artifacts/gbt_classifier -p 9090  --no-conda
```

In [57]:
%%bash
curl -s -d \
'{"columns":["OpSys", "EdLevel", "MainBranch" , "Country", "JobSeek", "YearsCode"], "data":[["1","1","1","1","1","1"]]}' -H 'Content-Type: application/json; format=pandas-split' \
-X POST \
localhost:9090/invocations

[0.0]