In [0]:
fileroot = "/FileStore/tables/clinicaltrial_2021"

In [0]:
import os
os.environ['fileroot'] = fileroot

In [0]:
from pyspark.sql.types import *
                                    
clinicaltrailDF = spark.read.options(delimiter = "|", header = True ).csv(fileroot)
                                    
clinicaltrailDF.show(5)

+-----------+--------------------+--------------------+--------+----------+--------------------+----------+--------------------+-------------+
|         Id|             Sponsor|              Status|   Start|Completion|                Type|Submission|          Conditions|Interventions|
+-----------+--------------------+--------------------+--------+----------+--------------------+----------+--------------------+-------------+
|NCT02758028|The University of...|          Recruiting|Aug 2005|  Nov 2021|      Interventional|  Apr 2016|                null|         null|
|NCT02751957|     Duke University|           Completed|Jul 2016|  Jul 2020|      Interventional|  Apr 2016|Autistic Disorder...|         null|
|NCT02758483|Universidade Fede...|           Completed|Mar 2017|  Jan 2018|      Interventional|  Apr 2016|   Diabetes Mellitus|         null|
|NCT02759848|Istanbul Medeniye...|           Completed|Jan 2012|  Dec 2014|       Observational|  May 2016|Tuberculosis,Lung...|         null|

In [0]:
clinicaltrailDF.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Sponsor: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Start: string (nullable = true)
 |-- Completion: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Submission: string (nullable = true)
 |-- Conditions: string (nullable = true)
 |-- Interventions: string (nullable = true)



In [0]:
clinicaltrailDF.count()

Out[78]: 387261

In [0]:
clinicaltrailDF1 = clinicaltrailDF.groupBy('Type').count()

In [0]:
clinicaltrailDF1.show()

