# Notebook: Spark + Kafka + Wykresy
Ten notebook pokazuje, jak wczytac dane z Kafki przez Spark, zapisac je do JSON, a potem zrobic agregacje i zestaw wykresow biznesowych.

Wymagania:
- pyspark
- matplotlib
- numpy
- seaborn

Jesli nie masz pakietow, zainstaluj je w tym samym srodowisku co notebook.

## 1. Skonfiguruj sesje Spark w notebooku
Utworz SparkSession, ustaw konfiguracje i sprawdz, czy dziala.

In [None]:
import pyspark
from pyspark.sql import SparkSession

spark_version = pyspark.__version__
if spark_version.startswith("4."):
    scala_suffix = "2.13"
elif spark_version.startswith("3."):
    scala_suffix = "2.12"
else:
    scala_suffix = "2.12"

kafka_package = f"org.apache.spark:spark-sql-kafka-0-10_{scala_suffix}:{spark_version}"

spark = (
    SparkSession.builder
    .appName("KafkaCrashCourse")
    .master("local[*]")
    .config("spark.jars.packages", kafka_package)
    .getOrCreate()
)

spark, kafka_package

## 2. Zaladuj dane i wykonaj podstawowe czyszczenie
Ponizej masz dwa warianty: streaming z Kafki do JSON oraz batch z plikow JSON.

In [None]:
from pyspark.sql.functions import col, from_json, lit, when
from pyspark.sql.types import BooleanType, DoubleType, IntegerType, LongType, StringType, StructField, StructType

schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("user", StringType(), True),
    StructField("item", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", DoubleType(), True),
    StructField("discount_pct", IntegerType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("payment_method", StringType(), True),
    StructField("sales_channel", StringType(), True),
    StructField("store_city", StringType(), True),
    StructField("purchase_datetime", StringType(), True),
    StructField("purchase_date", StringType(), True),
    StructField("purchase_time", StringType(), True),
    StructField("weekday_name", StringType(), True),
    StructField("weekday_num", IntegerType(), True),
    StructField("hour_of_day", IntegerType(), True),
    StructField("is_weekend", BooleanType(), True),
    StructField("event_time_ms", LongType(), True)
])

# Streaming: Kafka -> JSON (append)
kafka_df = (
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092,localhost:9094")
    .option("subscribe", "orders")
    .option("startingOffsets", "latest")
    .load()
)

parsed_df = (
    kafka_df.selectExpr("CAST(value AS STRING) AS json_str")
    .select(from_json(col("json_str"), schema).alias("data"), col("json_str"))
    .select("data.*", "json_str")
)

validated_df = parsed_df.withColumn(
    "invalid_reason",
    when(col("quantity").isNull(), lit("missing_quantity"))
    .when(col("unit_price").isNull(), lit("missing_unit_price"))
    .when(col("quantity") <= 0, lit("non_positive_quantity"))
    .when(col("unit_price") <= 0, lit("non_positive_unit_price"))
    .otherwise(lit(None).cast(StringType()))
)

valid_df = validated_df.where(col("invalid_reason").isNull()).drop("invalid_reason", "json_str")
invalid_df = validated_df.where(col("invalid_reason").isNotNull())

output_path = "data/orders_json"
checkpoint_path = "data/_checkpoints/orders_json"
invalid_output_path = "data/invalid_events"
invalid_checkpoint_path = "data/_checkpoints/invalid_events"

valid_stream_query = (
    valid_df.writeStream
    .format("json")
    .option("path", output_path)
    .option("checkpointLocation", checkpoint_path)
    .outputMode("append")
    .trigger(processingTime="5 seconds")
    .start()
)

invalid_stream_query = (
    invalid_df.writeStream
    .format("json")
    .option("path", invalid_output_path)
    .option("checkpointLocation", invalid_checkpoint_path)
    .outputMode("append")
    .trigger(processingTime="5 seconds")
    .start()
)

# valid_stream_query.awaitTermination()
(valid_stream_query, invalid_stream_query)

