In [9]:
import pandas as pd # Pandas für Datenmanipulation
import dask.dataframe as dd # Dask für große Datenmengen
import zipfile
import requests # HTTP-Anfragen
import shutil # Löschen von Ordnern
import datetime as dt
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed # Multithreading-Unterstützung
import os
from tqdm import tqdm # Fortschrittsbalken
import time
import glob # Dateimustererkennung
import re
import xml.etree.ElementTree as ET # Der native XML-Parser
import pyarrow as pa # 
import pyarrow.parquet as pq

Citibike-Daten verarbeiten

In [3]:
# Liste der Jahre/Muster, die Sie herunterladen möchten. Mögliche Formate:
# 1. "*"" - Alle Jahre 
# 2. "YYYY-YYYY" - Jahr-Bereich (z.B. [2015-2017])
# 3. [2016, 2017, 2018] - Spezifische Jahre
TARGET_YEARS = [2014]

# --- Generierung der Download-Muster basierend auf TARGET_YEARS ---
def generate_download_patterns(target_input):
    """
    Generiert die regulären Ausdrücke für die Zielfilterung basierend auf den drei definierten Formaten.
    """
    years = []
    current_year = datetime.now().year
    first_data_year = 2013 # Startjahr der CitiBike-Daten

    # --- Verarbeitung der Eingabe ---
    if isinstance(target_input, list):
        # Format: [2016, 2017]
        years = target_input
    
    elif isinstance(target_input, str) and target_input == "*":
        # Format: "*"" -> Alle Jahre
        years = list(range(first_data_year, current_year + 1))
        
    elif isinstance(target_input, str) and re.match(r"^\d{4}-\d{4}$", target_input):
        # Format: "YYYY-YYYY" -> Jahr-Bereich
        try:
            start_year, end_year = map(int, target_input.split('-'))
            if start_year > end_year:
                raise ValueError("Startjahr muss kleiner oder gleich dem Endjahr sein.")
            years = list(range(start_year, end_year + 1))
        except ValueError as e:
            print(f"FEHLER beim Parsen des Jahresbereichs: {e}")
            return []
            
    else:
        # Ungültige Eingabe
        print("FEHLER: Ungültiges TARGET_YEARS Format. Erlaubt sind: [*], [YYYY-YYYY] oder eine Liste von Jahren [2016, 2017].")
        return []

    # Optional: Ungültige/Zukünftige Jahre filtern
    years = [y for y in years if first_data_year <= y <= current_year]
    
    if not years:
        print("Keine gültigen Jahre gefunden, die heruntergeladen werden können.")
        return []

    # --- Generierung der Regex-Muster (wie zuvor, aber auf die gefilterten Jahre angewandt) ---
    patterns = []
    
    for year in years:
        year_str = str(year)
        # Muster für monatliche Dateien (202301-...) und ältere/JC-Formate
        patterns.append(rf"^{year_str}\d{{2}}-.*\.zip$")
        patterns.append(rf"^{year_str}-.*\.zip$")
        patterns.append(rf"^JC-{year_str}\d{{2}}-.*\.zip$")
        
    # Entferne Duplikate
    return list(set(patterns))

if __name__ == "__main__":
    YEAR_PATTERNS = generate_download_patterns(TARGET_YEARS)
    print("Generierte Download-Muster:", YEAR_PATTERNS)

Generierte Download-Muster: ['^JC-2014\\d{2}-.*\\.zip$', '^2014-.*\\.zip$', '^2014\\d{2}-.*\\.zip$']


In [4]:
# --- Konfiguration ---
BASE_URL = "https://s3.amazonaws.com/tripdata/"  # S3-Indexseite mit den Citibike-Daten
RAW_DATA_DIR = "../data/raw/citibike" 


# --- Setup ---
if not os.path.exists(RAW_DATA_DIR):
    os.makedirs(RAW_DATA_DIR)
    print(f"Lokaler Download-Ordner erstellt: {RAW_DATA_DIR}")




