# Аналитическая система эпидемиологического мониторинга COVID-19

**Цель проекта:** Разработать аналитическую систему для эпидемиологического мониторинга COVID-19 на основе метаданных рентгеновских снимков, используя стек PySpark.

**Датасет:** COVID-19 Chest X-Ray Dataset

## 1. Подготовка окружения

In [None]:
import os
import sys
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, when, lit, regexp_replace, count, median, lower, sum as _sum, year, month, row_number
from pyspark.sql.types import StructType, StructField, StringType, LongType, DateType
from pyspark.sql.window import Window
import pandas as pd
import numpy as np
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Настройка окружения
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Создание Spark сессии
spark = SparkSession.builder \
    .appName("COVID-19 Эпидемиологический мониторинг") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()

spark

## 2. Загрузка и изучение данных

In [None]:
# Загрузка данных
df = spark.createDataFrame(
    pd.read_csv('https://raw.githubusercontent.com/ieee8023/covid-chestxray-dataset/master/metadata.csv')
    .replace({np.nan: None})[['patientid', 'age', 'sex', 'finding', 'view', 'date']]
)

df.printSchema()
df.show(5, False)
print(f'\nВсего записей: {df.count()}')

## 3. Анализ качества данных

In [None]:
# Подсчет пропущенных значений
results = []
for column in df.columns:
    cnt_null = df.select(_sum(when(col(column).isNull(), lit(1)).otherwise(lit(0)))).collect()[0][0]
    results.append((column, cnt_null))

df_null = pd.DataFrame(results, columns=["name_col", "cnt_null"])

# Визуализация пропущених значень
plt.figure(figsize=(10, 6))
bars = plt.barh(df_null['name_col'], df_null['cnt_null'])
plt.xlabel('Количество пропущенных значений')
plt.ylabel('Название колонки')
plt.title('Распределение пропущенных значений по полям')
plt.tight_layout()

for bar in bars:
    width = bar.get_width()
    plt.text(width, bar.get_y() + bar.get_height()/2, f' {int(width)}', va='center', ha='left')

plt.show()

In [None]:
# Анализ аномалий в возрасте
age_data = df.select(regexp_replace(col('age'), ',', '.').cast('float').alias('age')) \
    .where(col('age').isNotNull()).toPandas()['age']

plt.figure(figsize=(12, 6))
plt.boxplot(age_data.dropna(), vert=False)
plt.title('Boxplot для анализа аномалий в возрасте')
plt.xlabel('Возраст (лет)')
plt.grid(True, alpha=0.3)

median_val = age_data.median()
q1 = age_data.quantile(0.25)
q3 = age_data.quantile(0.75)

plt.axvline(x=median_val, color='red', linestyle='--', alpha=0.5, label=f'Медиана: {median_val:.1f}')
plt.axvline(x=q1, color='green', linestyle=':', alpha=0.5, label=f'Q1: {q1:.1f}')
plt.axvline(x=q3, color='blue', linestyle=':', alpha=0.5, label=f'Q3: {q3:.1f}')

plt.legend()
plt.show()

print(f'Всего записей з віком: {len(age_data)}')
print(f'Пропущено: {df.where(col("age").isNull()).count()}')

iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
print(f'Количество выбросов: {len(age_data[(age_data < lower_bound) | (age_data > upper_bound) | (age_data < 0) | (age_data > 120)])}')

## 4. Предобработка данных

### 4.1. UDF для парсинга дат

In [None]:
def parse_date(date_str):
    """Функция для парсинга различных форматов дат"""
    if not date_str:
        return None
    date_str = str(date_str).strip()
    formats = [
        "%B %d, %Y",      # January 22, 2020
        "%b %d, %Y",      # Feb 18, 2020
        "%d %B %Y",       # 12 March 2020
        "%d %b %Y",       # 12 Mar 2020
        "%m/%d/%Y",       # 3/3/2020
        "%m/%d/%y",       # 03/16/20
        "%B %Y",          # March 2003
        "%b %Y",          # Mar 2003
        "%Y",             # 2014
        "%B %d",          # January 12
        "%b %d",          # Feb 18
        "%d %B",          # 12 March
        "%d %b",          # 12 Mar
    ]
    
    for fmt in formats:
        try:
            result = datetime.strptime(date_str, fmt).date()
            if fmt in ["%B %d", "%b %d", "%d %B", "%d %b"]:
                result = result.replace(year=2020)
            return result
        except:
            continue
    return None

# Регистрация UDF
parse_date_udf = udf(parse_date, DateType())

### 4.2. UDF для категоризации возраста

In [None]:
def categorize_age(age):
    """Функция для категоризации возраста"""
    if age is None:
        return "Неизвестно"
    age = float(age)
    if age < 18:
        return "0-17 (Дети)"
    elif age < 30:
        return "18-29 (Молодые)"
    elif age < 45:
        return "30-44 (Взрослые)"
    elif age < 60:
        return "45-59 (Средний возраст)"
    elif age < 75:
        return "60-74 (Пожилые)"
    else:
        return "75+ (Преклонный возраст)"

