1. Часть. Запуск Spark

In [None]:
from pyspark.sql import SparkSession, functions as F

# Здесь пример для работы внутри кластера, можно адаптировать под свои условия, если проект на хосте.

spark = SparkSession.builder \
    .appName("CovidMonitoring") \
    .master("local[*]") \
    .getOrCreate()

spark

2 Часть. Чтение parquet из HDFS

In [None]:
# Указываем для работы внутри Docker-кластера:
path = "hdfs://namenode:8020/covid_dataset/metadata/metadata_cleaned_spark.parquet"

df = spark.read.parquet(path)

df.printSchema()
df.show(5)

3. Часть. Регистрация временной таблицы и SQL‑запросы. Результаты см в логах SPARK(приложение), так как скрипт вводится в командной строкt PowerShell.

In [None]:
df.createOrReplaceTempView("covid_metadata")

3.1. Часть. SQL-запросы в Spark SQL

In [None]:
# Смотреть распределение диагнозов
spark.sql("""
    SELECT finding_clean,
           COUNT(*) AS cnt
    FROM covid_metadata
    GROUP BY finding_clean
    ORDER BY cnt DESC
""").show()

3.2. Часть. возрастные категории × диагноз

In [None]:
spark.sql("""
    SELECT age_category,
           finding_clean,
           COUNT(*) AS cnt
    FROM covid_metadata
    GROUP BY age_category, finding_clean
    ORDER BY age_category, cnt DESC
""").show(50)

3.3. Часть. средний возраст по диагнозам

In [None]:
spark.sql("""
    SELECT finding_clean,
           ROUND(AVG(age), 1) AS avg_age,
           COUNT(*) AS cnt
    FROM covid_metadata
    GROUP BY finding_clean
    HAVING cnt >= 10
    ORDER BY avg_age
""").show()

4 часть. PySpark DataFrame API (пример фильтраций и агрегаций

In [None]:
# выборка COVID‑19 и ключевых полей
covid_df = df.filter(df.finding_clean == "COVID-19") \
             .select("patientid", "sex", "age", "age_category",
                     "RT_PCR_positive", "survival", "date", "location")

covid_df.show(5)

In [None]:
# агрегация по датам
by_date = covid_df.groupBy("date").count().orderBy("date")
by_date.show(10)

Визуализации (на основе Spark → pandas). в проекте визуализация делалась в табличном виде. На случай если нужны гистограммы и диаграммы именно через SPARK - скрипты на вывод ниже.

In [None]:
# Круговая диаграмма по диагнозам
import pandas as pd
import matplotlib.pyplot as plt

diag_pdf = spark.sql("""
    SELECT finding_clean, COUNT(*) AS cnt
    FROM covid_metadata
    GROUP BY finding_clean
""").toPandas()

diag_pdf

In [None]:
plt.figure(figsize=(6, 6))
plt.pie(
    diag_pdf["cnt"],
    labels=diag_pdf["finding_clean"],
    autopct="%1.1f%%",
    startangle=90
)
plt.title("Распределение диагнозов (finding_clean)")
plt.tight_layout()
plt.show()

In [None]:
# Гистограмма возратста пациентов
covid_pdf = covid_df.toPandas()

plt.figure(figsize=(8, 5))
covid_pdf["age"].hist(bins=10, edgecolor="black")
plt.xlabel("Возраст, лет")
plt.ylabel("Число пациетов с COVID-19")
plt.title("Распред возраста пациентов с COVID-19")
plt.tight_layout()
plt.show()