In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, count, when
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
# Avvio della sessione Spark
spark = SparkSession.builder.appName("SymptomDataCleaning").getOrCreate()


In [None]:
# Caricamento del file CSV
df = spark.read.option("header", "true").csv("/content/drive/MyDrive/BigData/dataset_cleaned.csv")

# Identificazione delle colonne dei sintomi
symptom_cols = [col_name for col_name in df.columns if "Symptom_" in col_name]
df.select(symptom_cols).show(5)


In [None]:
# Pulizia sintomi: lowercase, underscore → spazio, rimozione spazi multipli
for col_name in symptom_cols:
    df = df.withColumn(col_name, regexp_replace(lower(col(col_name)), "_", " "))
    df = df.withColumn(col_name, regexp_replace(col(col_name), "\s+", " "))


In [None]:
# Mappa sinonimi (esempio base)
synonym_map = {
    "vomiting": "vomit",
    "head ache": "headache",
    "high fever": "fever",
    "skin rash": "rash"
}

# UDF per la sostituzione
def unify_symptom(s):
    return synonym_map.get(s, s) if s else s


unify_udf = udf(unify_symptom, StringType())

# Applica UDF a tutte le colonne dei sintomi
for col_name in symptom_cols:
    df = df.withColumn(col_name, unify_udf(col(col_name)))


In [None]:
# Calcola symptom_count
df = df.withColumn("symptom_count", sum([when(col(c).isNotNull(), 1).otherwise(0) for c in symptom_cols]))


In [None]:
# Mantieni solo righe con almeno 2 sintomi
df = df.filter(col("symptom_count") >= 2)


In [None]:
# Salva su CSV
df.toPandas().to_csv("/content/drive/MyDrive/BigData/dataset_curated_spark.csv", index=False)
df.select("Disease", "symptom_count").show(5)