# --- Hauptlogik ---
def find_and_download_files():
    """Crawlt die S3-Seite und lädt alle relevanten ZIP-Dateien herunter."""
    print(f"Starte Crawling der S3-Seite: {BASE_URL}")
    
    

    try:
        # Führe eine GET-Anfrage an die S3-Indexseite durch
        response = requests.get(BASE_URL)
        print(f"HTTP-Statuscode der Antwort: {response.status_code}")
        response.raise_for_status() # Löst Fehler bei ungültigem Status (4xx, 5xx) aus
    except requests.exceptions.RequestException as e:
        print(f"FEHLER beim Zugriff auf die S3-URL: {e}")
        return

    download_list = []

    # # Parse den XML-Inhalt hat weder mit html.parser noch mit lxml-xml funktioniert
    # # soup = BeautifulSoup(response.content, 'html.parser')
    # soup = BeautifulSoup(response.content, 'lxml-xml')
    

    # Alternative Methode: Verwende den nativen XML-Parser
    try:
        # 1. Parse den XML-Inhalt mit ElementTree
        root = ET.fromstring(response.content)
        
        # 2. S3 verwendet Namespaces; wir müssen den Namespace aus dem Root-Tag extrahieren
        # Beispiel: {http://s3.amazonaws.com/doc/2006-03-01/}
        namespace = re.match(r'\{.*\}', root.tag).group(0)
        
        # 3. Finde alle 'Key'-Tags innerhalb des S3-Listings
        # Der Tag-Name muss mit dem extrahierten Namespace verwendet werden
        keys = root.findall(f'.//{namespace}Key')

        print(f"Gefundene <Key>-Elemente im XML: {len(keys)}")
        
        # 4. Filterung und Sammeln der Dateinamen
        for key in keys:
            filename = key.text
            
            if not filename or not filename.endswith(".zip"):
                continue
                
            is_relevant = False
            for pattern in YEAR_PATTERNS:
                if any(re.match(pattern.replace('*', '.*'), filename) for pattern in YEAR_PATTERNS):
                # if re.match(pattern, filename): 
                    is_relevant = True
                    break
            
            if is_relevant:
                print(f"Gefundene relevante Datei: {filename}")
                download_list.append(filename)

    except Exception as e:
        print(f"KRITISCHER FEHLER beim Parsen der S3-Antwort: {e}")
        return
    if not download_list:
        print("Keine Dateien gefunden, die den Suchmustern entsprechen. Prüfen Sie die YEAR_PATTERNS.")
        return

    print(f"Insgesamt {len(download_list)} Dateien zum Herunterladen gefunden.")
    
    # Starte den Download-Prozess
    for filename in download_list:
        file_url = BASE_URL + filename
        local_path = os.path.join(RAW_DATA_DIR, filename)

        if os.path.exists(local_path):
            print(f"  > Überspringe: {filename} (existiert bereits)")
            continue

        print(f"  > Downloade: {filename}")
        
        try:
            # Streaming-Download, um den Speicher nicht zu überlasten
            with requests.get(file_url, stream=True) as r:
                r.raise_for_status()
                total_size = int(r.headers.get('content-length', 0))
                
                with open(local_path, 'wb') as f:
                    # Fortschrittsbalken mit tqdm
                    with tqdm(total=total_size, unit='B', unit_scale=True, desc=filename) as t:
                        for chunk in r.iter_content(chunk_size=8192):
                            f.write(chunk)
                            t.update(len(chunk))
            
            # Kurze Pause, um den Server nicht zu überlasten
            time.sleep(0.5)

        except requests.exceptions.RequestException as e:
            print(f"  FEHLER beim Download von {filename}: {e}")
            time.sleep(5) # Längere Pause bei Fehlern
            continue

    print("\n--- Download abgeschlossen ---")

if __name__ == "__main__":
    find_and_download_files()

Starte Crawling der S3-Seite: https://s3.amazonaws.com/tripdata/
HTTP-Statuscode der Antwort: 200
Gefundene <Key>-Elemente im XML: 158
Gefundene relevante Datei: 2014-citibike-tripdata.zip
Insgesamt 1 Dateien zum Herunterladen gefunden.
  > Überspringe: 2014-citibike-tripdata.zip (existiert bereits)

--- Download abgeschlossen ---


2019 führt Citibike E-Bikes ein

Spaltennamen ändern sich:
2014-2019: tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender
2020-2025: ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual

In [None]:
UNZIPPED_DIR = "../data/unzipped/citibike"
PROCESSED_DATA_DIR = "../data/processed/citibike"
FINAL_PARQUET_PATH = os.path.join(PROCESSED_DATA_DIR, 'citibike_all_years_combined.parquet')

# Daten-Typen-Optimierung um Speicher (RAM) zu sparen
# WICHTIG: Datums- und Zeit-Spalten werden NICHT in der DTYPE_MAP auf datetime
# gesetzt, da dies in read_csv zu Problemen führt. Sie werden separat konvertiert.

