In [0]:
clinical_data_date = 2021

clinicial_trial_data_path = "/FileStore/tables/clinicaltrial_{clinical_data_date}.csv".format(clinical_data_date=clinical_data_date)
mesh_data_path = '/FileStore/tables/mesh.csv'
pharma_data_path = '/FileStore/tables/pharma.csv'


clinical_data_df = spark.read.options(header=True, delimiter='|', inferSchema=True).csv(clinicial_trial_data_path)
mesh_data_df = spark.read.options(header=True, delimiter=',', inferSchema=True).csv(mesh_data_path)
pharma_data_df = spark.read.options(header=True, delimiter=',', inferSchema=True).csv(pharma_data_path)

clinical_rdd = clinical_data_df.rdd
mesh_rdd = mesh_data_df.rdd
pharma_rdd = pharma_data_df.rdd

#####These are two user defined functions. We could have also used directly a lambda function, but to showcase that user defined functions could also be used, I have created these two functions with similar functionality of pyspark reduceByKey and takeOrdered.

In [0]:
# The function 'reduce_by_key' helps in the transformation is used to merge the values of each key using an associative reduce function

def reduce_by_key(a,b):
  return a+b

# The function 'sort_results' helps to sort the results in descending order

def sort_results(res_rdd):
  return -res_rdd[1]

In [0]:
# 1

clinical_rdd.distinct().count()

Out[3]: 387261

In [0]:
# 2

rdd_1 = clinical_rdd.filter(lambda c: c.Type).map(lambda c: c.Type)
rdd_2 = rdd_1.map(lambda c: c.strip()).map(lambda c: (c,1))
rdd_2.reduceByKey(reduce_by_key).takeOrdered(4, key=sort_results)

Out[4]: [('Interventional', 301472),
 ('Observational', 77540),
 ('Observational [Patient Registry]', 8180),
 ('Expanded Access', 69)]

In [0]:
# 3

rdd_1 = clinical_rdd.filter(lambda c: c.Conditions).map(lambda c: c.Conditions.strip())
rdd_2 = rdd_1.flatMap(lambda c: c.split(',')).map(lambda c: (c,1))
rdd_2.reduceByKey(reduce_by_key).takeOrdered(5, key=sort_results)

Out[5]: [('Carcinoma', 13389),
 ('Diabetes Mellitus', 11080),
 ('Neoplasms', 9371),
 ('Breast Neoplasms', 8640),
 ('Syndrome', 8032)]

In [0]:
# 4

rdd_1 = clinical_rdd.filter(lambda c: c.Conditions).map(lambda c: c.Conditions)
crdd_2 = rdd_1.flatMap(lambda c: c.split(',')).map(lambda c: (c,1))

mrdd_1 = mesh_rdd.map(lambda c: (c.term, c.tree)).map(lambda c: (c[0], c[1].split('.'))).map(lambda c: (c[0], c[1][0]))

joint_data_rdd = crdd_2.join(mrdd_1).map(lambda c: (c[1][1], c[1][0]))
joint_data_rdd.reduceByKey(reduce_by_key).takeOrdered(5, key=sort_results)

Out[6]: [('C04', 143994),
 ('C23', 136079),
 ('C01', 106674),
 ('C14', 94523),
 ('C10', 92310)]

In [0]:
# 5

parents = pharma_rdd.map(lambda p: p.Parent_Company).collect()

def check_element_in_list(row):
  if row not in parents:
    return True
  else:
    return False


rdd_1 = clinical_rdd.filter(lambda c: c.Sponsor).map(lambda c: c.Sponsor)
rdd_2 = rdd_1.filter(check_element_in_list).map(lambda c: (c,1))
rdd_2.reduceByKey(reduce_by_key).takeOrdered(10, key=sort_results)

Out[7]: [('National Cancer Institute (NCI)', 3218),
 ('M.D. Anderson Cancer Center', 2414),
 ('Assistance Publique - Hôpitaux de Paris', 2369),
 ('Mayo Clinic', 2300),
 ('Merck Sharp & Dohme Corp.', 2243),
 ('Assiut University', 2154),
 ('Novartis Pharmaceuticals', 2088),
 ('Massachusetts General Hospital', 1971),
 ('Cairo University', 1928),
 ('Hoffmann-La Roche', 1828)]

In [0]:
# 6

rdd_1 = clinical_rdd.filter(lambda c: c.Completion).map(lambda c: (c.Completion, c.Status))
rdd_2 = rdd_1.filter(lambda c: c[1]=='Completed').map(lambda c: c[0].split(' '))
rdd_3 = rdd_2.filter(lambda c: c[1]==str(clinical_data_date)).map(lambda c: (c[0], 1))
rdd_3.reduceByKey(reduce_by_key).takeOrdered(12, key=sort_results)

Out[8]: [('Mar', 1227),
 ('Jan', 1131),
 ('Jun', 1094),
 ('May', 984),
 ('Apr', 967),
 ('Feb', 934),
 ('Jul', 819),
 ('Aug', 700),
 ('Sep', 528),
 ('Oct', 187)]