In [1]:
# Ben√∂tigte Pakete installieren
!pip install paho-mqtt pandas numpy scikit-learn matplotlib seaborn plotly

import paho.mqtt.client as mqtt
import json
import time
import threading
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Machine Learning Importe
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.cluster import KMeans

# Plotting einrichten
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")



In [5]:
import subprocess, time, atexit, os, signal

# Start mosquitto broker on localhost:1883
mosq_proc = subprocess.Popen(
    ["mosquitto", "-p", "1883", "-v"]  # -v = verbose logs in the cell
)

def stop_mosquitto():
    if mosq_proc.poll() is None:  # still running
        mosq_proc.terminate()
        try:
            mosq_proc.wait(timeout=5)
        except subprocess.TimeoutExpired:
            mosq_proc.kill()

atexit.register(stop_mosquitto)

time.sleep(1)
print("Mosquitto started with PID", mosq_proc.pid)


FileNotFoundError: [WinError 2] The system cannot find the file specified

In [None]:
# MQTT-Konfiguration
MQTT_BROKER = "127.0.0.1"  # Selbstreferenzierende IP-Adresse
MQTT_PORT = 1883
MQTT_KEEPALIVE = 60

# Themen f√ºr unsere Sensoren
TOPICS = {
    'temperature': 'tutorial/sensors/temperature',
    'humidity': 'tutorial/sensors/humidity',
    'pressure': 'tutorial/sensors/pressure',
    'vibration': 'tutorial/sensors/vibration'
}

# Globale Datenspeicherung
sensor_data = []
data_lock = threading.Lock()

print(f"MQTT Broker: {MQTT_BROKER}:{MQTT_PORT}")
print(f"Themen: {list(TOPICS.values())}")

In [None]:
class MQTTDataCollector:
    def __init__(self, broker, port, topics):
        self.broker = broker
        self.port = port
        self.topics = topics
        self.client = mqtt.Client()
        self.data = []
        self.is_connected = False
        
        # Callbacks einrichten
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self.client.on_disconnect = self.on_disconnect
    
    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print("Erfolgreich mit dem MQTT-Broker verbunden!")
            self.is_connected = True
            # Alle Themen abonnieren
            for topic_name, topic in self.topics.items():
                client.subscribe(topic)
                print(f"Abonniert: {topic}")
        else:
            print(f"Verbindung zum MQTT-Broker fehlgeschlagen. R√ºckgabecode: {rc}")
    
    def on_message(self, client, userdata, msg):
        try:
            # Nachricht dekodieren
            topic = msg.topic
            payload = json.loads(msg.payload.decode())
            
            # Zeitstempel hinzuf√ºgen
            payload['timestamp'] = datetime.now().isoformat()
            payload['topic'] = topic
            
            # Daten threadsicher speichern
            with data_lock:
                self.data.append(payload)
                sensor_data.append(payload)
            
            print(f"Empfangen: {topic} -> {payload}")
            
        except Exception as e:
            print(f"Fehler bei der Verarbeitung der Nachricht: {e}")
    
    def on_disconnect(self, client, userdata, rc):
        self.is_connected = False
        print("Vom MQTT-Broker getrennt")
    
    def connect(self):
        try:
            self.client.connect(self.broker, self.port, MQTT_KEEPALIVE)
            self.client.loop_start()
            return True
        except Exception as e:
            print(f"Verbindungsfehler: {e}")
            return False
    
    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()
    
    def get_data_as_dataframe(self):
        with data_lock:
            if self.data:
                return pd.DataFrame(self.data)
            else:
                return pd.DataFrame()

# Collector-Instanz erstellen
collector = MQTTDataCollector(MQTT_BROKER, MQTT_PORT, TOPICS)
print("MQTT Data Collector initialisiert")

