In [2]:
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('rheeza').getOrCreate()

In [3]:
#read the json file
trials_df=spark.read.json('dataset.json', multiLine=True) 
#multiline is used to take out a corrupt row when displaying our json

## Understanding the Dataset

In [4]:
trials_df.show(5)

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              19|{Ontario, Saul, t...|  Placebo|    1619827200000|      1617235200000|                            52|{BP normalized, r...|
|              14|{Ontario, Saul, n...| Naproxen|    1619827200000|      1617235200000|                            78|    {Follow-up, N/A}|
|              17|{Ontario, Saul, n...|  Placebo|    1619827200000|      1617235200000|                            14|    {Follow-up, N/A}|
|              18|{Ontario, Will, n...| Naproxen|    1619827200000|      1617235200000|                            14|{BP normalized, N/A}|
|              17|{O

In [5]:
#to see the datatype or structure in spark, we can use dtype() or Printschema() long datatype is int
trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [6]:
trials_df.columns

['ageofparticipant',
 'clinician',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result']

In [7]:
#to flatten the columns...i.e we dont want nested columns, we want all nested to be columns
columns= ['ageofparticipant',
 'clinician.branch',
 'clinician.name',
 'clinician.role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result.conclusion',
 'result.sideeffectsonparticipant']

In [8]:
trials_df.select(columns).show(5)

+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|ageofparticipant| branch|   name|     role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|              19|Ontario|   Saul|therapist|  Placebo|    1619827200000|      1617235200000|                            52|BP normalized|          rashes on neck|
|              14|Ontario|   Saul|    nurse| Naproxen|    1619827200000|      1617235200000|                            78|    Follow-up|                     N/A|
|              17|Ontario|   Saul|    nurse|  Placebo|    1619827200000|      1617235200000|                            14|    Follow-up|                     N/A|
|              18|Onta

In [9]:
from pyspark.sql import functions as fn

In [10]:
#to check for null values.
#counting null values
#1. count columns where column is null

#trials_df.select([fn.count(fn.when(fn.col(column).isNull, fn.column)) for column in columns]).show()
trials_df.select([fn.count(fn.when(fn.col(column).isNull(), column)).alias(column) for column in columns]).show()

+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|ageofparticipant|clinician.branch|clinician.name|clinician.role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|result.conclusion|result.sideeffectsonparticipant|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|               0|               0|             0|           109|        0|                0|                  0|                            73|               53|                              0|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+



## Data Cleaning and Transformation

In [11]:
#after flattenning, get acopy of the flattened dataframe
#deal with null values
#rename column

In [23]:
new_trials_df= trials_df.select(columns)
new_trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- branch: string (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- sideeffectsonparticipant: string (nullable = true)



In [24]:
#rename columns
new_column_names = {
    'ageofparticipant': 'age_of_participant'
    , 'branch': 'clinicia_branch' #clinician_branch
    , 'name': 'head_clinician' #clinician_name
    , 'role': 'assistants_role' #clinician_role
    , 'experimentenddate': 'experiment_end_date'
    , 'experimentstartdate': 'experiment_start_date'
    , 'noofhourspassedatfirstreaction': 'hours_passed_at_first_reaction'
    , 'sideeffectsonparticipant': 'observed_side_effect'
}
new_trials_df = new_trials_df.withColumnsRenamed(new_column_names)
new_trials_df.show(2)



+------------------+---------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|age_of_participant|clinicia_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+------------------+---------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|                19|        Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|
|                14|        Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                            78|    Follow-up|                 N/A|
+------------------+---------------+--------------+---------------+---

In [20]:
#cleaning null values
new_trials_df

In [25]:
new_trials_df.show()

+------------------+---------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|age_of_participant|clinicia_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+------------------+---------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|                19|        Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|
|                14|        Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                            78|    Follow-up|                 N/A|
|                17|        Ontario|          Saul|          nurse|  P

In [26]:
#new_trials_df.na.fill({'conclusion' : 'unknown', 'assistant_role' : 'unknown'})

new_trials_df= new_trials_df.na.fill({'conclusion' : 'unknown', 'assistants_role' : 'unknown','hours_passed_at_first_reaction': 0})

# Transformation

In [29]:
 from pyspark.sql import types as dtype

In [30]:
# convert to int and divide by 1000
#convert unix to datetime

new_trials_df = new_trials_df\
    .withColumn('start_ts', fn.from_unixtime(fn.col('experiment_start_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('start_ts', fn.col('start_ts').cast(dtypes.TimestampType()))\
            .withColumn('end_ts', fn.from_unixtime(fn.col('experiment_end_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
            .withColumn('end_ts', fn.col('end_ts').cast(dtypes.TimestampType()))

In [31]:
new_trials_df.show()

+------------------+---------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-------------------+-------------------+
|age_of_participant|clinicia_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|           start_ts|             end_ts|
+------------------+---------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-------------------+-------------------+
|                19|        Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|2021-03-31 20:00:00|2021-04-30 20:00:00|
|                14|        Ontario|          Saul|          nurse| Naproxen|      1619827200000