#DSI-Abschlussprojekt
#Weltmeere - Auswirkung von Wassertemperatur und Salzgehalt

##Gruppenmitglieder
Bayr Klemens, BSc.
Hufnagl Ivo, BSc.
Pribil Nadine BSc.

##Geplante Datenquellen
Flatfiles
- API von ESA
- https://climate.esa.int/de/odp/#/dashboard

API Data
- Amentum
- https://developer.amentum.io


##Geplante Datensicherung
- GIT Repository
- Datenbanksicherung

##Geplante Vorgehensweise

###Schritt 1: Datenfindung
Recherche von ...

###Schritt 2: Datenbeschaffung
Mindestens 2 verschiedene Datenquellen, die miteinander verbunden sind.

###Schritt 3: Datenanalyse
Die Daten wollen wir in eine Datenbank spielen. Eventuell CouchDB. Die Daten werden wir mit Kafka bereitstellen und für die Analyse wird dann SparkSQL verwendet.

###Schritt 4: Ergebnisdarstellung
Die erwarteten Ergebnisse werden mit Grafiken veranschaulicht. Diese werden wir mit Phyton erstellen.

##Weitere Eckpunkte
- Speichern / Lesen / Verarbeiten der Daten mit einer Datenbank (relational oder NoSQL)
- Kafka für die Bereitstellung der Daten verwenden (zumindest einen Teil, Kafka Producer, Kafka Consumer)
- Spark für Datenhandling/analyse verwenden.
- Abbildung von MapReduce mit Spark RDDs.
- SparkSQL mindestens einmal verwenden.
- Spark Dataframes verwenden.

##Meilensteine
1. 14.12.2022: Präsentation des Themas und der Vorgehensweise.
2. 21.12.2022: Zwischenabgabe des Projekts und Feedback-Einholung.
3. 25.01.2023: Abgabe und Präsentation der Endergebnisse.

##Erwarteter Output
Grafiken...

In [5]:
!pip install kafka-python

Collecting kafka-python
  Using cached kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [1]:
from kafka import KafkaConsumer, KafkaProducer

In [3]:
import json
import requests

url = "https://ocean.amentum.io/rtofs"

headers = {"API-Key": "SSDqrYzBLYFDuO0iEnCKnYWdQLWNcGSO"}

params = {
  "latitude": -34.0,
  "longitude": 152.0,
  "depth": 30
}

# handle exceptions
response = requests.get(url, headers=headers, params=params)
json_payload = response.json()

print(json.dumps(json_payload, indent=4, sort_keys=True))


{
    "current_u": {
        "units": "m/s",
        "value": 0.050109099596738815
    },
    "current_v": {
        "units": "m/s",
        "value": -0.2798057496547699
    },
    "point": {
        "depth": 30.0,
        "latitude": -34.04350280761719,
        "longitude": 151.977294921875
    },
    "salinity": {
        "units": "g/kg",
        "value": 35.560813903808594
    },
    "temperature": {
        "units": "deg C",
        "value": 21.43215560913086
    }
}


In [14]:
# connect to Kafka and open producer
from json import dumps
group_name = "rtofs"
topic_name = "pos1"
servers = ['localhost:29092']  # has to be adapted

producer = KafkaProducer(bootstrap_servers=servers,
                         value_serializer=lambda x:dumps(x).encode('utf-8'))

In [15]:
# write data into Kafka
data = json_payload
producer.send(topic_name, value=data)
print ("data sent to topic "+topic_name+" ",data)

data sent to topic pos1  {'current_u': {'units': 'm/s', 'value': 0.050109099596738815}, 'current_v': {'units': 'm/s', 'value': -0.2798057496547699}, 'point': {'depth': 30.0, 'latitude': -34.04350280761719, 'longitude': 151.977294921875}, 'salinity': {'units': 'g/kg', 'value': 35.560813903808594}, 'temperature': {'units': 'deg C', 'value': 21.43215560913086}}


In [16]:
# import necessary packages
from kafka import KafkaConsumer
from json import loads
# connect to Kafka and open consumer
group_name = "rtofs"
topic_name = "pos1"
servers = ['localhost:29092']  # has to be adapted

consumer = KafkaConsumer(
     topic_name,
     bootstrap_servers=servers,
     auto_offset_reset='earliest',
     auto_commit_interval_ms=1000,
     enable_auto_commit=True,
     group_id=group_name,
     value_deserializer=lambda x: loads(x.decode('utf-8')))

In [None]:
# read data from Kafka in a loop
for message in consumer:
    data = message.value
    print('{} read from topic {}'.format(data, topic_name))

{'current_u': {'units': 'm/s', 'value': 0.050109099596738815}, 'current_v': {'units': 'm/s', 'value': -0.2798057496547699}, 'point': {'depth': 30.0, 'latitude': -34.04350280761719, 'longitude': 151.977294921875}, 'salinity': {'units': 'g/kg', 'value': 35.560813903808594}, 'temperature': {'units': 'deg C', 'value': 21.43215560913086}} read from topic pos1