In [3]:
class MockSensorPublisher:
    def __init__(self, broker, port, topics):
        self.broker = broker
        self.port = port
        self.topics = topics
        self.client = mqtt.Client()
        self.publishing = False
        
        # Sensorparameter f√ºr realistische Daten
        self.sensor_params = {
            'temperature': {'mean': 22.0, 'std': 3.0, 'min': 15, 'max': 35},
            'humidity': {'mean': 45.0, 'std': 10.0, 'min': 20, 'max': 80},
            'pressure': {'mean': 1013.25, 'std': 5.0, 'min': 990, 'max': 1040},
            'vibration': {'mean': 0.1, 'std': 0.05, 'min': 0, 'max': 1.0}
        }
    
    def connect(self):
        try:
            self.client.connect(self.broker, self.port, MQTT_KEEPALIVE)
            self.client.loop_start()
            print("Publisher mit MQTT-Broker verbunden")
            return True
        except Exception as e:
            print(f"Publisher-Verbindungsfehler: {e}")
            return False
    
    def generate_sensor_reading(self, sensor_type):
        """Realistische Sensormesswerte mit einigen Anomalien generieren"""
        params = self.sensor_params[sensor_type]
        
        # 95% normale Messwerte, 5% Anomalien
        if np.random.random() < 0.95:
            # Normaler Messwert
            value = np.random.normal(params['mean'], params['std'])
            anomaly = False
        else:
            # Anomaler Messwert
            if np.random.random() < 0.5:
                value = params['min'] - np.random.uniform(0, 5)  # Unter dem Normalwert
            else:
                value = params['max'] + np.random.uniform(0, 10)  # √úber dem Normalwert
            anomaly = True
        
        # Auf vern√ºnftige Grenzen beschr√§nken
        value = np.clip(value, params['min'] - 10, params['max'] + 15)
        
        return {
            'sensor_type': sensor_type,
            'value': round(value, 2),
            'unit': self.get_unit(sensor_type),
            'device_id': f"{sensor_type}_sensor_001",
            'anomaly': anomaly,
            'reading_time': datetime.now().isoformat()
        }
    
    def get_unit(self, sensor_type):
        units = {
            'temperature': '¬∞C',
            'humidity': '%',
            'pressure': 'hPa',
            'vibration': 'g'
        }
        return units.get(sensor_type, '')
    
    def publish_sensor_data(self, duration_seconds=60, interval_seconds=2):
        """Sensordaten f√ºr die angegebene Dauer ver√∂ffentlichen"""
        self.publishing = True
        start_time = time.time()
        
        print(f"Beginne mit der Ver√∂ffentlichung von Sensordaten f√ºr {duration_seconds} Sekunden...")
        
        while self.publishing and (time.time() - start_time) < duration_seconds:
            for sensor_type, topic in self.topics.items():
                reading = self.generate_sensor_reading(sensor_type)
                
                # An MQTT ver√∂ffentlichen
                self.client.publish(topic, json.dumps(reading))
                print(f"Ver√∂ffentlicht {sensor_type}: {reading['value']} {reading['unit']}")
            
            time.sleep(interval_seconds)
        
        print("Ver√∂ffentlichung der Sensordaten beendet")
        self.publishing = False
    
    def stop_publishing(self):
        self.publishing = False
    
    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

# Publisher-Instanz erstellen
publisher = MockSensorPublisher(MQTT_BROKER, MQTT_PORT, TOPICS)
print("Mock Sensor Publisher initialisiert")

NameError: name 'MQTT_BROKER' is not defined

In [4]:
# Publisher und Collector verbinden
print("Verbinde mit MQTT-Broker...")

# Zuerst den Collector verbinden
if collector.connect():
    time.sleep(2)  # Warten, bis sich die Verbindung stabilisiert hat
    
    # Publisher verbinden
    if publisher.connect():
        time.sleep(1)
        
        # Datenver√∂ffentlichung in einem separaten Thread starten
        def publish_data():
            publisher.publish_sensor_data(duration_seconds=5, interval_seconds=1)
        
        publish_thread = threading.Thread(target=publish_data)
        publish_thread.start()
        
        print("\nDatenerfassung gestartet. Sammle f√ºr 5 Sekunden...")
        print("Beobachten Sie die Echtzeitdaten unten:")
        
        # Warten, bis die Ver√∂ffentlichung abgeschlossen ist
        publish_thread.join()
        
        # Einen Moment f√ºr die letzten Nachrichten geben
        time.sleep(2)
        
        print("\nDatenerfassung abgeschlossen!")
    else:
        print("Verbindung des Publishers fehlgeschlagen")
else:
    print("Verbindung des Collectors fehlgeschlagen")

Verbinde mit MQTT-Broker...


NameError: name 'collector' is not defined

In [None]:
# Gesammelte Daten als DataFrame abrufen
df = collector.get_data_as_dataframe()

if not df.empty:
    print(f"Gesammelt: {len(df)} Sensormesswerte")
    print(f"Datenform: {df.shape}")
    print("\nErste paar Datens√§tze:")
    print(df.head())
    
    print("\nDatentypen:")
    print(df.dtypes)
    
    print("\nVerteilung der Sensortypen:")
    print(df['sensor_type'].value_counts())
else:
    print("Keine Daten gesammelt. Stellen Sie sicher, dass die MQTT-Verbindung erfolgreich war.")

In [None]:
# Datenvorverarbeitung und Visualisierung
if not df.empty:
    # Zeitstempel in datetime umwandeln
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['reading_time'] = pd.to_datetime(df['reading_time'])
    
    # Visualisierungen erstellen
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('Echtzeit-Sensordaten-Visualisierung', fontsize=16)
    
    sensor_types = df['sensor_type'].unique()
    
    for i, sensor_type in enumerate(sensor_types[:4]):
        row = i // 2
        col = i % 2
        
        sensor_data_subset = df[df['sensor_type'] == sensor_type]
        
        # Zeitreihen-Plot
        axes[row, col].plot(sensor_data_subset['timestamp'], 
                           sensor_data_subset['value'], 
                           marker='o', linewidth=2, markersize=4)
        axes[row, col].set_title(f'{sensor_type.title()}-Messwerte')
        axes[row, col].set_xlabel('Zeit')
        axes[row, col].set_ylabel(f'Wert ({sensor_data_subset.iloc[0]["unit"]})')
        axes[row, col].grid(True, alpha=0.3)
        
        # Anomalien hervorheben
        anomalies = sensor_data_subset[sensor_data_subset['anomaly'] == True]
        if not anomalies.empty:
            axes[row, col].scatter(anomalies['timestamp'], 
                                 anomalies['value'], 
                                 color='red', s=50, alpha=0.7, 
                                 label='Anomalien')
            axes[row, col].legend()
    
    plt.tight_layout()
    plt.show()
    
    # Zusammenfassende Statistiken
    print("\nZusammenfassende Statistiken nach Sensortyp:")
    summary_stats = df.groupby('sensor_type')['value'].agg([
        'count', 'mean', 'std', 'min', 'max'
    ]).round(2)
    print(summary_stats)
    
    # Zusammenfassung der Anomalieerkennung
    print("\nZusammenfassung der Anomalieerkennung:")
    anomaly_summary = df.groupby('sensor_type')['anomaly'].agg([
        'sum', 'count', lambda x: (x.sum() / len(x) * 100)
    ])
    anomaly_summary.columns = ['Anomalien', 'Gesamtmesswerte', 'Anomalie_Prozentsatz']
    anomaly_summary['Anomalie_Prozentsatz'] = anomaly_summary['Anomalie_Prozentsatz'].round(2)
    print(anomaly_summary)