+--------------------+------+
|                Type| count|
+--------------------+------+
|Observational [Pa...|  8180|
|     Expanded Access|    69|
|      Interventional|301472|
|       Observational| 77540|
+--------------------+------+



In [0]:
from pyspark.sql.functions import desc

clinicaltrailDF1.sort(desc('count')).show()

+--------------------+------+
|                Type| count|
+--------------------+------+
|      Interventional|301472|
|       Observational| 77540|
|Observational [Pa...|  8180|
|     Expanded Access|    69|
+--------------------+------+



In [0]:
conditions_DF = clinicaltrailDF.select(clinicaltrailDF.Conditions)

In [0]:
conditions_DF.show(truncate = False)

+-----------------------------------------------------------------------------------------------------+
|Conditions                                                                                           |
+-----------------------------------------------------------------------------------------------------+
|null                                                                                                 |
|Autistic Disorder,Autism Spectrum Disorder                                                           |
|Diabetes Mellitus                                                                                    |
|Tuberculosis,Lung Diseases,Pulmonary Disease                                                         |
|Diverticular Diseases,Diverticulum,Diverticulosis                                                    |
|Asthma                                                                                               |
|Hypoventilation                                                

In [0]:
conditions_RDD = conditions_DF.rdd.map(lambda row:(row.Conditions))
conditions_RDD.take(5)

Out[84]: [None,
 'Autistic Disorder,Autism Spectrum Disorder',
 'Diabetes Mellitus',
 'Tuberculosis,Lung Diseases,Pulmonary Disease',
 'Diverticular Diseases,Diverticulum,Diverticulosis']

In [0]:
conditions_RDD1 = conditions_RDD.filter(lambda field: field != None) \
                                .flatMap(lambda line: line.split(',')) \
                                .map(lambda x: (x, ))

In [0]:
conditions_RDD1.take(20)

Out[86]: [('Autistic Disorder',),
 ('Autism Spectrum Disorder',),
 ('Diabetes Mellitus',),
 ('Tuberculosis',),
 ('Lung Diseases',),
 ('Pulmonary Disease',),
 ('Diverticular Diseases',),
 ('Diverticulum',),
 ('Diverticulosis',),
 ('Asthma',),
 ('Hypoventilation',),
 ('Lymphoma',),
 ('Myositis',),
 ('Diabetes Mellitus',),
 ('Hypertension',),
 ('Periodontal Diseases',),
 ('Diabetes Mellitus',),
 ('Appendicitis',),
 ('Stomach Ulcer',),
 ('Cholecystolithiasis',)]

In [0]:
conditions_DF1 = conditions_RDD1.toDF( ["Conditions"] )
conditions_DF1.show(truncate = False)

+------------------------+
|Conditions              |
+------------------------+
|Autistic Disorder       |
|Autism Spectrum Disorder|
|Diabetes Mellitus       |
|Tuberculosis            |
|Lung Diseases           |
|Pulmonary Disease       |
|Diverticular Diseases   |
|Diverticulum            |
|Diverticulosis          |
|Asthma                  |
|Hypoventilation         |
|Lymphoma                |
|Myositis                |
|Diabetes Mellitus       |
|Hypertension            |
|Periodontal Diseases    |
|Diabetes Mellitus       |
|Appendicitis            |
|Stomach Ulcer           |
|Cholecystolithiasis     |
+------------------------+
only showing top 20 rows



In [0]:
conditions_DF2 = conditions_DF1.groupBy('Conditions').count()

In [0]:
conditions_DF2.show()

+--------------------+-----+
|          Conditions|count|
+--------------------+-----+
|          Stillbirth|   80|
|  Pulmonary Embolism|  527|
|Adenocarcinoma of...|  207|
|       Hair Diseases|    7|
|         Adenomyosis|   70|
|  Alcohol Abstinence|    9|
|    Stomach Diseases|   36|
|Carbon Monoxide P...|   31|
|     Xanthogranuloma|    3|
|Mediastinal Emphy...|    3|
|Lecithin Choleste...|    3|
|Anti-Neutrophil C...|   44|
|  Iatrogenic Disease|   21|
|          Infections| 5323|
|Spinal Cord Diseases|  141|
|   Stomach Neoplasms| 1707|
|Bone Marrow Failu...|   61|
|Diabetic Ketoacid...|   40|
|Giant Cell Arteritis|  114|
|Lymphomatoid Gran...|   84|
+--------------------+-----+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import desc

conditions_DF2.sort(desc('count')).show(5)

+-----------------+-----+
|       Conditions|count|
+-----------------+-----+
|        Carcinoma|13389|
|Diabetes Mellitus|11080|
|        Neoplasms| 9371|
| Breast Neoplasms| 8640|
|         Syndrome| 8032|
+-----------------+-----+
only showing top 5 rows



In [0]:
fileroot1 = "/FileStore/tables/pharma"

In [0]:
import os
os.environ['fileroot'] = fileroot1

In [0]:
from pyspark.sql.types import *
                                    
pharma_DF = spark.read.options(delimiter = ",", header = True ).csv(fileroot1)
                                    
pharma_DF.show(5)

+--------------------+-------------------+--------------+------------------------+---------------------------------------------------------+------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------+--------------------+------------------+---------------------+-----+-------+-----------------------------+------------------+--------------+--------+-------+----+----------+--------------------+--------------------+------------------+-------------------+---------------------------+------------------------+---------------------------+--------------------+--------------------+
|             Company|     Parent_Company|Penalty_Amount|Subtraction_From_Penalty|Penalty_Amount_Adjusted_For_Eliminating_Multiple_Counting|Penalty_Year|Penalty_Date|       Offense_Group|     Primary_Offense|   Secondary_Offense|         Description|Level_of_Government|  Action_Type|              Agency|    Civil/Criminal|Prosecution_Ag

In [0]:
comps = clinicaltrailDF.select("Sponsor")

In [0]:
pharmaComps = pharma_DF.select("Parent_Company").withColumnRenamed("Parent_Company","Sponsor")

In [0]:
nonPharmaCompaniesTop10 = comps.join(pharmaComps, ["Sponsor"],"leftouter").where(pharmaComps["Sponsor"].isNull()).groupby("Sponsor").count().sort("count", ascending=False).limit(10)

In [0]:
nonPharmaCompaniesTop10.show(truncate = False)

+---------------------------------------+-----+
|Sponsor                                |count|
+---------------------------------------+-----+
|National Cancer Institute (NCI)        |3218 |
|M.D. Anderson Cancer Center            |2414 |
|Assistance Publique - Hôpitaux de Paris|2369 |
|Mayo Clinic                            |2300 |
|Merck Sharp & Dohme Corp.              |2243 |
|Assiut University                      |2154 |
|Novartis Pharmaceuticals               |2088 |
|Massachusetts General Hospital         |1971 |
|Cairo University                       |1928 |
|Hoffmann-La Roche                      |1828 |
+---------------------------------------+-----+



In [0]:
from pyspark.sql.functions import split
CompletedStatusTrial_DF = clinicaltrailDF.select('Completion', "Status").withColumn('Month', split(clinicaltrailDF['Completion'], ' ').getItem(0)) \
       .withColumn('Year', split(clinicaltrailDF['Completion'], ' ').getItem(1))

In [0]:
CompletedStatusTrial_DF.show(truncate =False)

+----------+----------------------+-----+----+
|Completion|Status                |Month|Year|
+----------+----------------------+-----+----+
|Nov 2021  |Recruiting            |Nov  |2021|
|Jul 2020  |Completed             |Jul  |2020|
|Jan 2018  |Completed             |Jan  |2018|
|Dec 2014  |Completed             |Dec  |2014|
|Sep 2020  |Active, not recruiting|Sep  |2020|
|Jan 2018  |Completed             |Jan  |2018|
|Jul 2017  |Unknown status        |Jul  |2017|
|Nov 2019  |Unknown status        |Nov  |2019|
|Jul 2017  |Completed             |Jul  |2017|
|Jan 2021  |Completed             |Jan  |2021|
|Oct 2016  |Completed             |Oct  |2016|
|Dec 2015  |Completed             |Dec  |2015|
|Mar 2016  |Completed             |Mar  |2016|
|May 2019  |Completed             |May  |2019|
|Dec 2021  |Active, not recruiting|Dec  |2021|
|Jun 2017  |Unknown status        |Jun  |2017|
|Jul 2016  |Terminated            |Jul  |2016|
|Oct 2019  |Completed             |Oct  |2019|
|Dec 2018  |U

In [0]:
CompletedStatusTrial_DF1 = CompletedStatusTrial_DF.select("Month","Year","Status") \
                                   .filter((CompletedStatusTrial_DF.Year  == "2021") & (CompletedStatusTrial_DF.Status  == "Completed"))\
                                   .groupby("Month").count().sort("count", ascending=False) \
                                   .show(truncate =False)

+-----+-----+
|Month|count|
+-----+-----+
|Mar  |1227 |
|Jan  |1131 |
|Jun  |1094 |
|May  |984  |
|Apr  |967  |
|Feb  |934  |
|Jul  |819  |
|Aug  |700  |
|Sep  |528  |
|Oct  |187  |
+-----+-----+

