# Destatis Data Ingestion
Download und Vorverarbeitung der Datens√§tze f√ºr:
- JVA (Strafvollzug)
- Zensus (Bev√∂lkerung)
- Justizurteile (Verurteilte)

Das initiale Setup von Spark

In [None]:
import sys
import os
import requests
import zipfile
import io

# Pfad f√ºr lokale Imports erweitern
sys.path.append(os.path.abspath(".."))
from src.config_local import get_spark_session
from pyspark.sql.functions import col, substring, trim, when, lit, sum as _sum

spark = get_spark_session("Destatis_Ingest")

# API Config
TOKEN = "HIER_EINEN_TOKEN_EINF√úGEN" # Registrieren auf hhttps://www-genesis.destatis.de/datenbank/online#modal=login,register f√ºr einen API-Token 
BASE_URL = 'https://www-genesis.destatis.de/genesisWS/rest/2020/'

RAW_DIR = "../data/raw/"
PROC_DIR = "../data/processed/"
os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(PROC_DIR, exist_ok=True)

Starting local Spark: Destatis_Ingest


Diese Hilfsfunktion dient zum erstellen der Tabellen aus den Datens√§tzen.

In [None]:
def download_table(table_code, filename, force_reload=True):
    if os.path.exists(filename) and not force_reload:
        print(f"√úberspringe download, file exists: {filename}")
        return True

    print(f"Starte Download: {table_code} ...")
    url = f"{BASE_URL}data/tablefile"
    payload = {
        "name": table_code,
        "area": "all",
        "compress": "false",
        "transpose": "false",
        "startyear": "1976",
        "endyear": "2024",
        "language": "de",
        "format": "ffcsv",
        "job": "false"
    }
    
    try:
        r = requests.post(url, data=payload, 
                          headers={'Content-Type': 'application/x-www-form-urlencoded', 'username': TOKEN, 'password': ""})
        
        if r.status_code == 200:
            # Pr√ºfe auf Zip-Header
            if r.content[:2] == b'PK': 
                with zipfile.ZipFile(io.BytesIO(r.content)) as z:
                    with z.open(z.namelist()[0]) as zf, open(filename, 'wb') as f:
                        f.write(zf.read())
            else:
                with open(filename, 'w', encoding='utf-8') as f:
                    f.write(r.text)
            print(f"Download erfolgreich: {filename}")
            return True
        else:
            print(f"API Fehler {r.status_code}: {r.text[:200]}")
            return False
    except Exception as e:
        print(f"Download fehlgeschlagen: {e}")
        return False

def clean_number(c):
    # Destatis nutzt oft "-", "." oder "/" f√ºr fehlende Werte/Null
    return when(trim(col(c)).isin(".", "-", "", "/"), 0).otherwise(trim(col(c))).cast("double")

In [None]:
# Bev√∂lkerung (Zensus) 12411-0002
FILE_RAW = f"{RAW_DIR}population_raw.csv"
FILE_OUT = f"{PROC_DIR}population_clean.parquet"

download_table("12411-0002", FILE_RAW)

print("Verarbeite Bev√∂lkerungsdaten")
df = spark.read.option("header", "true").option("delimiter", ";").csv(FILE_RAW)

# Filter auf relevante Merkmale und Pivotierung
df_clean = df.withColumn("jahr", substring(col("time"), 1, 4).cast("int")) \
    .filter(col("2_variable_attribute_label").isin("Deutsche", "Ausl√§nder")) \
    .withColumn("anzahl", clean_number("value")) \
    .groupBy("jahr").pivot("2_variable_attribute_label").sum("anzahl") \
    .select(col("jahr"), col("Deutsche").alias("pop_D"), col("Ausl√§nder").alias("pop_A"))

df_clean.write.mode("overwrite").parquet(FILE_OUT)
print(f"Gespeichert: {FILE_OUT}")
df_clean.show(3)

Starte Download: 12411-0002 ...
Download erfolgreich: ../data/raw/population_raw.csv
Verarbeite Bev√∂lkerungsdaten
Gespeichert: ../data/processed/population_clean.parquet
+----+-----------+---------+
|jahr|      pop_D|    pop_A|
+----+-----------+---------+
|1990| 7.417087E7|5582357.0|
|1977|5.7460519E7|3892226.0|
|2003|7.5189851E7|7341820.0|
+----+-----------+---------+
only showing top 3 rows


In [None]:
# Justiz (Chunked Download)
# Tabelle: 24311-0002 (Verurteilte nach Straftaten & Merkmalen)
import time
import glob

TABLE_CODE = "24311-0002"
FILE_OUT = f"{PROC_DIR}justiz_clean.parquet"

# API limitiert gro√üe Zeitr√§ume, daher in Chunks aufteilen
YEAR_CHUNKS = [
    (1976, 1990),
    (1991, 2005),
    (2006, 2015),
    (2016, 2024)
]