DTYPE_MAP = {
    # --- Geo-Koordinaten (Alt & Neu) ---
    'start_lat': 'float32',
    'start_lng': 'float32',
    'end_lat': 'float32',
    'end_lng': 'float32',
    'start station latitude': 'float32',
    'start station longitude': 'float32',
    'end station latitude': 'float32',
    'end station longitude': 'float32',
    'Start Station Latitude': 'float32',
    'Start Station Longitude': 'float32',
    'End Station Latitude': 'float32',
    'End Station Longitude': 'float32',
    
    # --- IDs ---
    'ride_id': 'string',
    'bikeid': 'int32',         # Alte bikeid ist oft nur eine Zahl
    'Bike ID': 'int32',
    'start_station_id': 'string', # IDs sollten in Int umgewandelt werden
    'end_station_id': 'string',
    'start station id': 'string', 
    'end station id': 'string',
    'Start Station Id': 'string',
    'End Station Id': 'string',
    
    # --- Typen und Kategorien ---
    'rideable_type': 'category',
    'usertype': 'category',
    'User Type': 'category',
    'member_casual': 'category',
    'gender': 'category',
    'Gender': 'category',
    
    # --- Numerische Werte ---
    'tripduration': 'int32',
    'Trip Duration': 'int32',
    'birth year': 'float16', # sollte in Int umgewandelt werden
    'Birth Year': 'float16', # sollte in Int umgewandelt werden
    
    # --- Stationsnamen  ---
    'start station name': 'string', 
    'end station name': 'string', 
    'start_station_name': 'string',
    'end_station_name': 'string',
    'Start Station Name': 'string',
    'End Station Name': 'string',
    }


# 2014-2018: tripduration,starttime,stoptime,start station id,start station name,start station latitude,start station longitude,end station id,end station name,end station latitude,end station longitude,bikeid,usertype,birth year,gender
# 2090-2025: ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual


# def unzip_file(zip_path, output_dir):
#     """
#     Entpackt eine ZIP-Datei (auch verschachtelte ZIPs) und prüft, 
#     ob die Entpackung bereits erfolgt ist.
#     """
#     zip_filename = os.path.basename(zip_path)
    
#     try:
#         with zipfile.ZipFile(zip_path, 'r') as zf:
#             members_to_extract = zf.namelist()
#             csv_members = [m for m in members_to_extract if m.endswith('.csv')]
            
#             if csv_members:
#                 # NEUE PRÜFUNG: Wenn die erste erwartete CSV existiert, überspringen
#                 first_csv_name = os.path.basename(csv_members[0])
#                 expected_csv_path = os.path.join(output_dir, first_csv_name)
                
#                 if os.path.exists(expected_csv_path):
#                     return f"Überspringe: {zip_filename} (CSV-Datei '{first_csv_name}' existiert bereits)."
            
#             # Alle Mitglieder entpacken
#             for member in members_to_extract:
                
#                 # Handling verschachtelter ZIPs
#                 if member.endswith('.zip'):
#                     inner_zip_path = zf.extract(member, path=output_dir)
#                     with zipfile.ZipFile(inner_zip_path, 'r') as inner_zf:
#                         inner_zf.extractall(output_dir)
#                     os.remove(inner_zip_path) # Temporäre innere ZIP-Datei löschen
                    
#                 # Handling von CSV-Dateien
#                 elif member.endswith('.csv'):
#                     zf.extract(member, path=output_dir)
                    
#         return f"Erfolg: {zip_filename} entpackt."
#     except Exception as e:
#         return f"FEHLER: {zip_filename} konnte nicht entpackt werden. {e}"


def unzip_file(zip_path, output_dir):
    """
    Entpackt eine ZIP-Datei und verarbeitet alle darin enthaltenen Strukturen: 
    verschachtelte Ordner, verschachtelte ZIPs und direkte CSVs.
    """
    zip_filename = os.path.basename(zip_path)
    
    # 1. Prüfe auf bereits entpackte Daten (der Skip-Check bleibt)
    # Da die Namenskonventionen variieren, ist dieser Check komplexer. 
    # Für Einfachheit überspringen wir den Check für das Entpacken und lassen die 
    # CSV-Verarbeitung (Phase 2) die Duplikate handhaben. Wir führen den Check nur 
    # für die temporären Verzeichnisse durch, die wir erstellen.
    
    temp_extract_dir = os.path.join(output_dir, f"temp_extract_{zip_filename.replace('.zip', '')}")
    os.makedirs(temp_extract_dir, exist_ok=True) # Temporäres Verzeichnis für das Entpacken

    try:
        # 2. Entpacke das Hauptarchiv in ein temporäres Verzeichnis
        with zipfile.ZipFile(zip_path, 'r') as zf:
            zf.extractall(temp_extract_dir)

        # 3. Durchsuche das temporäre Verzeichnis rekursiv nach CSV- und ZIP-Dateien

        extracted_files_count = 0
        
        for root, dirs, files in os.walk(temp_extract_dir):
            
            if '__MACOSX' in dirs:
                dirs.remove('__MACOSX')

            for file_name in files:
                source_path = os.path.join(root, file_name)
                
                # a) Wenn es eine ZIP-Datei ist (Szenario 3: Neue Jahresarchive)
                if file_name.endswith('.zip'):
                    # Entpacke die innere ZIP direkt in das finale Zielverzeichnis (output_dir)
                    with zipfile.ZipFile(source_path, 'r') as inner_zf:
                        inner_zf.extractall(output_dir)
                        extracted_files_count += len([m for m in inner_zf.namelist() if m.endswith('.csv')])
                        
                # b) Wenn es eine CSV-Datei ist (Szenario 1 oder 2: Alte Jahresarchive oder direkte Monats-ZIPs)
                elif file_name.endswith('.csv'):
                    # Verschiebe die CSV in das finale Zielverzeichnis (output_dir)
                    # Wir überschreiben Duplikate (z.B. wenn es doppelte Benennung in der Hierarchie gab)
                    shutil.move(source_path, os.path.join(output_dir, file_name))
                    extracted_files_count += 1
        
        # 4. Cleanup: Lösche das temporäre Verzeichnis
        shutil.rmtree(temp_extract_dir)
        
        return f"Erfolg: {zip_filename} entpackt. {extracted_files_count} CSVs verschoben/extrahiert."
        
    except Exception as e:
        # 5. Cleanup bei Fehler: Versuche, das temporäre Verzeichnis zu löschen
        if os.path.exists(temp_extract_dir):
             shutil.rmtree(temp_extract_dir)
             
        return f"FEHLER: {zip_filename} konnte nicht verarbeitet werden. {e}"