## 3. Batch: odczyt JSON i czyszczenie
Teraz wykonujemy odczyt zapisanych plikow JSON i czyszczenie danych.

In [None]:
# Batch: odczyt JSON i czyszczenie
# Uzywamy schematu, zeby kolumny byly znane nawet przy pustym katalogu.
batch_df = spark.read.schema(schema).json(output_path)

clean_df = (
    batch_df
    .select(
        "order_id", "user", "item", "category", "quantity", "unit_price", "discount_pct", "total_amount",
        "payment_method", "sales_channel", "store_city", "purchase_datetime", "purchase_date", "purchase_time",
        "weekday_name", "weekday_num", "hour_of_day", "is_weekend", "event_time_ms"
    )
    .where(
        col("item").isNotNull()
        & col("quantity").isNotNull()
        & (col("quantity") > 0)
        & col("unit_price").isNotNull()
        & (col("unit_price") > 0)
        & col("weekday_name").isNotNull()
        & col("weekday_num").isNotNull()
        & col("total_amount").isNotNull()
    )
)

clean_df.printSchema()
print(f"Rows: {clean_df.count()}")
clean_df.show(5, truncate=False)

## 4. Nieprawidlowe eventy (odrzucone)
Odczytujemy eventy odrzucone z dalszej analizy i zapisane do `data/invalid_events`.

In [None]:
import os

invalid_schema = StructType(schema.fields + [
    StructField("json_str", StringType(), True),
    StructField("invalid_reason", StringType(), True),
])

if os.path.exists(invalid_output_path):
    invalid_batch_df = spark.read.schema(invalid_schema).json(invalid_output_path)
else:
    invalid_batch_df = spark.createDataFrame([], invalid_schema)

invalid_events_df = (
    invalid_batch_df
    .select("invalid_reason", "order_id", "item", "quantity", "unit_price", "json_str", "event_time_ms")
    .where(col("invalid_reason").isNotNull())
)

print(f"Invalid rows: {invalid_events_df.count()}")
invalid_events_df.show(20, truncate=False)

## 5. Agreguj dane w Spark i przygotuj ramke do wykresu
Agregujemy dane po produkcie i dniu tygodnia. Wykres 3D pokaze osie: produkt, dzien tygodnia i laczna ilosc.

In [None]:
from pyspark.sql.functions import sum as sum_agg

agg_df = (
    clean_df
    .groupBy("weekday_num", "weekday_name", "item")
    .agg(
        sum_agg("quantity").alias("total_quantity"),
        sum_agg("total_amount").alias("total_revenue")
    )
    .orderBy(col("weekday_num"), col("total_quantity").desc())
)

plot_df = agg_df.toPandas()
plot_df

In [None]:
# Szybkie podsumowanie lacznej ilosci i przychodu
total_quantity_all = float(plot_df["total_quantity"].sum())
total_revenue_all = float(plot_df["total_revenue"].sum())
total_quantity_all, total_revenue_all

## 6. Wykres 3D: produkt x dzien tygodnia x ilosc
Kolor slupka reprezentuje laczny przychod.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import cm, colors
from mpl_toolkits.mplot3d import proj3d

if plot_df.empty:
    print("Brak danych do wykresu. Najpierw uruchom streaming i wyslij eventy.")