# Регистрация UDF
categorize_age_udf = udf(categorize_age, StringType())

### 4.3. UDF для унификации диагнозов

In [None]:
def unify_finding(finding):
    """Функция для унификации диагнозов"""
    if finding is None:
        return "Unknown"
    finding_str = str(finding).lower().strip()
    
    # COVID-19
    if any(word in finding_str for word in ["covid", "sars-cov-2", "coronavirus", "corona"]):
        return "COVID-19"
    
    # Пневмония
    elif "pneumonia" in finding_str:
        return "Pneumonia"
    
    # ARDS
    elif "ards" in finding_str or "acute respiratory distress" in finding_str:
        return "ARDS"
    
    # Нормально
    elif any(word in finding_str for word in ["no finding", "normal", "clear"]):
        return "Normal"
    
    # Туберкулез
    elif "tuberculosis" in finding_str or "tb" in finding_str:
        return "Tuberculosis"
    
    # Другое
    else:
        if "/" in finding_str:
            return finding_str.split("/")[0].strip().title()
        elif len(finding_str) > 30:
            return "Other"
        else:
            return finding_str.title()

# Регистрация UDF
unify_finding_udf = udf(unify_finding, StringType())

### 4.4. Применение предобработки

In [None]:
# Заполнение пропусков в возрасте медианой
median_age = df.where(col("age").isNotNull()).agg(median("age")).collect()[0][0]
df = df.withColumn("age", when(col("age").isNull(), median_age).otherwise(col("age")))

# Заполнение пропусков в поле наиболее частым значением
most_common_sex = df.where(col("sex").isNotNull() & (col("sex") != "")) \
    .groupBy("sex").count().orderBy(col("count").desc()).first()[0]
df = df.withColumn("sex", when(col("sex").isNull(), most_common_sex).otherwise(col("sex")))

# Парсинг дат
df = df.withColumn('date_parsed', parse_date_udf(col('date')))
most_common_date = df.where(col('date_parsed').isNotNull()) \
    .groupBy('date_parsed').count().orderBy(col('count').desc()).first()[0]
df = df.withColumn('date_correct', when(col('date_parsed').isNotNull(), col('date_parsed')).otherwise(most_common_date))

# Категоризация возраста
df = df.withColumn('age_category', categorize_age_udf(col('age')))

# Унификация диагнозов
df = df.withColumn('finding_unified', unify_finding_udf(col('finding')))

# Удаление дубликатов
df = df.dropDuplicates()

# Удаление вспомогательных колонок
df = df.drop('finding', 'date', 'date_parsed')

print("Данные после предобработки:")
df.show(5, False)

## 5. SQL-аналитика

### Создание временного представления

In [None]:
df.createOrReplaceTempView('covid_data')

### 5.1 Базовая статистика по диагнозам

In [None]:
query1 = spark.sql("""
    SELECT
        finding_unified,
        COUNT(*) as count_patients,
        ROUND(COUNT(*) * 100 / SUM(COUNT(*)) OVER(), 2) as percentage,
        ROUND(AVG(age), 1) as avg_age,
        MIN(age) as min_age,
        MAX(age) as max_age
    FROM covid_data
    GROUP BY finding_unified
    ORDER BY count_patients DESC
""")

query1.show(20, False)

### 5.2 Распределение по полу и диагнозам

In [None]:
query2 = spark.sql("""
    SELECT
        sex,
        finding_unified,
        COUNT(*) as count,
        ROUND(COUNT(*) * 100 / SUM(COUNT(*)) OVER (PARTITION BY sex), 2) as percentage_by_sex
    FROM covid_data
    GROUP BY sex, finding_unified
    ORDER BY sex, count DESC
""")

query2.show(20, False)

### 5.3 Оконная функция - топ-3 по возрасту в каждой группе диагнозов

In [None]:
query3 = spark.sql("""
    SELECT
        patientid,
        age,
        sex,
        finding_unified,
        rank_in_finding
    FROM (
        SELECT
            patientid,
            age,
            sex,
            finding_unified,
            ROW_NUMBER() OVER (PARTITION BY finding_unified ORDER BY age DESC) as rank_in_finding
        FROM (
            SELECT DISTINCT
                patientid,
                age,
                sex,
                finding_unified
            FROM covid_data
        ) q
    ) t
    WHERE rank_in_finding <= 3
    ORDER BY finding_unified, rank_in_finding
""")

query3.show(20, False)

### 5.4 Анализ временных трендов

In [None]:
query4 = spark.sql("""
    SELECT
        YEAR(date_correct) as year,
        MONTH(date_correct) as month,
        COUNT(*) as count_studies,
        COUNT(DISTINCT patientid) as unique_patients,
        ROUND(AVG(age), 1) as avg_age
    FROM covid_data
    GROUP BY YEAR(date_correct), MONTH(date_correct)
    ORDER BY year, month
""")

