# MLOps

![MLOps](images/MLOps.jpg)

![hidden technical debt paper](images/hidden_technical_debt_2015.jpg)

* __Reproduzierbarkeit__: 
    * Versionierung von Daten und Code
* __Monitoring__:
    * Überwachung des Verhaltens des Modells in Produktion

# Code

* GitHub: https://github.com/datanizing/datascienceday/
* Verzeichnis: `06_MLOps`

## [DVC](https://dvc.org/)

![DVC_project_versions](https://dvc.org/static/39d86590fa8ead1cd1247c883a8cf2c0/fa73e/project-versions.webp)

## DVC Pipelines

#### Daten laden

```
python load_data.py
```

In [1]:
!dvc run -n load_data --no-exec --force -o data/raw/transport-short.csv -d load_data.py python load_data.py

Modifying stage 'load_data' in 'dvc.yaml'                             core[39m>

To track the changes with git, run:

    git add dvc.yaml

To enable auto staging, run:

	dvc config core.autostage true
[0m

## DVC Pipelines

#### Model trainieren

```
python train.py
```

In [2]:
!dvc run -n train --no-exec --force -d data/raw/transport-short.csv -d train.py -o models/model/ python train.py

Modifying stage 'train' in 'dvc.yaml'                                 core[39m>

To track the changes with git, run:

    git add dvc.yaml

To enable auto staging, run:

	dvc config core.autostage true
[0m

## DVC Pipelines

In [3]:
!dvc dag --full | cat

+-----------+  
| load_data |  
+-----------+  
      *        
      *        
      *        
  +-------+    
  | train |    
  +-------+    


In [4]:
!dvc dag --out | cat

+------------------------------+ 
| data/raw/transport-short.csv | 
+------------------------------+ 
                *                
                *                
                *                
        +--------------+         
        | models/model |         
        +--------------+         


## Pipelines
In DVC lassen sich Pipelines definieren und damit lasse sich Trainings-Skripte und Daten miteinander verbinden.

#### `dvc.yaml`
```yaml
stages:
  load_data:
    cmd: python load_data.py
    outs:
    - data/raw/transport-short.csv
  train:
    cmd: python train.py
    deps:
    - data/raw/transport-short.csv
    outs:
    - models/model/
```

Reproduzieren der Schritte mit

In [5]:
!dvc repro

Running stage 'load_data':                                            core[39m>
> python load_data.py
Generating lock file 'dvc.lock'                                                 
Updating lock file 'dvc.lock'

Running stage 'train':
> python train.py
Computing file/dir hashes (only done once)            |0.00 [00:00,      ?md5/s]
![A
  0%|          |                                   0.00/? [00:00<?,        ?B/s][A
                                                                                [A
![A
  0%|          |                                   0.00/? [00:00<?,        ?B/s][A
                                                                                [A
![A
  0%|          |                                   0.00/? [00:00<?,        ?B/s][A
                                                                                [A
![A
  0%|          |                                   0.00/? [00:00<?,        ?B/s][A
                                                       

## Datenstände

Datenstände werden in `dvc.lock` über Hashes abgebildet.
```yaml
stages:
  load_data:
    cmd: python load_data.py
    deps:
    - path: load_data.py
      md5: ddeb3c7968c47788fb055752566e725d
      size: 153
    outs:
    - path: data/raw/transport-short.csv
      md5: 3057d4f316405b0a282328d2f9ee5748
      size: 551260620
  train:
     ...
```

## DVC Data

Folgende Datenablagen werden unterstützt:
* Amazon S3 (und kompatible, z.B. Minio)
* Azure Blob Storage
* Google Drive
* Google Cloud Storage
* Aliyon OSS
* SSH
* HDFS
* WebHDFS
* HTTP
* WebDAV
* local remote (z.B. Netzlaufwerke)

## DVC Remote

Hier, [DVC Remote with Minio](http://localhost:9000/minio/dvcrepo/)

* ACCESS KEY: `minio-access-key`
* SECRET KEY: `minio-secret-key`

In [None]:
!dvc push

  0% Querying remote cache|                          |0/1 [00:00<?,     ?file/s]

In [None]:
!dvc pull

# Sentiment API

Mittels [FastAPI](https://fastapi.tiangolo.com/) wird eine API für das Sentiment Model bereitgestellt.

### Datenmodell für Ein- und Ausgabe

```python
from pydantic import BaseModel, Field

# Datenmodell der Eingabe
class Input(BaseModel):
    sentence: str = Field(example="Das ist ein toller Satz.")

# Datenmodell der Ausgabe
class Sentiment(BaseModel):
    label: str = Field(description="Sentiment", example="NEGATIVE")
    score: float = Field(description="Score", example=0.9526780247688293)
```

### API Endpunkt

```python
from fastapi import FastAPI, Response

# Erzeugen der FastAPI Anwendung
app = FastAPI(
    title="Sentiment Model API",
    description="Sentiment Model API",
    version="0.1",)

# Modell laden
model_path = "models/model"

@app.post('/predict', response_model=Sentiment, operation_id="predict_post")
async def predict(response: Response, input: Input):
        pred=sentiment_classifier(input.sentence)[0]
        sentiment = Sentiment.parse_obj(pred)
        return sentiment
```


## API Testen

[uvicorn Webserver](https://www.uvicorn.org/)

```python
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8080)
```

Webserver starten
```
python app.py
```

## Schnittstelle

FastAPI stellt eine Dokumentation der Schnittstelle unter [/docs](http://localhost:8080/docs) zur Verfügung.

### Client Code erzeugen
Der Client wird mittles [`openapi-python-client` Generator](https://github.com/openapi-generators/openapi-python-client) z.B. wie folgt erzeugt.
```
openapi-python-client generate --url http://127.0.0.1/openapi.json
```

## API aufrufen

In [None]:
import sys
sys.path.append("sentiment-model-api-client")
from sentiment_model_api_client import Client
from sentiment_model_api_client.models import Input
from sentiment_model_api_client.api.default import predict_post

client = Client(base_url="http://localhost:8080", timeout=30)

predict_post.sync(client=client, 
                  json_body=Input(sentence=
                                  "I wonder how close a drone has to get to private property before someone "
                                  "can shoot it down, because that will definitely happen."))

# Monitoring

Modelausgaben über Header mitgeben, damit der Prometheus Client diese abfangen kann.

#### `app.py`
```python
# Endpunkt für Prediction
@app.post('/predict', response_model=Sentiment, operation_id="predict_post")
async def predict(response: Response, input: Input):
    pred = sentiment_classifier(input.sentence)[0]
    sentiment = Sentiment(**pred)

    # Header Monitoring
    response.headers["X-model-score"] = str(sentiment.score)`**
    response.headers["X-model-sentiment"] = str(sentiment.label)`**

    return sentiment


Metriken definieren

```python
from prometheus_client import Histogram, Counter

def model_output(metric_namespace: str = "", metric_subsystem: str = ""):
    SCORE = Histogram(
        "model_score",
        "Predicted score of model",
        buckets=(0, .1, .2, .3, .4, .5, .6, .7, .8, .9),
        namespace=metric_namespace,
        subsystem=metric_subsystem,
    )
    ...
```

```python
    ...
    SENTIMENT = Counter(
        "sentiment",
        "Predicted sentiment",
        namespace=metric_namespace,
        subsystem=metric_subsystem,
        labelnames=("sentiment",)        
    )
    ...
```

Metriken aus Header auslesen

```python
    ...
    def instrumentation(info) -> None:
        if info.modified_handler == "/predict":
            model_score = info.response.headers.get("X-model-score")
            model_sentiment = info.response.headers.get("X-model-sentiment")
            if model_score:
                SCORE.observe(float(model_score))
                SENTIMENT.labels(model_sentiment).inc()

    return instrumentation
```

Metriken an `app` beobachten:
```python
from prometheus_fastapi_instrumentator import Instrumentator

instrumentator = Instrumentator()
instrumentator.add(model_output(metric_namespace="mlops", metric_subsystem="model"))

# Prometheus Instrumentator verknüpfen
instrumentator.instrument(app).expose(app)
```

# Monitoring

[Model API](http://localhost:8080/docs)

[Metrics Endpoint](http://localhost:8080/metrics)

[Grafana](http://localhost:3000)

## 500 Aufrufe

#### Daten laden

In [None]:
import pandas as pd
data_df = pd.read_csv("data/raw/transport-short.csv", header=None, nrows=1000, names=['id', 'kind', 'title', 'link_id', 'parent_id', 'ups', 'downs', 'score',
       'author', 'num_comments', 'created_utc', 'permalink', 'url', 'text',
       'level', 'top_parent'])
data_df.head(2)

### API mit 500 Samples aufrufen

In [None]:
for idx, row in data_df.sample(500).iterrows():
    if not pd.isna(row["text"]) and row["text"] not in ["[deleted]", "[removed]"]:
        sentiment = predict_post.sync(client=client, json_body=Input(sentence=row["text"]))
        data_df.loc[idx, "sentiment"] = sentiment.label

Ergebnisse

In [None]:
with pd.option_context("display.max_colwidth", None):
    display(data_df[pd.notna(data_df["sentiment"])][["text", "sentiment"]].head())

# [Dashboard](http://localhost:3000/d/PGUZYQznk/model-score?orgId=1&refresh=5s)

![dashboard showing distriubtions of models scores, outlier scores, labels and drifts over time](images/dashboard.png)



## Was kann man messen?

* Score Verteilung
   * Frühwarnindikator für Probleme
   * Leichter zu messen als Modellgüte ("Was ist das korrekte Sentiment?")
* Grundlegende Aufrufstatistiken
   * Wird das Modell ggf. anders verwendet?
   
Komplexer, aber ggf. hilfreich:
* Drift
   * Seperates Modell notwendig ("Drift Detector")
   * z.B. Themenschwerpunkte verschieben sich im Vergleich zum Trainingsdatensatz stark
* Outlier Score
   * Separate Modelle, die prüfen, ob Daten zu Trainingsdaten passen
   * z.B. Anteil nicht-englischer Posts steigt