Insert Data in Databse and start unification from Keys etc.

Manage Imports

In [11]:
import os
import h5py
from pymongo import MongoClient
from dotenv import load_dotenv
from zoneinfo import ZoneInfo
from datetime import datetime

Get Database URI from .env File and setup other variables. Be sure tu insert the DB URI for a MongoDB Database

In [12]:
load_dotenv()

uri = os.environ['DB_URI']

# Verzeichnis, in dem sich die .h5 Dateien befinden
directory = '../data'

# MongoDB Datenbank-Verbindung
client = MongoClient(uri)
db = client['rosen']
collection = db['big_data']


In [13]:
def calc_average(value1, value2):
    return (value1 + value2) / 2

def traverse_group(group, path=''):
    data = {}
    for key, item in group.items():
        new_key = key.lower()
        if isinstance(item, h5py.Dataset):  # Prüfen, ob es sich um einen Datensatz handelt
            data[new_key] = item[()].tolist()  # Konvertieren in eine Liste für MongoDB
            
            #handle inconsistencies (for now only binary data)
            for index, i in enumerate(data[new_key]):
                # decode binary data
                if isinstance(i, (bytes, bytearray)):
                    try:
                        # try parse as float
                        data[new_key][index] = float(i)
                    except ValueError:
                        if i == b'Easteregg :)':
                            print(f"[{new_key}] hit {i}, taking average between last and next value")
                            prev = data[new_key][index-1]
                            nextItem = data[new_key][index+1]
                            # try calculating average between last and next val (does not work on strings)
                            try:
                                data[new_key][index] = calc_average(float(prev), float(nextItem))
                            except ValueError:
                                print("error on calculating average, leaving as decoded string")
                                # on error decode as string
                                data[new_key][index] = str(i.decode())
                                pass
                        else:
                            if new_key == "timestamp":
                                try:
                                    #timestamp in binary data as date string -> convert to unix timestamp
                                    timezone = ZoneInfo("UTC")
                                    date = datetime.strptime(str(i.decode()), '%Y-%m-%dT%H:%M:%S').replace(tzinfo=timezone) #2014-03-24T03:58:1
                                    # print(f"converted to timestamp: {i} is {date.timestamp()}")
                                    data[new_key][index] = date.timestamp()
                                except ValueError:
                                    print(f"error converting to timestamp {i}")
                            else:
                                print(f"[{new_key}] missed binary data {i}, taking average between last and next value")
                                prev = data[new_key][index-1]
                                nextItem = data[new_key][index+1]
                                # try calculating average between last and next val (does not work on strings)
                                try:
                                    data[new_key][index] = calc_average(float(prev), float(nextItem))
                                except ValueError:
                                    print("error on calculating average, leaving as decoded string")
                                    # on error decode as string
                                    data[new_key][index] = str(i.decode())
                                    pass   
                else:
                    # not binary handling
                    #everything that is not float
                    # case does not exist in dataset
                    if not isinstance(i, float):
                        print(f"value is not binary and not float: {i}")
                        
                        
        elif isinstance(item, h5py.Group):  # Prüfen, ob es sich um eine Gruppe handelt
            data[new_key] = traverse_group(item, path + '/' + new_key)  # Rekursiver Aufruf
            
    return data


In [14]:
def process_h5_file(file_path):
    with h5py.File(file_path, 'r') as h5file:
        data = traverse_group(h5file)  # Start der Rekursion von der Wurzelgruppe
        collection.insert_one(data)  # Daten in MongoDB einfügen
        print(f'Daten aus {file_path} wurden erfolgreich in MongoDB gespeichert.')

In [15]:
# Durchlaufe alle .h5 Dateien im angegebenen Verzeichnis
for filename in os.listdir(directory):
    if filename.endswith('.h5'):
        process_h5_file(os.path.join(directory, filename))

print('Verarbeitung abgeschlossen.')

