Defining Reusable code to unzip all files

In [0]:
file = "/FileStore/tables/clinicaltrial_2023/clinicaltrial_2023.csv"

dbutils.fs.head(file)

[Truncated to first 65536 bytes]
Out[1]: '"Id\tStudy Title\tAcronym\tStatus\tConditions\tInterventions\tSponsor\tCollaborators\tEnrollment\tFunder Type\tType\tStudy Design\tStart\tCompletion",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,\r\n"NCT03630471\tEffectiveness of a Problem-solving Intervention for Common Adolescent Mental Health Problems in India\tPRIDE\tCOMPLETED\tMental Health Issue (E.G.", Depression, Psychosis, Personality Disorder," Substance Abuse)\tBEHAVIORAL: PRIDE \'Step 1\' problem-solving intervention|BEHAVIORAL: Enhanced usual care\tSangath\tHarvard Medical School (HMS and HSDM)|London School of Hygiene and Tropical Medicine\t250.0\tOTHER\tINTERVENTIONAL\tAllocation: RANDOMIZED|Intervention Model: PARALLEL|Masking: DOUBLE (INVESTIGATOR"," OUTCOMES_ASSESSOR)|Primary Purpose: TREATMENT\t2018-08-20\t2019-02-

In [0]:
# Create an rdd for clinicaltrial dataset
 
rdd1 = sc.textFile(file)

rdd1.take(3)

