In [0]:
# Defines the Clinical Year Version

year_int = 2021
year_str = str(year_int)

In [0]:
# Shell - Defines the Clinical Year Version

import os
os.environ['year'] = year_str

In [0]:
# Creates a User-Defined Function to Check for the 'clinicaltrial_<year>' & 'pharma' Files

def check_file(files, path):
    file_present = [dbutils.fs.ls(f"{path}/{file}") for file in files]
    for file, is_present in zip(files, file_present):
        if len(is_present) > 0:
            print(f"{file} is present in {path}/")
        else:
            print(f"{file} is not present in {path}/")

In [0]:
# Checks for the 'clinicaltrial_<year>' & 'pharma' ZIPs

path = "/FileStore/tables"
files = ["clinicaltrial_" + year_str + ".zip", "pharma.zip"]
check_file(files, path)

clinicaltrial_2021.zip is present in /FileStore/tables/
pharma.zip is present in /FileStore/tables/


In [0]:
# Clears the Existing CSVs in /FileStore/tables/ to Avoid Errors

deleted_file = [  
    "/FileStore/tables/clinicaltrial_" + year_str + ".csv",  
    "/FileStore/tables/pharma.csv",
    "/FileStore/tables/Q2_Output.csv",  
    "/FileStore/tables/Q3_Output.csv",  
    "/FileStore/tables/Q4_Output.csv",  
    "/FileStore/tables/Q5_Output.csv",  
    "/FileStore/tables/FA1_Output.csv",  
    "/FileStore/tables/FA2_Output.csv",  
    "/FileStore/tables/FA3_Output.csv"]

for delete in deleted_file:
    dbutils.fs.rm(delete, True)

In [0]:
# Shell - Clears the Existing CSVs & ZIPs in TEMP to Avoid Errors

In [0]:
%sh 
rm /tmp/clinicaltrial_$year.zip
rm /tmp/clinicaltrial_$year.csv
rm /tmp/pharma.zip
rm /tmp/pharma.csv

rm: cannot remove '/tmp/clinicaltrial_2021.csv': No such file or directory
rm: cannot remove '/tmp/pharma.csv': No such file or directory


In [0]:
# Copies the ZIPs from /FileStore/tables/ to TEMP

dbutils.fs.cp("/FileStore/tables/clinicaltrial_" + year_str + ".zip", "file:/tmp/")
dbutils.fs.cp("/FileStore/tables/pharma.zip" , "file:/tmp/")

Out[8]: True

In [0]:
# Shell - Checks for the 'clinicaltrial_<year>' & 'pharma' ZIPs in TEMP then Unzips

In [0]:
%sh
ls /tmp/clinicaltrial_$year.zip
ls /tmp/pharma.zip

/tmp/clinicaltrial_2021.zip
/tmp/pharma.zip


In [0]:
%sh
unzip -d/tmp/ /tmp/clinicaltrial_$year.zip
unzip -d/tmp/ /tmp/pharma.zip

Archive:  /tmp/clinicaltrial_2021.zip
  inflating: /tmp/clinicaltrial_2021.csv  
Archive:  /tmp/pharma.zip
  inflating: /tmp/pharma.csv         


In [0]:
# Moves the 'clinicaltrial_<year>' & 'pharma' CSVs from TEMP to /FileStore/tables/

dbutils.fs.mv("file:/tmp/clinicaltrial_" + year_str + ".csv", "/FileStore/tables/", True)
dbutils.fs.mv("file:/tmp/pharma.csv", "/FileStore/tables/", True)

Out[12]: True

In [0]:
# Checks for the 'clinicaltrial_<year>' & 'pharma' CSVs

path = "/FileStore/tables"
files = ["clinicaltrial_" + year_str + ".csv", "pharma.csv"]
check_file(files, path)

clinicaltrial_2021.csv is present in /FileStore/tables/
pharma.csv is present in /FileStore/tables/


In [0]:
# Creates the RDD for 'clinicaltrial_<year>' & 'pharma'

clinicaltrial = sc.textFile("/FileStore/tables/clinicaltrial_" + year_str + ".csv") \
                .map(lambda line: line.split("|"))
pharma = sc.textFile("/FileStore/tables/pharma.csv") \
                .map(lambda line: line.split("\",\""))

In [0]:
# Views the Content of 'clinicaltrial_<year>'

for row in clinicaltrial.take(10):
    print(row)