In [None]:
class SensorMLPipeline:
    def __init__(self):
        self.anomaly_detector = IsolationForest(contamination=0.1, random_state=42)
        self.classifier = RandomForestClassifier(n_estimators=100, random_state=42)
        self.scaler = StandardScaler()
        self.is_trained = False
    
    def prepare_features(self, df):
        """Merkmale f√ºr das maschinelle Lernen vorbereiten"""
        if df.empty:
            return pd.DataFrame()
        
        # Merkmalsmatrix erstellen
        features_df = df.copy()
        
        # Zeitbasierte Merkmale
        features_df['hour'] = pd.to_datetime(features_df['timestamp']).dt.hour
        features_df['minute'] = pd.to_datetime(features_df['timestamp']).dt.minute
        
        # Gleitende Statistiken (wenn gen√ºgend Daten vorhanden sind)
        if len(features_df) > 5:
            features_df = features_df.sort_values('timestamp')
            features_df['rolling_mean_3'] = features_df.groupby('sensor_type')['value'].rolling(3, min_periods=1).mean().reset_index(0, drop=True)
            features_df['rolling_std_3'] = features_df.groupby('sensor_type')['value'].rolling(3, min_periods=1).std().fillna(0).reset_index(0, drop=True)
        else:
            features_df['rolling_mean_3'] = features_df['value']
            features_df['rolling_std_3'] = 0
        
        # Verz√∂gerungsmerkmale (Lag-Features)
        features_df['value_lag1'] = features_df.groupby('sensor_type')['value'].shift(1).fillna(features_df['value'])
        
        return features_df
    
    def train_anomaly_detector(self, df):
        """Anomalieerkennungsmodell trainieren"""
        features_df = self.prepare_features(df)
        
        if features_df.empty:
            print("Keine Daten zum Trainieren verf√ºgbar")
            return
        
        # Merkmale f√ºr die Anomalieerkennung ausw√§hlen
        feature_cols = ['value', 'hour', 'minute', 'rolling_mean_3', 'rolling_std_3', 'value_lag1']
        X = features_df[feature_cols].fillna(0)
        
        # Merkmale skalieren
        X_scaled = self.scaler.fit_transform(X)
        
        # Anomalieerkennungsmodell trainieren
        self.anomaly_detector.fit(X_scaled)
        
        print("Anomalieerkennungsmodell erfolgreich trainiert")
        return X_scaled
    
    def train_classifier(self, df):
        """Sensortyp-Klassifikator trainieren"""
        features_df = self.prepare_features(df)
        
        if features_df.empty:
            print("Keine Daten zum Trainieren verf√ºgbar")
            return
        
        # Merkmale f√ºr die Klassifizierung ausw√§hlen
        feature_cols = ['value', 'hour', 'minute', 'rolling_mean_3', 'rolling_std_3', 'value_lag1']
        X = features_df[feature_cols].fillna(0)
        y = features_df['sensor_type']
        
        # Daten aufteilen
        if len(X) > 10:  # Nur aufteilen, wenn gen√ºgend Daten vorhanden sind
            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)
        else:
            X_train, X_test, y_train, y_test = X, X, y, y
        
        # Klassifikator trainieren
        self.classifier.fit(X_train, y_train)
        
        # Evaluieren
        y_pred = self.classifier.predict(X_test)
        
        print("Sensortyp-Klassifikator erfolgreich trainiert")
        print("\nKlassifizierungsbericht:")
        print(classification_report(y_test, y_pred))
        
        self.is_trained = True
        return X_train, X_test, y_train, y_test, y_pred
    
    def detect_anomalies(self, df):
        """Anomalien in neuen Daten erkennen"""
        features_df = self.prepare_features(df)
        
        if features_df.empty:
            return pd.DataFrame()
        
        feature_cols = ['value', 'hour', 'minute', 'rolling_mean_3', 'rolling_std_3', 'value_lag1']
        X = features_df[feature_cols].fillna(0)
        
        # Merkmale skalieren
        X_scaled = self.scaler.transform(X)
        
        # Anomalien vorhersagen (-1 = Anomalie, 1 = normal)
        anomaly_predictions = self.anomaly_detector.predict(X_scaled)
        anomaly_scores = self.anomaly_detector.score_samples(X_scaled)
        
        features_df['predicted_anomaly'] = anomaly_predictions == -1
        features_df['anomaly_score'] = anomaly_scores
        
        return features_df
    
    def predict_sensor_type(self, df):
        """Sensortyp f√ºr neue Daten vorhersagen"""
        if not self.is_trained:
            print("Modell noch nicht trainiert")
            return None
        
        features_df = self.prepare_features(df)
        
        if features_df.empty:
            return pd.DataFrame()
        
        feature_cols = ['value', 'hour', 'minute', 'rolling_mean_3', 'rolling_std_3', 'value_lag1']
        X = features_df[feature_cols].fillna(0)
        
        # Sensortypen vorhersagen
        predictions = self.classifier.predict(X)
        probabilities = self.classifier.predict_proba(X)
        
        features_df['predicted_sensor_type'] = predictions
        
        return features_df

