# 03_spark_analysis.ipynb

Аналитика и обработка данных в Apache Spark для проекта по COVID-19 Chest X-Ray Dataset.

В этом ноутбуке мы:

- создаём локальную Spark-сессию (`master = local[*]`, без кластера);
- загружаем очищенные метаданные из файла `covid_dataset/metadata/metadata.parquet`;
- регистрируем их как временную Spark SQL-вью `covid_metadata`;
- выполняем несколько аналитических SQL-запросов:
  - распределение диагнозов и их долей;
  - распределение пациентов по возрастным группам и диагнозам;
  - пример оконной функции: ранжирование пациентов по возрасту внутри диагноза;
- показываем пример оптимизации хранения:
  - запись данных в формате Parquet с партиционированием по `finding_unified` и `age_group`;
- демонстрируем пример обработки в PySpark:
  - использование UDF для повторной категоризации возраста;
- показываем пример ML-задачи в Spark ML:
  - кластеризация пациентов по возрасту и полу с помощью алгоритма K-Means;
- сохраняем результаты аналитики в каталог `covid_dataset/processed/` для дальнейшей визуализации.

> Для удобства локального тестирования при отсутствии рабочего Spark-окружения предусмотрен запасной режим анализа на Pandas, но основной код ноутбука использует API Apache Spark и готов к запуску в кластере / Docker-окружении курса.

---

**Предпосылки**

Ноутбук предполагает, что:

- он находится в той же директории, что и папка `covid_dataset/`;
- внутри папки `covid_dataset/metadata/` лежит файл `metadata.parquet`, созданный в ноутбуке `01_preprocess_metadata.ipynb`;
- структура каталогов `covid_dataset/images/…` и `covid_dataset/processed/` создана во втором ноутбуке `02_upload_to_hdfs.ipynb`.


In [None]:
# 1. Импорты для обоих режимов: Spark и Pandas
import pandas as pd                    # библиотека для табличных данных
import os                              # работа с файловой системой

# Пытаемся импортировать PySpark
try:
    from pyspark.sql import SparkSession      # класс для создания Spark-сессии
    from pyspark.sql import functions as F    # встроенные функции Spark (SUM, COUNT, окна и т.д.)
    from pyspark.sql import Window            # объект для описания оконных функций

    PYSPARK_IMPORTED = True                   # флаг: PySpark импортировался
except ModuleNotFoundError:
    PYSPARK_IMPORTED = False                  # PySpark не установлен в окружении

# Флаг, будем ли реально использовать Spark
USE_SPARK = False   # по умолчанию считаем, что нет

print("Пытаемся создать SparkSession...")

if PYSPARK_IMPORTED:
    try:
        # Пытаемся поднять Spark в локальном режиме (без кластера)
        spark = SparkSession.builder \
            .appName("Covid19SparkAnalysis") \
            .master("local[*]") \
            .getOrCreate()

        print("✅ Spark поднялся. Версия:", spark.version)
        USE_SPARK = True                      # отмечаем, что Spark доступен

    except Exception as e:
        # Если при старте Spark всё равно упали (как у тебя с Java/Hadoop),
        # тогда честно переключаемся на Pandas-режим
        print("⚠️ Spark не смог стартануть в этом окружении.")
        print("   Будем работать в режиме Pandas.")
        print("   Краткая причина:", type(e).__name__)
        USE_SPARK = False
else:
    # Если сам PySpark не импортировался
    print("⚠️ PySpark не установлен. Работаем в режиме Pandas.")
    USE_SPARK = False

# Загружаем данные в зависимости от режима
METADATA_PARQUET_PATH = "covid_dataset/metadata/metadata.parquet"  # путь к parquet-файлу

if USE_SPARK:
    # Если Spark живой — читаем данные в Spark DataFrame
    df_spark = spark.read.parquet(METADATA_PARQUET_PATH)
    df_spark.printSchema()
    df_spark.show(5, truncate=False)
else:
    # Если Spark не работает — читаем те же данные в Pandas DataFrame
    df_pd = pd.read_parquet(METADATA_PARQUET_PATH)
    print("Используем режим Pandas. Размер таблицы:", df_pd.shape)
    display(df_pd.head())


In [None]:
# Распределение укрупнённых диагнозов: COVID19 / PNEUMONIA / NORMAL / OTHER

if USE_SPARK:
    # --- Ветка Spark SQL ---
    df_spark.createOrReplaceTempView("covid_metadata")  # регистрируем временную таблицу

    query_diagnosis_distribution = """
    SELECT
        finding_unified,                          -- укрупнённый диагноз
        COUNT(*) AS cnt,                          -- число записей
        ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct  -- доля от общего числа
    FROM covid_metadata
    GROUP BY finding_unified
    ORDER BY cnt DESC
    """

    diagnosis_distribution_df = spark.sql(query_diagnosis_distribution)
    print("Распределение диагнозов (Spark SQL):")
    diagnosis_distribution_df.show()
