In [1]:
# This notebook is data analysis of the medical using Databricks

In [2]:
import pyspark
from pyspark.sql.types import *
import pyspark.sql.functions as func
from pyspark.sql.functions import row_number
from pyspark.sql.window import *


In [3]:
# imaging.csv, diagnoses.csv and hearing_evaluation.csv have been uploaded to the databricks file system
# The following steps loads the csv files onto Databricks dataframes

In [4]:
# The following loads imaging.csv onto a dataframe name imaging_df

file_directory = "/FileStore/tables/imaging-2.csv/"
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

imaging_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_directory)

# This displays the dataframe created
display(imaging_df)

In [5]:
# The following loads diagnoses.csv onto a dataframe name diagnoses_df

file_directory1 = "/FileStore/tables/diagnoses.csv/"
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

diagnoses_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_directory1)

display(diagnoses_df)

In [6]:
# The following loads hearing_evaluation.csv onto a dataframe name hearing_evaluation_df

file_directory2 = "/FileStore/tables/hearing_evaluation.csv"
file_type = "csv"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

hearing_evaluation_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_directory2)

display(hearing_evaluation_df)

In [7]:
#Problem statement 1 - 5 most diagnosis codes along with their frequencies
# group diagnois codes, count and sort descending 
most_common_diagnosis_codes = diagnoses_df.groupby("diagnosis_code").count().sort("count", ascending=False)
most_common_diagnosis_codes.show(5)

In [8]:
# Problem statement 2
# 5 most common diagnosis codes at the time of hearing evaluation

# join diagnoses_df and hearing_evaluation_df on patient id and diagnosis age/evaluation age
join_diagnoses_df_and_hearing_evaluation_df = diagnoses_df.join(hearing_evaluation_df,(diagnoses_df.patient_id==hearing_evaluation_df.patient_id) & (diagnoses_df.diagnosis_age==hearing_evaluation_df.evaluation_age) ,'inner')

# group diagnois codes, count and sort descending 
most_common_diagnosis_codes_at_hearing = join_diagnoses_df_and_hearing_evaluation_df.groupby("diagnosis_code").count().sort('count', ascending=False)

most_common_diagnosis_codes_at_hearing.show(5)

In [9]:
#Problem statement 3 
#highest number of diagnoses assigned to a single pateint 
# group patient_id, count, sort descending
diagnoses_to_a_single_patient = diagnoses_df.groupBy("patient_id").count().sort("count", ascending=False)

diagnoses_to_a_single_patient.show(1)

#Problem Statement - additional analysis

# To make sure dataset is sensible we can check the five patients with the most diagnoses
diagnoses_to_a_single_patient.show(5)

In [10]:
#PROBLEM STATEMENT 4
#Total number of different people with hearing problem compared to total number of people who have had a hearing evaluation 

#Total Number of people who had a hearing evaluation
total_hearing_evaluation = hearing_evaluation_df.count()
print(total_hearing_evaluation)
#Total number of different(unique) people
unique_hearing_evaluation = hearing_evaluation_df.select("patient_id").distinct()
print(unique_hearing_evaluation.count())

#Total number of people diagnosed with hearing loss
total_hearing_problems = hearing_evaluation_df.where((hearing_evaluation_df["severity_of_hearing_loss"]).isin ('Moderate','Mild','Slight','Moderately Severe','Severe','Profound'))
print(total_hearing_problems.count())

#Total number of different(unique) patients who have been diagnosed with hearing problems 
distinct_hearing_problems= total_hearing_problems.select("patient_id").distinct()
print(distinct_hearing_problems.count())

# percentage of those with hearing problem compared to total number of people who have had a hearing evaluation

percentage_of_hearing_problems = distinct_hearing_problems.count()/unique_hearing_evaluation.count() * 100
print(percentage_of_hearing_problems)

In [11]:
#percentages of each severity of hearing loss for the total number of people with hearing loss

# total number diagnosed with hearing loss
total = total_hearing_problems.count()

# patients diagnose with hearing loss
diagnoses_severity_of_hearing_loss = hearing_evaluation_df.where(hearing_evaluation_df["severity_of_hearing_loss"].isin("Mild","Moderately Severe","Slight","Profound", "Severe","Moderate")).groupBy("severity_of_hearing_loss").count()

diagnoses_severity_of_hearing_loss.show()

