In [57]:
# daily_situation_report_spark_with_map.py

import cml.data_v1 as cmldata
from datetime import datetime
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, to_timestamp, row_number, coalesce, lit, round as spark_round, sin, cos, sqrt, asin, radians

# --- Imports für die Kartenerstellung ---
# Wichtiger Hinweis: Diese Bibliotheken müssen in Ihrer Umgebung installiert sein (pip install matplotlib geopandas contextily)
import matplotlib.pyplot as plt
import geopandas as gpd
import contextily as cx
import pandas as pd
from shapely.geometry import Point


In [58]:
# --- 1. SPARK-VERBINDUNG HERSTELLEN ---
try:
    CONNECTION_NAME = "se-aws-edl"
    conn = cmldata.get_connection(CONNECTION_NAME)
    spark = conn.get_spark_session()
    print("Spark-Session erfolgreich erstellt.")
except Exception as e:
    print(f"Fehler beim Erstellen der Spark-Session: {e}")
    print("Erstelle eine lokale Spark-Session als Fallback.")
    spark = SparkSession.builder.appName("DailyReportFallback").getOrCreate()


Spark Application Id:spark-4a34720e29cb4ec08cbd00d28992f52a
Spark-Session erfolgreich erstellt.


In [59]:
def get_latest_ship_data(spark_session: SparkSession, db_name: str = "defense"):
    """
    Frägt die letzte bekannte Position für jedes Schiff aus marine_vessel_status ab.
    """
    table_name = f"{db_name}.marine_vessel_status"
    print(f"Lese neueste Daten aus Tabelle: {table_name}")
    try:
        all_ships_df = spark_session.read.table(table_name)
        # Fensterfunktion, um nur den neuesten Status pro Schiff zu erhalten
        window_spec = Window.partitionBy("MMSI").orderBy(col("Event_Timestamp").desc())
        latest_ships_df = all_ships_df.withColumn("row_num", row_number().over(window_spec)) \
                                      .filter(col("row_num") == 1).drop("row_num")
        return latest_ships_df
    except Exception as e:
        print(f"Fehler bei der Abfrage von {table_name}: {e}")
        return None

def get_sanctioned_vessels(spark_session: SparkSession, db_name: str = "defense"):
    """Lädt die Liste der sanktionierten Schiffe."""
    table_name = f"{db_name}.sanctioned_vessels"
    print(f"Lese Daten aus Tabelle: {table_name}")
    return spark_session.sql(f"SELECT MMSI, Name, Sanction_Reason FROM {table_name} WHERE MMSI IS NOT NULL")

def get_latest_buoy_data(spark_session: SparkSession, db_name: str = "defense"):
    """Frägt die neuesten Bojen-Detektionsdaten ab."""
    table_name = f"{db_name}.buoy_data"
    print(f"Lese Daten aus Tabelle: {table_name}")
    try:
        df = spark_session.read.table(table_name)
        result_row_list = df.selectExpr("max(to_date(ts)) as max_date").collect()
        
        # Korrigierte Zeile: Greife auf das erste Element der Liste und dann auf den Feldnamen zu
        if not result_row_list or result_row_list[0]['max_date'] is None:
            return spark_session.createDataFrame([], df.schema)
        
        latest_date = result_row_list[0]['max_date']
        return df.withColumn("event_date", to_timestamp(col("ts"))).filter(f"to_date(event_date) = '{latest_date}'")
    except Exception as e:
        print(f"Fehler bei der Abfrage von {table_name}: {e}")
        return None


def get_latest_social_media_data(spark_session: SparkSession, db_name: str = "defense"):
    """Frägt die neuesten Social-Media-Daten ab."""
    table_name = f"{db_name}.social_media_messages"
    print(f"Lese Daten aus Tabelle: {table_name}")
    try:
        df = spark_session.read.table(table_name)
        result_row_list = df.selectExpr("max(to_date(ts)) as max_date").collect()
        
        # Corrected line
        if not result_row_list or result_row_list[0]['max_date'] is None:
            return spark_session.createDataFrame([], df.schema)
        
        latest_date = result_row_list[0]['max_date']
        return df.withColumn("event_date", to_timestamp(col("ts"))).filter(f"to_date(event_date) = '{latest_date}' AND priority IN ('hoch', 'mittel')")
    except Exception as e:
        print(f"Fehler bei der Abfrage von {table_name}: {e}")
        return None