def process_and_save_csv(csv_path):
    """Liest eine CSV, bereinigt Spaltennamen und speichert als temporäres Parquet."""
    
    try:
        # 1. Spaltennamen-Mapping erstellen (um alte und neue Formate zu vereinheitlichen)
        df_temp = pd.read_csv(csv_path, nrows=0) # Nur Header lesen
        df_cols = df_temp.columns.tolist()
        
        # Erzeuge ein Mapping, um alte Spaltennamen auf neue zu vereinheitlichen
        column_map = {}
        use_cols_final = []

        # Nutzerspalte usertype/member_casual ist in verschiedenen Dateien unterschiedlich kodiert
        requires_usertype_conversion = False # Flag für die Wertekonvertierung
        
        for col in df_cols:
            # --- Datum/Zeit-Daten standardisieren ---
            if 'start_time' in col or 'started_at' in col or 'starttime' in col or 'Start Time' in col:
                column_map[col] = 'started_at'
                use_cols_final.append('started_at')
            elif 'stop_time' in col or 'ended_at' in col or 'stoptime' in col or 'Stop Time' in col:
                column_map[col] = 'ended_at'
                use_cols_final.append('ended_at')
            
            # --- Geo-Daten standardisieren ---
            elif 'start station latitude' in col or 'start_lat' in col or 'Start Station Latitude' in col:
                column_map[col] = 'start_lat'
                use_cols_final.append('start_lat')
            elif 'end station latitude' in col or 'end_lat' in col or 'End Station Latitude' in col:
                column_map[col] = 'end_lat'
                use_cols_final.append('end_lat')
            elif 'start station longitude' in col or 'start_lng' in col or 'Start Station Longitude' in col:
                column_map[col] = 'start_lng'
                use_cols_final.append('start_lng')    
            elif 'end station longitude' in col or 'end_lng' in col or 'End Station Longitude' in col:
                column_map[col] = 'end_lng'
                use_cols_final.append('end_lng')    
            
            # --- Stations Namen/ID standardisieren ---
            elif 'start station name' in col or 'start_station_name' in col or 'Start Station Name' in col:
                column_map[col] = 'start_station_name'
                use_cols_final.append('start_station_name')    
            elif 'end station name' in col or 'end_station_name' in col or 'End Station Name' in col:
                column_map[col] = 'end_station_name'
                use_cols_final.append('end_station_name')      
            elif 'start station id' in col or 'start_station_id' in col or 'Start Station ID' in col:
                column_map[col] = 'start_station_id'
                use_cols_final.append('start_station_id')    
            elif 'end station id' in col or 'end_station_id' in col or 'End Station ID' in col:
                column_map[col] = 'end_station_id'
                use_cols_final.append('end_station_id')    
            
            # --- Rideable/Bike Typ standardisieren ---
            elif 'rideable_type' in col:
                column_map[col] = 'rideable_type'
                use_cols_final.append('rideable_type') 
            

            # --- Nutzerspalte standardisieren ---
            elif 'member_casual' in col:
                column_map[col] = 'member_casual'
                use_cols_final.append('member_casual')
            elif 'usertype' in col or 'User Type' in col:
                column_map[col] = 'member_casual'
                use_cols_final.append('member_casual')
                requires_usertype_conversion = True
            else:
                print(f"Unbekannte Spalte gefunden: {col}. Bitte erweitern Sie das Mapping.")
        
        
      
        # Entferne Duplikate aus use_cols_final
        use_cols_final = list(set(use_cols_final))
        
            

        # 2. Datei einlesen
        df = pd.read_csv(csv_path, 
                            usecols=list(column_map.keys()), 
                            dtype=DTYPE_MAP,
                            low_memory=False)

        # 2019 wurden bei Citibike die E-Bikes eingeführt. Davor gab es nur klassische Bikes. Daher fügen wir diese Spalte bei alten Daten hinzu.
        if 'rideable_type' not in df.columns:
            df['rideable_type'] = 'classic_bike'

        # Problematisch - Konvertiere Station-IDs zu integer (ungültige Strings wie 'SYS016' werden zu NaN)
        # for id_col in ['start_station_id', 'end_station_id']:
        #     if id_col in ddf.columns:
        #         df[id_col] = pd.to_numeric(df[id_col], errors='coerce').astype('Int16')

        # 3. Spalten umbenennen 
        df.rename(columns=column_map, inplace=True)
        
        # Anpassung der Nutzerspalte falls erforderlich
        if requires_usertype_conversion:
                    df['member_casual'] = df['member_casual'].astype('object').replace({'Subscriber': 'member', 'Customer': 'casual'})

        df['member_casual'] = df['member_casual'].astype('category')
        # 4. Nur die vereinheitlichten Spalten beibehalten
        #   Einträge die keinen Mehrwert bieten oder die nur in alten Daten existieren  wurden nicht zu use_cols_final hinzugefügt
        #   2014-2019: tripduration,bikeid,gender,birth year
        #   2020-2025: ride_id
        

        # Diese Spalten liegen teils als rein numerische und teils als alpha-numerische Daten vor und müssen absolut konsistent als String vorliegen
        id_cols_to_fix = ['start_station_id', 'end_station_id']
        for col in id_cols_to_fix:
            if col in df.columns:
                # 1. Alles in Strings umwandeln (auch NaNs werden zu 'nan')
                df[col] = df[col].astype(str)
                                
                # 2. 'nan' Strings (von echten NaNs) wieder zu echten Python-None machen
                #    Das erlaubt PyArrow, sie als NULL-Werte im String-Schema zu speichern
                df[col] = df[col].replace(['nan', 'NaN', 'None', ''], None)
        
        
        df = df[use_cols_final]

        # 5. Speichern als temporäres Parquet
        temp_parquet_path = os.path.join(PROCESSED_DATA_DIR, f'temp_{os.path.basename(csv_path)}.parquet')
        df.to_parquet(temp_parquet_path, index=False)

        
        # 6. Speicher freigeben
        del df
        return f"Erfolg: {os.path.basename(csv_path)} verarbeitet und als Parquet gespeichert."
        
    except Exception as e:
        return f"FEHLER beim Verarbeiten: {os.path.basename(csv_path)}. {e}"
    
