In [113]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('rheeze').getOrCreate()

# Understand the Dataset

In [114]:
spark.read.json('dataset.json') # reading the json

DataFrame[_corrupt_record: string, ageofparticipant: bigint, clinician: struct<branch:string,name:string,role:string>, drug_used: string, experimentenddate: string, experimentstartdate: string, noofhourspassedatfirstreaction: bigint, result: struct<conclusion:string,sideeffectsonparticipant:string>]

In [115]:
trials_df = spark.read.json('dataset.json',multiLine=True)

In [116]:
trials_df.show(5)

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              19|{Ontario, Saul, t...|  Placebo|    1619827200000|      1617235200000|                            52|{BP normalized, r...|
|              14|{Ontario, Saul, n...| Naproxen|    1619827200000|      1617235200000|                            78|    {Follow-up, N/A}|
|              17|{Ontario, Saul, n...|  Placebo|    1619827200000|      1617235200000|                            14|    {Follow-up, N/A}|
|              18|{Ontario, Will, n...| Naproxen|    1619827200000|      1617235200000|                            14|{BP normalized, N/A}|
|              17|{O

In [117]:
trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [118]:
trials_df.columns

['ageofparticipant',
 'clinician',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result']

In [119]:
trials_df.dtypes

[('ageofparticipant', 'bigint'),
 ('clinician', 'struct<branch:string,name:string,role:string>'),
 ('drug_used', 'string'),
 ('experimentenddate', 'string'),
 ('experimentstartdate', 'string'),
 ('noofhourspassedatfirstreaction', 'bigint'),
 ('result', 'struct<conclusion:string,sideeffectsonparticipant:string>')]

In [120]:
columns = ['ageofparticipant',
 'clinician.branch', 
 'clinician.name', 
 'clinician.role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result.conclusion',
 'result.sideeffectsonparticipant']

In [121]:
# To show column
trials_df.select(columns).show(5)

+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|ageofparticipant| branch|   name|     role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|              19|Ontario|   Saul|therapist|  Placebo|    1619827200000|      1617235200000|                            52|BP normalized|          rashes on neck|
|              14|Ontario|   Saul|    nurse| Naproxen|    1619827200000|      1617235200000|                            78|    Follow-up|                     N/A|
|              17|Ontario|   Saul|    nurse|  Placebo|    1619827200000|      1617235200000|                            14|    Follow-up|                     N/A|
|              18|Onta

In [122]:
from pyspark.sql import functions as fn

In [123]:
trials_df.select([fn.count(fn.when(fn.col(column).isNull(),fn.col(column))) for column in columns])

DataFrame[count(CASE WHEN (ageofparticipant IS NULL) THEN ageofparticipant END): bigint, count(CASE WHEN (clinician.branch IS NULL) THEN clinician.branch END): bigint, count(CASE WHEN (clinician.name IS NULL) THEN clinician.name END): bigint, count(CASE WHEN (clinician.role IS NULL) THEN clinician.role END): bigint, count(CASE WHEN (drug_used IS NULL) THEN drug_used END): bigint, count(CASE WHEN (experimentenddate IS NULL) THEN experimentenddate END): bigint, count(CASE WHEN (experimentstartdate IS NULL) THEN experimentstartdate END): bigint, count(CASE WHEN (noofhourspassedatfirstreaction IS NULL) THEN noofhourspassedatfirstreaction END): bigint, count(CASE WHEN (result.conclusion IS NULL) THEN result.conclusion END): bigint, count(CASE WHEN (result.sideeffectsonparticipant IS NULL) THEN result.sideeffectsonparticipant END): bigint]

In [101]:
# counting null values per column

trials_df.select([fn.count(fn.when(fn.col(column).isNull(),column)).alias (column) for column in columns]).show() 

+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|ageofparticipant|clinician.branch|clinician.name|clinician.role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|result.conclusion|result.sideeffectsonparticipant|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|               0|               0|             0|           109|        0|                0|                  0|                            73|               53|                              0|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+



## Cleaning

In [124]:
## flaten df
new_trials_df = trials_df.select(columns)
new_trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- branch: string (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- sideeffectsonparticipant: string (nullable = true)



In [125]:
## address null value
print(new_trials_df)

DataFrame[ageofparticipant: bigint, branch: string, name: string, role: string, drug_used: string, experimentenddate: string, experimentstartdate: string, noofhourspassedatfirstreaction: bigint, conclusion: string, sideeffectsonparticipant: string]


In [126]:
## rename the columns
new_column_names = {'ageofparticipant':'age_of_participant'
    , 'branch': 'clinic branch'
    , ' name': 'head_clinician'
    , 'role': 'assistants_role'
    , 'experimentenddate':'experiment_end_date'
    , 'experimentstartdate':'experiment_start_date'
    , 'noofhourspassedatfirstreaction': 'hours_passed_at_first_reaction'
    , 'sideeffectsonparticipant': 'observed_side_effect'

}

new_trials_df = new_trials_df.withColumnsRenamed(new_column_names)
new_trials_df.show(2)
    
    

+------------------+-------------+----+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|age_of_participant|clinic branch|name|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+------------------+-------------+----+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|                19|      Ontario|Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|
|                14|      Ontario|Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                            78|    Follow-up|                 N/A|
+------------------+-------------+----+---------------+---------+-------------------+---------------------+-----------------------

In [127]:
new_trials_df.dtypes

[('age_of_participant', 'bigint'),
 ('clinic branch', 'string'),
 ('name', 'string'),
 ('assistants_role', 'string'),
 ('drug_used', 'string'),
 ('experiment_end_date', 'string'),
 ('experiment_start_date', 'string'),
 ('hours_passed_at_first_reaction', 'bigint'),
 ('conclusion', 'string'),
 ('observed_side_effect', 'string')]

In [128]:
new_trials_df.describe().show()

+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|summary|age_of_participant|clinic branch|   name|assistants_role|drug_used| experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|  count|              3586|         3586|   3586|           3477|     3586|                3586|                 3586|                          3513|         3533|                3586|
|   mean|17.507250418293363|         null|   null|           null|     null|1.618381578137200...| 1.615813671834913...|             44.89097637346997|         null|                null|
| stddev|2.3066401927555233|         null|   null|           null|    

In [129]:

new_trials_df.na.fill({'conclusion':'unknown','assistants_role':'unknown'}).describe().show()

+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|summary|age_of_participant|clinic branch|   name|assistants_role|drug_used| experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|  count|              3586|         3586|   3586|           3586|     3586|                3586|                 3586|                          3513|         3586|                3586|
|   mean|17.507250418293363|         null|   null|           null|     null|1.618381578137200...| 1.615813671834913...|             44.89097637346997|         null|                null|
| stddev|2.3066401927555233|         null|   null|           null|    

In [130]:
new_trials_df = new_trials_df.na.fill({'conclusion':'unknown','assistants_role':'unknown'}).describe().show()

+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|summary|age_of_participant|clinic branch|   name|assistants_role|drug_used| experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|  count|              3586|         3586|   3586|           3586|     3586|                3586|                 3586|                          3513|         3586|                3586|
|   mean|17.507250418293363|         null|   null|           null|     null|1.618381578137200...| 1.615813671834913...|             44.89097637346997|         null|                null|
| stddev|2.3066401927555233|         null|   null|           null|    