#HW1 - Clinical Trials Analytics in PySpark

### 1. Spark Installation


Download and install Spark with all its dependencies

In [1]:
#Install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# download spark3.4.4
!wget -q https://apache.osuosl.org/spark/spark-3.4.4/spark-3.4.4-bin-hadoop3.tgz

# unzip it
!tar xf spark-3.4.4-bin-hadoop3.tgz

# install findspark
!pip install -q findspark

It's necessary to add enviroment variables to make visible runtime Spark to linux enviroment. We could install different versions of spark and decide later which one we would use.

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.4-bin-hadoop3"

You import the library `findspark` that allow to find and automatically initialize Spark configuration without having to manually configure enviroment variable and other options

In [3]:
import findspark
findspark.init()

# Spark version verification on cluster
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

assert "3." in sc.version, "Verify that the cluster Spark's version is 3.x"

In [4]:
sc

### 2. Loading the Dataset on Spark


RDD is a data representation in Spark, but for simplicity of coding and design, it became necessary to introduce a new, more responsive data model.

Spark SQL came to live, it offers to the users the opportunity ot use datasets/dataframes. They are objects tablelike: each column has a name and a type, each row is a combination of column values

The SQL engine on Spark translates sql-like operations in RDDs and gives at the end a RDD with the results.

In [5]:
# Libraries for SQL Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import split, explode, trim, count, sum, col, current_date, lower, regexp_replace
import time

spark = SparkSession(sc)
print(spark)

<pyspark.sql.session.SparkSession object at 0x7d3351da9050>


We can download the dataset from multiple sources (remote or local). Since it is a single csv file, not so big, let's download it on google drive enviroment, then mount the enviroment to the cluster file system

In [6]:
from google.colab import drive
drive.mount('/content/drive')

#Print all directories in my enviroment
print(os.listdir('/content/drive/MyDrive/Colab Notebooks/Big Data'))

homework1Path = "/content/drive/MyDrive/Colab Notebooks/Big Data/Homework 1"

#Save the path to the CSVs directory
csvPath = os.path.join(homework1Path,"dimensions_clinicalTrials.csv")
print(csvPath)

Mounted at /content/drive
['Esercitazione Spark 25 03 2025', 'Esercitazione Hive 25 03 2025', 'Homework 1']
/content/drive/MyDrive/Colab Notebooks/Big Data/Homework 1/dimensions_clinicalTrials.csv


Create the Dataset Object with the spark Object `read`

In [7]:
# Indentifies types for each column (float, integer, string, etc)
# Gets columns names of the first csv's row
ctDS = spark.read \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .option("multiline", "true") \
  .option("quote", "\"") \
  .option("escape", "\"") \
  .csv(csvPath)

# Print the schema
ctDS.printSchema()