def get_area_violations(spark_session: SparkSession, db_name: str = "defense"):
    """Frägt die View 'area_violation' ab."""
    view_name = f"{db_name}.area_violation"
    print(f"Frage View ab: {view_name}")
    try:
        return spark_session.sql(f"SELECT * FROM {view_name} ORDER BY event_timestamp DESC")
    except Exception as e:
        print(f"Warnung: Konnte View {view_name} nicht abfragen. Grund: {e}")
        return None

def get_buoys_near_harbours(spark_session: SparkSession, db_name: str = "defense"):
    """Frägt die View 'buoy_near_harbours' ab."""
    view_name = f"{db_name}.buoy_near_harbours"
    print(f"Frage View ab: {view_name}")
    try:
        return spark_session.sql(f"SELECT * FROM {view_name} ORDER BY ts DESC")
    except Exception as e:
        print(f"Warnung: Konnte View {view_name} nicht abfragen. Grund: {e}")
        return None

def get_german_fleet_status(spark_session: SparkSession, latest_ship_df, db_name: str = "defense"):
    """Kombiniert die statische Flottenliste mit den neuesten dynamischen Positions- und Statusdaten."""
    fleet_table = f"{db_name}.german_navy_fleet"
    print(f"Erstelle Flottenstatus aus '{fleet_table}' und Echtzeitdaten.")
    fleet_df = spark_session.read.table(fleet_table)

    fleet_status_df = fleet_df.join(latest_ship_df, fleet_df.mmsi == latest_ship_df.mmsi, "left") \
                              .select(
                                  fleet_df.name,
                                  fleet_df['class'].alias("klasse"),
                                  coalesce(latest_ship_df.status, lit("Keine Daten")).alias("status"),
                                  latest_ship_df.latitude.alias("latitude"),
                                  latest_ship_df.longitude.alias("longitude")
                              ).orderBy("klasse", "name")
    return fleet_status_df

In [60]:
# --- ind_ships_near_sanctioned ANALYSEFUNKTION ---
def find_ships_near_sanctioned(spark_session: SparkSession, latest_ship_df, sanctioned_info_df, distance_km=1):
    """Findet alle Schiffe, die sich innerhalb einer bestimmten Distanz zu einem sanktionierten Schiff befinden."""
    if latest_ship_df is None or sanctioned_info_df is None:
        return None

    sanctioned_positions_df = latest_ship_df.join(
        sanctioned_info_df, latest_ship_df.mmsi == sanctioned_info_df.MMSI, "inner"
    ).select(
        latest_ship_df.mmsi.alias("sanctioned_mmsi"),
        sanctioned_info_df.Name.alias("sanctioned_name"),
        latest_ship_df.latitude.alias("sanctioned_lat"),
        latest_ship_df.longitude.alias("sanctioned_lon")
    )

    if sanctioned_positions_df.count() == 0:
        print("Keine sanktionierten Schiffe mit aktuellen Positionsdaten gefunden.")
        return None

    nearby_ships_df = latest_ship_df.crossJoin(sanctioned_positions_df) \
        .filter(latest_ship_df.mmsi != col("sanctioned_mmsi"))

    haversine_expr = (
        lit(6371 * 2) * asin(sqrt(
            sin(radians(latest_ship_df.latitude) - radians(sanctioned_positions_df.sanctioned_lat)) / 2)**2 + \
            cos(radians(latest_ship_df.latitude)) * cos(radians(sanctioned_positions_df.sanctioned_lat)) * \
            sin(radians(latest_ship_df.longitude) - radians(sanctioned_positions_df.sanctioned_lon)) / 2)**2
        )

    result_df = nearby_ships_df.withColumn("distance", haversine_expr) \
                               .filter(col("distance") <= distance_km) \
                               .select(
                                   latest_ship_df.mmsi.alias("nearby_ship_mmsi"),
                                   col("sanctioned_mmsi"),
                                   col("sanctioned_name"),
                                   spark_round(col("distance"), 3).alias("distance_km")
                               ).orderBy("sanctioned_mmsi", "nearby_ship_mmsi")
    
    return result_df

