### 1. Подготовка среды

In [None]:
import os
import warnings
warnings.filterwarnings("ignore")

# Инициализация Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("COVID19_XRay_Analysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

### 2. Загрузка и предобработка данных

In [None]:
# Загрузка данных
df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .csv("metadata.csv")

# Просмотр структуры
df.printSchema()
df.show(5)

In [None]:
# Обработка пропусков и дубликатов
df_clean = df.dropDuplicates()

# Заполнение пропусков
from pyspark.sql.functions import when, col, mean, mode

# Возраст: заполняем медианой (приближённо — средним, так как PySpark не имеет median напрямую)
mean_age = df_clean.select(mean(col("age"))).collect()[0][0]
df_clean = df_clean.fillna({"age": mean_age})

# Пол: заполняем модой (наиболее частым значением)
mode_sex = df_clean.groupBy("sex").count().orderBy(desc("count")).first()["sex"]
df_clean = df_clean.fillna({"sex": mode_sex})

# Унификация диагнозов
def unify_finding(finding):
    if finding is None:
        return "Unknown"
    f = finding.lower()
    if "covid" in f or "sars-cov-2" in f:
        return "COVID-19"
    elif "pneumonia" in f:
        return "Pneumonia"
    elif "normal" in f:
        return "Normal"
    else:
        return "Other"

unify_finding_udf = udf(unify_finding, StringType())
df_clean = df_clean.withColumn("finding_unified", unify_finding_udf(col("finding")))

# Фильтрация: оставляем только основные категории
df_clean = df_clean.filter(col("finding_unified").isin(["COVID-19", "Pneumonia", "Normal"]))

### 3. Анализ качества данных

In [None]:
# Распределение пропущенных значений
from pyspark.sql.functions import isnan, when, count, col

missing = df_clean.select([count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in df_clean.columns])
missing.show()

# Аномалии: возраст < 0 или > 120
df_clean = df_clean.filter((col("age") >= 0) & (col("age") <= 120))

# Дата: фильтрация некорректных дат (опционально)
# Здесь предполагаем, что дата в формате 'yyyy-MM-dd' или 'dd.MM.yyyy'
# Для простоты — преобразуем и фильтруем
df_clean = df_clean.withColumn("date_parsed", to_date(col("date"), "yyyy-MM-dd"))
df_clean = df_clean.filter(col("date_parsed").isNotNull())

### SQL-аналитика

In [None]:
# Регистрация таблицы для SQL
df_clean.createOrReplaceTempView("xray_data")

In [None]:
# Базовая статистика по диагнозам
q1 = spark.sql("""
    SELECT finding_unified AS diagnosis, COUNT(*) AS count
    FROM xray_data
    GROUP BY finding_unified
    ORDER BY count DESC
""")
q1.show()

In [None]:
# Распределение по полу и диагнозам
q2 = spark.sql("""
    SELECT sex, finding_unified AS diagnosis, COUNT(*) AS count
    FROM xray_data
    WHERE sex IN ('M', 'F')
    GROUP BY sex, finding_unified
    ORDER BY diagnosis, sex
""")
q2.show()

In [None]:
# Топ-3 по возрасту в каждой группе диагнозов
window_spec = Window.partitionBy("finding_unified").orderBy(desc("age"))
q3 = df_clean.withColumn("age_rank", row_number().over(window_spec)) \
             .filter(col("age_rank") <= 3) \
             .select("finding_unified", "age", "patientid", "age_rank")
q3.show()

In [None]:
# Временные тренды
q4 = spark.sql("""
    SELECT 
        date_trunc('month', date_parsed) AS month,
        finding_unified AS diagnosis,
        COUNT(*) AS count
    FROM xray_data
    WHERE date_parsed IS NOT NULL
    GROUP BY month, diagnosis
    ORDER BY month, diagnosis
""")
q4.show()

In [None]:
# Проекции и диагнозы
q5 = spark.sql("""
    SELECT 
        view,
        finding_unified AS diagnosis,
        COUNT(*) AS count
    FROM xray_data
    WHERE view IS NOT NULL
    GROUP BY view, finding_unified
    ORDER BY view, count DESC
""")
q5.show()

### Обработка в PySpark с UDF

In [None]:
# Категоризация возраста
def categorize_age(age):
    if age < 18:
        return "Child"
    elif age < 65:
        return "Adult"
    else:
        return "Elderly"

age_category_udf = udf(categorize_age, StringType())
df_final = df_clean.withColumn("age_group", age_category_udf(col("age")))

# Сохранение в Parquet (оптимизированный формат)
df_final.write.mode("overwrite").parquet("output/cleaned_data.parquet")

### Визуализация

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Конвертация в Pandas для визуализации (только для Notebook!)
# Убедитесь, что данные помещаются в память!
pdf = df_final.toPandas()

# 1. Круговая диаграмма диагнозов
plt.figure(figsize=(6, 6))
pdf['finding_unified'].value_counts().plot.pie(autopct='%1.1f%%', startangle=90)
plt.title("Распределение диагнозов")
plt.ylabel('')
plt.show()

# 2. Столбчатая диаграмма по возрастным группам
plt.figure(figsize=(8, 5))
sns.countplot(data=pdf, x='age_group', hue='finding_unified')
plt.title("Распределение по возрастным группам и диагнозам")
plt.xticks(rotation=0)
plt.show()

# 3. Временные тренды
pdf_trend = q4.toPandas()
pdf_trend['month'] = pd.to_datetime(pdf_trend['month'])
plt.figure(figsize=(10, 5))
for diagnosis in pdf_trend['diagnosis'].unique():
    subset = pdf_trend[pdf_trend['diagnosis'] == diagnosis]
    plt.plot(subset['month'], subset['count'], label=diagnosis, marker='o')
plt.title("Временные тренды исследований")
plt.xlabel("Месяц")
plt.ylabel("Количество снимков")
plt.legend()
plt.grid(True)
plt.show()

# 4. Heatmap: диагнозы vs проекции
heatmap_data = pdf.groupby(['view', 'finding_unified']).size().unstack(fill_value=0)
plt.figure(figsize=(8, 6))
sns.heatmap(heatmap_data, annot=True, fmt="d", cmap="Blues")
plt.title("Heatmap: диагнозы vs проекции снимков")
plt.xlabel("Диагноз")
plt.ylabel("Проекция (view)")
plt.show()

Очистка

In [None]:
spark.stop()