# ML-Pipeline initialisieren
ml_pipeline = SensorMLPipeline()
print("Machine-Learning-Pipeline initialisiert")

In [None]:
# Modelle trainieren, wenn Daten vorhanden sind
if not df.empty:
    print("Trainiere Anomalieerkennungsmodell...")
    ml_pipeline.train_anomaly_detector(df)
    
    print("\nTrainiere Sensortyp-Klassifikator...")
    ml_pipeline.train_classifier(df)
    
    print("\nModelle erfolgreich trainiert!")
else:
    print("Keine Daten zum Trainieren verf√ºgbar. Bitte f√ºhren Sie zuerst den Abschnitt zur Datenerfassung aus.")

In [None]:
# Anomalieerkennung demonstrieren
if not df.empty and ml_pipeline.is_trained:
    print("F√ºhre Anomalieerkennung auf gesammelten Daten durch...")
    
    # Anomalien erkennen
    anomaly_results = ml_pipeline.detect_anomalies(df)
    
    # Echte vs. vorhergesagte Anomalien vergleichen
    comparison = pd.DataFrame({
        'sensor_type': anomaly_results['sensor_type'],
        'value': anomaly_results['value'],
        'true_anomaly': anomaly_results['anomaly'],
        'predicted_anomaly': anomaly_results['predicted_anomaly'],
        'anomaly_score': anomaly_results['anomaly_score']
    })
    
    print("\nErgebnisse der Anomalieerkennung:")
    print(comparison.head(10))
    
    # Genauigkeit berechnen
    accuracy = (comparison['true_anomaly'] == comparison['predicted_anomaly']).mean()
    print(f"\nGenauigkeit der Anomalieerkennung: {accuracy:.2%}")
    
    # Ergebnisse der Anomalieerkennung visualisieren
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('Ergebnisse der Anomalieerkennung nach Sensortyp', fontsize=16)
    
    for i, sensor_type in enumerate(anomaly_results['sensor_type'].unique()[:4]):
        row = i // 2
        col = i % 2
        
        sensor_subset = anomaly_results[anomaly_results['sensor_type'] == sensor_type]
        
        # Normale Punkte plotten
        normal_points = sensor_subset[~sensor_subset['predicted_anomaly']]
        axes[row, col].scatter(range(len(normal_points)), normal_points['value'], 
                              c='blue', alpha=0.6, label='Normal')
        
        # Anomalien plotten
        anomaly_points = sensor_subset[sensor_subset['predicted_anomaly']]
        if not anomaly_points.empty:
            axes[row, col].scatter(range(len(anomaly_points)), anomaly_points['value'], 
                                  c='red', alpha=0.8, label='Anomalie', s=60)
        
        axes[row, col].set_title(f'{sensor_type.title()} Anomalieerkennung')
        axes[row, col].set_xlabel('Messungsindex')
        axes[row, col].set_ylabel('Sensorwert')
        axes[row, col].legend()
        axes[row, col].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

