# Homework 1

Check installazione spark

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

assert  "3." in sc.version, "Verify that the cluster Spark's version is 3.x"

In [3]:
print("Spark version:", sc.version)

Spark version: 3.5.5


In [4]:
from pyspark.sql import SparkSession
spark_session = SparkSession(sc)

## Lettura Dataset

Il blocco di codice successivo serve a:
1. Usare la prima riga automaticamente come header  
2. Fare inferenza automatica sul tipo di dato di ogni colonna  
3. Gestire records multi-linea  
4. Specificare che sono usate le doppie virgolette per racchiudere i campi  
5. Gestire le virgolette all'interno dei campi racchiusi dalle stesse  
6. Permettere il parsing di record non formattati perfettamente senza fallire  
7. Specificare il path del file `.csv` da cui leggere  

In [5]:
# Load the CSV file into a DataFrame
df_clinical = spark_session.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("multiLine", "true") \
  .option("quote", '"') \
  .option("escape", '"') \
  .option("mode", "PERMISSIVE") \
  .csv('dimensions_clinicalTrials.csv') 

In [6]:
df_clinical.printSchema()  # Prints the schema of the DataFrame

root
 |-- Rank: integer (nullable = true)
 |-- Trial ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Brief title: string (nullable = true)
 |-- Acronym: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Start date: date (nullable = true)
 |-- Start Year: double (nullable = true)
 |-- End Date: date (nullable = true)
 |-- Completion Year: double (nullable = true)
 |-- Phase: string (nullable = true)
 |-- Study Type: string (nullable = true)
 |-- Study Design: string (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Recruitment Status: string (nullable = true)
 |-- Number of Participants: double (nullable = true)
 |-- Intervention: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Registry: string (nullable = true)
 |-- Investigators/Contacts: string (nullable = true)
 |-- Sponsors/Collaborators: string (nullable = true)
 |-- City of Sponsor/Collaborator: string (nullable = true

In [7]:
df_clinical.show(5)  # Displays the first 5 rows of the DataFrame

+----+--------------+--------------------+--------------------+-----------------+--------------------+----------+----------+----------+---------------+-------+--------------+--------------------+--------------------+------------------+----------------------+--------------------+------+-------------------+------------------+----------------------+----------------------+----------------------------+-----------------------------+-------------------------------+---------------------+------------+--------------+--------------------+-------------------------+--------------------+--------------------------------+--------------------+--------------------+-------------------+------------+--------------+---------------+
|Rank|      Trial ID|               Title|         Brief title|          Acronym|            Abstract|Start date|Start Year|  End Date|Completion Year|  Phase|    Study Type|        Study Design|          Conditions|Recruitment Status|Number of Participants|        Intervention|G

## Analytics

### Analytics di esempio

Numero di studi cominciati per anno

In [8]:
df_clinical.filter(df_clinical["Start Year"].isNotNull()) \
    .groupBy("Start Year") \
    .count() \
    .orderBy("count", ascending = False) \
    .show(10)  # Displays the count of clinical trials by start year

+----------+-----+
|Start Year|count|
+----------+-----+
|    2021.0| 1460|
|    2020.0| 1438|
|    2019.0| 1321|
|    2018.0| 1207|
|    2022.0| 1151|
|    2017.0| 1146|
|    2016.0|  850|
|    2015.0|  841|
|    2013.0|  784|
|    2014.0|  767|
+----------+-----+
only showing top 10 rows



Numero medio di partecipanti per tipo di studio

In [9]:
from pyspark.sql.functions import avg, round

df_clinical.filter(df_clinical["Number of Participants"].isNotNull()) \
    .groupBy("Study Type") \
    .agg(round(avg("Number of Participants"), 2).alias("avg_participants")) \
    .orderBy("avg_participants", ascending = False) \
    .show(10)  # Displays the average number (roundend to the .2 decimal) of participants by study type

+-------------------+----------------+
|         Study Type|avg_participants|
+-------------------+----------------+
|Active surveillance|        115000.0|
| Non-interventional|          3260.8|
|      Observational|         2632.76|
|     Interventional|          660.18|
|                CCT|           520.0|
|              Other|           202.0|
+-------------------+----------------+



Top 10 medical conditions più presenti

In [10]:
df_clinical.filter(df_clinical["Conditions"].isNotNull()) \
    .groupBy("Conditions") \
    .count() \
    .orderBy("count", ascending = False) \
    .show(10)  # Displays the top 10 count of clinical trials by condition

+--------------------+-----+
|          Conditions|count|
+--------------------+-----+
|       Breast Cancer|  281|
|    Multiple Myeloma|  179|
|      Ovarian Cancer|  126|
|            Melanoma|  118|
|  Ulcerative Colitis|  112|
|Acute Myeloid Leu...|  106|
|            Leukemia|  102|
|Carcinoma, Non-Sm...|  101|
| Follicular Lymphoma|   92|
|Metastatic Colore...|   90|
+--------------------+-----+
only showing top 10 rows



Paesi con il numero più alto di partecipanti

In [11]:
df_clinical.filter(df_clinical["Number of Participants"].isNotNull() & df_clinical["Funder Country"].isNotNull()) \
    .groupBy("Funder Country") \
    .count() \
    .orderBy("count", ascending = False) \
    .show(10)  # Displays the top 10 count of clinical trials by funder country

+--------------------+-----+
|      Funder Country|count|
+--------------------+-----+
|       United States| 2785|
|               Italy| 1797|
|             Germany|  391|
|               Japan|  325|
|             Belgium|  240|
|        Italy; Italy|  182|
|         Switzerland|  165|
|United States; Un...|  107|
|      United Kingdom|   91|
|United States; Japan|   68|
+--------------------+-----+
only showing top 10 rows



Si può notare che esistono anche duplicati o paesi che hanno probabilmente una collaborazione (separati da ;) che sono presenti in conteggi separati. Pertanto è utile provare a separarli.

In [12]:
from pyspark.sql.functions import split, explode, trim

# Split the 'Funder Country' column by ';' and explode it into separate rows
df_cleaned = df_clinical.withColumn("Funder Country", explode(split(df_clinical["Funder Country"], ";")))

# Remove leading/trailing whitespace from country names
df_cleaned = df_cleaned.withColumn("Funder Country", trim(df_cleaned["Funder Country"]))

# Filter out null values and group by 'Funder Country'
df_cleaned.filter(df_cleaned["Number of Participants"].isNotNull() & df_cleaned["Funder Country"].isNotNull()) \
    .groupBy("Funder Country") \
    .count() \
    .orderBy("count", ascending=False) \
    .show(10)  # Displays the top 10 count of clinical trials by funder country

+--------------+-----+
|Funder Country|count|
+--------------+-----+
| United States| 3662|
|         Italy| 2939|
|       Germany|  602|
|         Japan|  547|
|United Kingdom|  408|
|       Belgium|  347|
|   Switzerland|  278|
|   Netherlands|  197|
|        France|  150|
|       Denmark|   82|
+--------------+-----+
only showing top 10 rows



## Analytics proposte

## Distribuzione delle Age Groups in base alle condizioni

In [13]:
#Si sono filtrati i dati per età e condizioni, e sono stati esclusi i valori nulli e le righe con "None - None" o "N/A - N/A"
df_clinical.filter(df_clinical["Age"].isNotNull() & df_clinical["Conditions"].isNotNull() & (df_clinical["Age"] != "None - None") & (df_clinical["Age"] != "N/A - N/A")) \
    .groupBy("Age", "Conditions") \
    .count() \
    .orderBy("count", ascending = False) \
    .show(10) 

+--------------+--------------------+-----+
|           Age|          Conditions|count|
+--------------+--------------------+-----+
|18 Years - N/A|       Breast Cancer|  212|
|18 Years - N/A|    Multiple Myeloma|  159|
|18 Years - N/A|      Ovarian Cancer|   99|
|18 Years - N/A|Carcinoma, Non-Sm...|   97|
|18 Years - N/A|Non-Small Cell Lu...|   78|
|18 Years - N/A|Metastatic Breast...|   77|
|18 Years - N/A| Follicular Lymphoma|   69|
|18 Years - N/A|       Heart Failure|   66|
|18 Years - N/A|            Melanoma|   66|
|18 Years - N/A|Chronic Myeloid L...|   64|
+--------------+--------------------+-----+
only showing top 10 rows



In [14]:
#Qui si è sceso nel particolare delle top 3 Condizioni, per vedere le età più comuni per ciascuna di esse
df_clinical.filter(df_clinical["Age"].isNotNull() & (df_clinical["Conditions"] == "Breast Cancer") & (df_clinical["Age"] != "None - None") & (df_clinical["Age"] != "N/A - N/A") ) \
    .groupBy("Age", "Conditions") \
    .count() \
    .orderBy("count", ascending = False) \
    .show(5) 

df_clinical.filter(df_clinical["Age"].isNotNull() & (df_clinical["Conditions"] == "Multiple Myeloma") & (df_clinical["Age"] != "None - None") & (df_clinical["Age"] != "N/A - N/A") ) \
    .groupBy("Age", "Conditions") \
    .count() \
    .orderBy("count", ascending = False) \
    .show(5) 

df_clinical.filter(df_clinical["Age"].isNotNull() & (df_clinical["Conditions"] == "Ovarian Cancer") & (df_clinical["Age"] != "None - None") & (df_clinical["Age"] != "N/A - N/A") ) \
    .groupBy("Age", "Conditions") \
    .count() \
    .orderBy("count", ascending = False) \
    .show(5) 

+-------------------+-------------+-----+
|                Age|   Conditions|count|
+-------------------+-------------+-----+
|     18 Years - N/A|Breast Cancer|  212|
|18 Years - 70 Years|Breast Cancer|    8|
|18 Years - 75 Years|Breast Cancer|    7|
|     N/A - 70 Years|Breast Cancer|    4|
|18 Years - 45 Years|Breast Cancer|    4|
+-------------------+-------------+-----+
only showing top 5 rows

+-------------------+----------------+-----+
|                Age|      Conditions|count|
+-------------------+----------------+-----+
|     18 Years - N/A|Multiple Myeloma|  159|
|18 Years - 65 Years|Multiple Myeloma|    8|
|18 Years - 70 Years|Multiple Myeloma|    5|
|     65 Years - N/A|Multiple Myeloma|    2|
|18 Years - 80 Years|Multiple Myeloma|    1|
+-------------------+----------------+-----+

+-------------------+--------------+-----+
|                Age|    Conditions|count|
+-------------------+--------------+-----+
|     18 Years - N/A|Ovarian Cancer|   99|
|18 Years - 75 Year

Partendo dai risultati precedenti, si è proceduto ad estrarre la Start Age dell'Age Group per raggruppare il campione su tale base.

In [15]:
from pyspark.sql.functions import split

df_clinical_with_start_age = df_clinical.withColumn("Start Age", split(df_clinical["Age"], " ")[0])

df_clinical_with_start_age.filter(
    df_clinical_with_start_age["Age"].isNotNull() & 
    (df_clinical_with_start_age["Conditions"] == "Breast Cancer") & 
    (df_clinical_with_start_age["Age"] != "None - None") & 
    (df_clinical_with_start_age["Age"] != "N/A - N/A")
) \
    .groupBy("Start Age", "Conditions") \
    .count() \
    .orderBy("count", ascending=False) \
    .show(5)

df_clinical_with_start_age.filter(
    df_clinical_with_start_age["Age"].isNotNull()& 
    (df_clinical_with_start_age["Conditions"] == "Multiple Myeloma") & 
    (df_clinical_with_start_age["Age"] != "None - None") & 
    (df_clinical_with_start_age["Age"] != "N/A - N/A")
) \
    .groupBy("Start Age", "Conditions") \
    .count() \
    .orderBy("count", ascending=False) \
    .show(5)

df_clinical_with_start_age.filter(
    df_clinical_with_start_age["Age"].isNotNull() & 
    (df_clinical_with_start_age["Conditions"] == "Ovarian Cancer") & 
    (df_clinical_with_start_age["Age"] != "None - None") & 
    (df_clinical_with_start_age["Age"] != "N/A - N/A")
) \
    .groupBy("Start Age", "Conditions") \
    .count() \
    .orderBy("count", ascending=False) \
    .show(5)

+---------+-------------+-----+
|Start Age|   Conditions|count|
+---------+-------------+-----+
|       18|Breast Cancer|  246|
|       65|Breast Cancer|    6|
|       45|Breast Cancer|    6|
|      N/A|Breast Cancer|    5|
|       40|Breast Cancer|    4|
+---------+-------------+-----+
only showing top 5 rows

+---------+----------------+-----+
|Start Age|      Conditions|count|
+---------+----------------+-----+
|       18|Multiple Myeloma|  173|
|       65|Multiple Myeloma|    2|
+---------+----------------+-----+

+---------+--------------+-----+
|Start Age|    Conditions|count|
+---------+--------------+-----+
|       18|Ovarian Cancer|  111|
|      N/A|Ovarian Cancer|    7|
|       70|Ovarian Cancer|    2|
+---------+--------------+-----+



## Anomaly Detection Analysis

In questo caso si è cercato di fare una anomaly detection basandosi sul numero di partecipanti ad ogni trial. Usando il metodo IQR (Inter-Quartile Range) si identificano i Trial che hanno un alto o un basso numero di partecipanti con la stessa condizione.

In [44]:
from pyspark.sql.functions import col, expr, percentile_approx, first

# Calculate Q1, Q3, and IQR for each condition
iqr_stats = df_clinical.filter(df_clinical["Number of Participants"].isNotNull()) \
    .groupBy("Conditions") \
    .agg(
        percentile_approx("Number of Participants", 0.25).alias("Q1"),
        percentile_approx("Number of Participants", 0.75).alias("Q3")
    ) \
    .withColumn("IQR", col("Q3") - col("Q1")) \
    .withColumn("Lower_Bound", col("Q1") - 1.5 * col("IQR")) \
    .withColumn("Upper_Bound", col("Q3") + 1.5 * col("IQR"))

# Join the IQR stats back to the original DataFrame
df_with_iqr = df_clinical.filter(df_clinical["Number of Participants"].isNotNull()) \
    .join(iqr_stats, on="Conditions", how="inner")

# Flag anomalies based on IQR
anomalies = df_with_iqr.filter(
    (col("Number of Participants") < col("Lower_Bound")) |
    (col("Number of Participants") > col("Upper_Bound"))
)

# Select records within the bounds
valid_records = df_with_iqr.filter(
    (col("Number of Participants") >= col("Lower_Bound")) &
    (col("Number of Participants") <= col("Upper_Bound"))
)

# Group by Conditions and select the first anomaly for each condition
unique_anomalies = anomalies.groupBy("Conditions").agg(
    first("Number of Participants").alias("Number of Participants"),
    first("Lower_Bound").alias("Lower_Bound"),
    first("Upper_Bound").alias("Upper_Bound"),
    first("Trial ID").alias("Trial ID")
)

# Show unique anomalies with full-length Conditions column
print("Anomalies:")
unique_anomalies.show(10,truncate=False)

# Show valid records with study information
print("Valid Records:")
valid_records.select("Conditions", "Number of Participants", "Lower_Bound", "Upper_Bound", "Trial ID").show(10,truncate=False)

Anomalies:
+----------------------------------+----------------------+-----------+-----------+-----------+
|Conditions                        |Number of Participants|Lower_Bound|Upper_Bound|Trial ID   |
+----------------------------------+----------------------+-----------+-----------+-----------+
|AL Amyloidosis                    |5000.0                |-20.0      |140.0      |NCT04839003|
|AML                               |246.0                 |367.0      |367.0      |NCT02152956|
|ARDS                              |740.0                 |-110.0     |226.0      |NCT03963622|
|Acute Coronary Syndromes          |10000.0               |2597.0     |2597.0     |NCT02438085|
|Acute Leukemia                    |40.0                  |101.0      |101.0      |NCT01385891|
|Acute Lymphoblastic Leukemia      |280.0                 |-70.0      |210.0      |NCT00358072|
|Acute Myelogenous Leukemia        |326.0                 |561.0      |593.0      |NCT00317642|
|Acute Promyelocytic Leukemia