In [0]:
from pyspark.sql.functions import explode
from pyspark.sql.functions import split ,col
from pyspark.sql.functions import unix_timestamp ,col

In [0]:
filepath = '/FileStore/tables/clinicaltrial_2021.csv'
clinicalDf =spark.read.options(delimiter='|', header ='true').csv(filepath)
clinicalDf.show(5)

+-----------+--------------------+--------------------+--------+----------+--------------------+----------+--------------------+-------------+
|         Id|             Sponsor|              Status|   Start|Completion|                Type|Submission|          Conditions|Interventions|
+-----------+--------------------+--------------------+--------+----------+--------------------+----------+--------------------+-------------+
|NCT02758028|The University of...|          Recruiting|Aug 2005|  Nov 2021|      Interventional|  Apr 2016|                null|         null|
|NCT02751957|     Duke University|           Completed|Jul 2016|  Jul 2020|      Interventional|  Apr 2016|Autistic Disorder...|         null|
|NCT02758483|Universidade Fede...|           Completed|Mar 2017|  Jan 2018|      Interventional|  Apr 2016|   Diabetes Mellitus|         null|
|NCT02759848|Istanbul Medeniye...|           Completed|Jan 2012|  Dec 2014|       Observational|  May 2016|Tuberculosis,Lung...|         null|

In [0]:
#CASE 1 -> Number of Studies
clinicalDf.count()

Out[3]: 387261

In [0]:
#CASE 2 -> Types of Studies
typeDF = clinicalDf.groupBy('Type').count()
typeDF.sort('count', ascending = False).display()

Type,count
Interventional,301472
Observational,77540
Observational [Patient Registry],8180
Expanded Access,69


In [0]:
#CASE 3 -> Top 5 Conditions
explodeDf = clinicalDf.withColumn('Conditions',explode(split(clinicalDf['conditions'],',')))
condDF = explodeDf.select('conditions')
sorted_condDF = condDF.groupBy('Conditions').count()
sorted_condDF.sort('count', ascending = False).na.drop().limit(5).display()

Conditions,count
Carcinoma,13389
Diabetes Mellitus,11080
Neoplasms,9371
Breast Neoplasms,8640
Syndrome,8032


In [0]:
#CASE 4 -> 5 most frequent roots
meshDF = spark.read.options(delimiter=',', header ='true').csv('/FileStore/tables/mesh.csv')
rootDF =  meshDF.withColumn('roots',col('tree').substr(1, 3))
joinedDF = condDF.join(rootDF,condDF.conditions == rootDF.term,'inner')
condition_rootDF = joinedDF.groupBy('roots').count()
condition_rootDF.sort('count', ascending = False).limit(5).display()

roots,count
C04,143994
C23,136079
C01,106674
C14,94523
C10,92310


In [0]:
#CASE 5 -> 10 most common Sponsors (excluding pharmaceutical companies)
pharmaDF = spark.read.options(delimiter=',', header ='true').csv('/FileStore/tables/pharma.csv')
parentcoy_DF = pharmaDF.select('Parent_Company')

In [0]:
#CASE 5 -> 10 most common Sponsors (excluding pharmaceutical companies)
sponsordf = clinicalDf.select('Sponsor')
sponsor_pharmadf = sponsordf.join(parentcoy_DF,sponsordf.Sponsor == parentcoy_DF.Parent_Company, 'leftanti') 
nopharma_sponsorDF = sponsor_pharmadf.groupBy('Sponsor').count()
nopharma_sponsorDF.sort('count', ascending = False).limit(10).display()

Sponsor,count
National Cancer Institute (NCI),3218
M.D. Anderson Cancer Center,2414
Assistance Publique - Hôpitaux de Paris,2369
Mayo Clinic,2300
Merck Sharp & Dohme Corp.,2243
Assiut University,2154
Novartis Pharmaceuticals,2088
Massachusetts General Hospital,1971
Cairo University,1928
Hoffmann-La Roche,1828


In [0]:
#CASE 6 -> Number of completed studies each month of 2021
completeddf = clinicalDf.select('Status','Completion')
completeddf_status = completeddf.filter(completeddf.Status=='Completed')
completeddf_2021 =completeddf_status.filter(completeddf.Completion.contains('2021'))
completeddf_month = completeddf_2021.select('completion').withColumn('month', col('completion').substr(1, 3))
completeddf_sorted = completeddf_month.groupBy('month').count()
completeddf_sorted.sort(unix_timestamp(col('month'),'MMM')).display()

month,count
Jan,1131
Feb,934
Mar,1227
Apr,967
May,984
Jun,1094
Jul,819
Aug,700
Sep,528
Oct,187
