In [5]:
# Auswertung der DB Rad+ Daten auf Bezirksebene
# Zur Erstellung des Codes wurde die generative Künstliche Intelligenz (KI) „Claude AI“ des Anbieters Anthropic in Version 3.7 genutzt

# Calculate metrics for each Berlin district from network data
# This script creates a new GeoJSON with metrics derived from network segments in each district

import pandas as pd
import geopandas as gpd
import numpy as np
from shapely import wkt
import concurrent.futures
from tqdm import tqdm
import pyarrow.parquet as pq
import pyarrow as pa
import gc
import os
from pathlib import Path

# Configuration
INPUT_PARQUET = "data/network_all_months_plus_25833_length_with_fahrradstrasse.parquet"
DISTRICTS_GEOJSON = "data/bezirke_berlin.geojson"
OUTPUT_GEOJSON = "data/bezirke_berlin_metrics.geojson"
OUTPUT_DIR = "analysis_results/Bezirke"
COLUMN_HIST = '2304-2412_speeds'  # Column for speed histogram data
COLUMN_LENGTH = 'length_m'  # Column for length in meters
COLUMN_ROUTE_COUNT = '2304-2412_route_count'  # Column for route count
CHUNK_SIZE = 50000  # Process this many rows at a time (adjust based on available memory)
N_WORKERS = 2  # Number of parallel workers

# Fields to read from parquet - keeping minimal to reduce memory usage
PARQUET_FIELDS = ['id', 'geometry_wkt', COLUMN_HIST, COLUMN_LENGTH, COLUMN_ROUTE_COUNT]

# Ensure output directory exists
Path(OUTPUT_DIR).mkdir(exist_ok=True, parents=True)

def log(message):
    """Print a log message"""
    print(message)

def force_gc():
    """Force garbage collection"""
    gc.collect()

def parse_histogram(hist_str):
    """Parse histogram string to numpy array"""
    try:
        if isinstance(hist_str, str):
            hist_str = hist_str.strip('[]')
            # Fast NumPy parsing
            try:
                values = np.fromstring(hist_str, sep=',')
                return values
            except:
                # Fallback to manual parsing if NumPy method fails
                values = [float(x) for x in hist_str.split(',')]
                return np.array(values)
        else:
            return np.zeros(32)  # Return zeros for missing histograms
    except Exception as e:
        log(f"Error parsing histogram: {e}")
        return np.zeros(32)

def calculate_histogram_statistics(histogram):
    """
    Calculate statistics from a histogram
    Returns a dictionary with mean, median, and std
    """
    if np.sum(histogram) == 0:
        return {
            'mean': np.nan,
            'median': np.nan,
            'std': np.nan
        }
    
    speed_bins = np.arange(32)  # 0-31 km/h
    total_count = np.sum(histogram)
    
    # Normalize the histogram
    norm_hist = histogram / total_count
    
    # Calculate weighted mean
    mean = np.sum(speed_bins * norm_hist)
    
    # Calculate variance and std
    variance = np.sum(((speed_bins - mean) ** 2) * norm_hist)
    std = np.sqrt(variance)
    
    # Calculate median using cumulative distribution
    cum_dist = np.cumsum(norm_hist)
    median = np.interp(0.5, cum_dist, speed_bins)
    
    return {
        'mean': mean,
        'median': median,
        'std': std
    }

def process_district_chunk(district_name, district_geom, chunk_df):
    """
    Process a chunk of network data for a specific district.
    Returns metrics derived from segments intersecting with the district.
    """
    # Filter to segments that intersect with this district
    geometries = chunk_df['geometry_wkt'].apply(wkt.loads)
    intersects = geometries.apply(lambda g: g.intersects(district_geom) if g else False)
    district_segments = chunk_df.loc[intersects].copy()
    
    if district_segments.empty:
        # Return empty results if no segments in this district from this chunk
        return {
            'segment_count': 0,
            'total_length_km': 0,
            'total_route_count': 0,
            'aggregated_histogram': np.zeros(32),
            'segments': [] 
        }
    
    # Parse histograms
    district_segments['parsed_histogram'] = district_segments[COLUMN_HIST].apply(parse_histogram)
    
    # Calculate metrics
    segment_count = len(district_segments)
    total_length_km = district_segments[COLUMN_LENGTH].sum() / 1000
    total_route_count = district_segments[COLUMN_ROUTE_COUNT].sum()
    
    # Aggregate histograms weighted by length
    aggregated_histogram = np.zeros(32)
    for idx, row in district_segments.iterrows():
        hist = row['parsed_histogram']
        length = row[COLUMN_LENGTH]
        if np.sum(hist) > 0:
            aggregated_histogram += hist * length
    
    # Save segment IDs for potential future reference
    segment_ids = district_segments['id'].tolist()
    
    return {
        'segment_count': segment_count,
        'total_length_km': total_length_km,
        'total_route_count': total_route_count,
        'aggregated_histogram': aggregated_histogram,
        'segments': segment_ids
    }

