## Create a Spark Connection

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('walkthrough').config("spark.driver.bindAddress","localhost").\
config("spark.ui.port","4050").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/10 14:41:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Study the Provided Dataset

In [2]:
trial_results = spark.read.json('dataset.json', multiLine=True)

In [3]:
# What does a schema look like?
trial_results.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [4]:
# display the top rows

spark.createDataFrame(trial_results.tail(5)).show()

                                                                                

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              15|{Quebec, Drake, n...|  Placebo|    1619827200000|      1617235200000|                            86|{No effect, light...|
|              19|{Quebec, Said, do...| Naproxen|    1619827200000|      1617235200000|                            52|{BP normalized, l...|
|              14|{Quebec, William,...| Naproxen|    1619827200000|      1617235200000|                            14|{Follow-up, heada...|
|              16|{Quebec, Storey, ...| Naproxen|    1619827200000|      1617235200000|                             6| {null, light fever}|
|              17|{Q

In [5]:
from pyspark.sql.functions import isnull, count, isnan, when, col, date_format, to_date

## Split the Columns

In [6]:
columns = ['ageofparticipant',
 'clinician.branch',
 'clinician.name',
 'clinician.role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result.conclusion',
 'result.sideeffectsonparticipant'
]

## Check for NULL values

In [7]:
null_columns = trial_results.select([count(when(col(c).isNull(), c)).alias(c) for c in trial_results.columns])
null_columns.show()

+----------------+---------+---------+-----------------+-------------------+------------------------------+------+
|ageofparticipant|clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|result|
+----------------+---------+---------+-----------------+-------------------+------------------------------+------+
|               0|        0|        0|                0|                  0|                            73|     0|
+----------------+---------+---------+-----------------+-------------------+------------------------------+------+



# Cleaning

In [8]:
# check the rows agaiin in flat format

trial_results = trial_results.select([*columns])

## Clean Column Names

In [9]:
# pick out the columns

columns = trial_results.columns


### remove dot annotations from column names

In [10]:
columns

['ageofparticipant',
 'branch',
 'name',
 'role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'conclusion',
 'sideeffectsonparticipant']

### rename the columns

In [11]:
# following the provided description of the dataset

new_column_names = {
    'ageofparticipant': 'age_of_participant'
    , 'branch': 'clinic_branch'
    , 'role': 'assistants_role'
    , 'experimentenddate': 'experiment_end_date'
    , 'experimentstartdate': 'experiment_start_date'
    , 'noofhourspassedatfirstreaction': 'no_of_hours_passed_at_first_reaction'
    , 'conclusion': 'trial_conclusion'
    , 'sideeffectsonparticipant': 'observed_side_effect'
}

trial_results = trial_results.withColumnsRenamed(new_column_names)

In [12]:
trial_results.show(5)

+------------------+-------------+-------+---------------+---------+-------------------+---------------------+------------------------------------+----------------+--------------------+
|age_of_participant|clinic_branch|   name|assistants_role|drug_used|experiment_end_date|experiment_start_date|no_of_hours_passed_at_first_reaction|trial_conclusion|observed_side_effect|
+------------------+-------------+-------+---------------+---------+-------------------+---------------------+------------------------------------+----------------+--------------------+
|                19|      Ontario|   Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                                  52|   BP normalized|      rashes on neck|
|                14|      Ontario|   Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                                  78|       Follow-up|                 N/A|
|                17|      Ontario|   Saul|          nurse|  Placebo|  

## Address Missing Values

only columns "assistants_role" and "trial_conclusion" are addressed<br>
no_of_hours_passed_at_first_reaction is ignored to avoid confusing 0 with the actual number of hours

In [13]:
trial_results = trial_results.na.fill({'assistants_role': 'N/A', 'trial_conclusion': 'N/A'})

In [14]:
trial_results.select([count(when(col(a).isNull() | isnan(col(a)) | col(a).isin('', 'null', None), a)).alias(a) for a in trial_results.columns]).show()

+------------------+-------------+----+---------------+---------+-------------------+---------------------+------------------------------------+----------------+--------------------+
|age_of_participant|clinic_branch|name|assistants_role|drug_used|experiment_end_date|experiment_start_date|no_of_hours_passed_at_first_reaction|trial_conclusion|observed_side_effect|
+------------------+-------------+----+---------------+---------+-------------------+---------------------+------------------------------------+----------------+--------------------+
|                 0|            0|   0|              0|        0|                  0|                    0|                                  73|               0|                   0|
+------------------+-------------+----+---------------+---------+-------------------+---------------------+------------------------------------+----------------+--------------------+



# Transformation

## Address Datatypes

In [15]:
trial_results.dtypes

[('age_of_participant', 'bigint'),
 ('clinic_branch', 'string'),
 ('name', 'string'),
 ('assistants_role', 'string'),
 ('drug_used', 'string'),
 ('experiment_end_date', 'string'),
 ('experiment_start_date', 'string'),
 ('no_of_hours_passed_at_first_reaction', 'bigint'),
 ('trial_conclusion', 'string'),
 ('observed_side_effect', 'string')]

In [16]:
trial_results.describe().show()

23/09/10 14:42:03 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------------+----------------+--------------------+
|summary|age_of_participant|clinic_branch|   name|assistants_role|drug_used| experiment_end_date|experiment_start_date|no_of_hours_passed_at_first_reaction|trial_conclusion|observed_side_effect|
+-------+------------------+-------------+-------+---------------+---------+--------------------+---------------------+------------------------------------+----------------+--------------------+
|  count|              3586|         3586|   3586|           3586|     3586|                3586|                 3586|                                3513|            3586|                3586|
|   mean|17.507250418293363|         null|   null|           null|     null|1.618381578137200...| 1.615813671834913...|                   44.89097637346997|            null|                null|
| stddev|2.30664019275552

In [17]:
import pyspark.sql.types  as dtype
import pyspark.sql.functions as fn

In [18]:
# change the format from UNIX to datetime, for the start and end dates.
# change the datatypes from string to PySpark's timestamp.
# we created a new column as the old column experiment_start_date
# we converted to longtype to do the division cos it was string before, long type cos it holds more digit than regular integer
# function applied on start date and end date
# Using the same name in withcolumns will automatically update the column values to the converted values, another column name will add a new column
trial_results = trial_results.withColumn('experiment_start_date', fn.from_unixtime(col('experiment_start_date').cast(dtype.LongType())/1000, 'yyyy-MM-dd HH:mm:ss.SSSS'))\
    .withColumn('experiment_start_date', col('experiment_start_date').cast(dtype.TimestampType()))\
        .withColumn('experiment_end_date', fn.from_unixtime(col('experiment_end_date').cast(dtype.LongType())/1000, 'yyyy-MM-dd HH:mm:ss.SSSS'))\
            .withColumn('experiment_end_date', col('experiment_end_date').cast(dtype.TimestampType()))

In [19]:
trial_results.show(10)

+------------------+-------------+-------+---------------+---------+-------------------+---------------------+------------------------------------+----------------+--------------------+
|age_of_participant|clinic_branch|   name|assistants_role|drug_used|experiment_end_date|experiment_start_date|no_of_hours_passed_at_first_reaction|trial_conclusion|observed_side_effect|
+------------------+-------------+-------+---------------+---------+-------------------+---------------------+------------------------------------+----------------+--------------------+
|                19|      Ontario|   Saul|      therapist|  Placebo|2021-05-01 03:00:00|  2021-04-01 03:00:00|                                  52|   BP normalized|      rashes on neck|
|                14|      Ontario|   Saul|          nurse| Naproxen|2021-05-01 03:00:00|  2021-04-01 03:00:00|                                  78|       Follow-up|                 N/A|
|                17|      Ontario|   Saul|          nurse|  Placebo|20

In [20]:
trial_results.dtypes

[('age_of_participant', 'bigint'),
 ('clinic_branch', 'string'),
 ('name', 'string'),
 ('assistants_role', 'string'),
 ('drug_used', 'string'),
 ('experiment_end_date', 'timestamp'),
 ('experiment_start_date', 'timestamp'),
 ('no_of_hours_passed_at_first_reaction', 'bigint'),
 ('trial_conclusion', 'string'),
 ('observed_side_effect', 'string')]

# Load the Dataset - For the Clinician

In [21]:
# reorder the columns

columns = ['experiment_start_date', 'experiment_end_date', 'clinic_branch', 'drug_used', 'name', 'assistants_role', 'age_of_participant', 'no_of_hours_passed_at_first_reaction', 'trial_conclusion', 'observed_side_effect']

In [22]:
# Save the results 


In [23]:

# partition by drug type for easy accessibility
# order by conclusion and sideeffect for easy readability/analysis
trial_results.select(columns).sort('trial_conclusion', 'observed_side_effect').write.option("header", True)\
    .partitionBy(['drug_used'])\
    .mode("overwrite")\
    .csv("clinicians_dataset")

# Load the Dataset - For the ML Engineers

## Derive Month from TimeStamp

In [24]:
ml_trial_results = trial_results.withColumn('exp_start_month', fn.month(fn.col('experiment_start_date')))\
    .withColumn('exp_start_year', fn.year(col('experiment_start_date'))).sort('experiment_start_date')

## Save Results

In [25]:
ml_trial_results.write.partitionBy(['exp_start_month', 'drug_used']).mode('overwrite').parquet('ml_dataset')

                                                                                

In [26]:
# read files after saving


spark.read.option('header', True).format("csv").load("clinicians_dataset/drug_used=Naproxen/*.csv").show(10)

+---------------------+--------------------+-------------+-------+---------------+------------------+------------------------------------+----------------+--------------------+
|experiment_start_date| experiment_end_date|clinic_branch|   name|assistants_role|age_of_participant|no_of_hours_passed_at_first_reaction|trial_conclusion|observed_side_effect|
+---------------------+--------------------+-------------+-------+---------------+------------------+------------------------------------+----------------+--------------------+
| 2021-04-01T03:00:...|2021-05-01T03:00:...|      Ontario|   Will|          nurse|                18|                                  14|   BP normalized|                 N/A|
| 2021-04-01T03:00:...|2021-05-01T03:00:...|      Ontario|   Will|      therapist|                18|                                  10|   BP normalized|                 N/A|
| 2021-04-01T03:00:...|2021-05-01T03:00:...|      Ontario|   Will|      therapist|                14|              

In [27]:
spark.read.option('header', True).format("parquet").load("ml_dataset/exp_start_month=2/drug_used=Naproxen/*.parquet").show(10)

+------------------+-------------+-------+---------------+-------------------+---------------------+------------------------------------+----------------+--------------------+--------------+
|age_of_participant|clinic_branch|   name|assistants_role|experiment_end_date|experiment_start_date|no_of_hours_passed_at_first_reaction|trial_conclusion|observed_side_effect|exp_start_year|
+------------------+-------------+-------+---------------+-------------------+---------------------+------------------------------------+----------------+--------------------+--------------+
|                20|      Alberta|Johnson|          nurse|2021-03-02 02:00:00|  2021-02-01 02:00:00|                                  45|       Follow-up|      rashes on neck|          2021|
|                14|      Alberta|Johnson|      therapist|2021-03-02 02:00:00|  2021-02-01 02:00:00|                                  31|       No effect|       arms and feet|          2021|
|                16|      Alberta| Greene|   

23/09/10 14:42:07 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
