## Zarządzanie i wdrażanie modeli ML
Platforma ML Flow (https://www.mlflow.org/docs/latest/index.html) umożliwia całościowe zarządzanie cyklem życia modeli.
* logowanie eksperymentów, wartości parametrów modeli i osiąganych przez nie wyników
* serializowanie modeli (na potrzeby współdzielenia modelu, przeniesienia na inne środowisko lub serwowania)
* wersjonowanie modelu, adnotowanie i przechowywanie w Rejestrze

### Klasyfikacja jako serwis REST

Należy uruchomic proces udostępniający interfejs REST do otrzymowania danych do klasyfikacji.
Komendę należy uruchomić w terminalu.

```bash
unset PYSPARK_SUBMIT_ARGS && \
source /opt/conda/etc/profile.d/conda.sh && \
conda activate $HOME/venv/$JUPYTER_KERNEL_NAME && \
mlflow models serve -m gs://bdg-lab-{USER}/mlflow/artifacts/6/c93fadbf518048f48ec9c433b7ebb3f8/artifacts/model -p 9090  --no-conda
```

In [6]:
# szykujemy wiadomość z danymi do klasyfikacji

url = "http://localhost:9090/invocations"   # adres uslugi ktora wystawia interfejs REST
headers = {'Content-Type': 'application/json; format=pandas-split'}  # naglowki wiadomosci

input_data = '{"columns":["OpSys", "EdLevel", "MainBranch" , "Country", "JobSeek", "YearsCode", "ConvertedComp"], \
    "data":[[ \
    "MacOS",\
    "Master’s degree (M.A., M.S., M.Eng., MBA, etc.)",\
    "I am a developer by profession","United Kingdom",\
    "I am not interested in new job opportunities",\
    "10", \
    "6000"]]}'\
    .encode('utf-8')

import requests 
r = requests.post(url, data=input_data, headers=headers) # wyslanie wiadomosci i odebranie wyniku

r.text # wypisanie treści wyniku

'[1.0]'

### Klasyfikacja jako funkcja pyspark


In [17]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.config('spark.driver.memory','1g') \
.config('spark.executor.memory', '2g') \
.getOrCreate()

In [18]:
import mlflow.pyfunc
from pyspark.sql.functions import struct

import os
user_name = os.environ.get('USER')

# zdefiniowanie funkcji 
registry_uri=f"gs://bdg-lab-{user_name}/mlflow/artifacts/6/c93fadbf518048f48ec9c433b7ebb3f8/artifacts/model"
predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri=registry_uri)

In [16]:
spark.stop()

In [19]:
# szykujemy nowe dane testowe do klasyfikacji
new_data = spark.createDataFrame(
    [('MacOS', 'Master’s degree (M.A., M.S., M.Eng., MBA, etc.)', 'I am a developer by profession', "United Kingdom", "I am not interested in new job opportunities", "10", "6000"), 
     ('MacOS', 'Master’s degree (M.A., M.S., M.Eng., MBA, etc.)', 'I am a developer by profession', "Poland", "I am not interested in new job opportunities", "10", "6000"), 
     ('Windows', 'Master’s degree (M.A., M.S., M.Eng., MBA, etc.)', 'I am a developer by profession', "India", "I am not interested in new job opportunities", "10", "5000")],
    schema='OpSys string, EdLevel string, MainBranch string, Country string, JobSeek string, YearsCode string, ConvertedComp string')

In [20]:
# tworzymy DF ktory zawiera nową kolumnę predict, ktora zawiera wynik dzialania funkcji predict_udf
predicted_df = new_data\
    .withColumn("prediction", predict_udf(struct('OpSys', 'EdLevel', 'MainBranch' , 'Country', 'JobSeek', 'YearsCode', 'ConvertedComp'))) \
    .select ('OpSys', 'EdLevel', 'MainBranch' , 'Country', 'JobSeek', 'YearsCode', 'prediction')
predicted_df.toPandas()

Unnamed: 0,OpSys,EdLevel,MainBranch,Country,JobSeek,YearsCode,prediction
0,MacOS,"Master’s degree (M.A., M.S., M.Eng., MBA, etc.)",I am a developer by profession,United Kingdom,I am not interested in new job opportunities,10,1.0
1,MacOS,"Master’s degree (M.A., M.S., M.Eng., MBA, etc.)",I am a developer by profession,Poland,I am not interested in new job opportunities,10,0.0
2,Windows,"Master’s degree (M.A., M.S., M.Eng., MBA, etc.)",I am a developer by profession,India,I am not interested in new job opportunities,10,2.0