In [None]:
class RealTimeMQTTMLSystem:
    def __init__(self, ml_pipeline, mqtt_collector):
        self.ml_pipeline = ml_pipeline
        self.mqtt_collector = mqtt_collector
        self.predictions = []
        self.running = False
    
    def process_real_time_data(self, duration_seconds=30):
        """Eingehende MQTT-Daten in Echtzeit verarbeiten"""
        self.running = True
        start_time = time.time()
        
        print(f"Starte Echtzeitverarbeitung f√ºr {duration_seconds} Sekunden...")
        
        while self.running and (time.time() - start_time) < duration_seconds:
            # Neueste Daten abrufen
            current_data = self.mqtt_collector.get_data_as_dataframe()
            
            if not current_data.empty and len(current_data) > 0:
                # Die aktuellsten Messwerte abrufen
                recent_data = current_data.tail(4)  # Letzte 4 Messwerte
                
                if self.ml_pipeline.is_trained:
                    # Anomalien erkennen
                    anomaly_results = self.ml_pipeline.detect_anomalies(recent_data)
                    
                    # Jede aktuelle Messung verarbeiten
                    for _, row in anomaly_results.iterrows():
                        prediction = {
                            'timestamp': row['timestamp'],
                            'sensor_type': row['sensor_type'],
                            'value': row['value'],
                            'predicted_anomaly': row['predicted_anomaly'],
                            'anomaly_score': row['anomaly_score']
                        }
                        
                        self.predictions.append(prediction)
                        
                        # Alarmieren, wenn eine Anomalie erkannt wird
                        if row['predicted_anomaly']:
                            print(f"üö® ANOMALIE ERKANNT: {row['sensor_type']} = {row['value']} (Score: {row['anomaly_score']:.3f})")
                        else:
                            print(f"‚úÖ Normal: {row['sensor_type']} = {row['value']}")
            
            time.sleep(2)  # Alle 2 Sekunden pr√ºfen
        
        self.running = False
        print("\nEchtzeitverarbeitung abgeschlossen!")
    
    def get_predictions_summary(self):
        """Zusammenfassung der getroffenen Vorhersagen abrufen"""
        if not self.predictions:
            return "Noch keine Vorhersagen getroffen"
        
        pred_df = pd.DataFrame(self.predictions)
        
        summary = {
            'total_predictions': len(pred_df),
            'anomalies_detected': pred_df['predicted_anomaly'].sum(),
            'anomaly_rate': pred_df['predicted_anomaly'].mean() * 100,
            'sensors_monitored': pred_df['sensor_type'].nunique()
        }
        
        return summary

# Echtzeitsystem erstellen
if ml_pipeline.is_trained:
    realtime_system = RealTimeMQTTMLSystem(ml_pipeline, collector)
    print("Echtzeit-MQTT-ML-System initialisiert")
else:
    print("Bitte trainieren Sie zuerst die ML-Pipeline")

In [None]:
# Echtzeitverarbeitung demonstrieren
if 'realtime_system' in locals() and ml_pipeline.is_trained:
    print("Starte Echtzeit-Demo...")
    
    # Neue Daten im Hintergrund ver√∂ffentlichen
    def publish_demo_data():
        publisher.publish_sensor_data(duration_seconds=35, interval_seconds=1.5)
    
    # Echtzeitverarbeitung im Hintergrund starten
    def process_demo_data():
        realtime_system.process_real_time_data(duration_seconds=30)
    
    # Beide parallel ausf√ºhren
    publish_thread = threading.Thread(target=publish_demo_data)
    process_thread = threading.Thread(target=process_demo_data)
    
    publish_thread.start()
    time.sleep(1)  # Kleine Verz√∂gerung
    process_thread.start()
    
    # Auf Abschluss warten
    process_thread.join()
    publish_thread.join()
    
    # Zusammenfassung anzeigen
    summary = realtime_system.get_predictions_summary()
    print("\n" + "="*50)
    print("ZUSAMMENFASSUNG DER ECHTZEITVERARBEITUNG")
    print("="*50)
    for key, value in summary.items():
        print(f"{key.replace('_', ' ').title()}: {value}")
else:
    print("Echtzeitsystem nicht verf√ºgbar. Bitte f√ºhren Sie zuerst die vorherigen Abschnitte aus.")

## 7. √úbungsaufgaben {#practice}

Jetzt ist es Zeit zu √ºben! Hier sind einige √úbungen, um Ihr Verst√§ndnis zu testen.

### √úbungsaufgabe 1: Integration eines benutzerdefinierten Sensors

**Aufgabe**: F√ºgen Sie einen neuen Sensortyp namens "Licht" hinzu, der die Lichtintensit√§t (0-1000 Lux) misst.

**Anforderungen**:
1. F√ºgen Sie den neuen Sensor zum `TOPICS`-W√∂rterbuch hinzu
2. Aktualisieren Sie den `MockSensorPublisher`, um Lichtsensordaten zu generieren
3. Sammeln Sie Daten f√ºr diesen neuen Sensor
4. Trainieren Sie die ML-Modelle neu, um den neuen Sensortyp einzubeziehen

**Starter-Code**:

In [None]:
# √úbungsaufgabe 1 - L√∂sung

# Schritt 1: Lichtsensor zu den Themen hinzuf√ºgen
TOPICS_EXTENDED = TOPICS.copy()
TOPICS_EXTENDED['light'] = 'tutorial/sensors/light'

# Schritt 2: Sensorparameter aktualisieren
# Wir erstellen eine neue Instanz und f√ºgen den Parameter manuell hinzu
publisher_extended = MockSensorPublisher(MQTT_BROKER, MQTT_PORT, TOPICS_EXTENDED)
publisher_extended.sensor_params['light'] = {'mean': 500.0, 'std': 100.0, 'min': 0, 'max': 1000}

# Helper f√ºr Einheit patchen
original_get_unit = publisher_extended.get_unit
def get_unit_extended(sensor_type):
    if sensor_type == 'light':
        return 'Lux'
    return original_get_unit(sensor_type)
publisher_extended.get_unit = get_unit_extended

# Schritt 3: Neuen Collector mit erweiterten Themen erstellen
collector_extended = MQTTDataCollector(MQTT_BROKER, MQTT_PORT, TOPICS_EXTENDED)

