In [None]:
#Aktueller Code!!!
import json
import matplotlib.pyplot as plt
from PIL import Image
import requests
from io import BytesIO
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from IPython.display import display
import pandas as pd
from IPython.core.display import display, HTML
from dateutil import parser  # 🔥 Neu: Für ISO 8601 Parsing

def main(timeout, func, window):
    sc = SparkContext("local[*]", "Streaming-Aggregation")
    ssc = StreamingContext(sc, window)
    stream = ssc.socketTextStream("127.0.0.1", 9999)
    func(stream)  # ✅ Hier wird die Aggregation pro Window durchgeführt

    try:
        ssc.start()
        ssc.awaitTerminationOrTimeout(timeout)
    except Exception as e:
        print(str(e))
    finally:
        ssc.stop(False)


def aggregate_common_name_with_image(stream):
    def safe_parse(line):
        """Parst JSON und extrahiert commonName, Bild-URL und timestamp (ISO 8601 -> Unix-Zeitstempel)"""
        try:
            obj = json.loads(line)
            species = obj['species']['commonName']
            image_url = obj['species'].get('thumbnailUrl', '')

            # 🔥 Timestamp als Unix-Zeitstempel parsen (ISO 8601 → Unix)
            timestamp_str = obj.get('timestamp', None)
            try:
                timestamp = parser.parse(timestamp_str).timestamp() if timestamp_str else None
            except Exception:
                timestamp = None  # Falls Parsing fehlschlägt

            return (species, (1, image_url, [timestamp] if timestamp is not None else []))
        except Exception:
            return ("Fehler", (0, "", []))  # Fehlerhandling

    parsed_stream = stream.map(safe_parse)

    # **Aggregation innerhalb des aktuellen Windows**
    aggregated_stream = parsed_stream.reduceByKey(
        lambda a, b: (a[0] + b[0], a[1], a[2] + b[2])  # Zählen & Timestamps sammeln
    )

    # ✅ Hier die Output-Operation hinzufügen
    aggregated_stream.foreachRDD(show_images)  # 🔥 Jetzt wird die Ausgabe in Tabellenform gemacht!


def show_images(rdd):
    """Zeigt die aggregierten Ergebnisse + min/max Timestamp an"""
    results = rdd.collect()
    
    if results:
        sorted_results = sorted(results, key=lambda x: x[1][0], reverse=True)  # 🔥 Nach Häufigkeit sortieren
        
        # 🔥 Alle Timestamps aus dem Window extrahieren
        all_timestamps = [ts for _, (_, _, timestamps) in results for ts in timestamps]
        
        if all_timestamps:
            min_timestamp = min(all_timestamps)
            max_timestamp = max(all_timestamps)
            print(f"Window Fenster von: {pd.to_datetime(min_timestamp, unit='s')} bis {pd.to_datetime(max_timestamp, unit='s')}")
            
            time_diff = max_timestamp - min_timestamp
            minutes = int(time_diff // 60)  # Ganze Minuten
            seconds = int(time_diff % 60)  # Verbleibende Sekunden
            print(f"Zeitfenster: {minutes} Minuten {seconds} Sekunden")
        else:
            print("Keine gültigen Timestamps in diesem Window.")

        # 🔹 Tabelle mit Spezies, Anzahl und Bild-URL
        data = []
        for species, (count, url, _) in sorted_results:
            img_html = f'<img src="{url}" width="50" height="50">' if url else ""  # 🔥 Bild als HTML-Element
            data.append([species, count, img_html])

        # 🔹 DataFrame erstellen
        df = pd.DataFrame(data, columns=["Spezies", "Anzahl", "Bild"])
        
        # 🔹 Als HTML-Tabelle rendern
        display(HTML(df.to_html(escape=False, index=False)))  # escape=False erlaubt HTML-Inhalte (Bilder)


def show_image_from_url(url):
    """Lädt ein Bild von der URL und zeigt es direkt in Jupyter Notebook an."""
    try:
        response = requests.get(url)
        img = Image.open(BytesIO(response.content))
        display(img)  # 🔥 Zeigt das Bild direkt in Jupyter Notebook an
    except Exception as e:
        print(f"Fehler beim Laden des Bildes: {e}")

# **Starte Spark Streaming mit 30s Timeout, Aggregation und 10s Fenstergröße**
main(1000, aggregate_common_name_with_image, 10)



  from IPython.core.display import display, HTML


 Window Fenster von: 2025-03-02 15:23:46.500000 bis 2025-03-02 15:36:09
Zeitfenster: 12 Minuten 22 Sekunden


Spezies,Anzahl,Bild
House Sparrow,23,
Great Tit,2,
Redwing,1,
Long-tailed Tit,1,


 Window Fenster von: 2025-03-02 15:36:36.500000 bis 2025-03-02 16:25:31
Zeitfenster: 48 Minuten 54 Sekunden