root
 |-- Rank: integer (nullable = true)
 |-- Trial ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Brief title: string (nullable = true)
 |-- Acronym: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Start date: date (nullable = true)
 |-- Start Year: double (nullable = true)
 |-- End Date: date (nullable = true)
 |-- Completion Year: double (nullable = true)
 |-- Phase: string (nullable = true)
 |-- Study Type: string (nullable = true)
 |-- Study Design: string (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Recruitment Status: string (nullable = true)
 |-- Number of Participants: double (nullable = true)
 |-- Intervention: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Registry: string (nullable = true)
 |-- Investigators/Contacts: string (nullable = true)
 |-- Sponsors/Collaborators: string (nullable = true)
 |-- City of Sponsor/Collaborator: string (nullable = true

In [8]:
# Let's print some rows
ctDS.select("*").show(20, truncate = True)

+----+--------------+--------------------+--------------------+-----------------+--------------------+----------+----------+----------+---------------+-------+--------------+--------------------+--------------------+--------------------+----------------------+--------------------+------+--------------------+------------------+----------------------+----------------------+----------------------------+-----------------------------+-------------------------------+---------------------+------------+--------------+--------------------+-------------------------+--------------------+--------------------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+
|Rank|      Trial ID|               Title|         Brief title|          Acronym|            Abstract|Start date|Start Year|  End Date|Completion Year|  Phase|    Study Type|        Study Design|          Conditions|  Recruitment Status|Number of Participants|  

### 3. Preprocessing


At a first sight it could not be clear, but in this dataset there are a lot of duplicate rows. Clean the dataset with dropDuplicates()



In [9]:
# Dirty Dataset
count_dirty = ctDS.count()

# Cleaned Dataset
#ctDS = ctDS.fillna("NA_TEMP")
ctDS = ctDS.dropDuplicates(["Trial ID"])
#ctDS = ctDS.replace("NA_TEMP", None)
count_clean = ctDS.count()

print("Dirty: " + str(count_dirty))
print("Clean: " + str(count_clean))


Dirty: 15990
Clean: 8356


In [10]:
# Before the cleaning there were 7 rows with this "title"
ctDS.select("*").where(ctDS["Title"] == "Phase III Study on STem cElls Mobilization in Acute Myocardial Infarction").show(20,truncate=False)

+----+-----------+-------------------------------------------------------------------------+--------------------------------------------------------------------+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

There's still something wrong...Some columns have pseudo-structured data, like dictionaries and lists. In particular:

**Dictionaries**

*   Study Design

**Lists**

*   Conditions
*   Investigators/Contacts
*   Sponsors/Collaborators
*   City of Sponsor/Collaborator
*   State of Sponsor/Collaborator
*   Country of Sponsor/Collaborator
*   Fields of Research (ANZSRC 2020)
*   RCDC Categories
*   HRCS HC Categories
*   HRCS RAC Categories
*   Cancer Types
*   CSO Categories

For better analytics, we have changed manually the type of these columns with a preprocessing script.




In [11]:
from pyspark.sql.functions import expr

# All columns that are pseudo-lists
columnsList = ["Conditions","Investigators/Contacts","Sponsors/Collaborators","City of Sponsor/Collaborator","State of Sponsor/Collaborator","Country of Sponsor/Collaborator","Fields of Research (ANZSRC 2020)","RCDC Categories","HRCS HC Categories","HRCS RAC Categories","Cancer Types","CSO Categories"]

# Cicle for all lists
for column in columnsList:
  ctDS = ctDS.withColumn(
      column,
      # Wiht expr function, we use sql like preprocessing
      expr(f"""
        transform(
            split(`{column}`, ';'),
            x -> CASE WHEN trim(x) = '' THEN NULL ELSE trim(x) END
        )
      """
    )
  )



In [12]:
# For the only Dictionary
ctDS = ctDS.withColumn(
  "Study Design",
  expr("""
    map_from_entries(
            transform(
                split(`Study Design`, ';'),
                entry -> struct(
                    trim(split(entry, ':')[0]) as key,
                    IF(size(split(entry, ':')) > 1, trim(split(entry, ':')[1]), NULL) as value
                )
            )
        )
  """)
)

In [23]:
ctDS.select("Conditions").show(100,truncate = False)
#ctDS.select(col("Conditions")[0].alias("first_condition")).show(truncate = False)

+--------------------------------------------+
|Conditions                                  |
+--------------------------------------------+
|null                                        |
|null                                        |
|null                                        |
|null                                        |
|null                                        |
|null                                        |
|null                                        |
|null                                        |
|[Chronic myeloid leukaemia]                 |
|null                                        |
|null                                        |
|null                                        |
|null                                        |
|null                                        |
|[Epilepsy]                                  |
|[Colorectal cancer NOS]                     |
|null                                        |
|null                                        |
|null        

In [14]:
#ctDS.select("Study Design").show(truncate = False)
ctDS.select(col("Study Design")["Allocation"].alias("Allocation_type")).show(truncate = False)

+---------------+
|Allocation_type|
+---------------+
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
|null           |
+---------------+
only showing top 20 rows



### 4. Analytics

In [15]:
# Save all the results in csv format

# create the new repo path
resultsPath = os.path.join(homework1Path,"results")

In [16]:
# Number of studies started per year
studiesPerYear = ctDS.select("Start Year") \
  .filter(ctDS["Start Year"].isNotNull()) \
  .groupBy(ctDS["Start Year"]) \
  .count() \
  .withColumnRenamed("count","NumStudies per Year") \
  .orderBy(col("NumStudies per Year").desc())

studiesPerYear.show(50,truncate = False)

# Saving result in a csv
studiesPerYearResult = studiesPerYear.toPandas()
studiesPerYearResult.to_csv(os.path.join(resultsPath,"studiesPerYear.csv"))


+----------+-------------------+
|Start Year|NumStudies per Year|
+----------+-------------------+
|2021.0    |722                |
|2020.0    |661                |
|2019.0    |640                |
|2018.0    |589                |
|2022.0    |585                |
|2017.0    |548                |
|2015.0    |457                |
|2016.0    |446                |
|2023.0    |399                |
|2014.0    |397                |
|2013.0    |370                |
|2012.0    |367                |
|2011.0    |325                |
|2010.0    |295                |
|2009.0    |287                |
|2008.0    |287                |
|2007.0    |226                |
|2006.0    |200                |
|2005.0    |130                |
|2004.0    |101                |
|2003.0    |56                 |
|2001.0    |45                 |
|2002.0    |40                 |
|2024.0    |38                 |
|2000.0    |35                 |
|1998.0    |19                 |
|1999.0    |18                 |
|1997.0   

In [17]:
# Average number of participants per study title

# Check if the title column is unique, maybe the question is refering title type
tot_rows = ctDS.count()

distinct_rows = ctDS.select("Title").distinct().count()

if tot_rows == distinct_rows:
  print("They're the same")
else:
  print("They're NOT the same")

# As shown they aren't the same, so we can group by title
print("Tot_rows: " + str(tot_rows))
print("Distinct_rows: " + str(distinct_rows))

They're NOT the same
Tot_rows: 8356
Distinct_rows: 8334


In [18]:
# Check which Title is duplicated, but the trials are different
duplicates = ctDS.groupBy("Title") \
  .count() \
  .filter(col("count") > 1)

duplicates.select("Title","count").show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Title                                                                                                                                                                                                                                                                        |count|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|SAlute e LaVoro in Chirurgia Oncologica (SALVO)                                                                                                                      

In [24]:
from pyspark.sql.functions import avg

# Average number of participants per study title
averagePerTitle = ctDS.select("Title","Number of Participants") \
  .filter(ctDS["Title"].isNotNull()) \
  .groupBy("Title") \
  .agg(avg("Number of Participants").alias("Average per Title")) \
  .orderBy(col("Average per Title").desc())

averagePerTitle.show(20,truncate=False)

averagePerTitleResult = averagePerTitle.toPandas()
averagePerTitleResult.to_csv(os.path.join(resultsPath,"averagePerTitle.csv"))


+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|Title                                                                                                                                                                                                                                                                                                                                                                                                                                                               

In [28]:
# Top 10 most frequent medical conditions
ctDS.select(explode(ctDS["Conditions"]).alias("Condition")) \
  .filter(col("Condition").isNotNull()) \
  .groupBy("Condition") \
  .count() \
  .withColumnRenamed("count","Count per Condition") \
  .orderBy(col("Count per Condition").desc()) \
  .limit(10).show(truncate=False)

+-----------------------+-------------------+
|Condition              |Count per Condition|
+-----------------------+-------------------+
|Breast Cancer          |157                |
|Multiple Myeloma       |83                 |
|Coronary Artery Disease|71                 |
|Heart Failure          |62                 |
|Ovarian Cancer         |60                 |
|Lung Cancer            |60                 |
|Colorectal Cancer      |56                 |
|Ulcerative Colitis     |52                 |
|Melanoma               |51                 |
|Prostate Cancer        |47                 |
+-----------------------+-------------------+

