In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, trim, when, lit, year, month, dayofmonth, to_date, count, coalesce, floor, avg, regexp_replace, countDistinct, concat
from pyspark.sql.types import IntegerType
import pandas as pd

# Инициализация SparkSession
spark = SparkSession.builder \
    .appName("MetadataCleaningAndOptimization") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY") \
    .getOrCreate() 

input_hdfs_csv_path = "hdfs:///covid_dataset/metadata/metadata_raw.csv" # Путь к исходному CSV-файлу с метаданными в HDFS 
output_optimized_parquet_table_name = "covid_metadata_optimized_table" # Путь в HDFS, куда будут сохранены оптимизированные данные в формате Parquet
output_optimized_parquet_path = "hdfs:///covid_dataset/metadata_optimized/"
output_hdfs_csv_path = "hdfs:///covid_dataset/metadata/metadata_cleaned.csv" # Путь в HDFS, куда будут сохранены очищенные данные в формате CSV (для удобства просмотра)

# Загрузка исходных данных 
df = spark.read.csv(input_hdfs_csv_path, header=True, inferSchema=True)

# Удаление ненужных колонок, у нас это: '_c29' 
if '_c29' in df.columns:
    df = df.drop('_c29')

# Удаление полностью идентичных строк из DataFrame
df = df.dropDuplicates()

# Очистка и стандартизация колонки 'finding' (диагноз) значением "unknown"
df = df.withColumn("finding",
                    when(col("finding").isNull() | (trim(col("finding")) == ""), lit("unknown"))
                    .otherwise(col("finding")))

# Приведение всех значений в колонке 'finding' к нижнему регистру и удаление пробелов по краям
df = df.withColumn("finding", lower(trim(col("finding"))))

# Создание новой колонки 'finding_unified' для стандартизации диагнозов
# Это позволяет сгруппировать похожие диагнозы в общие категории. У нас это: "covid-19", "other pneumonia", "tuberculosis", "no finding", "other finding"
df = df.withColumn("finding_unified",
    when(col("finding").contains("covid"), "covid-19") # Если содержит "covid", то "covid-19"
    .when(col("finding").contains("pneumonia") & (~col("finding").contains("covid")), "other pneumonia") # Если пневмония, но не ковид, то "other pneumonia"
    .when(col("finding").contains("tuberculosis"), "tuberculosis") # Если туберкулез, то "tuberculosis"
    .when(col("finding").contains("no finding"), "no finding") # Если "no finding", то "no finding"
    .otherwise("other finding") # Все остальное - "other finding"
)

# Стандартизация колонки 'RT_PCR_positive' (результат ПЦР-теста) 
# Приведение значений 'Y'/'y' к 'Y', 'N'/'n' к 'N', остальные - 'UNKNOWN'
if "RT_PCR_positive" in df.columns:
    df = df.withColumn("RT_PCR_positive",
        when(lower(col("RT_PCR_positive")) == "y", "Y")
        .when(lower(col("RT_PCR_positive")) == "n", "N")
        .otherwise("UNKNOWN")
    )

# Стандартизация колонки 'survival' (выживаемость) 
# Приведение значений 'Y'/'y' к 'Y', 'N'/'n' к 'N', остальные - 'UNKNOWN'
if "survival" in df.columns:
    df = df.withColumn("survival",
        when(lower(col("survival")) == "y", "Y")
        .when(lower(col("survival")) == "n", "N")
        .otherwise("UNKNOWN")
    )

# Парсинг и извлечение даты 
# Создание новой колонки 'date_parsed' путем попытки парсинга колонки 'date' с использованием нескольких форматов
df = df.withColumn("date_parsed",
    coalesce(
        to_date(col("date"), "MMMM d, yyyy"), # Например, "July 5, 2025"
        to_date(col("date"), "MMMM dd, yyyy"),# Например, "July 05, 2025"
        to_date(col("date"), "MMM d, yyyy"),  # Например, "Jul 5, 2025"
        to_date(col("date"), "MMM dd, yyyy"), # Например, "Jul 05, 2025"
        to_date(col("date"), "yyyy-MM-dd"),   # Стандартный формат "2025-07-05"
        to_date(col("date"), "MM/dd/yyyy"),   # "07/05/2025"
        to_date(col("date"), "M/d/yy"),       # "7/5/25"
        to_date(col("date"), "yyyy")          # Только год "2025"
    )
)

# Извлечение года, месяца и дня из спарсенной даты
df = df.withColumn("year", year(col("date_parsed"))) \
       .withColumn("month", month(col("date_parsed"))) \
       .withColumn("day", dayofmonth(col("date_parsed")))