def orchestrate_data_pipeline():
    
    # 1. Ordner erstellen
    os.makedirs(UNZIPPED_DIR, exist_ok=True)
    os.makedirs(PROCESSED_DATA_DIR, exist_ok=True)
    
    # 2. Alle heruntergeladenen ZIP-Dateien finden
    all_zip_files = glob.glob(os.path.join(RAW_DATA_DIR, '*.zip'))

    if not all_zip_files:
        print(f"KRITISCHER FEHLER: Keine ZIP-Dateien im Ordner {RAW_DATA_DIR} gefunden.")
        return
    
    print(f"Starte Verarbeitung von {len(all_zip_files)} ZIP-Dateien...")
        
    # 3. Alle ZIPs parallel entpacken
    print("\n--- Phase 1: Entpacken der ZIPs ---")
    # Wir filtern die 'Überspringen'-Meldungen in der tqdm-Schleife für eine saubere Ausgabe.
    skipped_count = 0
    # ThreadPoolExecutor anstatt ProcessPoolExecutor (macht Probleme) verwenden
    # with ThreadPoolExecutor(max_workers=32) as executor: 
    #     futures = [executor.submit(unzip_file, zp, UNZIPPED_DIR) for zp in all_zip_files]
    #     for future in tqdm(as_completed(futures), total=len(all_zip_files), desc="Entpacke"):
    #         result = future.result()
    #         if "FEHLER" in result:
    #             print(f"\n{result}")
    #         elif "Überspringe" in result:
    #             skipped_count += 1

    # TEST: Sequenzielle Schleife nutzen, falls Multithreading/-processing Probleme macht:
    # print("\n--- Phase 1: Sequenzieller Testlauf Entpacken ---")
    # skipped_count = 0
    # for zip_path in tqdm(all_zip_files, desc="Entpacke Sequenziell"):
    #     result = unzip_file(zip_path, UNZIPPED_DIR)
    #     if "FEHLER" in result:
    #         print(f"\n{result}")
    #         # Beenden Sie hier, um den Fehler zu sehen
    #         raise Exception("Sequenzieller Entpack-Fehler aufgetreten.")
    #     elif "Überspringe" in result:
    #         skipped_count += 1

    if skipped_count > 0:
        print(f"INFO: {skipped_count} ZIP-Dateien wurden übersprungen, da die entpackten Daten bereits existieren.")

    # 4. Alle entpackten CSVs finden
    all_csv_files = glob.glob(os.path.join(UNZIPPED_DIR, '*.csv'))
    print(f"\n{len(all_csv_files)} CSV-Dateien gefunden und bereit zur Parallelverarbeitung.")
    
    # 5. CSVs verarbeiten und als temporäre Parquet-Files speichern

    # Parallel verarbeiten macht Probleme 
    # print("\n--- Phase 2: Parallelverarbeitung der CSVs ---")
    # with ProcessPoolExecutor(max_workers=1) as executor:
    #     futures = [executor.submit(process_and_save_csv, csvp) for csvp in all_csv_files]
    #     for future in tqdm(as_completed(futures), total=len(all_csv_files), desc="Verarbeite CSVs"):
    #         result = future.result()
    #         if "FEHLER" in result:
    #             print(f"\n{result}")
    

    # Sequenzielle Schleife nutzen, da Multiprocessing Probleme macht (auskommentiert um diesen Prozess nur einmal zu durchlaufen):
    print("\n--- Phase 2: Sequenzielle Verarbeitung der CSVs ---")
    # for csv_path in tqdm(all_csv_files, desc="Verarbeite CSVs Sequenziell"):
    #     result = process_and_save_csv(csv_path)
    #     if "FEHLER" in result:
    #         print(f"\n{result}")
    #         # Wenn hier ein Fehler auftritt, ist es ein Daten-/Code-Fehler, kein ProcessPool-Problem!
    #         raise Exception("Sequenzieller Verarbeitungsfehler aufgetreten.")            
    
    # 6. Konsolidierung & Speichern
    print("\n--- Phase 3: Finale Konsolidierung ---")
    final_parquet_files = glob.glob(os.path.join(PROCESSED_DATA_DIR, 'temp_*.parquet'))
    print(f"Füge {len(final_parquet_files)} temporäre Parquet-Dateien zusammen...")
    
    # Verwende Pandas.Concat für speichereffiziente Konsolidierung
    # df_final = pd.concat([pd.read_parquet(f) for f in tqdm(final_parquet_files, desc="Lade Parquet Chunks")], 
    #                     ignore_index=True)
    # df_final.to_parquet(FINAL_PARQUET_PATH, index=False)

    # Verwende PyArrow für speichereffiziente Konsolidierung, um Memory Overflow zu vermeiden
    # --> Immernoch zu hohe Speicherauslastung --> Dask Parquet 
    # for f in tqdm(final_parquet_files, desc="Lade Parquet Chunks"):
    #     table = pq.read_table(f)
    #     tables.append(table)
    # # Kombiniere alle geladenen Tabellen
    # combined_table = pa.concat_tables(tables)
    # # Schreibe die kombinierte Tabelle in die Zieldatei
    # pq.write_table(combined_table, FINAL_PARQUET_PATH)


    
    # Verwende Dask Parquet für speichereffiziente Konsolidierung, um Memory Overflow zu vermeiden
    print(f"Lese {len(final_parquet_files)} Parquet-Dateien (lazy)...")
    ddf = dd.read_parquet(final_parquet_files)
    print(f"Schreibe die kombinierte Tabelle nach {FINAL_PARQUET_PATH}...")
    ddf.to_parquet(FINAL_PARQUET_PATH, write_index=False)
 
    print(f"\n--- ERFOLG ---")
    print(f"Gesamtdatensatz gespeichert unter: {FINAL_PARQUET_PATH}")
           
    # 7. Aufräumen (optional)
    # Entfernen Sie die temporären Dateien, um Speicherplatz zu sparen
    # for f in final_parquet_files: os.remove(f)
    # for f in all_csv_files: os.remove(f)