def download_chunk(start, end):
    filename = f"{RAW_DIR}justiz_flat_{start}_{end}.csv"
    
    # Check ob schon da
    if os.path.exists(filename) and os.path.getsize(filename) > 100:
        print(f"Chunk bereits vorhanden: {start}-{end}")
        return

    print(f"Lade Chunk: {start}-{end}")
    payload = {
        "name": TABLE_CODE,
        "area": "all",
        "compress": "false",
        "transpose": "false",
        "startyear": str(start),
        "endyear": str(end),
        "language": "de",
        "format": "ffcsv",
        "job": "false"
    }
    
    try:
        r = requests.post(f"{BASE_URL}data/tablefile", data=payload, 
                          headers={'Content-Type': 'application/x-www-form-urlencoded', 
                                   'username': TOKEN, 
                                   'password': ""})
        
        if r.status_code == 200 and "{\"Code\":" not in r.text[:100]:
            if r.content[:2] == b'PK':
                with zipfile.ZipFile(io.BytesIO(r.content)) as z:
                    with z.open(z.namelist()[0]) as zf, open(filename, 'wb') as f:
                        f.write(zf.read())
            else:
                with open(filename, 'w', encoding='utf-8') as f:
                    f.write(r.text)
            print(f"   -> Gespeichert: {filename}")
        else:
            print(f"   -> API Fehler {r.status_code}")
    except Exception as e:
        print(f"   -> Download fehlgeschlagen: {e}")

# Downloads durchf√ºhren
for s, e in YEAR_CHUNKS:
    download_chunk(s, e)
    time.sleep(1)

# Spark Verarbeitung - alle "Chunks" laden
# Absolute Pfade n√∂tig, weil Spark relative Pfade anders aufl√∂st als Python
try:
    JUSTIZ_FILES = [os.path.abspath(f) for f in glob.glob(f"{RAW_DIR}justiz_flat_*.csv")]
    df_raw = spark.read.option("header", "true") \
                       .option("delimiter", ";") \
                       .option("inferSchema", "false") \
                       .csv(JUSTIZ_FILES)

    df_clean = df_raw.select(
        col("time").alias("jahr"),
        col("2_variable_attribute_label").alias("straftat"),
        col("3_variable_attribute_label").alias("nationalitaet"),
        col("4_variable_attribute_label").alias("geschlecht"),
        col("5_variable_attribute_label").alias("alter_gruppe"),
        clean_number("value").alias("verurteilte")
    )
    
    df_final = df_clean.withColumn("jahr", substring(col("jahr"), 1, 4).cast("int"))

    df_final.write.mode("overwrite").parquet(FILE_OUT)
    print(f"Gespeichert: {FILE_OUT}")
    df_final.show(5)

except Exception as e:
    print(f"Fehler beim Verarbeiten der Justiz-Daten: {e}")
    try:
        print("Vorhandene Spalten in CSV:", df_raw.columns)
    except:
        pass

Chunk bereits vorhanden: 1976-1990
Chunk bereits vorhanden: 1991-2005
Chunk bereits vorhanden: 2006-2015
Chunk bereits vorhanden: 2016-2024
Gespeichert: ../data/processed/justiz_clean.parquet
+----+--------------------+-------------+----------+--------------------+-----------+
|jahr|            straftat|nationalitaet|geschlecht|        alter_gruppe|verurteilte|
+----+--------------------+-------------+----------+--------------------+-----------+
|1982|Andere Straftaten...|    Ausl√§nder|  weiblich|25 bis unter 30 J...|       65.0|
|1989|Straftaten ohne S...|     Deutsche|  weiblich|30 bis unter 40 J...|    16120.0|
|1981|Straftaten ohne S...|     Deutsche|  weiblich|30 bis unter 40 J...|    14380.0|
|1988|   Urkundenf√§lschung|     Deutsche| Insgesamt|   50 Jahre und mehr|      827.0|
|1978|Straftaten ohne S...|     Deutsche|  weiblich|30 bis unter 40 J...|    15188.0|
+----+--------------------+-------------+----------+--------------------+-----------+
only showing top 5 rows


In [None]:
# Strafvollzug (JVA) 24321-0001
FILE_RAW = f"{RAW_DIR}prison_raw.csv"
FILE_OUT = f"{PROC_DIR}prison_clean.parquet"

download_table("24321-0001", FILE_RAW)

print("Verarbeite Gef√§ngnisdaten...")
df = spark.read.option("header", "true").option("delimiter", ";").csv(FILE_RAW)

# Aggregation nach Nationalit√§t (Position 3 in Destatis Schema)
# Filter auf Gesamtwerte um Doppelz√§hlungen zu vermeiden
df_clean = df.withColumn("jahr", substring(col("time"), 1, 4).cast("int")) \
    .filter(col("3_variable_attribute_label").isin("Deutsche", "Ausl√§nder")) \
    .filter(col("4_variable_attribute_label") == "Insgesamt") \
    .filter(col("5_variable_attribute_label") == "Insgesamt") \
    .withColumn("insassen_raw", clean_number("value")) \
    .groupBy("jahr", col("3_variable_attribute_label").alias("nationalitaet")) \
    .agg(_sum("insassen_raw").alias("insassen"))

df_clean.write.mode("overwrite").parquet(FILE_OUT)
print(f"Gespeichert: {FILE_OUT}")
df_clean.show(3)

Starte Download: 24321-0001 ...
Download erfolgreich: ../data/raw/prison_raw.csv
‚öôÔ∏è Verarbeite Gef√§ngnisdaten...
üíæ Gespeichert: ../data/processed/prison_clean.parquet
+----+-------------+--------+
|jahr|nationalitaet|insassen|
+----+-------------+--------+
|2018|     Deutsche| 34690.0|
|2022|     Deutsche| 27995.0|
|2020|     Deutsche| 30420.0|
+----+-------------+--------+
only showing top 3 rows