def calculate_district_metrics():
    """
    Calculate metrics for each district by processing the network data
    and identifying segments that intersect with each district.
    """
    log(f"Loading districts from {DISTRICTS_GEOJSON}")
    # Load districts
    districts_gdf = gpd.read_file(DISTRICTS_GEOJSON)
    
    # Ensure CRS is EPSG:25833
    if districts_gdf.crs != "EPSG:25833":
        districts_gdf = districts_gdf.to_crs("EPSG:25833")
    
    # Initialize results dictionary
    district_results = {}
    for idx, row in districts_gdf.iterrows():
        district_name = row['bbez_name']
        district_results[district_name] = {
            'segment_count': 0,
            'total_length_km': 0,
            'total_route_count': 0,
            'aggregated_histogram': np.zeros(32),
            'segments': []
        }
    
    # Open parquet file
    log(f"Opening parquet file: {INPUT_PARQUET}")
    parquet_file = pq.ParquetFile(INPUT_PARQUET)
    total_rows = parquet_file.metadata.num_rows
    num_row_groups = parquet_file.metadata.num_row_groups
    log(f"Parquet file has {num_row_groups} row groups and approximately {total_rows:,} rows")
    
    # Process each row group
    for rg in tqdm(range(num_row_groups), desc="Processing row groups"):
        try:
            # Get row group size
            row_group_metadata = parquet_file.metadata.row_group(rg)
            row_group_size = row_group_metadata.num_rows
            
            # Read only necessary columns from this row group
            table = parquet_file.read_row_group(rg, columns=PARQUET_FIELDS)
            df = table.to_pandas()
            
            # Handle NaN values
            df = df.dropna(subset=['geometry_wkt', COLUMN_HIST, COLUMN_LENGTH, COLUMN_ROUTE_COUNT])
            
            # Process in smaller chunks to manage memory
            num_chunks = (len(df) + CHUNK_SIZE - 1) // CHUNK_SIZE  # Ceiling division
            
            for chunk_idx in tqdm(range(num_chunks), desc=f"Chunks in group {rg}", leave=False):
                # Get chunk
                start_idx = chunk_idx * CHUNK_SIZE
                end_idx = min(start_idx + CHUNK_SIZE, len(df))
                chunk_df = df.iloc[start_idx:end_idx]
                
                # Process this chunk for each district
                for idx, row in districts_gdf.iterrows():
                    district_name = row['bbez_name']
                    district_geom = row['geometry']
                    
                    # Process chunk for this district
                    chunk_results = process_district_chunk(district_name, district_geom, chunk_df)
                    
                    # Aggregate results
                    district_results[district_name]['segment_count'] += chunk_results['segment_count']
                    district_results[district_name]['total_length_km'] += chunk_results['total_length_km']
                    district_results[district_name]['total_route_count'] += chunk_results['total_route_count']
                    district_results[district_name]['aggregated_histogram'] += chunk_results['aggregated_histogram']
                    district_results[district_name]['segments'].extend(chunk_results['segments'])
                
                # Clean up the chunk data
                del chunk_df
                force_gc()
            
            # Clean up the row group data
            del df, table
            force_gc()
            
        except Exception as e:
            log(f"Error processing row group {rg}: {e}")
    
    # Calculate final statistics for each district
    log("Calculating final statistics for each district")
    for district_name, metrics in district_results.items():
        # Calculate statistics from aggregated histogram
        histogram = metrics['aggregated_histogram']
        if np.sum(histogram) > 0:
            stats = calculate_histogram_statistics(histogram)
            metrics['mean_speed'] = stats['mean']
            metrics['median_speed'] = stats['median']
            metrics['speed_std'] = stats['std']
            
            # Normalize the aggregated histogram
            metrics['normalized_histogram'] = histogram / np.sum(histogram)
        else:
            metrics['mean_speed'] = np.nan
            metrics['median_speed'] = np.nan
            metrics['speed_std'] = np.nan
            metrics['normalized_histogram'] = np.zeros(32)
    
    return districts_gdf, district_results