In [61]:
# --- 3. NEUE FUNKTION ZUR KARTENERSTELLUNG ---
def create_fleet_map(fleet_status_df, filename="flottenkarte.png"):
    """
    Erstellt eine Karte mit den Positionen der Flotte und speichert sie als Bild.
    """
    if fleet_status_df is None:
        print("Keine Flottendaten für die Kartenerstellung vorhanden.")
        return None
        
    fleet_pd = fleet_status_df.toPandas()
    fleet_pd.dropna(subset=['latitude', 'longitude'], inplace=True)
    
    if fleet_pd.empty:
        print("Keine Schiffe mit gültigen Koordinaten gefunden. Karte wird nicht erstellt.")
        return None

    geometry = [Point(xy) for xy in zip(fleet_pd['longitude'], fleet_pd['latitude'])]
    gdf = gpd.GeoDataFrame(fleet_pd, geometry=geometry, crs="EPSG:4326")
    
    fig, ax = plt.subplots(1, 1, figsize=(15, 12))
    gdf_web_mercator = gdf.to_crs(epsg=3857)
    gdf_web_mercator.plot(ax=ax, marker='o', color='red', markersize=50, edgecolor='black', zorder=2, label='Position der Marineeinheiten')
    
    for x, y, label in zip(gdf_web_mercator.geometry.x, gdf_web_mercator.geometry.y, gdf_web_mercator.name):
        ax.text(x + 5000, y, label, fontsize=9, ha='left', zorder=3)
        
    cx.add_basemap(ax, source=cx.providers.OpenStreetMap.Mapnik, zoom=6)
    
    ax.set_title(f"Positionen der Deutschen Marine (Stand: {datetime.now().strftime('%Y-%m-%d %H:%M')})", fontsize=16)
    ax.set_axis_off()
    plt.legend()
    plt.tight_layout()
    
    plt.savefig(filename, dpi=300, bbox_inches='tight')
    plt.close(fig)
    print(f"Karte der Flottenpositionen wurde erfolgreich in '{filename}' gespeichert.")
    return filename



In [62]:
def create_critical_detections_map(critical_detections, filename="critical_detections_map.png"):
    """
    Erstellt eine Karte, die die Positionen der kritischen Detektionen anzeigt und speichert sie als Bild.
    """
    if not critical_detections:
        return None

    detections_data = []
    for row in critical_detections:
        try:
            lat = row['geo_position_lat']
            lon = row['geo_position_lon']
            obj_type = row['payload_object_type']
        except KeyError:
            continue

        if lat is not None and lon is not None:
            detections_data.append({
                'latitude': lat,
                'longitude': lon,
                'object_type': obj_type
            })

    if not detections_data:
        return None

    detections_pd = pd.DataFrame(detections_data)
    geometry = [Point(xy) for xy in zip(detections_pd['longitude'], detections_pd['latitude'])]
    gdf = gpd.GeoDataFrame(detections_pd, geometry=geometry, crs="EPSG:4326")

    fig, ax = plt.subplots(1, 1, figsize=(15, 12))
    gdf_web_mercator = gdf.to_crs(epsg=3857)

    unique_object_types = gdf_web_mercator['object_type'].unique()
    colors = plt.cm.get_cmap('viridis', len(unique_object_types))
    for i, obj_type in enumerate(unique_object_types):
        subset = gdf_web_mercator[gdf_web_mercator['object_type'] == obj_type]
        subset.plot(ax=ax, marker='X', markersize=200, color=colors(i), edgecolor='white', zorder=3, label=f'Typ: {obj_type.upper()}')

    cx.add_basemap(ax, source=cx.providers.OpenStreetMap.Mapnik)
    ax.set_title('Positionen kritischer Detektionen', fontsize=16)
    ax.set_axis_off()
    plt.legend()
    plt.tight_layout()
    plt.savefig(filename, dpi=300, bbox_inches='tight')
    plt.close(fig)

    return filename

