RDD IMPLEMENTATION

In [0]:
%pip install bokeh

Python interpreter will be restarted.
Collecting bokeh
  Downloading bokeh-2.4.2-py3-none-any.whl (18.5 MB)
Collecting PyYAML>=3.10
  Downloading PyYAML-6.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (701 kB)
Collecting typing-extensions>=3.10.0
  Downloading typing_extensions-4.2.0-py3-none-any.whl (24 kB)
Installing collected packages: typing-extensions, PyYAML, bokeh
Successfully installed PyYAML-6.0 bokeh-2.4.2 typing-extensions-4.2.0
Python interpreter will be restarted.


In [0]:
year = '2021'
ctdataset = '/FileStore/tables/clinicaltrial_2021.csv'
meshdataset = '/FileStore/tables/mesh.csv'
pharmadataset = '/FileStore/tables/pharma.csv'

In [0]:
inputrdd = sc.textFile(ctdataset)
inputrdd.take(5)

Out[2]: ['Id|Sponsor|Status|Start|Completion|Type|Submission|Conditions|Interventions',
 'NCT02758028|The University of Hong Kong|Recruiting|Aug 2005|Nov 2021|Interventional|Apr 2016||',
 'NCT02751957|Duke University|Completed|Jul 2016|Jul 2020|Interventional|Apr 2016|Autistic Disorder,Autism Spectrum Disorder|',
 'NCT02758483|Universidade Federal do Rio de Janeiro|Completed|Mar 2017|Jan 2018|Interventional|Apr 2016|Diabetes Mellitus|',
 'NCT02759848|Istanbul Medeniyet University|Completed|Jan 2012|Dec 2014|Observational|May 2016|Tuberculosis,Lung Diseases,Pulmonary Disease|']

In [0]:
#Defining a function to remove header line from the rdd
def removeheader(rdd):
    header = rdd.first()
    return rdd.filter(lambda x: x != header)

In [0]:
#To remove header
ctnoheaderrdd = removeheader(inputrdd)

#Define a function that counts distinct studies
def CountID(rdd):
    return rdd.map(lambda x: x.split('|')[0]).distinct().count()

countrdd = CountID(ctnoheaderrdd)

print('The number of studies in the Clinical Trial dataset is -', countrdd)

The number of studies in the Clinical Trial dataset is - 387261


In [0]:
def ListTypes(rdd):
    return rdd.map(lambda x: (x.split('|')[5],1)).\
    reduceByKey(lambda v1,v2: v1+v2).\
    sortBy(lambda x: x[1], False)

typeslistrdd = ListTypes(ctnoheaderrdd)

print('Types of Studies and their Frequencies')
typeslistrdd.collect()

Types of Studies and their Frequencies
Out[24]: [('Interventional', 301472),
 ('Observational', 77540),
 ('Observational [Patient Registry]', 8180),
 ('Expanded Access', 69)]

In [0]:
def Conditions(rdd):
     return rdd.map(lambda line: line.split('|')).\
            map(lambda x: x[7]).\
            flatMap(lambda x: x.split(',')).\
            filter(lambda x: x !='').\
            map(lambda x: (x, 1)).\
            reduceByKey(lambda v1,v2: v1+v2).\
            sortBy(lambda x: x[1], False)

conditionsrdd = Conditions(ctnoheaderrdd)
conditionsrdd.take(5)

Out[6]: [('Carcinoma', 13389),
 ('Diabetes Mellitus', 11080),
 ('Neoplasms', 9371),
 ('Breast Neoplasms', 8640),
 ('Syndrome', 8032)]

In [0]:
mesh = sc.textFile(meshdataset)
meshrdd = removeheader(mesh)
meshrdd.take(10)

Out[7]: ['Calcimycin,D03.633.100.221.173',
 'A-23187,D03.633.100.221.173',
 'Temefos,D02.705.400.625.800',
 'Temefos,D02.705.539.345.800',
 'Temefos,D02.886.300.692.800',
 'Abate,D02.705.400.625.800',
 'Abate,D02.705.539.345.800',
 'Abate,D02.886.300.692.800',
 'Difos,D02.705.400.625.800',
 'Difos,D02.705.539.345.800']

In [0]:
#key the mesh dataset by tree (i.e. conditions)
keyedMeshrdd = meshrdd.keyBy(lambda line: line.split(',')[0])

#Joining both keyedmeshrdd to the conditionsrdd created from ctnoheaderrdd in #3 above
CTMeshrdd = keyedMeshrdd.join(conditionsrdd)
CTMeshrdd.take(5)
#Split and sort the joint rdd to get the most (5) frequent roots
CTMeshRootrdd = CTMeshrdd.map(lambda entry: (entry[1][0].split(','), entry[1][1])).\
                        map(lambda x: (x[0][1].split('.'), x[1])).\
                        map(lambda x: (x[0][0], x[1])).\
                        reduceByKey(lambda v1,v2: v1+v2).\
                        sortBy(lambda x: x[1], False)