#percentage of each severity of hearing loss
mild = float(diagnoses_severity_of_hearing_loss.where(diagnoses_severity_of_hearing_loss.severity_of_hearing_loss == 'Mild').select('count').collect()[0]['count']/total)*100
slight = float(diagnoses_severity_of_hearing_loss.where(diagnoses_severity_of_hearing_loss.severity_of_hearing_loss == 'Slight').select('count').collect()[0]['count']/total)*100
moderate = float(diagnoses_severity_of_hearing_loss.where(diagnoses_severity_of_hearing_loss.severity_of_hearing_loss == 'Moderate').select('count').collect()[0]['count']/total)*100
mod_severe = float(diagnoses_severity_of_hearing_loss.where(diagnoses_severity_of_hearing_loss.severity_of_hearing_loss == 'Moderately Severe').select('count').collect()[0]['count']/total)*100
severe = float(diagnoses_severity_of_hearing_loss.where(diagnoses_severity_of_hearing_loss.severity_of_hearing_loss == 'Severe').select('count').collect()[0]['count']/total)*100
profound = float(diagnoses_severity_of_hearing_loss.where(diagnoses_severity_of_hearing_loss.severity_of_hearing_loss == 'Profound').select('count').collect()[0]['count']/total)*100
print('Mild: %f' %(mild))
print('Slight: %f'%(slight))
print('Moderate: %f'%(moderate))
print('Moderately Severe: %f'%(mod_severe))
print('Severe: %f'%(severe))
print('Profound: %f'%(profound))


In [12]:
#PROBLEM STATEMENT 5
#Focusing on the patients with hearing loss, the client would like to know the average number of CT / MT / SC investigations performed on them. Both the number of investigations and the total number of patients with hearing loss as well as the average should be presented.

#number of patients with hearing loss
print(distinct_hearing_problems.count())

#Total number of investigations 
print(imaging_df.count())

#Number of patients with hearing loss that scans performed on them 
join_hearing_and_imaging = distinct_hearing_problems.join(imaging_df,(distinct_hearing_problems.patient_id==imaging_df.patient_id),'inner')
print(join_hearing_and_imaging.count())

# Working out the perecentage of investigations performed on those with hearing problems
average  = join_hearing_and_imaging.count()/distinct_hearing_problems.count()
percentage_average = average * 100
print(percentage_average)


In [13]:
pt = join_hearing_and_imaging.groupBy('modality').count()
pt.show(5)

total_inv = float(join_hearing_and_imaging.count())
total1 = float(distinct_hearing_problems.count())
MR = float(pt.where(pt.modality == 'MR').select('count').collect()[0]['count']/total_inv)*100
CT = float(pt.where(pt.modality == 'CT').select('count').collect()[0]['count']/total_inv)*100
SC = float(pt.where(pt.modality == 'SC').select('count').collect()[0]['count']/total_inv)*100
print('Total patients = %d' %total1)
print('Total Investigations = %d' %total_inv)
print('MR scans = %f'%MR)
print('CT scans = %f'%CT)
print('SC scans = %f'%SC)


In [14]:
#problem statement 6
#Looking specifically at CT imaging, the client would like to know the year group (i.e. you will need to group the ages of encounter into years) which is given the greatest number of CTs.

greatest_number_of_CTs_on_age = imaging_df.where(imaging_df['modality'].isin('CT'))\
.groupBy(imaging_df.imaging_age.cast(IntegerType())).count().sort('count', ascending=False)\
.withColumnRenamed('CAST(imaging_age AS INT)','age')

greatest_number_of_CTs_on_age.show(10)

In [15]:
#PROBLEM STATEMENT 7
#For each year group, the client would like to visualize the most frequent diagnosis code

#round diagnosis age, group diagnosis age and diagnosis code and count frequncy of each diagnosis code for each group

frequency_of_diagnosis_code = diagnoses_df.groupby(diagnoses_df.diagnosis_age.cast(IntegerType()),"diagnosis_code").count().sort('count', ascending=False)\
.withColumnRenamed('CAST(diagnosis_age AS INT)','age_group')

#For each group show only the most frequent diagnosis code and its count   

most_frequent_diagnosis_code = frequency_of_diagnosis_code.withColumn("maxrow_num", func.max("count").over(Window.partitionBy("age_group")))\
.where(func.col("count") == func.col("maxrow_num")).drop("maxrow_num").sort('count', ascending=False)

most_frequent_diagnosis_code.show()


In [16]:
from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
from pyspark.rdd import RDD

In [17]:
imaging = sc.textFile("/FileStore/tables/imaging-2.csv/")
diagnoses = sc.textFile("/FileStore/tables/diagnoses.csv/")
hearing_evaluation = sc.textFile("/FileStore/tables/hearing_evaluation.csv")

In [18]:
#Remove headers, "" and comma delimter in each of the three csv files 