else:
    weekday_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
    item_order = sorted(plot_df["item"].unique())
    x_spacing = 1.55
    item_to_x = {item: idx * x_spacing for idx, item in enumerate(item_order)}

    x = plot_df["item"].map(item_to_x).to_numpy(dtype=float)
    y = plot_df["weekday_num"].to_numpy(dtype=float)
    z = np.zeros(len(plot_df), dtype=float)
    dx = np.full(len(plot_df), 0.55)
    dy = np.full(len(plot_df), 0.6)
    dz = plot_df["total_quantity"].to_numpy(dtype=float)

    revenue = plot_df["total_revenue"].to_numpy(dtype=float)
    vmin = float(revenue.min()) if len(revenue) else 0.0
    vmax = float(revenue.max()) if len(revenue) else 1.0
    if vmax <= vmin:
        vmax = vmin + 1.0
    norm = colors.Normalize(vmin=vmin, vmax=vmax)
    bar_colors = cm.YlOrRd(norm(revenue))

    fig_3d = plt.figure(figsize=(16, 9))
    ax = fig_3d.add_subplot(111, projection="3d")
    ax.bar3d(x, y, z, dx, dy, dz, color=bar_colors, shade=True, alpha=0.95)

    ax.set_title("3D Orders: item x weekday x quantity")
    ax.set_xlabel("item (ref)", labelpad=24)
    ax.set_ylabel("weekday", labelpad=20)
    ax.set_zlabel("total_quantity")
    x_tick_positions = np.arange(len(item_order)) * x_spacing + 0.275
    ax.set_xticks(x_tick_positions)
    x_refs = [str(i + 1) for i in range(len(item_order))]
    ax.set_xticklabels(x_refs, fontsize=9)
    ax.tick_params(axis="x", pad=4)
    ax.set_xlim(-0.4, (len(item_order) - 1) * x_spacing + 1.0)
    ax.set_yticks(np.arange(len(weekday_order)) + 0.3)
    ax.set_yticklabels(weekday_order)
    ax.set_box_aspect((1.8, 1.0, 0.8))
    ax.view_init(elev=24, azim=228)

    # Obracamy etykiety dni tygodnia zgodnie z kierunkiem osi Y na siatce.
    x_min, x_max = ax.get_xlim3d()
    z_min, _ = ax.get_zlim3d()
    y0x, y0y, _ = proj3d.proj_transform(x_min, 0, z_min, ax.get_proj())
    y1x, y1y, _ = proj3d.proj_transform(x_min, 1, z_min, ax.get_proj())
    y_axis_angle = np.degrees(np.arctan2(y1y - y0y, y1x - y0x))
    if y_axis_angle > 90:
        y_axis_angle -= 180
    if y_axis_angle < -90:
        y_axis_angle += 180
    fig_3d.canvas.draw()
    for day_label in ax.get_yticklabels():
        day_label.set_rotation(y_axis_angle + 50)
        day_label.set_ha("right")
        day_label.set_va("center")
        day_label.set_rotation_mode("anchor")

    mapping_lines = [f"{i + 1:>2} -> {item}" for i, item in enumerate(item_order)]
    mapping_text = "Item reference:\n" + "\n".join(mapping_lines)
    fig_3d.text(
        0.02,
        0.5,
        mapping_text,
        ha="left",
        va="center",
        fontsize=9,
        family="monospace",
        bbox=dict(boxstyle="round,pad=0.4", facecolor="white", alpha=0.85),
    )

    mappable = cm.ScalarMappable(norm=norm, cmap=cm.YlOrRd)
    mappable.set_array([])
    colorbar = fig_3d.colorbar(mappable, ax=ax, shrink=0.65, pad=0.08)
    colorbar.set_label("total_revenue")
    fig_3d.subplots_adjust(left=0.20, right=0.92, bottom=0.08, top=0.92)
    fig_3d

## 7. Wykres: Heatmapa przychodu (dzien tygodnia x godzina)
Pokazuje, kiedy w tygodniu i o jakiej godzinie wartosc sprzedazy jest najwyzsza.

In [None]:
import seaborn as sns

weekday_order = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]

heatmap_spark_df = (
    clean_df
    .groupBy("weekday_num", "weekday_name", "hour_of_day")
    .agg(sum_agg("total_amount").alias("total_revenue"))
    .orderBy("weekday_num", "hour_of_day")
)

heatmap_df = heatmap_spark_df.toPandas()

if heatmap_df.empty:
    print("Brak danych do wykresu. Najpierw uruchom streaming i wyslij eventy.")