Daten aus ../data/39619a6a-815e-4206-a91b-8b73ea7dd032.h5 wurden erfolgreich in MongoDB gespeichert.
Daten aus ../data/0d25b80b-c195-4321-a5f8-3925e4f1d230.h5 wurden erfolgreich in MongoDB gespeichert.
Daten aus ../data/77c4c9fe-5ac1-4547-8c78-efa89eed98cf.h5 wurden erfolgreich in MongoDB gespeichert.
Daten aus ../data/1489d59b-2d5c-4b86-9724-113eda61227d.h5 wurden erfolgreich in MongoDB gespeichert.
[magnetization] hit b'Easteregg :)', taking average between last and next value
Daten aus ../data/cb8aa16d-c969-4bed-974e-12dfcbec03e8.h5 wurden erfolgreich in MongoDB gespeichert.
Daten aus ../data/6d5a0ebb-9a7a-48b6-9e02-ca1ad3deb564.h5 wurden erfolgreich in MongoDB gespeichert.
Daten aus ../data/79b0b625-6044-48de-9402-04246117bfb5.h5 wurden erfolgreich in MongoDB gespeichert.
Daten aus ../data/bf1741fb-2a97-4aea-9b0c-e82c0609910f.h5 wurden erfolgreich in MongoDB gespeichert.
Daten aus ../data/481865b3-e38f-46aa-b32b-58c06f8d5693.h5 wurden erfolgreich in MongoDB gespeichert.
[magnetizat

Refactor Document Structure. In den Dateien existieren die keys "Daten" und "Data" FML

In [16]:
def transform_data(data):
    # Ermittle die Länge der Arrays und verwende die minimale Länge, um Indexfehler zu vermeiden
    min_length = min(len(v) for v in data.values() if isinstance(v, list))

    # Erstelle `measuring_points` basierend auf der minimalen Länge
    measuring_points = []
    for i in range(min_length):
        point = {key: data[key][i] if i < len(data[key]) else None for key in data if isinstance(data[key], list)}
        measuring_points.append(point)

    return measuring_points


def update_documents():
    for doc in collection.find():
        if 'data' in doc:
            # Transformiere die Daten
            new_measuring_points = transform_data(doc['data'])
            # Aktualisiere das Dokument mit den transformierten Daten unter dem neuen Schlüssel `measuring_points`
            collection.update_one({'_id': doc['_id']}, {'$set': {'measuring_points': new_measuring_points}})
            print(f'Dokument {doc["_id"]} wurde aktualisiert.')
        if 'daten' in doc:
            new_measuring_points = transform_data(doc['daten'])
            collection.update_one({'_id': doc['_id']}, {'$set': {'measuring_points': new_measuring_points}})
            print(f'Dokument {doc["_id"]} wurde aktualisiert.')


update_documents()
print('Alle Dokumente wurden aktualisiert.')


Dokument 65c4899d859e0d483c7b984f wurde aktualisiert.
Dokument 65c4899e859e0d483c7b9850 wurde aktualisiert.
Dokument 65c4899e859e0d483c7b9851 wurde aktualisiert.
Dokument 65c4899e859e0d483c7b9852 wurde aktualisiert.
Dokument 65c4899e859e0d483c7b9853 wurde aktualisiert.
Dokument 65c4899f859e0d483c7b9854 wurde aktualisiert.
Dokument 65c4899f859e0d483c7b9855 wurde aktualisiert.
Dokument 65c4899f859e0d483c7b9856 wurde aktualisiert.
Dokument 65c4899f859e0d483c7b9857 wurde aktualisiert.
Dokument 65c4899f859e0d483c7b9858 wurde aktualisiert.
Dokument 65c4899f859e0d483c7b9859 wurde aktualisiert.
Dokument 65c489a0859e0d483c7b985a wurde aktualisiert.
Dokument 65c489a0859e0d483c7b985b wurde aktualisiert.
Dokument 65c489a0859e0d483c7b985c wurde aktualisiert.
Dokument 65c489a0859e0d483c7b985d wurde aktualisiert.
Dokument 65c489a1859e0d483c7b985e wurde aktualisiert.
Dokument 65c489a1859e0d483c7b985f wurde aktualisiert.
Dokument 65c489a1859e0d483c7b9860 wurde aktualisiert.
Dokument 65c489a1859e0d483c7

Unset unused (original) data

In [17]:
collection.update_many({}, {"$unset": {"daten": "", "data": ""}})

UpdateResult({'n': 2000, 'electionId': ObjectId('7fffffff0000000000000006'), 'opTime': {'ts': Timestamp(1707380027, 418), 't': 6}, 'nModified': 2000, 'ok': 1.0, '$clusterTime': {'clusterTime': Timestamp(1707380027, 419), 'signature': {'hash': b'\x85v^\x12\x12a\xfc\xc5\xf0\xd1\x1e\xc5\xce45\xc4c\xecs.', 'keyId': 7320298377621536774}}, 'operationTime': Timestamp(1707380027, 418), 'updatedExisting': True}, acknowledged=True)