In [None]:
#!/usr/bin/env python3
"""
Data‑Jobs Salary & Cost‑of‑Living Analysis with PySpark + Kafka
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import Dict

import matplotlib.pyplot as plt
import pandas as pd
from confluent_kafka import Producer
from pyspark.sql import SparkSession, functions as F, types as T

In [None]:
# ---------------------------------------------------------------------------
# 1. Spark & Kafka setup
# ---------------------------------------------------------------------------
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
TOPIC_MEDIAN_SALARY = "median_salary"
TOPIC_JOBS_PER_CAPITA = "jobs_per_capita"
TOPIC_SCORE_RANKING = "score_ranking"

spark = (
    SparkSession.builder.appName("Data‑Jobs‑PySpark‑App")
    .master("spark://172.29.16.102:7077")  # run on cluster
    .config("spark.sql.execution.arrow.enabled", "true")
    .getOrCreate()
)

producer = Producer({"bootstrap.servers": KAFKA_BOOTSTRAP_SERVERS})

def _delivery_report(err, msg):
    if err:
        print(f"Kafka delivery failed: {err}")
    else:
        print(f"Message delivered to {msg.topic()} partition {msg.partition()} offset {msg.offset()}")

In [None]:
# ---------------------------------------------------------------------------
# 2. Read input data (CSV → Spark DataFrame)
# ---------------------------------------------------------------------------
root = Path("../output")
jobs_path = (root / "ScrapedJobOffersAllCleaned.csv").as_posix()
cost_path = (root / "CostOfLivingAPI.csv").as_posix()

jobs = (
    spark.read.option("header", True)
    .option("encoding", "utf-8")
    .option("inferSchema", True)
    .csv(jobs_path)
)

cost_of_living = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv(cost_path)
)

In [None]:
# ---------------------------------------------------------------------------
# 3. Helper columns & UDFs
# ---------------------------------------------------------------------------
@F.udf(returnType=T.StringType())
def classify_role(title: str | None) -> str | None:
    if not title:
        return None
    t = title.lower()
    if "scientist" in t:
        return "Scientist"
    if "engineer" in t:
        return "Engineer"
    if "analyst" in t:
        return "Analyst"
    return None

@F.udf(returnType=T.StringType())
def extract_data_role(title: str | None) -> str | None:
    if not title:
        return None
    t = title.lower()
    if "analyst" in t:
        return "Data Analyst"
    if "engineer" in t:
        return "Data Engineer"
    if "scientist" in t:
        return "Data Scientist"
    return None

jobs = jobs.withColumn("role", classify_role("job_title")).withColumn(
    "data_role", extract_data_role("job_title")
)

# Netto‑Umrechnung (simple 62% for AT & DE)
jobs = jobs.withColumn(
    "annual_salary_netto",
    F.when(F.lower("country").isin("austria", "germany"), F.col("annual_salary") * 0.62),
)

# Harmonise names
for col_name in ("city", "country"):
    jobs = jobs.withColumn(col_name, F.initcap(F.trim(col_name)))
    cost_of_living = cost_of_living.withColumn(col_name, F.initcap(F.trim(col_name)))

In [None]:
# ---------------------------------------------------------------------------
# 4. Median salary per city/role (Spark) → Kafka + Plot
# ---------------------------------------------------------------------------
median_salary_city = (
    jobs.filter("role IS NOT NULL")
    .groupBy("city")
    .agg(F.expr("percentile_approx(annual_salary, 0.5)").alias("median_salary"))
    .orderBy(F.desc("median_salary"))
)

median_salary_role = (
    jobs.filter("role IS NOT NULL")
    .groupBy("role")
    .agg(F.expr("percentile_approx(annual_salary, 0.5)").alias("median_salary"))
    .orderBy(F.desc("median_salary"))
)

# ---------- publish to Kafka
for row in median_salary_city.toJSON().collect():
    producer.produce(TOPIC_MEDIAN_SALARY, value=row, callback=_delivery_report)
producer.flush()

# ---------- plot on driver
median_city_pd = median_salary_city.toPandas()
plt.figure(figsize=(10, 6))
plt.boxplot([
    jobs.filter(F.col("city") == city).select("annual_salary").toPandas()["annual_salary"]
    for city in median_city_pd["city"]
], labels=median_city_pd["city"], showfliers=True)
plt.xticks(rotation=45)
plt.title("Gehaltsverteilung nach Stadt")
plt.ylabel("Jahresgehalt (€)")
plt.tight_layout()
plt.show()

In [None]:
# ---------------------------------------------------------------------------
# 5. Jobs per 100k inhabitants
# ---------------------------------------------------------------------------
EINWOHNER: Dict[str, int] = {
    "Berlin": 3_782_202,
    "Hamburg": 1_819_160,
    "München": 1_510_378,
    "Frankfurt": 775_790,
    "Köln": 1_087_353,
    "Wien": 2_028_399,
    "Linz": 213_574,
    "Salzburg": 157_652,
    "Graz": 305_232,
}

pop_df = spark.createDataFrame([(k, v) for k, v in EINWOHNER.items()], ["city", "einwohner"])

job_counts = (
    jobs.filter("data_role IS NOT NULL")
    .groupBy("data_role", "city")
    .count()
    .join(pop_df, "city", "inner")
    .withColumn("jobs_pro_100k", F.col("count") / F.col("einwohner") * 100_000)
)

for row in job_counts.toJSON().collect():
    producer.produce(TOPIC_JOBS_PER_CAPITA, value=row, callback=_delivery_report)
producer.flush()

# Plot (driver)
job_counts_pd = job_counts.orderBy(F.desc("jobs_pro_100k")).toPandas()

plt.figure(figsize=(14, max(6, len(job_counts_pd) * 0.3)))
plt.barh(
    job_counts_pd["data_role"] + " – " + job_counts_pd["city"],
    job_counts_pd["jobs_pro_100k"],
)
plt.xlabel("Jobangebote pro 100.000 Einwohner")
plt.title("Relative Anzahl der Data‑Jobangebote pro Stadt")
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

In [None]:
# ---------------------------------------------------------------------------
# 6. Cost‑of‑living vs Data‑job salary score
# ---------------------------------------------------------------------------
# 6.1 Prepare cost‑of‑living totals
cost_of_living = cost_of_living.withColumn(
    "total_costs",
    F.col("apt1_city_centre") + F.col("ticket_monthly") + F.col("utilities") + F.col("internet"),
)

cost_of_living = cost_of_living.withColumn(
    "score_allgemein", F.col("net_salary") / F.col("total_costs")
)

# 6.2 Average netto data‑job salary per city (Spark)
avg_netto = (
    jobs.groupBy("city", "country")
    .agg(F.avg("annual_salary_netto").alias("avg_datajob_netto"))
)

# 6.3 Merge + derive scores
merged = (
    cost_of_living.join(avg_netto, ["city", "country"], "inner")
    .withColumn("monthly_datajob_netto", F.col("avg_datajob_netto") / 12)
    .withColumn("score_datajob", F.col("monthly_datajob_netto") / F.col("total_costs"))
    .withColumn("score_diff", F.col("score_datajob") - F.col("score_allgemein"))
)

# 6.4 Ranking & Kafka
ranking_cols = [
    "city",
    "country",
    "net_salary",
    "monthly_datajob_netto",
    "total_costs",
    "score_allgemein",
    "score_datajob",
    "score_diff",
]
ranking = merged.select(ranking_cols).orderBy(F.desc("score_datajob"))

for row in ranking.limit(50).toJSON().collect():  # top 50 entries
    producer.produce(TOPIC_SCORE_RANKING, value=row, callback=_delivery_report)
producer.flush()

ranking_pd = ranking.orderBy(F.desc("score_diff")).limit(8).toPandas()

# 6.5 Plot score difference
colors = ["#2ECC71" if val >= 0 else "#E74C3C" for val in ranking_pd["score_diff"]]
plt.figure(figsize=(20, 12))
plt.barh(ranking_pd["city"], ranking_pd["score_diff"], color=colors)
plt.xlabel("Score‑Differenz (Data‑Job – Allgemein)")
plt.title("Vorteil durch Data‑Job‑Einkommen im Vergleich zum Durchschnitt")
plt.axvline(0, color="gray", linewidth=1)
plt.gca().invert_yaxis()
plt.tight_layout()
plt.show()

In [None]:
# ---------------------------------------------------------------------------
# 7. Graceful shutdown
# ---------------------------------------------------------------------------
producer.flush()
spark.stop()

print("Analysis finished. Aggregates published to Kafka.")