# imaging evaluation 
imaging_header = imaging.first()
imaging_rdd = imaging.filter(lambda row: row != imaging_header ).map(lambda x: x.replace('"','').split(","))
# diagnoses evalaution 
diagnoses_header = diagnoses.first()
diagnoses_rdd=diagnoses.filter(lambda row: row != diagnoses_header ).map(lambda x: x.replace('"','').split(","))
# hearing evaluation
hearing_header = hearing_evaluation.first()
hearing_evaluation_rdd = hearing_evaluation.filter(lambda row: row != hearing_header).map(lambda x: x.replace('"','').split(","))

# check output
imaging_rdd.take(5)

In [19]:
# Problem Statement 5 
#The 5 most common diagnosis codes along with their frequencies

most_common_diagnosis_codes_rdd =diagnoses_rdd.map(lambda x: (x[1],1)).reduceByKey(lambda x1,y1: x1+y1).sortBy(lambda x: x[1], False)

most_common_diagnosis_codes_rdd.take(5)

In [20]:
#5 most common diagnosis codes at the time of hearing evaluation
# select patient id, age and diagnoses code from diagnoses and select patient id, age from hearing evaluation and join both rdds together
join_rdd = diagnoses_rdd.map(lambda x: ((x[0],x[2]),(x[1]))).join(hearing_evaluation_rdd.map(lambda x: ((x[0],x[1]),(x[3]))))

#assign key value pair to diagnosis codes, combine values with the same key and then count frequency
most_common_diagnosis_codes_at_hearing_rdd = join_rdd.map(lambda x: (x[1])).map(lambda x: (x[0])).map(lambda x: (x,1)).reduceByKey(lambda x1,y1: x1+y1).sortBy(lambda x: x[1], False)

# five most common diagnoses code at time of hearing
most_common_diagnosis_codes_at_hearing_rdd.take(5)

In [21]:
# the highest number of diagnoses assigned to a single patient.
diagnoses_to_a_single_patient_rdd = diagnoses_rdd.map(lambda x: (x[0],1)).reduceByKey(lambda v1,v2: v1+v2).sortBy(lambda x: x[1], False)
diagnoses_to_a_single_patient_rdd.take(1)


In [22]:
#Total number of different people with hearing problem compared to total number of people who have had a hearing evaluation 

#Total Number of people who had a hearing evaluation
total_hearing_evaluation_rdd = hearing_evaluation_rdd.count()
print(total_hearing_evaluation_rdd)
#Total number of different(unique) people
unique_hearing_evaluation_rdd = hearing_evaluation_rdd.map(lambda x: x[0]).distinct().count()
print(unique_hearing_evaluation_rdd)
#Total number of people who have hearing problems, this number will include patients who have been diagnosed multiple of times 

severity_of_hearing_loss = ['Moderate','Mild','Slight','Moderately Severe','Severe','Profound']
total_hearing_problems_rdd=hearing_evaluation_rdd.filter(lambda x: x[2] in severity_of_hearing_loss)

print(total_hearing_problems_rdd.count())

#Total number of different(unique) patients who have been diagnosed with hearing problems 
distinct_hearing_problems_rdd= total_hearing_problems_rdd.map(lambda x: x[0]).distinct()
print(distinct_hearing_problems_rdd.count())

# percentage of those with hearing problem compared to total number of people who have had a hearing evaluation
percentage_of_hearing_problems = distinct_hearing_problems_rdd.count()/unique_hearing_evaluation_rdd * 100
print(percentage_of_hearing_problems)

In [23]:
# Focusing on the patients with hearing loss, the client would like to know the average number of CT / MT / SC investigations performed on them. Both the number of investigations and the total number of patients with hearing loss as well as the average should be presented.

#number of patients with hearing loss
print(total_hearing_problems_rdd.count())

#Total number of investigations 
print(imaging_rdd.count())

#Number of patients with hearing loss that scans performed on them 
join_hearing_and_imaging_rdd = total_hearing_problems_rdd.map(lambda x: (x[0],1)).reduceByKey(lambda x1,y1: x1+y1).join(imaging_rdd.map(lambda x: ((x[0]),(x[1]))))

print(join_hearing_and_imaging_rdd.count())

# Working out the perecentage of investigations performed on those with hearing problems
average = (join_hearing_and_imaging_rdd.count()/distinct_hearing_problems_rdd.count()) *100

print(average)

In [24]:
#Looking specifically at CT imaging, the client would like to know the year group (i.e. you will need to group the ages of encounter into years) which is given the greatest number of CTs.

greatest_number_of_CTs_on_age_rdd = imaging_rdd.filter(lambda x: x[1]=='CT').map(lambda x: (int(float(x[0])), x[1])).map(lambda x: (x[0], 1)).reduceByKey(lambda x1,y1: x1+y1).sortBy(lambda x: x[1],False)

greatest_number_of_CTs_on_age.take(10)