if __name__ == "__main__":
    # Achtung: Die Ausführung dieses Skripts kann bei allen Jahren sehr lange dauern!
    orchestrate_data_pipeline()    

Starte Verarbeitung von 155 ZIP-Dateien...

--- Phase 1: Entpacken der ZIPs ---

488 CSV-Dateien gefunden und bereit zur Parallelverarbeitung.

--- Phase 2: Parallelverarbeitung der CSVs ---

--- Phase 3: Finale Konsolidierung ---
Füge 488 temporäre Parquet-Dateien zusammen...
Lese 488 Parquet-Dateien (lazy)...
Schreibe die kombinierte Tabelle nach ../data/processed/citibike\citibike_all_years_combined.parquet...

--- ERFOLG ---
Gesamtdatensatz gespeichert unter: ../data/processed/citibike\citibike_all_years_combined.parquet


MemoryError: Unable to allocate 13.6 MiB for an array with shape (4, 888085) and data type float32

NYPD CRASH DATA verarbeiten

In [5]:
## Einlesen Datenbank 1: NYPD Motor Vehicle Collisions

NYPD_IN_FILE = 'Motor_Vehicle_Collisions_-_Crashes_20251209.csv'
nypd_path = os.path.join('../data/unzipped/nypd/', NYPD_IN_FILE)

