In [24]:
from kafka import KafkaConsumer
import pymongo
import json


In [25]:
# Hilfsmethoden
def is_valid_data(data):
    """ Überprüft, ob alle Einträge in den Daten nicht None sind. """
    for key, value in data.items():
        if value is None:
            print(f"Ungültiger Eintrag gefunden: {key} ist None")
            return False
    return True
def verify_gx_sum(gx_kafka_sum, first_counter, last_counter, collection):
    """ Überprüft, ob die Summe der 'gx'-Datenpunkte in den letzten 100 Datensätzen in Kafka und in der DB übereinstimmen. """
    try:
        gx_db_sum = sum(
            doc['gx'] for doc in collection.find({"counter": {"$gte": first_counter, "$lte": last_counter}})
        )
        if gx_db_sum == gx_kafka_sum:
            print(f"Die Summe der gx-Werte für die Datensätze {first_counter} bis {last_counter} stimmt überein: {gx_db_sum}")
        else:
            print(f"Die Summe der gx-Werte für die Datensätze {first_counter} bis {last_counter} stimmt NICHT überein! (Kafka: {gx_kafka_sum}, DB: {gx_db_sum})")
    except Exception as e:
        print(f"Fehler beim Überprüfen der gx-Werte: {e}")

def verify_number_of_data(first_counter, last_counter, collection):
    """ Überprüft, ob die Anzahl der Datensätze in den letzten 100 in Kafka und in der DB übereinstimmen. """
    try:
        differnz = last_counter - first_counter + 1
        counter_db_count = collection.count_documents({"counter": {"$gte": first_counter, "$lte": last_counter}})
        if differnz == counter_db_count:
            print("Die Daten sind vollständig, keine Daten sind verloren gegangen.")
        else:
            print(f"Die Anzahl der verlorenen Daten: {differnz - counter_db_count}")
    except Exception as e:
        print(f"Fehler beim Überprüfen der Anzahl der Daten: {e}")


#------------------------------------------------------------------------------------------------------------------------------------------------------------------------
# Verbindung zu MongoDB und Kafka
#------------------------------------------------------------------------------------------------------------------------------------------------------------------------
mongo_conn_uri = "mongodb+srv://admin:987654321@cluster0.0j8bqsf.mongodb.net/"
client = pymongo.MongoClient(mongo_conn_uri)
db = client.swtp_a
collection = db.swtp_a_collection

decode = lambda x: json.loads(x.decode('utf-8'))
consumer = KafkaConsumer(bootstrap_servers="slo.swe.th-luebeck.de:9092", auto_offset_reset = "latest") #value_deserializer=decode)
consumer.subscribe(topics = ["swtp_team_a"])
#------------------------------------------------------------------------------------------------------------------------------------------------------------------------

gx_kafka_sum = 0
message_count = 0
first_counter = None

#------------------------------------------------------------------------------------------------------------------------------------------------------------------------
# Daten überprüfen und in Staging Area speichern
#------------------------------------------------------------------------------------------------------------------------------------------------------------------------
for message in consumer:
    try:
        data = json.loads(message.value)
        data['label'] = -1
        if is_valid_data(data):
            result = collection.insert_one(data)
            print(f"Daten eingefügt: {data}, ID: {result.inserted_id}")
            gx_kafka_sum += data['gx']
            if first_counter is None:
                first_counter = data['counter']
                print(f"Erster Counter: {first_counter}")
            last_counter = data['counter']
            print(f"Letzter Counter: {last_counter}")
            message_count += 1
            
            if message_count == 100:
                verify_gx_sum(gx_kafka_sum, first_counter, last_counter, collection)
                verify_number_of_data( first_counter, last_counter, collection)
                message_count = 0
                gx_kafka_sum = 0
                first_counter = None
        else:
            print("Daten nicht eingefügt, da ein oder mehrere Einträge None sind.")
    except Exception as e:
        print(f"Fehler beim Einfügen der Daten: {e}")
        
        

KeyboardInterrupt: 

In [None]:
import pandas as pd
import pymongo
# MongoDB Verbindungsinformatione
mongo_conn_uri = "mongodb+srv://admin:987654321@cluster0.0j8bqsf.mongodb.net/"
client = pymongo.MongoClient(mongo_conn_uri)
db = client.swtp_a

# Getting data und -1 labeln


In [61]:
import pandas as pd

from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import load_model


# Daten aus MongoDB abrufen und in DataFrame umwandeln
def load_data_from_mongodb(collection_name, query={}, no_id=True):
    collection = db[collection_name]
    cursor = collection.find(query)
    df = pd.DataFrame(list(cursor))
    if no_id and '_id' in df.columns:
        df.drop('_id', axis=1, inplace=True)
    if df.empty:
        print("Keine Daten gefunden für die Query:", query)
    else:
        df['label'] = -1

        # Wertebereich definieren
        obergrenze = 0.5
        untergrenze = -0.5
        datenpunkte_block = 300
        prozent = (40 / 100) * datenpunkte_block

        # Blöcke identifizieren, die innerhalb des definierten Bereichs liegen
        for i in range(0, len(df), datenpunkte_block):
            block = df.iloc[i:i+datenpunkte_block]
            if block['ax'].between(untergrenze, obergrenze).sum() > prozent and \
                    block['ay'].between(untergrenze, obergrenze).sum() > prozent and \
                    block['az'].between(untergrenze, obergrenze).sum() > prozent:
                df.loc[i:i+datenpunkte_block, 'label'] = 2

        # Neue Spalte für Wurfnummer hinzufügen
        df['wurf_nummer'] = 0
        current_wurf_nummer = 1
        in_resting_state = False

        # Iteriere durch die Datenpunkte
        for i in range(len(df)):
            if df['label'].iloc[i] == 2:
                in_resting_state = True
            else:
                if in_resting_state:
                    current_wurf_nummer += 1
                    in_resting_state = False
                df.at[i, 'wurf_nummer'] = current_wurf_nummer

        # Entfernen Sie alle Zeilen, bei denen das Label 2 ist (Ruhezustand)
        df = df[df['label'] != 2]

    return df

# Beispiel für die Nutzung der Funktion
collection_name ='live_test_collection' #'live_test_collection'  
query = {}  
df = load_data_from_mongodb(collection_name, query)


model = load_model('model.keras')
#model = load_model('modelGRU.keras')

predictor_columns = ['ax', 'ay', 'az', 'gx', 'gy', 'gz']

unnoetige_spalten = [ 'n', 'counter', 'timestamp', 'label']

scaler = StandardScaler()

gruppierte_daten = df.groupby('wurf_nummer')


for wurfnummer, gruppe in gruppierte_daten:
    
    bereinigte_gruppe = gruppe.drop(columns=unnoetige_spalten)

    bereinigte_gruppe[predictor_columns] = scaler.fit_transform(bereinigte_gruppe[predictor_columns])

    
    X = bereinigte_gruppe[predictor_columns].values
    
    predictions = model.predict(X)
    print(predictions.shape)
    predicted_classes = (predictions > 0.5)


    
    
    
    if predicted_classes.mean() > 0.5:
        print(f"Wurfnummer {wurfnummer}: Normaler Wurf")
    else:
        print(f"Wurfnummer {wurfnummer}: Anomalie Wurf")


[1m132/132[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step
(4200, 1)
Wurfnummer 1: Normaler Wurf
[1m207/207[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step
(6599, 1)
Wurfnummer 2: Normaler Wurf
[1m3/3[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step 
(76, 1)
Wurfnummer 3: Anomalie Wurf
