## Demografische Entwicklung Wiens seit 2008: Analyse der
## Bevölkerungsstruktur und Geburtenentwicklung auf
## Bezirksebene
<u>**Big Data Projekt von:**</u>
<br>
Johannes Reitterer <br>
Johannes Mantler <br>
Nicolas Nemeth <br>
<br>

# ETL Pipeline
Diese ETL-Pipeline lädt demografische Daten der Stadt Wien, bereinigt sie und speichert sie in MongoDB zur weiteren Analyse.

**Datenquellen:**
- Bevölkerung nach Geburtsbundesland (2008-heute): ~500.000 Datensätze https://www.data.gv.at/datasets/f54e6828-3d75-4a82-89cb-23c58057bad4?locale=de
- Geburtenstatistik (2002-heute): ~50.000 Datensätze https://www.data.gv.at/datasets/f54e6828-3d75-4a82-89cb-23c58057bad4?locale=de

## Pipeline-Ablauf

### 1. Extract (Daten laden)

Die Rohdaten werden aus CSV-Dateien von data.gv.at geladen.

Da es Probleme bei der API-Abfrage gibt, müssen die csv files manuell gedownloaded werden und in den Projekt Ordner eingefügt werden.

### 2. Transform (Daten bereinigen)

**Spaltenumbenennung:**
- Englische Spaltennamen werden zu deutschen Namen konvertiert
- Beispiel: `REF_YEAR` → `Jahr`, `DISTRICT_CODE` → `Bezirk_Roh`

**Bezirkscode-Transformation:**

Wien verwendet statistische Codes (90101-90223), die zu Postleitzahlen konvertiert werden:

```
90101 → 1010 (1. Bezirk)
90201 → 1020 (2. Bezirk)
90301 → 1030 (3. Bezirk)
...
```

**Datenbereinigung:**
- Ungültige Bezirkscodes entfernen
- Fehlende Werte mit 0 auffüllen
- Negative Werte korrigieren
- Datentypen zu Integer konvertieren

(1 = Männer, 2 = Frauen)

### 3. Load (Daten speichern)

Die bereinigten Daten werden in MongoDB gespeichert:
- **Collection `population`**: Bevölkerungsdaten nach Bezirk, Jahr, Alter, Geschlecht und Herkunft
- **Collection `births`**: Geburtendaten nach Bezirk, Jahr und Geschlecht
 
**Dokumentstruktur Beispiel:**
```json
{
  "Jahr": 2020,
  "Bezirk": 1010,
  "Geschlecht": 1,
  "Alter": 25,
  "Wien": 1234,
  "Ausland": 789
}
```

## Verwendung

```python
# Pipeline ausführen
run_pipeline()

# Ergebnis: Daten in MongoDB unter wien_demografie_db
# - population: Bevölkerungsdaten
# - births: Geburtendaten
```

In [10]:
"""
Authors: Johannes Mantler, Johannes Reitterer, Nicolas Nemeth

Data Sources: 
- Population by province of birth (2008-present) https://www.data.gv.at/datasets/98b782ca-8e46-43d7-a061-e196d0e0160a?locale=de
- Birth statistics (2002-present) https://www.data.gv.at/datasets/f54e6828-3d75-4a82-89cb-23c58057bad4?locale=de
"""

import pandas as pd
import sys
from pymongo import MongoClient
import os

MONGO_CONFIG = {
    'uri': "mongodb://admin:admin123@localhost:27017/",
    'auth_source': "admin",
    'database': "wien_demografie_db",
    'use_docker': True
}

#Da Probleme beim automatisiertem Download, direkt CSV Files im Projekt 
#URL_BEVOELKERUNG = "https://www.wien.gv.at/gogv/l9ogdviebezpopsexage5stkcobgeoat102008f"
#URL_GEBURTEN = "https://www.wien.gv.at/gogv/l9ogdviebezpopsexbir2002f"

DATA_FILES = {
    'population': 'vie-bez-pop-sex-age5-stk-cob-geoat10-2008f.csv',
    'births': 'vie-bez-pop-sex-bir-2002f.csv'
}

POPULATION_COLUMNS = {
    'REF_YEAR': 'Jahr',
    'DISTRICT_CODE': 'Bezirk_Roh',
    'SUB_DISTRICT_CODE': 'Sub_Bezirk',
    'REF_DATE': 'Datum',
    'SEX': 'Geschlecht',
    'AGE1': 'Alter',
    'UNK': 'Unbekannt',
    'BGD': 'Burgenland',
    'KTN': 'Kaernten',
    'NOE': 'Niederoesterreich',
    'OOE': 'Oberoesterreich',
    'SBG': 'Salzburg',
    'STK': 'Steiermark',
    'TIR': 'Tirol',
    'VBG': 'Vorarlberg',
    'VIE': 'Wien',
    'FOR': 'Ausland'
}

