In [None]:
import xarray as xr
import mean_sst
import mean_sst_dask
import requests as rq
import json
import hvplot.xarray

## Server, Daten und SST

Die erste Version eines `docker-compose.yml` ist jetzt fertig und verbindet die API mit dem SST Prozess. Außerdem funktioniert jetzt das Job Mangement des Servers, um diesen Prozess zu starten.

### Durchlauf eines Jobs

1. Zuerst startet man den Server über:

```bash
docker-compose up
```

(in dem Verzeichnis, wo dieses Notebook liegt)

2. Ein Job JSON erstellen:\
Ein Beispiel JSON für einen SST Job (freundlicher Weise vom Server Team zur Verfügung gestellt):

In [None]:
testjob = {
  "title": "Example Title",
  "description": "Example Description",
  "process": {
    "process_graph": {
      "loadcollection1": {
        "process_id": "load_collection",
        "arguments": {
          "timeframe" : ["01-12-1981 00:00:00","30-12-1981 00:00:00","%d-%m-%Y %H:%M:%S"],
          "DataType": "SST"
        }
        },
        "SST": {
        "process_id": "mean_sst",
        "arguments": {
          "data":{
              "from_node": "loadcollection1"
          },
          "timeframe":["1981-12-01","1981-12-17"],
          "bbox":[-999,-999,-999,-999]
          }
        },
        "save":{
            "process_id": "save_result",
            "arguments":{
                "SaveData":{
                    "from_node":"SST"
                },
                "Format": "netcdf"
            }
        }
      }
      }
    }

3. Den Job mittels HTTP Post an den jobs Endpoint schicken:

In [None]:
#Den Testdaten an /jobs Posten
rq.post("http://localhost/api/v1/jobs", json=testjob, headers={"Content-Type": "application/json"})

4. Die Id des Jobs erfragen, in dem eine GET Anfrage an den /jobs Endpoint gestellt wird:

In [None]:
j = rq.get("http://localhost/api/v1/jobs")
rjson = j.json()
# Die Id des neusten Job abspeichern für spätere Verwendung
job_id = rjson['jobs'][-1]['id']
rjson

5. Den Job ausführen über eine POST Anfrage an den results Endpoint des Jobs:

In [None]:
rq.post("http://localhost/api/v1/jobs/" + job_id + "/results" , json=None, headers={"Content-Type": "application/json"})

6. Verlauf des Jobs im Server:

Was im der Server im Hintergund macht: 

```bash
frontend_1        | 172.21.0.1 - - [20/Jan/2021 12:14:29] "POST /api/v1/jobs/051a9bc0-5b19-11eb-a9c8-0242ac150006/results HTTP/1.1" 204 -
frontend_1        | 172.21.0.5 - - [20/Jan/2021 12:14:32] "GET /jobRunning/051a9bc0-5b19-11eb-a9c8-0242ac150006 HTTP/1.1" 200 -
database_1        | 172.21.0.5 - - [20/Jan/2021 12:14:32] "POST /doJob/051a9bc0-5b19-11eb-a9c8-0242ac150006 HTTP/1.1" 200 -
database_1        | 172.21.0.5 - - [20/Jan/2021 12:14:32] "GET /jobStatus HTTP/1.1" 200 -
...
database_1        | 172.21.0.5 - - [20/Jan/2021 12:16:37] "GET /jobStatus HTTP/1.1" 200 -
sst_0_1           | 172.21.0.5 - - [20/Jan/2021 12:16:37] "POST /doJob/051a9bc0-5b19-11eb-a9c8-0242ac150006 HTTP/1.1" 200 -
sst_0_1           | 172.21.0.5 - - [20/Jan/2021 12:16:37] "GET /jobStatus HTTP/1.1" 200 -
sst_0_1           | 172.21.0.5 - - [20/Jan/2021 12:16:42] "GET /jobStatus HTTP/1.1" 200 -
sst_0_1           | 172.21.0.5 - - [20/Jan/2021 12:16:47] "GET /jobStatus HTTP/1.1" 200 -
sst_0_1           | 172.21.0.5 - - [20/Jan/2021 12:16:52] "GET /jobStatus HTTP/1.1" 200 -
sst_0_1           | 172.21.0.5 - - [20/Jan/2021 12:16:57] "GET /jobStatus HTTP/1.1" 200 -
frontend_1        | 172.21.0.5 - - [20/Jan/2021 12:16:57] "POST /takeData/051a9bc0-5b19-11eb-a9c8-0242ac150006 HTTP/1.1" 200 -
```
- Zuerst übergibt der `frontend` container, den Job an den `database` container. \
- Dannach wird vom `database` container der entsprechende Datacube vorbereitet. \
- Dann wird übernimmt der `sst` container und berechnet den SST für den ausgewählten Zeitraum. \
- Das Ergebnis wird dann dem `/takeData` Endpoint des `frontend` containers übergeben und das Ergebnis zum Download bereitgestellt.\

7. Den Downloadlink über eine GET Anfrage an /jobs results bekommen