Out[3]: ['"Id\tStudy Title\tAcronym\tStatus\tConditions\tInterventions\tSponsor\tCollaborators\tEnrollment\tFunder Type\tType\tStudy Design\tStart\tCompletion",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,',
 '"NCT03630471\tEffectiveness of a Problem-solving Intervention for Common Adolescent Mental Health Problems in India\tPRIDE\tCOMPLETED\tMental Health Issue (E.G.", Depression, Psychosis, Personality Disorder," Substance Abuse)\tBEHAVIORAL: PRIDE \'Step 1\' problem-solving intervention|BEHAVIORAL: Enhanced usual care\tSangath\tHarvard Medical School (HMS and HSDM)|London School of Hygiene and Tropical Medicine\t250.0\tOTHER\tINTERVENTIONAL\tAllocation: RANDOMIZED|Intervention Model: PARALLEL|Masking: DOUBLE (INVESTIGATOR"," OUTCOMES_ASSESSOR)|Primary Purpose: TREATMENT\t2018-08-20\t2019-02-28",,,,,,,,,,,,,,,,,,,,,,,,,,,,

In [0]:
# Create an rdd for pharma dataset

pharma_file = "/FileStore/tables/pharma/pharma.csv"

pharma_rdd = sc.textFile(pharma_file)

pharma_rdd.take(2)

Out[50]: ['"Company","Parent_Company","Penalty_Amount","Subtraction_From_Penalty","Penalty_Amount_Adjusted_For_Eliminating_Multiple_Counting","Penalty_Year","Penalty_Date","Offense_Group","Primary_Offense","Secondary_Offense","Description","Level_of_Government","Action_Type","Agency","Civil/Criminal","Prosecution_Agreement","Court","Case_ID","Private_Litigation_Case_Title","Lawsuit_Resolution","Facility_State","City","Address","Zip","NAICS_Code","NAICS_Translation","HQ_Country_of_Parent","HQ_State_of_Parent","Ownership_Structure","Parent_Company_Stock_Ticker","Major_Industry_of_Parent","Specific_Industry_of_Parent","Info_Source","Notes"',
 '"Abbott Laboratories","Abbott Laboratories","$5,475,000","$0","$5,475,000","2013","20131227","government-contracting-related offenses","False Claims Act and related","kickbacks and bribery","Abbott Laboratories agreed to $5.475 million to resolve allegations that it violated the False Claims Act by paying kickbacks to induce doctors to implant the c

In [0]:
# Clean the rdd before converting to dataframe

def eliminate_double_quotes(resp):
    return [i.replace('"', '') for i in resp]

# convert date to same format
def convert_dates(date):
    date[12] = date[12] + '-01' if len(date[12]) == 7 else date[12]
    date[13] = date[13] + '-01' if len(date[13]) == 7 else date[13]
    return date

def populate_with_strings(item, max_fig):
    return item + [''] * (max_fig - len(item))

clinicaltrial_2023_rdd = rdd1. \
                         map(lambda y: y.replace(',', '')). \
                         map(lambda y: y.split('\t')). \
                         map(eliminate_double_quotes). \
                         map(lambda y: populate_with_strings(y, 14)). \
                         filter(lambda y: y[0] != 'Id'). \
                         map(convert_dates)

In [0]:
clinicaltrial_2023_rdd.take(1)

Out[10]: [['NCT03630471',
  'Effectiveness of a Problem-solving Intervention for Common Adolescent Mental Health Problems in India',
  'PRIDE',
  'COMPLETED',
  'Mental Health Issue (E.G. Depression Psychosis Personality Disorder Substance Abuse)',
  "BEHAVIORAL: PRIDE 'Step 1' problem-solving intervention|BEHAVIORAL: Enhanced usual care",
  'Sangath',
  'Harvard Medical School (HMS and HSDM)|London School of Hygiene and Tropical Medicine',
  '250.0',
  'OTHER',
  'INTERVENTIONAL',
  'Allocation: RANDOMIZED|Intervention Model: PARALLEL|Masking: DOUBLE (INVESTIGATOR OUTCOMES_ASSESSOR)|Primary Purpose: TREATMENT',
  '2018-08-20',
  '2019-02-28']]

In [0]:
rdd1.first()

Out[13]: '"Id\tStudy Title\tAcronym\tStatus\tConditions\tInterventions\tSponsor\tCollaborators\tEnrollment\tFunder Type\tType\tStudy Design\tStart\tCompletion",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,'

In [0]:
# Create DataFrame

from pyspark.sql.types import *

mySchema = StructType ([
            StructField("Id", StringType()),
            StructField("Study Title", StringType()),
            StructField("Acronynm", StringType()),
            StructField("Status", StringType()),
            StructField("Conditions", StringType()),
            StructField("Interventions", StringType()),
            StructField("Sponsor", StringType()),
            StructField("Collaborators", StringType()),
            StructField("Enrollment", StringType()),
            StructField("Funder Type", StringType()),
            StructField("Type", StringType()),
            StructField("Study Design", StringType()),
            StructField("Start", StringType()),
            StructField("Completion", StringType()),
])

df = spark.createDataFrame(clinicaltrial_2023_rdd , mySchema)

df.show(3)

+-----------+--------------------+--------+----------+--------------------+--------------------+-------------------+--------------------+----------+-----------+--------------+--------------------+----------+----------+
|         Id|         Study Title|Acronynm|    Status|          Conditions|       Interventions|            Sponsor|       Collaborators|Enrollment|Funder Type|          Type|        Study Design|     Start|Completion|
+-----------+--------------------+--------+----------+--------------------+--------------------+-------------------+--------------------+----------+-----------+--------------+--------------------+----------+----------+
|NCT03630471|Effectiveness of ...|   PRIDE| COMPLETED|Mental Health Iss...|BEHAVIORAL: PRIDE...|            Sangath|Harvard Medical S...|     250.0|      OTHER|INTERVENTIONAL|Allocation: RANDO...|2018-08-20|2019-02-28|
|NCT05992571|Oral Ketone Monoe...|        |RECRUITING|Cerebrovascular F...|OTHER: Placebo|DI...|McMaster University|Alzheime

In [0]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Study Title: string (nullable = true)
 |-- Acronynm: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Interventions: string (nullable = true)
 |-- Sponsor: string (nullable = true)
 |-- Collaborators: string (nullable = true)
 |-- Enrollment: string (nullable = true)
 |-- Funder Type: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Study Design: string (nullable = true)
 |-- Start: string (nullable = true)
 |-- Completion: string (nullable = true)



In [0]:
# Change datetype 
from pyspark.sql.functions import to_date

df = df.withColumn("Enrollment", df.Enrollment.cast("int"))

df = df.withColumn("Start", to_date(df["Start"], "yyyy-MM-dd"))
df = df.withColumn("Completion", to_date(df["Completion"], "yyyy-MM-dd"))

df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Study Title: string (nullable = true)
 |-- Acronynm: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Interventions: string (nullable = true)
 |-- Sponsor: string (nullable = true)
 |-- Collaborators: string (nullable = true)
 |-- Enrollment: integer (nullable = true)
 |-- Funder Type: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Study Design: string (nullable = true)
 |-- Start: date (nullable = true)
 |-- Completion: date (nullable = true)



In [0]:
# Create an rdd of pharmaceutical companies and convert to df
from pyspark.sql.types import StringType

pharma_com = pharma_rdd \
    .flatMap(lambda x: x.split('\n')) \
    .filter(lambda line: not line.startswith('"Company')) \
    .map(lambda line: line.split(",")) \
    .map(lambda x: (x[1][1:-1])) \
    .filter(lambda x: x[0].strip() != "") \
    .filter(lambda x: x[0] != '')


pharma_df = spark.createDataFrame(pharma_com, StringType())

pharma_df.show(5)

+-------------------+
|              value|
+-------------------+
|Abbott Laboratories|
|             AbbVie|
|             AbbVie|
|               Inc.|
|  Johnson & Johnson|
+-------------------+
only showing top 5 rows



In [0]:
number_of_study = df.select("ID").distinct().count()

print("Total number of Study:", number_of_study)

Total number of Study: 483422


In [0]:
Study_Type = df.groupBy("Type").count()

Study_Type.show()

+---------------+------+
|           Type| count|
+---------------+------+
| INTERVENTIONAL|371382|
|  OBSERVATIONAL|110221|
|EXPANDED_ACCESS|   928|
|               |   891|
+---------------+------+



In [0]:
from pyspark.sql.functions import col

conditions = df.groupBy("Conditions").count()

Top_conditions = conditions.orderBy(col("count").desc()).limit(5)

Top_conditions.show()

+---------------+-----+
|     Conditions|count|
+---------------+-----+
|        Healthy| 7997|
|  Breast Cancer| 4556|
|Prostate Cancer| 2650|
|         Asthma| 2309|
|        Obesity| 2284|
+---------------+-----+



In [0]:

completion_df = df.filter(col("Completion").startswith("2023"))
completion_df = completion_df.withColumn("Month", month("Completion"))
monthly_df = completion_df.groupBy("Month").count()
sorted_monthly_df = monthly_df.orderBy("Month")


# Output
sorted_monthly_df.show()

+-----+-----+
|Month|count|
+-----+-----+
|    1| 2567|
|    2| 2112|
|    3| 2947|
|    4| 2402|
|    5| 2627|
|    6| 3634|
|    7| 2752|
|    8| 2696|
|    9| 2878|
|   10| 2616|
|   11| 2313|
|   12| 9283|
+-----+-----+

