*von Benedikt Funke*

### Senden der Temperaturdaten an Kafka

**Wichtig!:** Bevor dieses Notebook ausgeführt werden kann muss Kafka auf dem System laufen und das `kafka-consumer.ipynb` ausgeführt werden.

Die Temperaturdaten werden nicht direkt in die Datenbank geschrieben, sondern gehen einen "Umweg" über Kafka. Der Grund dafür liegt darin, dass es sich bei den Temperaturdaten um eine große Anzahl an Daten handelt, die so über Kafka gepuffert werden.

Rein rechnerisch ergeben sich bei 494 Wetterstationen die an 62 Tagen 24 Temperaturdaten gesendet haben 735072 Datensätze. Um zu verhindern, dass Teile der Anwendung durch Datenbankoperationen blockiert werden nutzt man Kafka um die Schreiboperationen der Daten asynchron zu gestalten. In dem hier verwendeten Beispiel handelt sich jedoch zum großen Teil um eine Veranschaulichung der Nutzung von Kafka. Alle Operationen werden auf einem System ausgeführt und es sind somit keine großen Verbesserungen gegenüber der direkten Speicherung der Daten zu erwarten.

Um die gewünschten Temperaturdaten zu erhalten wird zunächst eine Liste aller `STATION_ID` erstellt, für die diese abgefragt werden sollen. Dafür wird die zuvor erstelle `stations`-Collection der MongoDB abgefragt und alle `STATION_ID`s gesucht.



In [None]:
from kafka import KafkaProducer
from wetterdienst.dwd.observations import DWDObservationData, DWDObservationParameterSet, DWDObservationResolution
from pymongo import MongoClient
import csv

client = MongoClient('mongodb://localhost:27017/')
stations = client.bda.stations.find({}, {'STATION_ID':1, "_id":0})


Folgend wird zunächst die `KafkaProducer`-Klasse instanziiert. Bei keiner Übergabe von Parametern wie in diesem Fall, versucht der `KafkaProducer` eine Verbindung zu Kafka auf `localhost` aufzubauen. In dem hier gezeigten Beispiel läuft Kafka innerhalb des `Confluent All-In-One-Pakets` in einem Docker-Container auf dem gleichen System wie dieses Notebook.

Anschließend werden zwei Hilfsfunktionen definiert, die später über den Erfolg oder Misserfolg einer Nachricht an Kafka informieren. 

In [None]:
producer = KafkaProducer()

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)

Im folgenden Abschnitt werden für jede Station die Temperaturdaten abgerufen und anschließend an Kafka gesendet. Dafür wird erneut die `wetterdienst`-Bibliothek verwendet. Theoretisch ließe sich bei der Abfrage der `DWDObservationData` direkt ein Array an `station_ids` einfügen. An dieser Stelle wurde sich jedoch bewusst dagegen entschieden um diesen synchronen Prozess in mehrere kleinere aufzuteilen und die Vorteile von Kafka besser nutzen zu können.

Diese Abfrage liefert als Ergebnis mehrere `pandas Dataframes`. Da bei jeder Abfrage jedoch nur eine Station abgefragt wird, handelt es sich bei dieser Abfrage immer nur um einen `Dataframe`. Dieser enthält nun Informationen über die Temperatur und die Luftfeuchtigkeit zu bestimmten Zeitpunkten an der angefragten Station. Da die Luftfeuchtigkeit wird die weitere Analyse nicht verwendet werden soll, werden alle Zeilen mit Informationen über die Luftfeuchtigkeit verworfen. Anschließend wird das verbleibende `Dataframe` in eine CSV-Datei konvertiert und in die einzelnen Zeilen aufgeteilt. Diese werden nun Zeile für Zeile an Kafka gesendet. Bei einem erfolgreichen Sendevorgang wird die `on_send_success` Funktion und bei einem Fehler die `on_send_error` Funktion aufgerufen.

In [None]:
for station in stations:
    station_id = station['STATION_ID']

    observations = DWDObservationData(
    station_ids=[44],
    parameters=[DWDObservationParameterSet.TEMPERATURE_AIR],
    resolution=DWDObservationResolution.HOURLY,
    start_date="2020-07-01",
    end_date="2020-08-31",
    tidy_data=True,
    humanize_column_names=True,
    )

    for df in observations.collect_data():
        df = df[df['ELEMENT'].str.match('TEMPERATURE_AIR_200')]
        raw = df.to_csv()
        lines = csv.reader(raw.splitlines()[1:])

        for line in lines:
            producer.send('weather', value=bytearray(str(line).strip('[]'), encoding='utf-8')).add_callback(on_send_success).add_errback(on_send_error)