print(f"Lese NYPD-Daten ein von: {nypd_path}")

try:
    # Nur die relevantesten Spalten zur Optimierung der Ladezeit auswählen
    nypd_cols = ["CRASH DATE","CRASH TIME","BOROUGH","ZIP CODE","LATITUDE","LONGITUDE","LOCATION","ON STREET NAME","CROSS STREET NAME","OFF STREET NAME","NUMBER OF PERSONS INJURED","NUMBER OF PERSONS KILLED","NUMBER OF PEDESTRIANS INJURED","NUMBER OF PEDESTRIANS KILLED","NUMBER OF CYCLIST INJURED","NUMBER OF CYCLIST KILLED","NUMBER OF MOTORIST INJURED","NUMBER OF MOTORIST KILLED","CONTRIBUTING FACTOR VEHICLE 1","CONTRIBUTING FACTOR VEHICLE 2","CONTRIBUTING FACTOR VEHICLE 3","CONTRIBUTING FACTOR VEHICLE 4","CONTRIBUTING FACTOR VEHICLE 5","COLLISION_ID","VEHICLE TYPE CODE 1","VEHICLE TYPE CODE 2","VEHICLE TYPE CODE 3","VEHICLE TYPE CODE 4","VEHICLE TYPE CODE 5"]
    #   nypd_cols = ['CRASH DATE', 'CRASH TIME', 'LATITUDE', 'LONGITUDE', 'NUMBER OF PERSONS INJURED', 'NUMBER OF CYCLIST INJURED','VEHICLE TYPE CODE 1', 'VEHICLE TYPE CODE 2']

    df_nypd = pd.read_csv(nypd_path, usecols=lambda x: x in nypd_cols, encoding='latin1', low_memory=False)
#     ddf_nypd = dd.read_csv(
#     nypd_path, 
#     usecols=nypd_cols,    # Dask akzeptiert die Liste nypd_cols direkt
#     encoding='latin1', 
#     # low_memory=False,   # In Dask nicht nötig/vorhanden, da Dask sowieso chunkweise arbeitet
#     dtype='object'         # Empfehlung: Erstmal als object laden, um Typ-Konflikte zu vermeiden
# )
    print(f"\n=======================================================")
    print(f"\nNYPD-Daten erfolgreich geladen. {len(df_nypd):,} Zeilen.")
    print(f"=======================================================")
    print(df_nypd.head())
except FileNotFoundError:
    print(f"FEHLER: Datei {NYPD_IN_FILE} nicht im angegebenen Pfad gefunden.")

Lese NYPD-Daten ein von: ../data/unzipped/nypd/Motor_Vehicle_Collisions_-_Crashes_20251209.csv


NYPD-Daten erfolgreich geladen. 2,226,246 Zeilen.
   CRASH DATE CRASH TIME   BOROUGH ZIP CODE  LATITUDE  LONGITUDE  \
0  09/11/2021       2:39       NaN      NaN       NaN        NaN   
1  03/26/2022      11:45       NaN      NaN       NaN        NaN   
2  11/01/2023       1:29  BROOKLYN    11230  40.62179 -73.970024   
3  06/29/2022       6:55       NaN      NaN       NaN        NaN   
4  09/21/2022      13:21       NaN      NaN       NaN        NaN   

                     LOCATION           ON STREET NAME CROSS STREET NAME  \
0                         NaN    WHITESTONE EXPRESSWAY         20 AVENUE   
1                         NaN  QUEENSBORO BRIDGE UPPER               NaN   
2      (40.62179, -73.970024)            OCEAN PARKWAY          AVENUE K   
3                         NaN       THROGS NECK BRIDGE               NaN   
4                         NaN          BROOKLYN BRIDGE          

In [None]:
## Bereinigen Datenbank 1: NYPD Motor Vehicle Collisions
os.makedirs('../data/processed/nypd', exist_ok=True)
print(f"NYPD-Daten (unbereinigt): {len(df_nypd):,} Zeilen.")

# Koordinaten filtern (entfernt Zeilen ohne Geo-Daten)
df_nypd.dropna(subset=['LATITUDE', 'LONGITUDE'], inplace=True)
print(f"NYPD-Daten nach Geo-Bereinigung: {len(df_nypd):,} Zeilen.")

# Crash Date/Time in Datetime-Typ konvertieren; Die Fehler-Toleranz (errors='coerce') setzt ungültige Daten auf NaT (Not a Time)
df_nypd['CRASH DATE'] = pd.to_datetime(df_nypd['CRASH DATE'], errors='coerce')