CTMeshRootrdd.take(5)

Out[8]: [('C04', 143994),
 ('C23', 136079),
 ('C01', 106674),
 ('C14', 94523),
 ('C10', 92310)]

In [0]:
pharma = sc.textFile(pharmadataset)
pharmardd = removeheader(pharma)
pharmardd.take(2)

Out[9]: ['"Abbott Laboratories","Abbott Laboratories","$5,475,000","$0","$5,475,000","2013","20131227","government-contracting-related offenses","False Claims Act and related","kickbacks and bribery","Abbott Laboratories agreed to $5.475 million to resolve allegations that it violated the False Claims Act by paying kickbacks to induce doctors to implant the company\'s carotid, biliary and peripheral vascular products.","federal","agency action","Justice Department Civil Division","civil","","","","","","","","","","","","USA","Illinois","publicly traded","ABT","pharmaceuticals","pharmaceuticals","https://www.justice.gov/opa/pr/abbott-laboratories-pays-us-5475-million-settle-claims-company-paid-kickbacks-physicians",""',
 '"Abbott Laboratories Inc.","AbbVie","$1,500,000,000","$0","$1,500,000,000","2012","20120507","healthcare-related offenses","off-label or unapproved promotion of medical products","","Global Health Care Company Abbott Laboratories Inc. has pleaded guilty and agreed to 

In [0]:
#Create a paired rdd from the pharma rdd using parent company column
pharmacomp = pharmardd.map(lambda line: (line.split('","')[1],1))

#Create a paired rdd from the ctnoheaderrdd using sponsor column
CTsponsorsrdd = ctnoheaderrdd.map(lambda line: (line.split('|')[1],1))

tennonpharmardd = CTsponsorsrdd.leftOuterJoin(pharmacomp).\
                  subtractByKey(pharmacomp).\
                  map(lambda x: x[0]).\
                  flatMap(lambda v:[(v,1)]).\
                  reduceByKey(lambda v1,v2: v1+v2).\
                  sortBy(lambda x: x[1],False)                   
tennonpharmardd.take(10)

Out[10]: [('National Cancer Institute (NCI)', 3218),
 ('M.D. Anderson Cancer Center', 2414),
 ('Assistance Publique - Hôpitaux de Paris', 2369),
 ('Mayo Clinic', 2300),
 ('Merck Sharp & Dohme Corp.', 2243),
 ('Assiut University', 2154),
 ('Novartis Pharmaceuticals', 2088),
 ('Massachusetts General Hospital', 1971),
 ('Cairo University', 1928),
 ('Hoffmann-La Roche', 1828)]

In [0]:
#Create a dictionary to sort result by month
import calendar
d = {i:e for e,i in enumerate(calendar.month_abbr[1:],1)}

completedCTrdd = ctnoheaderrdd.map(lambda line: line.split('|')).\
                map(lambda x: (x[2],x[4])).\
                filter(lambda x: year in x[1]).\
                filter(lambda x: 'Completed' in x[0]).\
                map(lambda x: x[1]).\
                map(lambda x: x.split(' ')).\
                map(lambda x: (x[0], 1)).\
                reduceByKey(lambda v1,v2: v1+v2).\
                sortBy(lambda x: d.get(x[0]))
completedCTrdd.collect()

Out[11]: [('Jan', 1131),
 ('Feb', 934),
 ('Mar', 1227),
 ('Apr', 967),
 ('May', 984),
 ('Jun', 1094),
 ('Jul', 819),
 ('Aug', 700),
 ('Sep', 528),
 ('Oct', 187)]

In [0]:
import pandas as pd
from bokeh.plotting import figure, output_file, show
from bokeh.models import ColumnDataSource
from bokeh.palettes import cividis
from bokeh.plotting import figure
from bokeh.transform import factor_cmap
from bokeh.models.tools import HoverTool
from bokeh.embed import components, file_html

output_file('completed_studies.html')

convertedrdd = completedCTrdd.toDF()
CTrddDF = convertedrdd.withColumnRenamed('_1','Month').withColumnRenamed('_2','Count')
data = CTrddDF.toPandas()
source = ColumnDataSource(data)

months = source.data['Month'].tolist()
p = figure(x_range= months)

color_map = factor_cmap(field_name='Month', palette=cividis(12), factors=months)

p.vbar(x='Month', top='Count', source=source, width=0.70, color=color_map)

p.title.text ='Completed studies per month - '+ year
p.title.align = 'center'
p.title.text_font_size = '24px'
p.xaxis.axis_label = 'Months'
p.yaxis.axis_label = 'No of completed studies'

hover = HoverTool()
hover.tooltips = [('No of completed studies is', '@Count')]
hover.mode = 'vline'
p.add_tools(hover)

show(p)
displayHTML('completed_studies.html')