BIRTH_COLUMNS = {
    'REF_YEAR': 'Jahr',
    'DISTRICT_CODE': 'Bezirk_Roh',
    'SUB_DISTRICT_CODE': 'Sub_Bezirk',
    'REF_DATE': 'Datum',
    'SEX': 'Geschlecht',
    'BIR': 'Anzahl_Geburten'
}

BUNDESLAND_COLUMNS = [
    'Unbekannt', 'Burgenland', 'Kaernten', 'Niederoesterreich',
    'Oberoesterreich', 'Salzburg', 'Steiermark', 'Tirol',
    'Vorarlberg', 'Wien', 'Ausland'
]


def setup_mongodb():
    try:
        if MONGO_CONFIG['use_docker']:
            client = MongoClient(
                MONGO_CONFIG['uri'],
                serverSelectionTimeoutMS=5000,
                authSource=MONGO_CONFIG['auth_source']
            )
        else:
            client = MongoClient(
                MONGO_CONFIG['uri'],
                serverSelectionTimeoutMS=5000
            )

        client.server_info()
        db = client[MONGO_CONFIG['database']]
        return client, db

    except Exception as e:
        print(f"ERROR: MongoDB connection failed - {e}")
        sys.exit(1)


def clean_district_code(code):
    try:
        code_str = str(code).strip()

        if code_str.startswith('9') and len(code_str) == 5:
            district_num = int(code_str[1:3])
            return 1000 + district_num * 10

        if code_str.startswith('1') and len(code_str) == 4:
            return int(code_str)

        return int(code_str)

    except (ValueError, TypeError):
        return 0

def extract_data():
    df_pop = pd.read_csv(
        DATA_FILES['population'],
        sep=';',
        encoding='utf-8-sig',
        skiprows=1,
        header=0
    )

    df_birth = pd.read_csv(
        DATA_FILES['births'],
        sep=';',
        encoding='utf-8-sig',
        skiprows=1,
        header=0
    )

    return df_pop, df_birth



def transform_population_data(df):
    rename_map = {k: v for k, v in POPULATION_COLUMNS.items() if k in df.columns}
    df = df.rename(columns=rename_map)

    if 'Sub_Bezirk' in df.columns:
        df['Bezirk'] = df['Sub_Bezirk'].apply(clean_district_code)
    elif 'Bezirk_Roh' in df.columns:
        df['Bezirk'] = df['Bezirk_Roh'].apply(clean_district_code)
    else:
        print("  WARNING: No district code column found")
        df['Bezirk'] = 0

    df = df[df['Bezirk'] > 0]

    for col in BUNDESLAND_COLUMNS:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)

    if 'Jahr' in df.columns:
        df['Jahr'] = pd.to_numeric(df['Jahr'], errors='coerce').fillna(0).astype(int)

    return df


def transform_birth_data(df):
    rename_map = {k: v for k, v in BIRTH_COLUMNS.items() if k in df.columns}
    df = df.rename(columns=rename_map)

    if 'Sub_Bezirk' in df.columns:
        df['Bezirk'] = df['Sub_Bezirk'].apply(clean_district_code)
    elif 'Bezirk_Roh' in df.columns:
        df['Bezirk'] = df['Bezirk_Roh'].apply(clean_district_code)
    else:
        df['Bezirk'] = 0



    df = df[df['Bezirk'] > 0]

    if 'Anzahl_Geburten' in df.columns:
        df['Anzahl_Geburten'] = pd.to_numeric(
            df['Anzahl_Geburten'],
            errors='coerce'
        ).fillna(0).astype(int)

    if 'Jahr' in df.columns:
        df['Jahr'] = pd.to_numeric(df['Jahr'], errors='coerce').fillna(0).astype(int)

    return df


def merge_data_sources(df_pop, df_birth):
    pop_agg = df_pop.groupby(['Jahr', 'Bezirk']).agg({
        'Wien': 'sum',
        'Ausland': 'sum',
        'Geschlecht': 'count'
    }).reset_index()

    pop_agg.rename(columns={'Geschlecht': 'Gesamt_Bevoelkerung'}, inplace=True)

    birth_agg = df_birth.groupby(['Jahr', 'Bezirk']).agg({
        'Anzahl_Geburten': 'sum'
    }).reset_index()

    merged = pd.merge(pop_agg, birth_agg, on=['Jahr', 'Bezirk'], how='outer')
    merged = merged.fillna(0)

    return merged