else:
    heatmap_pivot = (
        heatmap_df
        .pivot_table(index="weekday_name", columns="hour_of_day", values="total_revenue", aggfunc="sum", fill_value=0.0)
        .reindex(weekday_order)
    )
    heatmap_pivot = heatmap_pivot.reindex(columns=list(range(24)), fill_value=0.0)

    fig_heatmap, ax = plt.subplots(figsize=(14, 5))
    sns.heatmap(heatmap_pivot, cmap="YlOrRd", ax=ax)
    ax.set_title("Revenue heatmap: weekday x hour")
    ax.set_xlabel("hour_of_day")
    ax.set_ylabel("weekday")
    plt.tight_layout()
    fig_heatmap

## 8. Wykres Pareto: produkty wg przychodu
Slupki pokazuja przychod produktu, linia pokazuje procent skumulowany.

In [None]:
pareto_spark_df = (
    clean_df
    .groupBy("item")
    .agg(sum_agg("total_amount").alias("total_revenue"))
    .orderBy(col("total_revenue").desc())
)

pareto_df = pareto_spark_df.toPandas()

if pareto_df.empty:
    print("Brak danych do wykresu. Najpierw uruchom streaming i wyslij eventy.")
else:
    pareto_df["cum_pct"] = pareto_df["total_revenue"].cumsum() / pareto_df["total_revenue"].sum() * 100

    fig_pareto, ax1 = plt.subplots(figsize=(12, 6))
    ax1.bar(pareto_df["item"], pareto_df["total_revenue"], color="#4c72b0", alpha=0.9)
    ax1.set_xlabel("item")
    ax1.set_ylabel("total_revenue", color="#4c72b0")
    ax1.tick_params(axis="y", labelcolor="#4c72b0")
    ax1.tick_params(axis="x", rotation=45)

    ax2 = ax1.twinx()
    ax2.plot(pareto_df["item"], pareto_df["cum_pct"], color="#dd8452", marker="o", linewidth=2)
    ax2.axhline(80, color="#55a868", linestyle="--", linewidth=1.5)
    ax2.set_ylabel("cumulative %", color="#dd8452")
    ax2.tick_params(axis="y", labelcolor="#dd8452")

    ax1.set_title("Pareto chart: revenue by item")
    plt.tight_layout()
    fig_pareto

## 9. Boxplot: wartosc zamowienia wg kanalu i platnosci
Porownuje rozklad `total_amount` pomiedzy kanalami sprzedazy i metodami platnosci.

In [None]:
boxplot_df = clean_df.select("sales_channel", "payment_method", "total_amount").toPandas()

if boxplot_df.empty:
    print("Brak danych do wykresu. Najpierw uruchom streaming i wyslij eventy.")
else:
    fig_boxplot, ax = plt.subplots(figsize=(12, 6))
    sns.boxplot(
        data=boxplot_df,
        x="sales_channel",
        y="total_amount",
        hue="payment_method",
        showfliers=False,
        ax=ax,
    )
    ax.set_title("Order value by sales channel and payment method")
    ax.set_xlabel("sales_channel")
    ax.set_ylabel("total_amount")
    plt.tight_layout()
    fig_boxplot

## 10. Zapisz wykresy do plikow
Zapisujemy wszystkie wygenerowane wykresy PNG w katalogu output.

In [None]:
import os

output_dir = "output"
os.makedirs(output_dir, exist_ok=True)

charts_to_save = [
    ("fig_3d", "orders_3d_item_weekday.png"),
    ("fig_heatmap", "orders_revenue_heatmap_weekday_hour.png"),
    ("fig_pareto", "orders_pareto_revenue_item.png"),
    ("fig_boxplot", "orders_boxplot_channel_payment.png"),
]

saved_paths = []
for fig_name, filename in charts_to_save:
    figure_obj = globals().get(fig_name)
    if figure_obj is None:
        continue
    chart_path = os.path.join(output_dir, filename)
    figure_obj.savefig(chart_path, dpi=150, bbox_inches="tight")
    saved_paths.append(chart_path)

if saved_paths:
    saved_paths
else:
    print("Brak wykresow do zapisu. Uruchom komorki z wykresami.")