In [63]:
# The main create_daily_report function
def create_daily_report(spark_session: SparkSession):
    """Führt alle Analysen durch und generiert den Textbericht sowie die Karte."""
    
    # 1. Daten abrufen
    # ...
    latest_ship_df = get_latest_ship_data(spark_session)
    if latest_ship_df is None:
        print("Kritischer Fehler: Konnte keine Schiffsdaten laden. Bericht wird abgebrochen.")
        return
    latest_ship_df.cache()

    sanctioned_df = get_sanctioned_vessels(spark_session)
    buoy_df = get_latest_buoy_data(spark_session)
    social_media_df = get_latest_social_media_data(spark_session)
    area_violations_df = get_area_violations(spark_session)
    buoys_near_harbours_df = get_buoys_near_harbours(spark_session)
    german_fleet_status_df = get_german_fleet_status(spark_session, latest_ship_df)

    # 2. Analysen durchführen
    active_ship_count = latest_ship_df.count()
    suspicious_activities = latest_ship_df.filter((col("Status").isin("Moored", "At anchor")) & (col("Speed") > 1.0)).collect()
    nearby_sanctioned_df = find_ships_near_sanctioned(spark_session, latest_ship_df, sanctioned_df)
    
    fleet_status = german_fleet_status_df.collect() if german_fleet_status_df else []
    critical_detections = buoy_df.filter(col("payload_object_type").isin("SUBMARINE", "ORDNANCE")).collect() if buoy_df else []
    relevant_tweets = social_media_df.collect() if social_media_df else []
    area_violations = area_violations_df.collect() if area_violations_df else []
    buoys_near_harbours = buoys_near_harbours_df.collect() if buoys_near_harbours_df else []
    nearby_sanctioned = nearby_sanctioned_df.collect() if nearby_sanctioned_df else []
  
    # 3. Visuelle Ausgabe (Karte)
    fleet_map_path = create_fleet_map(german_fleet_status_df)
    critical_detections_map_path = create_critical_detections_map(critical_detections)

    # 4. Text-Bericht formatieren
    report = f"""
=====================================================
            TÄGLICHES LAGEBILD (via Spark)
=====================================================
Datum: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Datenbank: defense
-----------------------------------------------------

1. ALLGEMEINE LAGE
-------------------
- Aktive maritime Einheiten im Datensatz: {active_ship_count}
- Kritische Unterwasser-Detektionen heute: {len(critical_detections)}
- Relevante Social-Media-Meldungen heute: {len(relevant_tweets)}

2. FLOTTENSTATUS DEUTSCHE MARINE
----------------------------------
"""
    # Embed the fleet map
    if fleet_map_path:
        report += f"![Positionen der Deutschen Marine]({fleet_map_path})\n\n"
    else:
        report += "- Flottenstatus konnte nicht ermittelt werden.\n"

    # Report text for fleet status
    if fleet_status:
        current_class = ""
        for ship in fleet_status:
            if ship['klasse'] != current_class:
                current_class = ship['klasse']
                report += f"\n--- {current_class.upper()} ---\n"
            report += f"- {ship['name']:<30} | Status: {ship['status']:<25} | Position: ({ship['latitude']}, {ship['longitude']})\n"
    else: report += "- Flottenstatus konnte nicht ermittelt werden.\n"

    report += "\n3. RISIKOANALYSE: SCHIFFE IN DER NÄHE SANKTIONIERTER EINHEITEN (< 1 km)\n-------------------------------------------------------------------------\n"
    if nearby_sanctioned:
        for row in nearby_sanctioned:
            report += (f"- Schiff MMSI {row['nearby_ship_mmsi']} ist {row['distance_km']:.3f} km entfernt von "
                       f"sanktionierter Einheit '{row['sanctioned_name']}' (MMSI {row['sanctioned_mmsi']}).\n")
    else: report += "- Keine Schiffe in unmittelbarer Nähe (< 1 km) zu bekannten sanktionierten Einheiten detektiert.\n"

    report += "\n4. MARITIME LAGE: VERDÄCHTIGE AKTIVITÄTEN\n--------------------------------------------\n"
    if suspicious_activities:
        for row in suspicious_activities:
            report += f"- Inkonsistente Bewegung: MMSI {row['mmsi']} meldet '{row['status']}', bewegt sich aber mit {row['speed']:.1f} Knoten.\n"
    else: report += "- Keine verdächtigen maritimen Aktivitäten identifiziert.\n"

    report += "\n5. ÜBERWACHUNGSGEBIETE: GEBIETSÜBERGÄNGE\n---------------------------------------------\n"
    if area_violations:
        for row in area_violations:
            report += f"- {row['event_timestamp']}: MMSI {row['mmsi']} hat Gebiet '{row['area_id']}' betreten/verlassen (Aktion: {row['transition']}).\n"
    else: report += "- Keine neuen Gebietsübergänge in den definierten Beobachtungsgebieten festgestellt.\n"

    report += "\n6. UNTERWASSERLAGE: KRITISCHE DETEKTIONEN\n-------------------------------------------\n"
    
    # Embed the critical detections map
    if critical_detections_map_path:
        report += f"![Karte kritischer Detektionen]({critical_detections_map_path})\n\n"
    
    if critical_detections:
        for row in critical_detections:
            confidence_value = "N/A"
            try:
                confidence_value = row['payload_object_confidence']
            except KeyError:
                pass
            
            report += (f"- Detektion bei ({row['geo_position_lat']:.4f}, {row['geo_position_lon']:.4f}): Typ '{row['payload_object_type']}', "
                       f"Klassifizierung '{row['payload_object_classification']}' mit {confidence_value}% Konfidenz.\n")
    else: report += "- Keine kritischen Unterwasserobjekte (U-Boote/Kampfmittel) detektiert.\n"

    report += "\n7. UNTERWASSERLAGE: DETEKTIONEN IN HAFENNÄHE\n------------------------------------------------\n"
    if buoys_near_harbours:
        for row in buoys_near_harbours:
            report += (f"- Boje {row['buoyid']} meldet Objekt '{row['payload_object_type']}' {row['distance_km']:.2f} km von Hafen {row['harbor_name']}.\n")
    else: report += "- Keine Bojen-Detektionen innerhalb von 10 km zu bekannten Häfen.\n"

    report += "\n8. SOCIAL MEDIA INTELLIGENCE (OSINT)\n--------------------------------------\n"
    if relevant_tweets:
        for row in relevant_tweets:
            report += f"- Tweet (Priorität: {row['priority'].upper()}) von @{row['user_username']}: \"{row['tweet']}\"\n"
    else: report += "- Keine Social-Media-Meldungen mit hoher oder mittlerer Priorität.\n"
        
    report += "\n==================== ENDE DES BERICHTS ====================\n"
    
    latest_ship_df.unpersist()
    return report