# Schritt 4: Daten sammeln und Modelle neu trainieren
print("Verbinde erweiterte Komponenten...")
if collector_extended.connect():
    time.sleep(1)
    if publisher_extended.connect():
        time.sleep(1)
        
        print("Sammle Daten f√ºr alle Sensoren inklusive Licht...")
        
        # Thread f√ºr das Publishen
        def publish_extended():
            publisher_extended.publish_sensor_data(duration_seconds=10, interval_seconds=0.5)
            
        pub_thread = threading.Thread(target=publish_extended)
        pub_thread.start()
        pub_thread.join()
        
        time.sleep(2)
        
        # Daten abrufen
        df_extended = collector_extended.get_data_as_dataframe()
        
        if not df_extended.empty:
            print(f"\nGesammelte Daten: {len(df_extended)} Zeilen")
            print("Sensortypen:", df_extended['sensor_type'].unique())
            
            # Modelle neu trainieren
            print("\nTrainiere Modelle mit neuen Daten...")
            ml_pipeline_extended = SensorMLPipeline()
            ml_pipeline_extended.train_anomaly_detector(df_extended)
            ml_pipeline_extended.train_classifier(df_extended)
            
            # Aufr√§umen
            collector_extended.disconnect()
            publisher_extended.disconnect()
            print("\n√úbungsaufgabe 1 erfolgreich abgeschlossen!")
        else:
            print("Keine Daten gesammelt.")
    else:
        print("Publisher Verbindung fehlgeschlagen")
else:
    print("Collector Verbindung fehlgeschlagen")

### √úbungsaufgabe 2: Fortgeschrittene Anomalieerkennung

**Aufgabe**: Implementieren Sie ein anspruchsvolleres Anomalieerkennungssystem, das Folgendes ber√ºcksichtigt:
1. Korrelationen zwischen Sensoren (z. B. sind Temperatur und Luftfeuchtigkeit oft korreliert)
2. Zeitbasierte Muster (z. B. variiert die Temperatur je nach Tageszeit)
3. Saisonale Trends

**Anforderungen**:
1. Erstellen Sie Merkmale, die Sensorkorrelationen erfassen
2. F√ºgen Sie zeitbasierte Merkmale hinzu (Stunde, Wochentag usw.)
3. Implementieren Sie einen Anomalie-Detektor f√ºr mehrere Sensoren
4. Vergleichen Sie die Leistung mit dem einfachen Anomalie-Detektor

In [None]:
# √úbungsaufgabe 2 - L√∂sung

class AdvancedAnomalyDetector:
    def __init__(self):
        self.model = IsolationForest(contamination=0.05, random_state=42)
        self.scaler = StandardScaler()
        self.is_trained = False
    
    def create_advanced_features(self, df):
        """Erstellen Sie fortgeschrittene Merkmale f√ºr die Anomalieerkennung"""
        if df.empty:
            return pd.DataFrame()
            
        df_processed = df.copy()
        df_processed['timestamp'] = pd.to_datetime(df_processed['timestamp'])
        
        # Pivotieren um Sensoren als Spalten zu haben (f√ºr Korrelationen)
        # Da Zeitstempel leicht variieren k√∂nnen, runden wir auf Sekunden oder nutzen resample
        df_pivot = df_processed.pivot_table(
            index=pd.Grouper(key='timestamp', freq='2S'), # 2 Sekunden Fenster
            columns='sensor_type',
            values='value',
            aggfunc='mean'
        ).interpolate(method='linear').fillna(method='bfill').fillna(method='ffill')
        
        # Wenn wir nicht genug Spalten haben (z.B. nur ein Sensor), abbrechen
        if df_pivot.shape[1] < 2:
            return pd.DataFrame()

        # Feature Engineering auf dem Pivot-DataFrame
        features = pd.DataFrame(index=df_pivot.index)
        
        # 1. Rohwerte
        for col in df_pivot.columns:
            features[f'{col}_val'] = df_pivot[col]
            
        # 2. Verh√§ltnisse/Korrelationen (vereinfacht als Verh√§ltnis)
        if 'temperature' in df_pivot.columns and 'humidity' in df_pivot.columns:
            features['temp_humid_ratio'] = df_pivot['temperature'] / (df_pivot['humidity'] + 1)
            
        # 3. Zeitbasierte Merkmale (Zyklisch)
        features['hour_sin'] = np.sin(2 * np.pi * features.index.hour / 24)
        features['hour_cos'] = np.cos(2 * np.pi * features.index.hour / 24)
        
        # 4. Gleitende Statistiken √ºber alle Sensoren (Systemzustand)
        features['system_mean'] = df_pivot.mean(axis=1)
        features['system_std'] = df_pivot.std(axis=1)
        
        return features.dropna()
    
    def train(self, df):
        """Trainieren Sie den fortgeschrittenen Anomalie-Detektor"""
        X = self.create_advanced_features(df)
        if X.empty:
            print("Nicht gen√ºgend Daten f√ºr fortgeschrittene Merkmale.")
            return
            
        X_scaled = self.scaler.fit_transform(X)
        self.model.fit(X_scaled)
        self.is_trained = True
        print(f"Fortgeschrittenes Modell trainiert mit {X.shape[1]} Merkmalen.")
        return X
    
    def predict(self, df):
        """Anomalien mit fortgeschrittenen Merkmalen vorhersagen"""
        if not self.is_trained:
            print("Modell nicht trainiert.")
            return pd.DataFrame()
            
        X = self.create_advanced_features(df)
        if X.empty:
            return pd.DataFrame()
            
        X_scaled = self.scaler.transform(X)
        predictions = self.model.predict(X_scaled)
        scores = self.model.score_samples(X_scaled)
        
        results = X.copy()
        results['is_anomaly'] = predictions == -1
        results['anomaly_score'] = scores
        
        return results