query4.show(20, False)

### 5.5 Статистика по проекциям снимков

In [None]:
query5 = spark.sql("""
    SELECT
        view,
        finding_unified,
        COUNT(*) as count_images,
        ROUND(COUNT(*) * 100 / SUM(COUNT(*)) OVER(PARTITION BY view), 2) as percentage_by_view,
        ROUND(AVG(age), 1) as avg_age
    FROM covid_data
    GROUP BY view, finding_unified
    ORDER BY view, count_images DESC
""")

query5.show(20, False)

## 6. Фильтрация и сохранение данных

In [None]:
# Фильтрация: только COVID-19 пациенты старше 18 лет
filtered_df = df.where((col("finding_unified") == "COVID-19") & (col("age") >= 18))

print(f'Всего записей: {df.count()}')
print(f'После фильтрации (COVID-19, вік ≥ 18): {filtered_df.count()}')

In [None]:
# Сохранение в оптимизированном формате Parquet
filtered_df.write.mode("overwrite").parquet('covid19_filtered')

# Проверка сохраненных данных
spark.read.parquet('covid19_filtered').show(20, False)

## 7. Визуализация

In [None]:
# Конвертация в Pandas для визуализации
df_pandas = df.toPandas()

### 7.1. Круговая диаграмма распределения диагнозов

In [None]:
plt.figure(figsize=(10, 10))
df_pandas['finding_unified'].value_counts().plot.pie(autopct='%1.1f%%', startangle=90)
plt.ylabel('')
plt.title('Распределение диагнозов', fontsize=16, pad=20)
plt.tight_layout()
plt.show()

### 7.2. Столбчатая диаграмма по возрастным группам

In [None]:
plt.figure(figsize=(12, 6))
age_dist = df_pandas['age_category'].value_counts().sort_index()
ax = age_dist.plot(kind='bar', color='steelblue')
plt.xlabel('Возрастная группа', fontsize=12)
plt.ylabel('Количество пациентов', fontsize=12)
plt.title('Распределение пациентов по возрастным группам', fontsize=16)
plt.xticks(rotation=45, ha='right')
plt.grid(axis='y', alpha=0.3)

# Добавляем значения на столбцы
for i, v in enumerate(age_dist):
    ax.text(i, v + 5, str(v), ha='center', va='bottom')

plt.tight_layout()
plt.show()

### 7.3. График временных трендов

In [None]:
# Подготовка данных для графика
time_data = query4.toPandas()
time_data = time_data[time_data['year'] >= 2019]  # Фильтруем данные с 2019 года
time_data['date'] = pd.to_datetime(time_data[['year', 'month']].assign(day=1))

plt.figure(figsize=(14, 6))
plt.plot(time_data['date'], time_data['count_studies'], marker='o', linewidth=2, markersize=6)
plt.xlabel('Дата', fontsize=12)
plt.ylabel('Количество исследований', fontsize=12)
plt.title('Временные тренды количества исследований', fontsize=16)
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

### 7.4. Heatmap распределения диагнозов по проекциям

In [None]:
# Подготовка данных для heatmap
pivot_data = df_pandas.groupby(['view', 'finding_unified']).size().unstack(fill_value=0)

# Берем топ-5 проекций и диагнозов для читабельности
top_views = df_pandas['view'].value_counts().head(5).index
top_findings = df_pandas['finding_unified'].value_counts().head(5).index
pivot_data_filtered = pivot_data.loc[top_views, top_findings]

plt.figure(figsize=(10, 6))
sns.heatmap(pivot_data_filtered, annot=True, fmt='d', cmap='YlOrRd', cbar_kws={'label': 'Количество'})
plt.xlabel('Диагноз', fontsize=12)
plt.ylabel('Проекция снимка', fontsize=12)
plt.title('Распределение диагнозов по проекціях снімків', fontsize=16)
plt.tight_layout()
plt.show()

## 8. Выводы

На основе проведенного анализа можно сделать следующие выводы:

1. **Распределение диагнозов**: Большинство пациентов в датасете имеют диагноз COVID-19, что соответствует тематике исследования.

2. **Возрастная структура**: Анализ показал, что среди пациентов с COVID-19 преобладают лица среднего и пожилого возраста (45+ лет), что согласуется с эпидемиологическими данными о группах риска.

3. **Гендерное распределение**: Выявлены определенные различия в распределении диагнозов между мужчинами и женщинами, что может указывать на различную склонность к разным респираторным заболеваниям.

4. **Временные тренды**: Выявлены сезонные колебания количества исследований, с пиком в определенные месяцы, что может быть связано с волнами пандемии.

5. **Технические аспекты**: Наиболее распространенные проекции снимков (PA, AP) коррелируют с определенными типами диагнозов, что может быть полезно для оптимизации диагностических процедур.

In [None]:
# Закрытие Spark сессии
spark.stop()