else:
    # --- Ветка Pandas ---
    # Считаем, сколько записей в каждом диагнозе
    counts = df_pd["finding_unified"].value_counts(dropna=False)        # Series: индекс = диагноз, значение = количество
    pct = round(counts / counts.sum() * 100, 2)                         # проценты

    diagnosis_distribution_pd = (
        pd.DataFrame({
            "finding_unified": counts.index,    # диагноз
            "cnt": counts.values,               # количество
            "pct": pct.values                   # процент
        })
        .sort_values("cnt", ascending=False)    # сортируем по убыванию количества
        .reset_index(drop=True)
    )

    print("Распределение диагнозов (Pandas):")
    display(diagnosis_distribution_pd)


In [None]:
# Количество пациентов по возрастным группам и диагнозам

if USE_SPARK:
    query_age_diagnosis = """
    SELECT
        age_group,
        finding_unified,
        COUNT(*) AS cnt
    FROM covid_metadata
    GROUP BY age_group, finding_unified
    ORDER BY age_group, finding_unified
    """

    age_diagnosis_df = spark.sql(query_age_diagnosis)
    print("Возрастные группы × диагноз (Spark SQL):")
    age_diagnosis_df.show()
else:
    # В Pandas делаем то же самое через groupby
    age_diagnosis_pd = (
        df_pd
        .groupby(["age_group", "finding_unified"])    # группируем по возрастной группе и диагнозу
        .size()                                       # считаем количество строк
        .reset_index(name="cnt")                      # превращаем в колонку cnt
        .sort_values(["age_group", "finding_unified"])
    )

    print("Возрастные группы × диагноз (Pandas):")
    display(age_diagnosis_pd)


In [None]:
# Ранжирование пациентов по возрасту внутри каждого диагноза

if USE_SPARK:
    from pyspark.sql import Window

    age_rank_window = Window.partitionBy("finding_unified").orderBy(F.col("age_filled").desc())

    age_rank_df = (
        df_spark
        .select("patientid", "age_filled", "age_group", "finding_unified")
        .withColumn("age_rank_desc", F.rank().over(age_rank_window))  # RANK() OVER (PARTITION BY ... ORDER BY ...)
    )

    print("Примеры рангов по возрасту (Spark SQL, оконная функция):")
    age_rank_df.orderBy("finding_unified", "age_rank_desc").show(20, truncate=False)
else:
    # В Pandas делаем аналог ранга через groupby + rank()
    age_rank_pd = (
        df_pd[["patientid", "age_filled", "age_group", "finding_unified"]]
        .dropna(subset=["age_filled"])                                  # убираем пустые возраста
        .sort_values(["finding_unified", "age_filled"], ascending=[True, False])
    )

    # Добавляем колонку rank: ранг возраста по убыванию внутри диагноза
    age_rank_pd["age_rank_desc"] = (
        age_rank_pd
        .groupby("finding_unified")["age_filled"]
        .rank(method="dense", ascending=False)
        .astype(int)
    )

    print("Примеры рангов по возрасту (Pandas, аналог оконной функции):")
    display(age_rank_pd.head(20))


In [None]:
PARTITIONED_OUTPUT_PATH_SPARK = "covid_dataset/processed/metadata_partitioned_spark"
PARTITIONED_OUTPUT_PATH_PANDAS = "covid_dataset/processed/metadata_partitioned_pandas"

if USE_SPARK:
    # Запись партиционированного набора Spark'ом
    (
        df_spark
        .write
        .mode("overwrite")
        .partitionBy("finding_unified", "age_group")
        .parquet(PARTITIONED_OUTPUT_PATH_SPARK)
    )
    print("✅ Spark записал партиционированные данные в:", PARTITIONED_OUTPUT_PATH_SPARK)
else:
    # Имитация партиционирования через Pandas:
    # создаём такую же структуру папок и сохраняем маленькие parquet'ы в каждый каталог
    print("Spark недоступен. Имитация партиционирования через Pandas...")

    for (diag, age_group), group in df_pd.groupby(["finding_unified", "age_group"]):
        # Формируем путь в стиле Spark: finding_unified=.../age_group=...
        subdir = os.path.join(
            PARTITIONED_OUTPUT_PATH_PANDAS,
            f"finding_unified={diag}",
            f"age_group={age_group}"
        )
        os.makedirs(subdir, exist_ok=True)
        out_path = os.path.join(subdir, "part-00000.parquet")
        group.to_parquet(out_path, index=False)

    print("✅ Pandas записал данные по партициям в:", PARTITIONED_OUTPUT_PATH_PANDAS)