# Testen der L√∂sung
if 'df' in locals() and not df.empty:
    adv_detector = AdvancedAnomalyDetector()
    adv_detector.train(df)
    results = adv_detector.predict(df)
    if not results.empty:
        print(f"Anomalien gefunden: {results['is_anomaly'].sum()}")
        print(results.head())

### √úbungsaufgabe 3: Vorausschauendes Wartungssystem

**Aufgabe**: Erstellen Sie ein vorausschauendes Wartungssystem, das:
1. Vorhersagt, wann ein Sensor aufgrund seiner Messwerte ausfallen k√∂nnte
2. Die verbleibende Nutzungsdauer (RUL) von Ger√§ten sch√§tzt
3. Wartungswarnungen sendet, bevor Ausf√§lle auftreten

**Anforderungen**:
1. Erstellen Sie synthetische Ausfalldaten (simulieren Sie die Sensoralterung)
2. Erstellen Sie ein Regressionsmodell zur Vorhersage der RUL
3. Implementieren Sie Alarmschwellen
4. Erstellen Sie ein Dashboard, das den Zustand der Ger√§te anzeigt

In [None]:
# √úbungsaufgabe 3 - L√∂sung

class PredictiveMaintenanceSystem:
    def __init__(self):
        self.rul_model = None # In einem echten Szenario: LinearRegression oder √§hnliches
        self.health_thresholds = {'warning': 80, 'critical': 50}
    
    def simulate_sensor_degradation(self, df):
        """Sensoralterung im Laufe der Zeit simulieren"""
        df_deg = df.copy()
        df_deg['timestamp'] = pd.to_datetime(df_deg['timestamp'])
        
        # Wir simulieren, dass Vibration √ºber die Zeit zunimmt (Verschlei√ü)
        # Sortieren nach Zeit
        df_deg = df_deg.sort_values('timestamp')
        
        # Nur f√ºr Vibration
        mask = df_deg['sensor_type'] == 'vibration'
        n_samples = mask.sum()
        
        if n_samples > 0:
            # Linearer Anstieg (Drift) + Zuf√§lliges Rauschen
            drift = np.linspace(0, 0.5, n_samples) # Drift von 0 bis 0.5g
            df_deg.loc[mask, 'value'] += drift
            
            # Rauschen nimmt auch zu
            noise_scale = np.linspace(1, 3, n_samples)
            noise = np.random.normal(0, 0.05, n_samples) * noise_scale
            df_deg.loc[mask, 'value'] += noise
            
        return df_deg
    
    def calculate_health_score(self, df):
        """Ger√§tezustandsbewertung (0-100) berechnen"""
        # Einfache Logik: Je h√∂her die Vibration, desto schlechter der Zustand
        # Normal: 0.1g -> 100%, Kritisch: > 0.8g -> <50%
        
        health_scores = []
        
        # Gruppieren nach Zeitfenstern (z.B. letzte 10 Messungen)
        # Hier vereinfacht: Score pro Messung
        for _, row in df.iterrows():
            score = 100
            if row['sensor_type'] == 'vibration':
                # Basiswert abziehen (0.1 ist normal)
                degradation = max(0, row['value'] - 0.1)
                # Skalierung: bei 1.0g ist Score 0
                score = max(0, 100 - (degradation * 100))
            elif row['sensor_type'] == 'temperature':
                # Abweichung von 22 Grad
                diff = abs(row['value'] - 22.0)
                score = max(0, 100 - (diff * 2))
                
            health_scores.append(score)
            
        return pd.Series(health_scores, index=df.index)
    
    def predict_remaining_life(self, current_health, degradation_rate=0.1):
        """Verbleibende Nutzungsdauer in Tagen vorhersagen"""
        # RUL = (Current Health - Critical Threshold) / Degradation Rate per Day
        if current_health <= self.health_thresholds['critical']:
            return 0
        
        rul = (current_health - self.health_thresholds['critical']) / degradation_rate
        return rul
    
    def generate_maintenance_alerts(self, health_scores):
        """Wartungswarnungen basierend auf Vorhersagen generieren"""
        alerts = []
        for i, score in enumerate(health_scores):
            if score < self.health_thresholds['critical']:
                alerts.append(f"KRITISCH: Index {i}, Score {score:.1f} - Sofortige Wartung erforderlich!")
            elif score < self.health_thresholds['warning']:
                alerts.append(f"WARNUNG: Index {i}, Score {score:.1f} - Wartung planen.")
        return alerts

