In [0]:
clinical_tiral_file = "clinicaltrial_2023"
pharma_file = "pharma"

dbutils.fs.head(f"/FileStore/tables/{clinical_tiral_file}.csv")

[Truncated to first 65536 bytes]


'"Id\tStudy Title\tAcronym\tStatus\tConditions\tInterventions\tSponsor\tCollaborators\tEnrollment\tFunder Type\tType\tStudy Design\tStart\tCompletion",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,\r\n"NCT03630471\tEffectiveness of a Problem-solving Intervention for Common Adolescent Mental Health Problems in India\tPRIDE\tCOMPLETED\tMental Health Issue (E.G.", Depression, Psychosis, Personality Disorder," Substance Abuse)\tBEHAVIORAL: PRIDE \'Step 1\' problem-solving intervention|BEHAVIORAL: Enhanced usual care\tSangath\tHarvard Medical School (HMS and HSDM)|London School of Hygiene and Tropical Medicine\t250.0\tOTHER\tINTERVENTIONAL\tAllocation: RANDOMIZED|Intervention Model: PARALLEL|Masking: DOUBLE (INVESTIGATOR"," OUTCOMES_ASSESSOR)|Primary Purpose: TREATMENT\t2018-08-20\t2019-02-28",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,

# CREATING AND CLEANING DATAFRAMES

In [0]:
# CREATING DATAFRAME
from pyspark.sql.types import *
from pyspark.sql.functions import *

delimiter = {
    "clinicaltrial_2023": "\t",
    "clinicaltrial_2021": "|",
    "clinicaltrial_2020": "|",
    "pharma": ","
}

# Function for creating the dataframe using using different methods for different datasets
def create_dataframe(_file):
    if _file == "clinicaltrial_2023":
        rdd = sc.textFile(f"/FileStore/tables/{_file}.csv").map(lambda x: x.rstrip(",").strip('"')).map(lambda row: row.split(delimiter[_file]))
        head = rdd.first()
        rdd = rdd.map(lambda row: row + [" " for i in range(len(head) - len(row))] if len(row) < len(head) else row )
        df = rdd.toDF()
        first = df.first()
        for col in range(0, len(list(first))):
           df = df.withColumnRenamed(f"_{col + 1}", list(first)[col])
        df = df.withColumn('index', monotonically_increasing_id())
        return df.filter(~df.index.isin([0])).drop('index')
    else:
        return spark.read.csv(f"/FileStore/tables/{_file}.csv", sep=delimiter[_file], header = True)

In [0]:
# Creating the clinical trial dataframe using the function from previous cell

ct_dataframe = create_dataframe(clinical_tiral_file)
ct_dataframe.show(20)

+-----------+--------------------+----------+------------------+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+----------+
|         Id|         Study Title|   Acronym|            Status|          Conditions|       Interventions|             Sponsor|       Collaborators|Enrollment|Funder Type|          Type|        Study Design|     Start|Completion|
+-----------+--------------------+----------+------------------+--------------------+--------------------+--------------------+--------------------+----------+-----------+--------------+--------------------+----------+----------+
|NCT03630471|Effectiveness of ...|     PRIDE|         COMPLETED|Mental Health Iss...|BEHAVIORAL: PRIDE...|             Sangath|Harvard Medical S...|     250.0|      OTHER|INTERVENTIONAL|Allocation: RANDO...|2018-08-20|2019-02-28|
|NCT05992571|Oral Ketone Monoe...|          |        RECRUITING|Cerebrovascular 

# QUESTION 1

In [0]:
# QUESTION 1: The number of studies in the dataset

ct_dataframe.distinct().count() # Counting the distinct dataset

483422

# QUESTION 2

In [0]:
# QUESTION 2: list all the types (as contained in the Type column) of studies in the dataset along with the frequencies of each type


typeColumnCount = ct_dataframe.groupBy('Type').count().orderBy('count', ascending=False) # Getting the sum of each distinct data in the 'Type' column

typeColumnCount = typeColumnCount.filter(~ct_dataframe.Type.isin(["", " "])) # Filtering empty rows in the colum

typeColumnCount.show(truncate=False) # Displaying the result

+---------------+------+
|Type           |count |
+---------------+------+
|INTERVENTIONAL |371382|
|OBSERVATIONAL  |110221|
|EXPANDED_ACCESS|928   |
+---------------+------+



# QUESTION 3

In [0]:
# QUESTION 3: The top 5 conditions (from Conditions) with their frequencies
conditions_delimeter = {
    "clinicaltrial_2023": "\|",
    "clinicaltrial_2021": ",",
    "clinicaltrial_2020": ",",
}

# Spliting the rows in the Conditions column using the delimeter from "conditions_delimeter" dictionary and exploding to destructure grouped data into individual rows.
explodedConditionsColumn = ct_dataframe.withColumn('Conditions', explode(split('Conditions', conditions_delimeter[clinical_tiral_file])))

# Getting the sum of each distinct data in the exploded Conditions column.
ConditionsColumnCount = explodedConditionsColumn.groupBy('Conditions').count().orderBy('count', ascending=False)

