# TMDB 10000 Movies (2022) — EDA + wizualizacje (pomocnicze)

**Uwaga dot. wymagań projektu:**
- Ten notebook służy do **EDA, kontroli jakości danych i wizualizacji**.
- **Główny trening modeli** oraz tuning hiperparametrów realizowany jest w **Apache Spark / Spark MLlib** (`tmdb_spark_ml.py`).
- **Streaming + Kafka + Spark Structured Streaming** realizuje `front_app.py` + `stream_predict.py`.

## Cel notebooka
- sprawdzić strukturę CSV,
- ocenić braki, typy danych, statystyki,
- wykonać podstawowe wykresy,
- zbudować *lekki* model porównawczy (scikit-learn) jako sanity-check.

## Definicja sukcesu (spójna z pipeline Spark)
- **SUKCES / hit**: `vote_average >= 7.0`
- **NIE-SUKCES**: `vote_average < 7.0`


In [None]:
# Jeśli uruchamiasz lokalnie: upewnij się, że masz pakiety.
# Jeśli na Colab: odkomentuj instalację.

!pip -q install pandas numpy matplotlib seaborn scikit-learn

import os
import re
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay,
    roc_auc_score,
)
from sklearn.linear_model import LogisticRegression

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 140)
sns.set_theme(style="whitegrid")

RANDOM_STATE = 42
LABEL_THRESHOLD = 7.0  # spójnie z Spark MLlib


## 1) Wczytanie danych

Dataset: CSV (snapshot). W projekcie Big Data:
- **Batch**: Spark wczytuje CSV i buduje modele (MLlib)
- **Streaming**: Producer symuluje zdarzenia w JSON (Kafka)


In [None]:
DATA_PATH = "./data/tmdb_10000_movies.csv"  # <- ustaw prawidłową ścieżkę
assert os.path.exists(DATA_PATH), f"Nie znaleziono pliku: {DATA_PATH}"

df = pd.read_csv(DATA_PATH)
df.head()

In [None]:
print("Shape:", df.shape)
df.info()

## 2) Szybka kontrola jakości
- duplikaty
- braki danych
- wstępny przegląd rozkładów


In [None]:
n_duplicates = df.duplicated().sum()
print("Liczba duplikatów wierszy:", n_duplicates)

missing = df.isna().mean().sort_values(ascending=False)
display(missing.head(20).to_frame("missing_rate"))

## 3) Automatyczna analiza kolumn (dtype, braki, unikalne, statystyki)

Dla każdej kolumny raportujemy:
- dtype
- liczba braków i % braków
- liczba unikalnych wartości
- jeśli numeryczna: min/max/std/median/mean


In [None]:
def try_parse_numeric(series: pd.Series) -> pd.Series:
    """Próba konwersji string->numeric (usuwa $, przecinki, spacje)."""
    if series.dtype != "object":
        return series

    s = series.astype(str)
    s = s.replace({"None": np.nan, "nan": np.nan, "NaN": np.nan, "NULL": np.nan, "null": np.nan})
    s = s.str.replace(r"[\$,]", "", regex=True).str.strip()
    numeric = pd.to_numeric(s, errors="coerce")

    non_na_original = series.notna().sum()
    non_na_numeric = numeric.notna().sum()
    if non_na_original == 0:
        return series
    if non_na_numeric / non_na_original >= 0.85:
        return numeric
    return series


df2 = df.copy()
for c in df2.columns:
    df2[c] = try_parse_numeric(df2[c])