# Testen
if 'df' in locals() and not df.empty:
    pms = PredictiveMaintenanceSystem()
    
    # 1. Daten mit Alterung simulieren
    df_degraded = pms.simulate_sensor_degradation(df)
    
    # 2. Gesundheitszustand berechnen
    health = pms.calculate_health_score(df_degraded)
    df_degraded['health_score'] = health
    
    # 3. Alerts generieren (nur die letzten paar anzeigen)
    alerts = pms.generate_maintenance_alerts(health.tail(20))
    
    print("Simulierte Alterung und Gesundheitszustand:")
    print(df_degraded[df_degraded['sensor_type'] == 'vibration'].tail())
    
    print("\nGenerierte Alarme (Auszug):")
    for alert in alerts[:5]:
        print(alert)

### √úbungsaufgabe 4: Edge-Computing-Integration

**Aufgabe**: Simulieren Sie ein Edge-Computing-Szenario, in dem:
1. Lokale Verarbeitung die Daten√ºbertragung reduziert
2. Nur wichtige Ereignisse an die Cloud gesendet werden
3. Modelle lokal ausgef√ºhrt werden und regelm√§√üig Updates aus der Cloud erhalten

**Anforderungen**:
1. Implementieren Sie eine Logik f√ºr lokale vs. Cloud-Verarbeitung
2. Erstellen Sie eine Datenfilterung basierend auf der Wichtigkeit
3. Simulieren Sie die Modell-Synchronisation
4. Vergleichen Sie die Bandbreitennutzung mit/ohne Edge-Verarbeitung

In [None]:
# √úbungsaufgabe 4 - L√∂sung

class EdgeComputingSystem:
    def __init__(self, ml_pipeline):
        self.local_model = ml_pipeline
        self.bandwidth_usage = {'local_only': 0, 'cloud_full': 0}
        self.cloud_storage = []
        
    def process_locally(self, sensor_data):
        """Daten lokal auf dem Edge-Ger√§t verarbeiten"""
        # Wir simulieren die Gr√∂√üe der Daten: ca. 100 Bytes pro Datensatz
        data_size = len(sensor_data) * 100 
        self.bandwidth_usage['cloud_full'] += data_size
        
        # Lokal Anomalien erkennen
        if self.local_model.is_trained:
            results = self.local_model.detect_anomalies(sensor_data)
            
            # Filter: Nur Anomalien oder periodische "Heartbeats" senden
            # Wir senden alle Anomalien und 10% der normalen Daten als Stichprobe
            important_data = results[
                (results['predicted_anomaly'] == True) | 
                (np.random.random(len(results)) < 0.1)
            ]
            
            return important_data
        
        return sensor_data # Fallback: Alles senden
    
    def send_to_cloud(self, filtered_data):
        """Nur wichtige Daten an die Cloud senden"""
        if filtered_data.empty:
            return
            
        # Gr√∂√üe der gefilterten Daten
        filtered_size = len(filtered_data) * 100
        self.bandwidth_usage['local_only'] += filtered_size
        
        # "Senden" (Speichern)
        self.cloud_storage.append(filtered_data)
        print(f"Gesendet: {len(filtered_data)} Datens√§tze an Cloud.")
    
    def sync_models(self):
        """Lokales Modell mit Cloud-Updates synchronisieren"""
        # Simulation: Modellparameter aktualisieren
        print("Synchronisiere Modell mit Cloud...")
        time.sleep(0.5)
        print("Modell aktualisiert.")
    
    def calculate_bandwidth_savings(self):
        """Bandbreiteneinsparungen durch Edge-Verarbeitung berechnen"""
        full = self.bandwidth_usage['cloud_full']
        optimized = self.bandwidth_usage['local_only']
        
        if full == 0: return 0
        
        savings = (full - optimized) / full * 100
        return {
            'full_transmission_bytes': full,
            'edge_filtered_bytes': optimized,
            'savings_percent': savings
        }

# Testen
if 'df' in locals() and not df.empty and 'ml_pipeline' in locals():
    edge_system = EdgeComputingSystem(ml_pipeline)
    
    # Wir simulieren einen Datenstrom in Chunks
    chunk_size = 10
    print("Simuliere Edge-Verarbeitung...")
    
    for i in range(0, len(df), chunk_size):
        chunk = df.iloc[i:i+chunk_size]
        
        # 1. Lokal verarbeiten und filtern
        filtered_chunk = edge_system.process_locally(chunk)
        
        # 2. An Cloud senden
        edge_system.send_to_cloud(filtered_chunk)
        
    # 3. Auswertung
    stats = edge_system.calculate_bandwidth_savings()
    print("\nEdge-Computing Auswertung:")
    print(f"Volle √úbertragung: {stats['full_transmission_bytes']} Bytes")
    print(f"Gefilterte √úbertragung: {stats['edge_filtered_bytes']} Bytes")
    print(f"Bandbreiteneinsparung: {stats['savings_percent']:.2f}%")

In [None]:
# Verbindungen aufr√§umen
try:
    collector.disconnect()
    publisher.disconnect()
    print("Erfolgreich vom MQTT-Broker getrennt")
except:
    print("Aufr√§umen abgeschlossen")