# Создание колонки 'is_covid': 1, если 'finding_unified' - "covid-19", иначе 0
df = df.withColumn("is_covid", when(col("finding_unified") == "covid-19", 1).otherwise(0))

# Обработка колонки 'age' 
if "age" in df.columns:
    # Попытка привести 'age' к целочисленному типу. Если не удается, устанавливается None
    df = df.withColumn("age_numeric",
                        when(col("age").cast(IntegerType()).isNotNull(), col("age").cast(IntegerType()))
                        .otherwise(None))

    # Вычисление медианы возраста для заполнения пропущенных значений
    filtered_df_for_median = df.filter(col("age_numeric").isNotNull())
    if filtered_df_for_median.count() > 0:
        median_age = filtered_df_for_median.approxQuantile("age_numeric", [0.5], 0.25)[0]
        # Заполнение пропущенных значений в 'age_numeric' вычисленной медианой
        df = df.withColumn("age_numeric", coalesce(col("age_numeric"), lit(int(median_age))))
    else:
        # Если все значения 'age_numeric' NULL, заполняем 0 
        df = df.withColumn("age_numeric", coalesce(col("age_numeric"), lit(0)))

    # Создание колонки 'age_group' на основе 'age_numeric'
    df = df.withColumn("age_group",
                        when(col("age_numeric") < 18, "child")
                        .when((col("age_numeric") >= 18) & (col("age_numeric") < 65), "adult")
                        .when(col("age_numeric") >= 65, "senior")
                        .otherwise("unknown"))

# Обработка колонки 'sex' 
if "sex" in df.columns:
    # Вычисление mode (наиболее часто встречающегося значения) для заполнения пропущенных
    mode_sex_row = df.filter(col("sex").isNotNull()).groupBy("sex").count().orderBy(col("count").desc()).first()
    if mode_sex_row:
        mode_sex = mode_sex_row["sex"]
        # Заполнение пропущенных значений в 'sex' вычисленной mode
        df = df.withColumn("sex", coalesce(col("sex"), lit(mode_sex)))
    else:
        # Если все значения 'sex' NULL, заполняем "UNKNOWN"
        df = df.withColumn("sex", coalesce(col("sex"), lit("UNKNOWN")))

# Обработка колонки 'temperature' 
if "temperature" in df.columns:
    # Очистка нереалистичных значений температуры (ниже 35 или выше 42 градусов Цельсия)
    df = df.withColumn("temperature",
                        when((col("temperature").isNotNull()) & ((col("temperature") < 35) | (col("temperature") > 42)), None)
                        .otherwise(col("temperature")))
    # Вычисление среднего значения температуры для заполнения пропущенных
    avg_temp_row = df.filter(col("temperature").isNotNull()).agg(avg("temperature")).collect()[0][0]
    if avg_temp_row is not None:
        avg_temp = avg_temp_row
        # Заполнение пропущенных значений в 'temperature' вычисленным средним
        df = df.withColumn("temperature", coalesce(col("temperature"), lit(avg_temp)))

# Обработка колонки 'pO2_saturation' (насыщение кислородом) 
if "pO2_saturation" in df.columns:
    # Очистка нереалистичных значений сатурации (ниже 70 или выше 100) 
    df = df.withColumn("pO2_saturation",
                        when((col("pO2_saturation").isNotNull()) & ((col("pO2_saturation") < 70) | (col("pO2_saturation") > 100)), None)
                        .otherwise(col("pO2_saturation")))
    # Вычисление среднего значения сатурации для заполнения пропущенных
    avg_po2_row = df.filter(col("pO2_saturation").isNotNull()).agg(avg("pO2_saturation")).collect()[0][0]
    if avg_po2_row is not None:
        avg_po2 = avg_po2_row
        # Заполнение пропущенных значений в 'pO2_saturation' вычисленным средним
        df = df.withColumn("pO2_saturation", coalesce(col("pO2_saturation"), lit(avg_po2)))

# Создание полного пути к изображению в HDFS 
# Создание колонки 'hdfs_image_path' путем конкатенации базового пути HDFS и имени файла изображения. Чтобы Spark знал где искать
df_cleaned_final = df.withColumn("hdfs_image_path", concat(lit("hdfs:///covid_dataset/images/"), col("filename")))

# Фильтрация строк с некорректными путями к изображениям 
df_cleaned_final = df_cleaned_final.filter(col("hdfs_image_path").isNotNull() & (trim(col("hdfs_image_path")) != ""))

# Вывод статистики по очищенному DataFrame 
total_rows = df_cleaned_final.count()
print(f"Финальное количество строк: {total_rows}")

print("Схема DataFrame")
df_cleaned_final.printSchema()