def column_report(frame: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for col in frame.columns:
        s = frame[col]
        n = len(s)
        n_missing = int(s.isna().sum())
        missing_rate = n_missing / n if n else np.nan
        nunique = int(s.nunique(dropna=True))
        dtype = str(s.dtype)

        stats = {"min": np.nan, "max": np.nan, "std": np.nan, "median": np.nan, "mean": np.nan}
        if pd.api.types.is_numeric_dtype(s):
            stats["min"] = float(np.nanmin(s.values)) if s.notna().any() else np.nan
            stats["max"] = float(np.nanmax(s.values)) if s.notna().any() else np.nan
            stats["std"] = float(np.nanstd(s.values, ddof=1)) if s.notna().sum() > 1 else np.nan
            stats["median"] = float(np.nanmedian(s.values)) if s.notna().any() else np.nan
            stats["mean"] = float(np.nanmean(s.values)) if s.notna().any() else np.nan

        rows.append({
            "column": col,
            "dtype": dtype,
            "missing_count": n_missing,
            "missing_rate": missing_rate,
            "nunique": nunique,
            **stats,
        })
    return (
        pd.DataFrame(rows)
        .sort_values(["missing_rate", "nunique"], ascending=[False, False])
        .reset_index(drop=True)
    )


report = column_report(df2)
display(report)

out_report_path = os.path.join(os.path.dirname(DATA_PATH), "column_report.csv")
report.to_csv(out_report_path, index=False)
print("Zapisano raport:", out_report_path)

## 4) Wizualizacje: rozkłady i korelacje (numeryczne)

Wybieramy kilka numerycznych kolumn o największej wariancji i pokazujemy histogramy.


In [None]:
num_cols = [c for c in df2.columns if pd.api.types.is_numeric_dtype(df2[c])]
print("Liczba kolumn numerycznych:", len(num_cols))

if len(num_cols) > 0:
    variances = df2[num_cols].var(numeric_only=True).sort_values(ascending=False)
    top = variances.head(8).index.tolist()
    display(variances.head(15).to_frame("variance"))

    fig, axes = plt.subplots(nrows=2, ncols=4, figsize=(18, 8))
    axes = axes.flatten()
    for ax, col in zip(axes, top):
        sns.histplot(df2[col], kde=False, ax=ax, bins=40)
        ax.set_title(col)
    for ax in axes[len(top):]:
        ax.axis("off")
    plt.tight_layout()
    plt.show()
else:
    print("Brak kolumn numerycznych do pokazania.")

In [None]:
if len(num_cols) >= 2:
    corr = df2[num_cols].corr(numeric_only=True)
    plt.figure(figsize=(12, 10))
    sns.heatmap(corr, cmap="coolwarm", center=0)
    plt.title("Korelacje (kolumny numeryczne)")
    plt.tight_layout()
    plt.show()
else:
    print("Za mało kolumn numerycznych do korelacji.")

## 5) Target (etykieta) — SPÓJNY z projektem Spark

Definiujemy `target_hit`:
- `target_hit = 1` jeśli `vote_average >= 7.0`
- `target_hit = 0` w przeciwnym wypadku

To jest zgodne z `tmdb_spark_ml.py`.


In [None]:
assert "vote_average" in df2.columns, "Brak kolumny vote_average w CSV"

# upewnij się, że vote_average jest numeryczne
df2["vote_average"] = pd.to_numeric(df2["vote_average"], errors="coerce")

df2 = df2[df2["vote_average"].notna()].copy()
df2["target_hit"] = (df2["vote_average"] >= LABEL_THRESHOLD).astype(int)

print("Próg vote_average:", LABEL_THRESHOLD)
df2["target_hit"].value_counts(normalize=True).rename("share")

## 6) Feature engineering (baseline)

Zasady:
- usuwamy `vote_average` (bo to źródło label → leakage),
- usuwamy identyfikatory i bardzo długie teksty w baseline,
- model porównawczy ma charakter pomocniczy.

W produkcyjnym pipeline (Spark) cechy są budowane w `tmdb_spark_ml.py`.


In [None]:
target_col = "target_hit"

drop_cols = {target_col, "vote_average"}  # leakage

for c in ["id", "imdb_id", "tmdb_id", "title", "original_title", "poster_path", "backdrop_path", "overview"]:
    if c in df2.columns:
        drop_cols.add(c)

# Heurystyka: bardzo długi tekst -> drop w baseline
for c in df2.columns:
    if df2[c].dtype == "object":
        avg_len = df2[c].dropna().astype(str).str.len().mean() if df2[c].notna().any() else 0
        if avg_len and avg_len > 80:
            drop_cols.add(c)

print("Kolumny odrzucone (baseline):")
print(sorted(drop_cols))

X = df2.drop(columns=[c for c in drop_cols if c in df2.columns])
y = df2[target_col].astype(int)

print("X shape:", X.shape)
display(y.value_counts().to_frame("count"))

num_features = [c for c in X.columns if pd.api.types.is_numeric_dtype(X[c])]
cat_features = [c for c in X.columns if X[c].dtype == "object"]

print("Num features:", len(num_features))
print("Cat features:", len(cat_features))
display(pd.DataFrame({"num": num_features}).head(20))
display(pd.DataFrame({"cat": cat_features}).head(20))

## 7) Model porównawczy (scikit-learn) + macierz pomyłek

**To jest tylko sanity-check.**
Ocena i tuning docelowych modeli odbywa się w Spark MLlib.


In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=RANDOM_STATE,
    stratify=y,
)

numeric_transformer = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="median")),
    ]
)

categorical_transformer = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", min_frequency=10)),
    ]
)

preprocess = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, num_features),
        ("cat", categorical_transformer, cat_features),
    ]
)

clf = LogisticRegression(max_iter=1000)
model = Pipeline(steps=[("preprocess", preprocess), ("clf", clf)])

model.fit(X_train, y_train)

y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred, digits=4))

cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(values_format="d", cmap="Blues", colorbar=False)
for t in disp.text_.ravel():
    t.set_color("black")
    t.set_fontsize(12)

plt.title("Macierz pomyłek (sanity-check)", color="black")
plt.xticks(color="black")
plt.yticks(color="black")
plt.tight_layout()
plt.show()

# ROC-AUC (jeśli możliwe)
if hasattr(model.named_steps["clf"], "predict_proba"):
    y_proba = model.predict_proba(X_test)[:, 1]
    auc = roc_auc_score(y_test, y_proba)
    print("ROC-AUC:", round(auc, 4))