def transform_data(df_pop, df_birth):
    df_pop_clean = transform_population_data(df_pop)
    df_birth_clean = transform_birth_data(df_birth)
    df_merged = merge_data_sources(df_pop_clean, df_birth_clean)
    return df_pop_clean, df_birth_clean, df_merged


def load_data(db, df_pop, df_birth, df_merged):
    db.population.delete_many({})
    db.births.delete_many({})
    db.merged_analysis.delete_many({})

    if len(df_pop) > 0:
        db.population.insert_many(df_pop.to_dict("records"))

    if len(df_birth) > 0:
        db.births.insert_many(df_birth.to_dict("records"))

    if len(df_merged) > 0:
        db.merged_analysis.insert_many(df_merged.to_dict("records"))


def run_pipeline():
    client, db = setup_mongodb()
    try:
        df_pop, df_birth = extract_data()
        df_pop_clean, df_birth_clean, df_merged = transform_data(df_pop, df_birth)
        load_data(db, df_pop_clean, df_birth_clean, df_merged)
    finally:
        client.close()


run_pipeline()


In [11]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Wien-Geburten-Zeitverlauf")
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.6.0")
    .config(
        "spark.mongodb.read.connection.uri",
        "mongodb://admin:admin123@localhost:27017/wien_demografie_db?authSource=admin"
    )
    .getOrCreate()
)

print(spark.version)  # muss 3.5.x sein


Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: Hadoop home directory C:\hadoop does not exist -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:789)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:298)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:1305)
	at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:1291)
	at org.apache.spark.util.Utils$.fetchFile(Utils.scala:417)
	at org.apache.spark.SparkContext.addFile(SparkContext.scala:1864)
	at org.apache.spark.SparkContext.$anonfun$new$16(SparkContext.scala:545)
	at org.apache.spark.SparkContext.$anonfun$new$16$adapted(SparkContext.scala:545)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:545)
	at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:59)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:499)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:480)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: Hadoop home directory C:\hadoop does not exist -see https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:601)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:622)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:645)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:742)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:80)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1954)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1912)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1885)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$install$1(ShutdownHookManager.scala:194)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
	at scala.Option.fold(Option.scala:263)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:195)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:55)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:53)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:159)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala:63)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:250)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:99)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:379)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:961)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:204)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:227)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:96)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1132)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1141)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: Hadoop home directory C:\hadoop does not exist
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:544)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:492)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:569)
	... 27 more


In [3]:
births_df = (
    spark.read.format("mongodb")
    .option("spark.mongodb.read.collection", "births")
    .load()
)

births_df.show(5)


[Stage 0:>                                                          (0 + 1) / 1]

+---------------+------+----------+-----+----------+----+----+----------+--------------------+
|Anzahl_Geburten|Bezirk|Bezirk_Roh|Datum|Geschlecht|Jahr|NUTS|Sub_Bezirk|                 _id|
+---------------+------+----------+-----+----------+----+----+----------+--------------------+
|             58|  1010|     90100| 2002|         1|2002|AT13|     90100|69601f2050812e9d0...|
|             72|  1010|     90100| 2002|         2|2002|AT13|     90100|69601f2050812e9d0...|
|            490|  1020|     90200| 2002|         1|2002|AT13|     90200|69601f2050812e9d0...|
|            491|  1020|     90200| 2002|         2|2002|AT13|     90200|69601f2050812e9d0...|
|            370|  1030|     90300| 2002|         1|2002|AT13|     90300|69601f2050812e9d0...|
+---------------+------+----------+-----+----------+----+----+----------+--------------------+
only showing top 5 rows



                                                                                

In [6]:
from pyspark.sql import functions as F

population_df = (
    spark.read.format("mongodb")
    .option("spark.mongodb.read.collection", "population")
    .load()
)

population_df.printSchema()
population_df.show(5)


pop_origin_df = (
    population_df
    .groupBy("Jahr", "Bezirk")
    .agg(
        F.sum("Ausland").alias("Bev_Ausland"),
        F.sum("Wien").alias("Bev_Wien")
    )
)

births_agg_df = (
    births_df
    .groupBy("Jahr", "Bezirk")
    .agg(F.sum("Anzahl_Geburten").alias("Geburten"))
)

birth_rate_origin_df = (
    births_agg_df
    .join(pop_origin_df, ["Jahr", "Bezirk"])
    .withColumn(
        "Geburtenrate_Ausland",
        F.col("Geburten") / F.col("Bev_Ausland")
    )
    .withColumn(
        "Geburtenrate_Wien",
        F.col("Geburten") / F.col("Bev_Wien")
    )
)