# Prüfen, ob die 'CRASH TIME'-Spalte bereits datetime.time-Objekte enthält.
# Dies verhindert, dass pd.to_datetime bei bereits konvertierten Objekten fehlschlägt.
if not df_nypd['CRASH TIME'].dropna().empty and \
   isinstance(df_nypd['CRASH TIME'].dropna().iloc[0], dt.time):
    pass
else:
    df_nypd['CRASH TIME'] = pd.to_datetime(df_nypd['CRASH TIME'], format='%H:%M', errors='coerce').dt.time


# Die CRASH TIME liegt als Python 'time' Objekt vor, das nicht direkt addiert werden kann.
def time_to_seconds(t):
    """Konvertiert ein datetime.time Objekt in die Gesamtanzahl der Sekunden."""
    # fängt fehlerhafte time Objekte ab
    if pd.isna(t):
        return pd.NA
    return t.hour * 3600 + t.minute * 60 + t.second

# Konvertiere in Sekunden, bilde ein Zeitdelta und addiere dieses zum Datum
df_nypd['time_seconds'] = df_nypd['CRASH TIME'].apply(time_to_seconds)
df_nypd['time_delta'] = pd.to_timedelta(df_nypd['time_seconds'], unit='s')
df_nypd['crash_datetime'] = df_nypd['CRASH DATE'] + df_nypd['time_delta']

# Entferne die temporären Spalten und Zeilen, die nach der Kombination ungültig sind.
df_nypd.drop(columns=['time_seconds', 'time_delta'], inplace=True)
df_nypd.dropna(subset=['crash_datetime'], inplace=True)
print(f"NYPD-Daten nach Datetime-Konvertierung und -Bereinigung: {len(df_nypd):,} Zeilen.")


# Bereinigung: Unfallbeteiligte filtern (entfernt Zeilen ohne  Fahrradbeteiligung)
# -- Fall A: Prüfen, ob "bike" / "bicycle" in den Fahrzeugtyp-Codes vorkommt
BIKE_IDENTIFIER = "bike|bicycle"

bicycle_in_codes = (
    df_nypd['VEHICLE TYPE CODE 1'].astype(str).str.contains(BIKE_IDENTIFIER, case=False, na=False) |
    df_nypd['VEHICLE TYPE CODE 2'].astype(str).str.contains(BIKE_IDENTIFIER, case=False, na=False) |
    df_nypd['VEHICLE TYPE CODE 3'].astype(str).str.contains(BIKE_IDENTIFIER, case=False, na=False) |
    df_nypd['VEHICLE TYPE CODE 4'].astype(str).str.contains(BIKE_IDENTIFIER, case=False, na=False) |
    df_nypd['VEHICLE TYPE CODE 5'].astype(str).str.contains(BIKE_IDENTIFIER, case=False, na=False)
)

# -- Fall B: Prüfen, ob eine Person als Radfahrer verletzt oder getötet wurde
# (Stellt sicher, dass auch nicht explizit als "bike" / "bicycle"  gekennzeichnete, aber involvierte Fahrräder erfasst werden)
cyclist_injured = df_nypd['NUMBER OF CYCLIST INJURED'].fillna(0).astype(int) > 0
cyclist_killed = df_nypd['NUMBER OF CYCLIST KILLED'].fillna(0).astype(int) > 0

# Kombiniere die beiden Fälle (OR-Verknüpfung)
df_nypd_filtered = df_nypd[bicycle_in_codes | cyclist_injured | cyclist_killed].copy()
print(f"\nNYPD-Daten nach Filterung auf Fahrrad-Unfälle: {len(df_nypd_filtered):,} Zeilen.")

# Reduziere den DataFrame auf die wesentlichen Geo- und Zeit-Spalten für die Analyse
df_nypd_clean = df_nypd_filtered[[
    'crash_datetime',
    'LATITUDE',
    'LONGITUDE',
    'NUMBER OF CYCLIST INJURED',
    'NUMBER OF CYCLIST KILLED',
    'VEHICLE TYPE CODE 1',
    'VEHICLE TYPE CODE 2',
    'VEHICLE TYPE CODE 3',
    'VEHICLE TYPE CODE 4', 
    'VEHICLE TYPE CODE 5'
]].copy()

nypd_parquet_path = os.path.join('../data/processed/nypd', 'nypd_clean.parquet')
df_nypd_clean.to_parquet(nypd_parquet_path, index=False)

print(f"\n=======================================================")
print(f"NYPD-Daten erfolgreich bereinigt. {len(df_nypd_clean):,} Zeilen.")
print(f"=======================================================")

NYPD-Daten (unbereinigt): 1,985,841 Zeilen.
NYPD-Daten nach Geo-Bereinigung: 1,985,841 Zeilen.
NYPD-Daten nach Datetime-Konvertierung und -Bereinigung: 1,985,841 Zeilen.

NYPD-Daten nach Filterung auf Fahrrad-Unfälle: 84,361 Zeilen.


OSError: Cannot save file into a non-existent directory: '..\data\processed\nypd'