In [185]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('rheeza').getOrCreate()

## Understand the Dataset

In [186]:
trials_df = spark.read.json('dataset.json', multiLine=True)   # exraction of the dataset

In [187]:
trials_df.show(5)

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              19|{Ontario, Saul, t...|  Placebo|    1619827200000|      1617235200000|                            52|{BP normalized, r...|
|              14|{Ontario, Saul, n...| Naproxen|    1619827200000|      1617235200000|                            78|    {Follow-up, N/A}|
|              17|{Ontario, Saul, n...|  Placebo|    1619827200000|      1617235200000|                            14|    {Follow-up, N/A}|
|              18|{Ontario, Will, n...| Naproxen|    1619827200000|      1617235200000|                            14|{BP normalized, N/A}|
|              17|{O

In [188]:
trials_df.printSchema() # to show schema

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [189]:
trials_df.columns

['ageofparticipant',
 'clinician',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result']

In [190]:
columns = ['ageofparticipant',
 'clinician.branch',
 'clinician.name',
 'clinician.role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result.conclusion',
 'result.sideeffectsonparticipant'
 ]

In [191]:
trials_df.select(columns).show(5)

+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|ageofparticipant| branch|   name|     role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|              19|Ontario|   Saul|therapist|  Placebo|    1619827200000|      1617235200000|                            52|BP normalized|          rashes on neck|
|              14|Ontario|   Saul|    nurse| Naproxen|    1619827200000|      1617235200000|                            78|    Follow-up|                     N/A|
|              17|Ontario|   Saul|    nurse|  Placebo|    1619827200000|      1617235200000|                            14|    Follow-up|                     N/A|
|              18|Onta

In [192]:
from pyspark.sql import functions as fn

In [193]:
# counting null values per column
trials_df.select([ fn.count(fn.when(fn.col(column).isNull(), column)) for column in columns]).show()


+---------------------------------------------------------------------+---------------------------------------------------------------------+-----------------------------------------------------------------+-----------------------------------------------------------------+-------------------------------------------------------+-----------------------------------------------------------------------+---------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|count(CASE WHEN (ageofparticipant IS NULL) THEN ageofparticipant END)|count(CASE WHEN (clinician.branch IS NULL) THEN clinician.branch END)|count(CASE WHEN (clinician.name IS NULL) THEN clinician.name END)|count(CASE WHEN (clinician.role IS NULL) THEN

## Cleaning

In [194]:
# flatten df
# address null issues
# rename columns

In [195]:
# flatten df
new_trials_df = trials_df.select(columns)
new_trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- branch: string (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- sideeffectsonparticipant: string (nullable = true)



In [196]:
# rename columns
new_column_names = {
    'ageofparticipant': 'age_of_partcipant',
    'branch': 'clinic_branch',
    'name': 'head_clinician',
    'role': 'assistants_role',
    'experimentenddate': 'experiment_and_date',
    'experimentstartdate': 'experiment_start_date',
    'noofhourspassedatfirstreaction': 'hours_passed_at_first_reaction',
    'sideeffectsonparticipant': 'observed_side_effect'
}
new_trials_df = new_trials_df.withColumnsRenamed(new_column_names)
new_trials_df.show(3)

+-----------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|age_of_partcipant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_and_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-----------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|               19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|
|               14|      Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                            78|    Follow-up|                 N/A|
|               17|      Ontario|          Saul|          nurse|  Placebo|      16198

In [197]:
new_trials_df.dtypes

[('age_of_partcipant', 'bigint'),
 ('clinic_branch', 'string'),
 ('head_clinician', 'string'),
 ('assistants_role', 'string'),
 ('drug_used', 'string'),
 ('experiment_and_date', 'string'),
 ('experiment_start_date', 'string'),
 ('hours_passed_at_first_reaction', 'bigint'),
 ('conclusion', 'string'),
 ('observed_side_effect', 'string')]

In [198]:
new_trials_df.describe().show()

+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|summary| age_of_partcipant|clinic_branch|head_clinician|assistants_role|drug_used| experiment_and_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|  count|              3586|         3586|          3586|           3477|     3586|                3586|                 3586|                          3513|         3533|                3586|
|   mean|17.507250418293363|         null|          null|           null|     null|1.618381578137200...| 1.615813671834913...|             44.89097637346997|         null|                null|
| stddev|2.3066401927555233|       

In [199]:
# fill null values
new_trials_df = new_trials_df.na.fill({'conclusion': 'unknown','assistants_role': 'unknown', 'hours_passed_at_first_reaction': 0})

In [200]:
new_trials_df.describe().show()

+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|summary| age_of_partcipant|clinic_branch|head_clinician|assistants_role|drug_used| experiment_and_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|  count|              3586|         3586|          3586|           3586|     3586|                3586|                 3586|                          3586|         3586|                3586|
|   mean|17.507250418293363|         null|          null|           null|     null|1.618381578137200...| 1.615813671834913...|              43.9771332961517|         null|                null|
| stddev|2.3066401927555233|       

## Transformation

In [201]:
new_trials_df.printSchema()

root
 |-- age_of_partcipant: long (nullable = true)
 |-- clinic_branch: string (nullable = true)
 |-- head_clinician: string (nullable = true)
 |-- assistants_role: string (nullable = false)
 |-- drug_used: string (nullable = true)
 |-- experiment_and_date: string (nullable = true)
 |-- experiment_start_date: string (nullable = true)
 |-- hours_passed_at_first_reaction: long (nullable = false)
 |-- conclusion: string (nullable = false)
 |-- observed_side_effect: string (nullable = true)



In [202]:
from pyspark.sql import types as dtypes

In [203]:
# convert the datatypes to integers (long)
# divide by 1000
# convert from unit to datetime


new_trials_df.withColumn('start_ts', fn.col('experiment_start_date').cast(dtypes.LongType())/1000 ).show(5)

+-----------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-----------+
|age_of_partcipant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_and_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|   start_ts|
+-----------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-----------+
|               19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|1.6172352E9|
|               14|      Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                            78|    Follow-up|                 N/A|1.6172352E9|
|               17|      

In [204]:
new_trials_df = new_trials_df\
    .withColumn('start_ts', fn.from_unixtime(fn.col('experiment_start_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('start_ts', fn.col('start_ts').cast(dtypes.TimestampType()))\
            .withColumn('end_ts', fn.from_unixtime(fn.col('experiment_and_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
            .withColumn('end_ts', fn.col('end_ts').cast(dtypes.TimestampType()))