['Id', 'Sponsor', 'Status', 'Start', 'Completion', 'Type', 'Submission', 'Conditions', 'Interventions']
['NCT02758028', 'The University of Hong Kong', 'Recruiting', 'Aug 2005', 'Nov 2021', 'Interventional', 'Apr 2016', '', '']
['NCT02751957', 'Duke University', 'Completed', 'Jul 2016', 'Jul 2020', 'Interventional', 'Apr 2016', 'Autistic Disorder,Autism Spectrum Disorder', '']
['NCT02758483', 'Universidade Federal do Rio de Janeiro', 'Completed', 'Mar 2017', 'Jan 2018', 'Interventional', 'Apr 2016', 'Diabetes Mellitus', '']
['NCT02759848', 'Istanbul Medeniyet University', 'Completed', 'Jan 2012', 'Dec 2014', 'Observational', 'May 2016', 'Tuberculosis,Lung Diseases,Pulmonary Disease', '']
['NCT02758860', 'University of Roma La Sapienza', 'Active, not recruiting', 'Jun 2016', 'Sep 2020', 'Observational [Patient Registry]', 'Apr 2016', 'Diverticular Diseases,Diverticulum,Diverticulosis', '']
['NCT02757209', 'Consorzio Futuro in Ricerca', 'Completed', 'Apr 2016', 'Jan 2018', 'Interventional

In [0]:
# Views the Content of 'pharma'

for row in pharma.take(10):
    print(row)

['"Company', 'Parent_Company', 'Penalty_Amount', 'Subtraction_From_Penalty', 'Penalty_Amount_Adjusted_For_Eliminating_Multiple_Counting', 'Penalty_Year', 'Penalty_Date', 'Offense_Group', 'Primary_Offense', 'Secondary_Offense', 'Description', 'Level_of_Government', 'Action_Type', 'Agency', 'Civil/Criminal', 'Prosecution_Agreement', 'Court', 'Case_ID', 'Private_Litigation_Case_Title', 'Lawsuit_Resolution', 'Facility_State', 'City', 'Address', 'Zip', 'NAICS_Code', 'NAICS_Translation', 'HQ_Country_of_Parent', 'HQ_State_of_Parent', 'Ownership_Structure', 'Parent_Company_Stock_Ticker', 'Major_Industry_of_Parent', 'Specific_Industry_of_Parent', 'Info_Source', 'Notes"']
['"Abbott Laboratories', 'Abbott Laboratories', '$5,475,000', '$0', '$5,475,000', '2013', '20131227', 'government-contracting-related offenses', 'False Claims Act and related', 'kickbacks and bribery', "Abbott Laboratories agreed to $5.475 million to resolve allegations that it violated the False Claims Act by paying kickbacks 

In [0]:
# Maps the Headers in 'clinicaltrial_<year>' & 'pharma'

clinicaltrial_header = clinicaltrial.first()
pharma_header = pharma.first()

In [0]:
# Counts the Null Values in 'clinicaltrial_<year>'

missing_counts_clinicaltrial = clinicaltrial \
                .flatMap(lambda x: [col_name for col_name, value in zip(clinicaltrial_header, x) if value == '']) \
                .countByValue()

for col_name, missing_count in missing_counts_clinicaltrial.items():
    print(f"{col_name}: {missing_count}")

Conditions: 65131
Interventions: 253837
Completion: 13260


In [0]:
# Counts the Null Values in 'pharma'

missing_counts_pharma = pharma \
                .flatMap(lambda x: [col_name for col_name, value in zip(pharma_header, x) if value == '']) \
                .countByValue()

for col_name, missing_count in missing_counts_pharma.items():
    print(f"{col_name}: {missing_count}")

Prosecution_Agreement: 949
Court: 936
Case_ID: 771
Private_Litigation_Case_Title: 936
Lawsuit_Resolution: 936
Facility_State: 467
City: 769
Address: 845
Zip: 826
NAICS_Code: 848
NAICS_Translation: 865
Secondary_Offense: 808
Description: 298
HQ_State_of_Parent: 434
Parent_Company_Stock_Ticker: 246
Agency: 31
Info_Source: 3


In [0]:
# Removes the Headers in 'clinicaltrial_<year>' & 'pharma'

clinicaltrial = clinicaltrial.filter(lambda x: x != clinicaltrial_header)
pharma = pharma.filter(lambda x: x != pharma_header)

In [0]:
# Question 1 - Counts the Number of Distinct Studies - Base Version

total_studies = clinicaltrial.map(lambda x: x[0]) \
                .distinct() \
                .count() 

print(f"Total Studies {year_int}: {total_studies}")

Total Studies 2021: 387261


In [0]:
# Question 1 - Counts the Number of Distinct Studies - User-Defined Function Utilized

def count_unique_values(data, column):
    return data.map(lambda x: x[column]).distinct().count()

unique_count = count_unique_values(clinicaltrial, 0)
print(f"Total Value {year_int}: {unique_count}")

Total Value 2021: 387261


In [0]:
# Question 2 - Lists the Types of Studies with Frequency

types_frequency = clinicaltrial \
        .map(lambda x: x[5]) \
        .countByValue()
    
types_sorted = sorted(types_frequency.items(), key=lambda x: -x[1])

print(f"Types of Studies {year_int}")
for types, count in types_sorted:
    print(f"{types}: {count}")

Types of Studies 2021
Interventional: 301472
Observational: 77540
Observational [Patient Registry]: 8180
Expanded Access: 69


In [0]:
# Question 3 - Lists the Top 5 Conditions with Frequency

conditions_frequency = clinicaltrial \
                        .flatMap(lambda x: x[7].split(',')) \
                        .filter(lambda x: x != '') \
                        .countByValue()

conditions_sorted = sorted(conditions_frequency.items(), key=lambda x: -x[1])[:5]

print(f"Top 5 Conditions {year_int}")
for condition, count in conditions_sorted:
    print(f"{condition}: {count}")

Top 5 Conditions 2021
Carcinoma: 13389
Diabetes Mellitus: 11080
Neoplasms: 9371
Breast Neoplasms: 8640
Syndrome: 8032


In [0]:
# Question 4 - Lists the Top 10 Most Common Non-Pharmaceutical Sponsors with Clinical Trials

pharma_filtered = pharma \
                .map(lambda x: x[1]) \
                .collect()

clinicaltrial_filtered = clinicaltrial \
                .filter(lambda x: x[1] not in pharma_filtered)

sponsor_count = clinicaltrial_filtered \
                .map(lambda x: (x[1], 1)) \
                .reduceByKey(lambda a, b: a + b) \
                .sortBy(lambda x: -x[1]) \
                .take(10)

print(f"Top 10 Most Common Non-Pharmaceutical Sponsors {year_int}")
for sponsor, count in sponsor_count:
    print(f"{sponsor}: {count}")

Top 10 Most Common Non-Pharmaceutical Sponsors 2021
National Cancer Institute (NCI): 3218
M.D. Anderson Cancer Center: 2414
Assistance Publique - Hôpitaux de Paris: 2369
Mayo Clinic: 2300
Merck Sharp & Dohme Corp.: 2243
Assiut University: 2154
Novartis Pharmaceuticals: 2088
Massachusetts General Hospital: 1971
Cairo University: 1928
Hoffmann-La Roche: 1828


In [0]:
# Question 5 - Lists the Completed Studies Each Month in a Given Year

from datetime import datetime

completed_studies = clinicaltrial \
                    .filter(lambda x: x[2] == "Completed" and all(x[4:6])) \
                    .map(lambda x: (datetime.strptime(x[4], '%b %Y').replace(day=1), 1)) \
                    .filter(lambda x: x[0].year == year_int) \
                    .reduceByKey(lambda a, b: a + b) \
                    .sortByKey() \
                    .collect()

print(f"Completed Studies {year_int}")
for month, count in completed_studies:
    print(f"{month.strftime('%b')}: {count}")

Completed Studies 2021
Jan: 1131
Feb: 934
Mar: 1227
Apr: 967
May: 984
Jun: 1094
Jul: 819
Aug: 700
Sep: 528
Oct: 187


In [0]:
# Further Analyis 1 - Lists the Sponsors Recruiting with Clinical Trials

sponsor_count = clinicaltrial \
                 .filter(lambda x: x[2] == "Recruiting") \
                 .map(lambda x: (x[1], 1)) \
                 .reduceByKey(lambda a, b: a + b) \
                 .sortBy(lambda x: -x[1]) \
                 .take(10)

print(f"Sponsors Currently Recruiting {year_int}")
for sponsor, count in sponsor_count:
    print(f"{sponsor}: {count}")

Sponsors Currently Recruiting 2021
M.D. Anderson Cancer Center: 601
Mayo Clinic: 499
Assistance Publique - Hôpitaux de Paris: 450
National Cancer Institute (NCI): 391
Memorial Sloan Kettering Cancer Center: 354
Massachusetts General Hospital: 352
Hospices Civils de Lyon: 321
Sun Yat-sen University: 315
VA Office of Research and Development: 312
University of Colorado, Denver: 278


In [0]:
# Part 1 of Creating the Visuals on Power Bi

In [0]:
# Transforms the Further Analysis 1 Output from RDD to DF

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("Sponsor", StringType(), True),
    StructField("Count", IntegerType(), True)
])

sponsor_count = spark.createDataFrame(sponsor_count, schema)

sponsor_count.display()

Sponsor,Count
M.D. Anderson Cancer Center,601
Mayo Clinic,499
Assistance Publique - Hôpitaux de Paris,450
National Cancer Institute (NCI),391
Memorial Sloan Kettering Cancer Center,354
Massachusetts General Hospital,352
Hospices Civils de Lyon,321
Sun Yat-sen University,315
VA Office of Research and Development,312
"University of Colorado, Denver",278


In [0]:
# Creates the Further Analysis 1 Output CSV

sponsor_count.write.csv("/FileStore/tables/FA1_Output.csv")

In [0]:
# Part 2 on DF Notebook