# Распределение пропущенных значений по колонкам 
print("Распределение пропущенных значений (процент)")
missing_data = []
for column in df_cleaned_final.columns:
    missing_count = df_cleaned_final.filter(col(column).isNull() | (trim(col(column)) == "")).count()
    missing_percentage = (missing_count / total_rows) * 100 if total_rows > 0 else 0
    missing_data.append({"Column": column, "Missing Count": missing_count, "Missing Percentage": f"{missing_percentage:.2f}%"})

# Вывод результатов в виде Pandas DataFrame
print(pd.DataFrame(missing_data).set_index("Column"))

# Общая статистика для числовых колонок 
print("Общая статистика для числовых колонок")
numeric_cols = [c for c, t in df_cleaned_final.dtypes if t in ["int", "double", "long", "float"] and c not in ["year", "month", "day", "is_covid"]]
if numeric_cols:
    # summary(): Вычисляет статистические сводки (count, mean, stddev, min, max) для выбранных колонок
    df_cleaned_final.select(numeric_cols).summary("count", "mean", "stddev", "min", "max").show()
else:
    print("Числовых колонок для статистики не найдено.")

# Распределение уникальных значений для категориальных колонок 
print("Распределение уникальных значений для некоторых категориальных колонок")
categorical_cols_to_check = ["finding_unified", "sex", "view", "modality", "location", "age_group", "RT_PCR_positive", "survival"]
for column in categorical_cols_to_check:
    if column in df_cleaned_final.columns:
        print(f"\nКолонка '{column}':")
        # Подсчет количества уникальных значений
        num_distinct = df_cleaned_final.select(countDistinct(column)).collect()[0][0]
        if num_distinct < 50:
            df_cleaned_final.groupBy(column).count().orderBy(col("count").desc()).show(truncate=False)
        else:
            # Если уникальных значений много, показываем только топ-20
            df_cleaned_final.groupBy(column).count().orderBy(col("count").desc()).limit(20).show(truncate=False)
    else:
        print(f"Колонка '{column}' не найдена.")

# Сохранение очищенных данных в HDFS 
# Сохранение DataFrame в формате Parquet
# Перезаписывает данные, если они уже существуют по указанному пути
# Разбивает данные на директории по году и месяцу (например, /year=2020/month=01)
# Дополнительно организует данные внутри партиций в "бакеты"
# Сохраняет DataFrame как управляемую таблицу в Hive Metastore
print(f"\nСохраняем очищенные данные в HDFS в формате Parquet с партиционированием и бакетированием по пути: {output_optimized_parquet_path}")
df_cleaned_final.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .bucketBy(10, "finding_unified", "sex") \
    .saveAsTable(output_optimized_parquet_table_name, path=output_optimized_parquet_path, format="parquet")
print(f"Данные успешно сохранены в Parquet в виде таблицы '{output_optimized_parquet_table_name}'.")

# Сохранение DataFrame в формате CSV
print(f"\nСохраняем очищенные данные в HDFS в формате CSV: {output_hdfs_csv_path}")
df_cleaned_final.write.mode("overwrite").csv(output_hdfs_csv_path, header=True)
print("Данные успешно сохранены в CSV.")

# Остановка SparkSession
spark.stop()
print("SparkSession остановлена. Очистка и сохранение метаданных завершены.")

Финальное количество строк: 950
Схема DataFrame
root
 |-- patientid: string (nullable = true)
 |-- offset: integer (nullable = true)
 |-- sex: string (nullable = false)
 |-- age: integer (nullable = true)
 |-- finding: string (nullable = true)
 |-- RT_PCR_positive: string (nullable = false)
 |-- survival: string (nullable = false)
 |-- intubated: string (nullable = true)
 |-- intubation_present: string (nullable = true)
 |-- went_icu: string (nullable = true)
 |-- in_icu: string (nullable = true)
 |-- needed_supplemental_O2: string (nullable = true)
 |-- extubated: string (nullable = true)
 |-- temperature: double (nullable = false)
 |-- pO2_saturation: double (nullable = false)
 |-- leukocyte_count: double (nullable = true)
 |-- neutrophil_count: double (nullable = true)
 |-- lymphocyte_count: double (nullable = true)
 |-- view: string (nullable = true)
 |-- modality: string (nullable = true)
 |-- date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- folder: stri

                                                                                

Данные успешно сохранены в Parquet в виде таблицы 'covid_metadata_optimized_table'.

Сохраняем очищенные данные в HDFS в формате CSV: hdfs:///covid_dataset/metadata/metadata_cleaned.csv


                                                                                

Данные успешно сохранены в CSV.
SparkSession остановлена. Очистка и сохранение метаданных завершены.