In [64]:
if __name__ == "__main__":
    print("Starte die Erstellung des täglichen Lagebilds...")
    daily_report_text = create_daily_report(spark)
    print(daily_report_text)
    spark.stop()
    print("Spark-Session beendet.")


Starte die Erstellung des täglichen Lagebilds...
Lese neueste Daten aus Tabelle: defense.marine_vessel_status


Hive Session ID = e0808921-e625-4520-8386-f38951f12b26


Lese Daten aus Tabelle: defense.sanctioned_vessels
Lese Daten aus Tabelle: defense.buoy_data


                                                                                

Lese Daten aus Tabelle: defense.social_media_messages


                                                                                

Frage View ab: defense.area_violation
Warnung: Konnte View defense.area_violation nicht abfragen. Grund: [UNRESOLVED_ROUTINE] Cannot resolve function `ST_GeomFromText` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`].; line 4 pos 8
Frage View ab: defense.buoy_near_harbours
Erstelle Flottenstatus aus 'defense.german_navy_fleet' und Echtzeitdaten.


                                                                                

Keine sanktionierten Schiffe mit aktuellen Positionsdaten gefunden.


                                                                                

Karte der Flottenpositionen wurde erfolgreich in 'flottenkarte.png' gespeichert.


  colors = plt.cm.get_cmap('viridis', len(unique_object_types))



            TÄGLICHES LAGEBILD (via Spark)
Datum: 2025-09-17 21:34:34
Datenbank: defense
-----------------------------------------------------

1. ALLGEMEINE LAGE
-------------------
- Aktive maritime Einheiten im Datensatz: 50
- Kritische Unterwasser-Detektionen heute: 299
- Relevante Social-Media-Meldungen heute: 3

2. FLOTTENSTATUS DEUTSCHE MARINE
----------------------------------
![Positionen der Deutschen Marine](flottenkarte.png)


--- FLOTTENDIENSTBOOTE ---
- A50 Oker                       | Status: Moored                    | Position: (54.41481, 18.08236)
- A52 Oste                       | Status: Moored                    | Position: (54.19982, 12.1676)
- A53 Alster                     | Status: Moored                    | Position: (56.08067, 16.56429)

--- FREGATTEN ---
- FGS Baden-Württemberg          | Status: Moored                    | Position: (53.89909, 12.82041)
- FGS Bayern                     | Status: Moored                    | Position: (56.75677, 19.79897)
-