In [None]:
res = rq.get("http://localhost/api/v1/jobs/" + job_id + "/results" )
dl = res.json()["assets"]
dl

8. Link im Browser eingeben und Ergebnis als netCDF herunterladen: http://localhost:80/download/< Euer Downloadlink>

In [None]:
fin = xr.open_dataset('../demodata/25ff212c-5b2c-11eb-a863-0242ac160005.nc') # Hier ggf. euren eigenen Pfad zum DOwnload einfügen
fin

( 9. Mit `docker-compose down -v` die lokale Umgebung aufräumen)

  
 

## Dask SST Laufzeitanalyse
---
*von Alexia*

### Dask Delayed

"The Dask delayed function decorates your functions so that they operate lazily. Rather than executing your function immediately, it will defer execution, placing the function and its arguments into a task graph." (https://docs.dask.org/en/latest/delayed.html)
-  Ein einfacher Weg Code, der nicht voneinander anhängt, parallel auszuführen

### Chunks

"Dask divides arrays into many small pieces, called chunks, each of which is presumed to be small enough to fit into memory." 
"If your chunks are too small, queueing up operations will be extremely slow, because Dask will translate each operation into a huge number of operations mapped across chunks. Computation on Dask arrays with small chunks can also be slow, because each operation on a chunk has some fixed overhead from the Python interpreter and the Dask task executor. Conversely, if your chunks are too big, some of your computation may be wasted, because Dask only computes results one chunk at a time." (http://xarray.pydata.org/en/stable/dask.html#chunking-and-performance)
-  Erlaubt es ein Dataset in Stücke zu teilen, auf denen Operationen parallel angewendet werden können

In [None]:
'''Dask Cluster'''
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
client

## 12 Monate, kleines Subset

### Ohne Dask delayed und chunks => keine Parallelisierung

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc")

x = mean_sst.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'], [0, -50, 60, 50])

### Mit Dask delayed, ohne chunks

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc")

x = mean_sst_dask.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'], [0, -50, 60, 50])

dauert länger, da dask delayed overhead erzeugt, der code an sich damit aber nicht parallelisiert werden kann, da er aufeinander aufbaut 

### Ohne Dask delayed, mit chunks={"time": "auto"}

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc", chunks={"time": "auto"})

x = mean_sst.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'], [0, -50, 60, 50])

das verwenden wir. "auto" findet die beste chunk-größe automatisch

### Ohne Dask delayed, mit chunks={"lat": 200, "lon": 400}

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc", chunks={"lat": 200, "lon": 400})

x = mean_sst.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'], [0, -50, 60, 50])

schlecht gewählte chunk-größe, hier manuell vorgegeben

### Ohne Dask delayed, mit chunks={"lat": "auto", "lon": "auto"}

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc", chunks={"lat": "auto", "lon": "auto"})

x = mean_sst.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'], [0, -50, 60, 50])

Chunken von lat und lon an sich scheint nicht ideal zu sein

### Mit Dask delayed, mit chunks={"time": "auto"}

Problem, dieser fall verbraucht zu viel memory: distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting 

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc", chunks={"time": "auto"})

x = mean_sst_dask.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'], [0, -50, 60, 50])

## 12 Monate, kein Subset

### Ohne Dask delayed und chunks => keine Parallelisierung

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc")

x = mean_sst.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'])

### Ohne Dask delayed, mit chunks={"time": "auto"}

In [None]:
%%time

ds = xr.open_dataset("../demodata/sst.day.mean.1989.nc", chunks={"time": "auto"})

x = mean_sst.wrapper_mean_sst(ds, ['1989-01-01','1989-12-31'])

Der Abstand zwischen den Laufzeiten von dem Fall ohne Parallelsisierung und dem Fall chunks={"time": "auto"} nimmt mit zunehmender größe des datasets zu

## Testen 

Inzwischen gibt es zwei fertige Github Actions, im [Testrepo](https://github.com/GeoSoftII2020-21/TestRepo/actions).
So soll auch das One Click Demo Szenario aus dem Pflichtenheft umgesetzt werden.

###### [1. Pytest Workflow](https://github.com/GeoSoftII2020-21/TestRepo/actions?query=workflow%3A%22Pytest+Workflow%22)

Diese Action soll alle Submodule des Projekts ihre Unittests mit Pytest ausführen lassen. Dazu werden zuerst alle Teilprojekte geupdated. Dann wird auf in einem zweiten Schritt Pytest über alle gesammelten Unitests ausgeführt.

🚧 *Momentan gibt es noch ein Problem mit den Testdatensätzen, die für manche Tests verwendet wurden. Da diese, aufgrund ihrer Größe, nicht auf Github liegen, schlägt die Action bis jetzt noch fehl. An einer Lösung wird gearbeitet.*

##### [2. Backendvalidator](https://github.com/GeoSoftII2020-21/TestRepo/actions?query=workflow%3ABackend-Validator)

Diese Action führt den openeo Backendvalidator(s. Demo II für die lokale Ausführung) nun über Github aus. Dabei wird das openeo Backend Validator Repository geklont und der validate befehl auf die den momentanen Stand der API ausgeführt.