def create_output_geojson(districts_gdf, district_results):
    """
    Create a new GeoJSON file with district metrics
    """
    # Copy districts GeoDataFrame
    output_gdf = districts_gdf.copy()
    
    # Rename bbez_name to Bezirk
    output_gdf = output_gdf.rename(columns={'bbez_name': 'Bezirk'})
    
    # Add metrics columns
    output_gdf['mean_speed'] = output_gdf['Bezirk'].map(lambda x: district_results[x]['mean_speed'])
    output_gdf['median_speed'] = output_gdf['Bezirk'].map(lambda x: district_results[x]['median_speed'])
    output_gdf['total_route_count'] = output_gdf['Bezirk'].map(lambda x: district_results[x]['total_route_count'])
    output_gdf['segment_count'] = output_gdf['Bezirk'].map(lambda x: district_results[x]['segment_count'])
    output_gdf['total_length_km'] = output_gdf['Bezirk'].map(lambda x: district_results[x]['total_length_km'])
    output_gdf['speed_std'] = output_gdf['Bezirk'].map(lambda x: district_results[x]['speed_std'])
    
    # Convert normalized histogram to string for storage in GeoJSON
    output_gdf['normalized_histogram'] = output_gdf['Bezirk'].map(
        lambda x: np.array2string(district_results[x]['normalized_histogram'], precision=6, separator=',')
    )
    
    # Save to file
    log(f"Saving output to {OUTPUT_GEOJSON}")
    output_gdf.to_file(OUTPUT_GEOJSON, driver='GeoJSON')
    
    # Also save as CSV for easier analysis
    csv_path = os.path.join(OUTPUT_DIR, 'bezirke_berlin_metrics.csv')
    output_gdf.drop('geometry', axis=1).to_csv(csv_path, index=False)
    log(f"Saved CSV to {csv_path}")
    
    return output_gdf

def main():
    """Main function"""
    log("Starting district metrics calculation")
    
    # Calculate metrics for each district
    districts_gdf, district_results = calculate_district_metrics()
    
    # Create and save output GeoJSON
    output_gdf = create_output_geojson(districts_gdf, district_results)
    
    # Print summary
    log("\nDistrict Metrics Summary:")
    summary_df = output_gdf[['Bezirk', 'mean_speed', 'median_speed', 'total_length_km', 'segment_count']].copy()
    summary_df = summary_df.sort_values('mean_speed', ascending=False)
    print(summary_df)
    
    log("District metrics calculation complete")

if __name__ == "__main__":
    main()

Starting district metrics calculation
Loading districts from data/bezirke_berlin.geojson
Opening parquet file: data/network_all_months_plus_25833_length_with_fahrradstrasse.parquet
Parquet file has 12 row groups and approximately 466,957 rows


Processing row groups:   0%|          | 0/12 [00:00<?, ?it/s]
[Anks in group 0:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 0: 100%|██████████| 1/1 [00:29<00:00, 29.79s/it]
Processing row groups:   8%|▊         | 1/12 [00:29<05:29, 29.91s/it]
[Anks in group 1:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 1: 100%|██████████| 1/1 [00:23<00:00, 23.35s/it]
Processing row groups:  17%|█▋        | 2/12 [00:53<04:21, 26.12s/it]
[Anks in group 2:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 2: 100%|██████████| 1/1 [00:25<00:00, 25.70s/it]
Processing row groups:  25%|██▌       | 3/12 [01:19<03:53, 25.98s/it]
[Anks in group 3:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 3: 100%|██████████| 1/1 [00:29<00:00, 29.58s/it]
Processing row groups:  33%|███▎      | 4/12 [01:48<03:39, 27.46s/it]
[Anks in group 4:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 4: 100%|██████████| 1/1 [00:28<00:00, 28.74s/it]
Processing row groups:  42%|████▏     | 5/12 [02

Error processing row group 8: Corrupt snappy compressed data.



[Anks in group 9:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 9: 100%|██████████| 1/1 [00:36<00:00, 36.03s/it]
Processing row groups:  83%|████████▎ | 10/12 [04:20<00:46, 23.41s/it]
[Anks in group 10:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 10: 100%|██████████| 1/1 [00:34<00:00, 34.75s/it]
Processing row groups:  92%|█████████▏| 11/12 [04:55<00:26, 26.33s/it]
[Anks in group 11:   0%|          | 0/1 [00:00<?, ?it/s]
[Anks in group 11: 100%|██████████| 1/1 [00:30<00:00, 30.53s/it]
Processing row groups: 100%|██████████| 12/12 [05:26<00:00, 27.18s/it]


Calculating final statistics for each district
Saving output to data/bezirke_berlin_metrics.geojson
Saved CSV to analysis_results/Bezirke/bezirke_berlin_metrics.csv

District Metrics Summary:
                        Bezirk  mean_speed  median_speed  total_length_km  \
8             Treptow-Köpenick   20.720511     20.753022         1769.911   
6                      Spandau   20.218258     20.111272          916.086   
9          Marzahn-Hellersdorf   20.137714     20.051772         1137.002   
10                 Lichtenberg   19.966903     20.057429         1019.599   
11               Reinickendorf   19.872396     19.818555         1015.620   
7          Steglitz-Zehlendorf   19.869438     19.734697         1502.659   
4                     Neukölln   19.857536     19.824575          782.853   
5   Charlottenburg-Wilmersdorf   19.424771     19.328688         1465.697   
2                       Pankow   19.364373     19.347080         1598.759   
3         Tempelhof-Schöneberg   19.34