# Filtering to remove empty rows
ConditionsColumnCount = ConditionsColumnCount.filter("Conditions != ''")

# Displaying the results
ConditionsColumnCount.show(5, truncate=False)

+-------------+-----+
|Conditions   |count|
+-------------+-----+
|Healthy      |9731 |
|Breast Cancer|7502 |
|Obesity      |6549 |
|Stroke       |4071 |
|Hypertension |4020 |
+-------------+-----+
only showing top 5 rows



# QUESTION 4

In [0]:
# QUESTION 4: Find the 10 most common sponsors that are not pharmaceutical companies, along with the number of clinical trials they have sponsored.

# Extracting the "Parent_Company" column from the pharma file and converting to a list
pharma_list = create_dataframe(pharma_file).select("Parent_Company").rdd.flatMap(lambda x: x).collect()

ct_sponsor_dataframe = ct_dataframe.select("Sponsor") # Extracting the "Sponsor" column from the clinical trial file

# Getting the sum of each distinct data in the "Sponsor" column
sponsorColumnCount = ct_sponsor_dataframe.groupBy("Sponsor").count().orderBy("count", ascending=False)

# Filtering the Sponsor column using the pharma list to find the difference between both columns
nonPharmaSponsors = sponsorColumnCount.filter(~ct_sponsor_dataframe.Sponsor.isin(pharma_list))

nonPharmaSponsors.show(10, truncate=False) # Displaying the result

+-------------------------------------------------------------+-----+
|Sponsor                                                      |count|
+-------------------------------------------------------------+-----+
|National Cancer Institute (NCI)                              |3410 |
|Assiut University                                            |3335 |
|Cairo University                                             |3023 |
|Assistance Publique - Hôpitaux de Paris                      |2951 |
|Mayo Clinic                                                  |2766 |
|M.D. Anderson Cancer Center                                  |2702 |
|Novartis Pharmaceuticals                                     |2393 |
|National Institute of Allergy and Infectious Diseases (NIAID)|2340 |
|Massachusetts General Hospital                               |2263 |
|National Taiwan University Hospital                          |2181 |
+-------------------------------------------------------------+-----+
only showing top 10 

# QUESTION 5

In [0]:
# QUESTION 5: Plot number of completed studies for each month in 2023

orderedMonths = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] # Defining an ordered list of months

# Conditional loop to split column based on file because of different delimiters
if clinical_tiral_file.endswith("2023"):
    # Splits the completion column based on the "-" delimeter and creates new columns from the extracted values
    dateDataframe = ct_dataframe.withColumn('Year', split('Completion', "-")[0]).withColumn('Month', split('Completion', "-")[1]) 
else:
    # Splits the completion column based on the " " delimeter and creates new columns from the extracted values
    dateDataframe = ct_dataframe.withColumn('Year', split('Completion', " ")[1]).withColumn('Month', split('Completion', " ")[0])

    # Removes rows with empty values in month column by filtering based on the orderedMonth list which was initialy defined
    dateDataframe = dateDataframe.filter(dateDataframe.Month.isin(orderedMonths))

    month_index_udf = udf(lambda x: orderedMonths.index(x) + 1) # Udf to convert month name to corresponding numbers using the orderedMonth

    dateDataframe = dateDataframe.withColumn('Month', month_index_udf(col('Month'))) # Udf to convert month name to corresponding numbers using the orderedMonth

# Selecting rows with Status column as "COMPLETED" or "Completed".
completionDateDataframe = dateDataframe.filter(dateDataframe.Status.isin(["COMPLETED", "Completed"])).select("Month", "Year", "Status")

# Converting Month column data type to integer to run arithemetic operations on it.
completionDateDataframe = completionDateDataframe.withColumn('Month', col('Month').cast('int'))

month_name_udf = udf(lambda x: orderedMonths[x - 1]) # Udf to replace month number to month name using the month number as index

# Filtering dates by year (2023, 2021, 2020), then counting dataframe grouped by month. then using UDF to convert month number to month name.
completionDateDataframe = completionDateDataframe.filter(completionDateDataframe.Year.isin([clinical_tiral_file.split("_")[1]])).groupBy("Month").count().orderBy("Month", ascending=True).withColumn('Month', month_name_udf(col('Month')))

completionDateDataframe.show() # Displaying final results

+-----+-----+
|Month|count|
+-----+-----+
|  Jan| 1494|
|  Feb| 1272|
|  Mar| 1552|
|  Apr| 1324|
|  May| 1415|
|  Jun| 1619|
|  Jul| 1360|
|  Aug| 1230|
|  Sep| 1152|
|  Oct| 1058|
|  Nov|  909|
|  Dec| 1082|
+-----+-----+



# FURTHER ANALYSIS DATAFRAME

In [0]:
# Futher Analysis: Distinct Number of studies which have been terminated

terminatedClinicalTrials = ct_dataframe.filter(ct_dataframe.Status.isin(["Terminated", "TERMINATED"])) # Filtering dataframe for terminated studies

terminatedClinicalTrialsCount = terminatedClinicalTrials.distinct().count() # Getting the distinct count of the filtered dataframe

terminatedClinicalTrialsCount # Displaying the results

28022