root
 |-- AGE5: integer (nullable = true)
 |-- Ausland: integer (nullable = true)
 |-- Bezirk: integer (nullable = true)
 |-- Bezirk_Roh: integer (nullable = true)
 |-- Burgenland: integer (nullable = true)
 |-- Datum: integer (nullable = true)
 |-- Geschlecht: integer (nullable = true)
 |-- Jahr: integer (nullable = true)
 |-- Kaernten: integer (nullable = true)
 |-- NUTS: string (nullable = true)
 |-- Niederoesterreich: integer (nullable = true)
 |-- Oberoesterreich: integer (nullable = true)
 |-- Salzburg: integer (nullable = true)
 |-- Steiermark: integer (nullable = true)
 |-- Sub_Bezirk: integer (nullable = true)
 |-- Tirol: integer (nullable = true)
 |-- Unbekannt: integer (nullable = true)
 |-- Vorarlberg: integer (nullable = true)
 |-- Wien: integer (nullable = true)
 |-- _id: string (nullable = true)

+----+-------+------+----------+----------+--------+----------+----+--------+----+-----------------+---------------+--------+----------+----------+-----+---------+----------+---

In [7]:
zuwanderung_df = (
    population_df
    .groupBy("Bezirk")
    .agg(
        F.sum("Ausland").alias("Bev_Ausland"),
        F.sum(
            F.col("Ausland") +
            F.col("Wien") +
            F.col("Burgenland") +
            F.col("Kaernten") +
            F.col("Niederoesterreich") +
            F.col("Oberoesterreich") +
            F.col("Salzburg") +
            F.col("Steiermark") +
            F.col("Tirol") +
            F.col("Vorarlberg")
        ).alias("Bev_Gesamt")
    )
    .withColumn(
        "Auslaenderanteil",
        F.col("Bev_Ausland") / F.col("Bev_Gesamt")
    )
    .orderBy(F.desc("Auslaenderanteil"))
)

zuwanderung_df.show(10)


[Stage 3:>                                                          (0 + 1) / 1]

+------+-----------+----------+-------------------+
|Bezirk|Bev_Ausland|Bev_Gesamt|   Auslaenderanteil|
+------+-----------+----------+-------------------+
|  1150|     625236|   1337225|0.46756230252949205|
|  1200|     657751|   1511808|0.43507575035983403|
|  1050|     413544|    964825|0.42862073433005987|
|  1100|    1423243|   3488017| 0.4080378622007863|
|  1160|     727318|   1785032|  0.407453759932595|
|  1020|     732516|   1819078|0.40268531640754274|
|  1120|     654201|   1676218| 0.3902839606781457|
|  1040|     217183|    573607|0.37862682986783636|
|  1170|     370226|    984228|0.37615877621851845|
|  1030|     594349|   1597165|0.37212748839349724|
+------+-----------+----------+-------------------+
only showing top 10 rows



                                                                                

In [8]:
births_agg_df = (
    births_df
    .groupBy("Jahr", "Bezirk")
    .agg(F.sum("Anzahl_Geburten").alias("Geburten"))
)

migration_birth_df = (
    births_agg_df
    .join(
        population_df.groupBy("Jahr", "Bezirk")
        .agg(F.sum("Ausland").alias("Bev_Ausland")),
        ["Jahr", "Bezirk"]
    )
    .withColumn(
        "Geburten_pro_1000_Ausland",
        (F.col("Geburten") / F.col("Bev_Ausland")) * 1000
    )
    .orderBy(F.desc("Geburten_pro_1000_Ausland"))
)

migration_birth_df.show(10)


+----+------+--------+-----------+-------------------------+
|Jahr|Bezirk|Geburten|Bev_Ausland|Geburten_pro_1000_Ausland|
+----+------+--------+-----------+-------------------------+
|2008|  1230|     836|      16493|        50.68817073910144|
|2008|  1220|    1485|      30923|        48.02250751867542|
|2010|  1230|     833|      17476|        47.66536964980544|
|2011|  1230|     843|      17994|       46.848949649883295|
|2012|  1230|     869|      18663|       46.562717676686496|
|2014|  1230|     927|      20036|        46.26671990417249|
|2011|  1220|    1545|      34303|        45.03979243797918|
|2013|  1130|     473|      10511|        45.00047569213205|
|2012|  1220|    1608|      35928|         44.7561790247161|
|2015|  1220|    1875|      42016|        44.62585681645088|
+----+------+--------+-----------+-------------------------+
only showing top 10 rows

