In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('rheeza').getOrCreate()

## Understand the Dataset

In [2]:
trials_df = spark.read.json('dataset.json', multiLine=True) # read json file

In [3]:
trials_df.show(5)

+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|ageofparticipant|           clinician|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|              result|
+----------------+--------------------+---------+-----------------+-------------------+------------------------------+--------------------+
|              19|{Ontario, Saul, t...|  Placebo|    1619827200000|      1617235200000|                            52|{BP normalized, r...|
|              14|{Ontario, Saul, n...| Naproxen|    1619827200000|      1617235200000|                            78|    {Follow-up, N/A}|
|              17|{Ontario, Saul, n...|  Placebo|    1619827200000|      1617235200000|                            14|    {Follow-up, N/A}|
|              18|{Ontario, Will, n...| Naproxen|    1619827200000|      1617235200000|                            14|{BP normalized, N/A}|
|              17|{O

In [4]:
trials_df.printSchema() # view schema

root
 |-- ageofparticipant: long (nullable = true)
 |-- clinician: struct (nullable = true)
 |    |-- branch: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- result: struct (nullable = true)
 |    |-- conclusion: string (nullable = true)
 |    |-- sideeffectsonparticipant: string (nullable = true)



In [5]:
trials_df.columns

['ageofparticipant',
 'clinician',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result']

In [6]:
columns = ['ageofparticipant',
 'clinician.branch',
 'clinician.name',
 'clinician.role',
 'drug_used',
 'experimentenddate',
 'experimentstartdate',
 'noofhourspassedatfirstreaction',
 'result.conclusion',
 'result.sideeffectsonparticipant'
 ] 

In [7]:
trials_df.select(columns).show(5)

+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|ageofparticipant| branch|   name|     role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|   conclusion|sideeffectsonparticipant|
+----------------+-------+-------+---------+---------+-----------------+-------------------+------------------------------+-------------+------------------------+
|              19|Ontario|   Saul|therapist|  Placebo|    1619827200000|      1617235200000|                            52|BP normalized|          rashes on neck|
|              14|Ontario|   Saul|    nurse| Naproxen|    1619827200000|      1617235200000|                            78|    Follow-up|                     N/A|
|              17|Ontario|   Saul|    nurse|  Placebo|    1619827200000|      1617235200000|                            14|    Follow-up|                     N/A|
|              18|Onta

In [8]:
from pyspark.sql import functions as fn

In [9]:
# counting null values per column

trials_df.select([ fn.count(fn.when(fn.col(column).isNull(), column)).alias(column) for column in columns]).show()

+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|ageofparticipant|clinician.branch|clinician.name|clinician.role|drug_used|experimentenddate|experimentstartdate|noofhourspassedatfirstreaction|result.conclusion|result.sideeffectsonparticipant|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+
|               0|               0|             0|           109|        0|                0|                  0|                            73|               53|                              0|
+----------------+----------------+--------------+--------------+---------+-----------------+-------------------+------------------------------+-----------------+-------------------------------+



## Cleaning

In [10]:
# flaten df
# address null values
# rename columns

In [11]:
# flaten df

new_trials_df = trials_df.select(columns)
new_trials_df.printSchema()

root
 |-- ageofparticipant: long (nullable = true)
 |-- branch: string (nullable = true)
 |-- name: string (nullable = true)
 |-- role: string (nullable = true)
 |-- drug_used: string (nullable = true)
 |-- experimentenddate: string (nullable = true)
 |-- experimentstartdate: string (nullable = true)
 |-- noofhourspassedatfirstreaction: long (nullable = true)
 |-- conclusion: string (nullable = true)
 |-- sideeffectsonparticipant: string (nullable = true)



In [12]:
# rename columns
new_column_names = {
    'ageofparticipant': 'age_of_participant'
    , 'branch': 'clinic_branch'
    , 'name': 'head_clinician'
    , 'role': 'assistants_role'
    , 'experimentenddate': 'experiment_end_date'
    , 'experimentstartdate': 'experiment_start_date'
    , 'noofhourspassedatfirstreaction': 'hours_passed_at_first_reaction'
    , 'sideeffectsonparticipant': 'observed_side_effect'
}
new_trials_df = new_trials_df.withColumnsRenamed(new_column_names)
new_trials_df.show(2)

+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|                19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|
|                14|      Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                            78|    Follow-up|                 N/A|
+------------------+-------------+--------------+---------------+---------+-----

In [13]:
new_trials_df.describe().show() 

+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|summary|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used| experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|  count|              3586|         3586|          3586|           3477|     3586|                3586|                 3586|                          3513|         3533|                3586|
|   mean|17.507250418293363|         null|          null|           null|     null|1.618381578137200...| 1.615813671834913...|             44.89097637346997|         null|                null|
| stddev|2.3066401927555233|       

In [14]:
# fill columns with null values

new_trials_df = new_trials_df.na.fill({'conclusion': 'unknown', 'assistants_role': 'unknown', 'hours_passed_at_first_reaction': 0})

In [15]:
new_trials_df.describe().show()

+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|summary|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used| experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+-------+------------------+-------------+--------------+---------------+---------+--------------------+---------------------+------------------------------+-------------+--------------------+
|  count|              3586|         3586|          3586|           3586|     3586|                3586|                 3586|                          3586|         3586|                3586|
|   mean|17.507250418293363|         null|          null|           null|     null|1.618381578137200...| 1.615813671834913...|              43.9771332961517|         null|                null|
| stddev|2.3066401927555233|       

In [16]:
new_trials_df.show(5)

+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|
+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+
|                19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|
|                14|      Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1617235200000|                            78|    Follow-up|                 N/A|
|                17|      Ontario|          Saul|          nurse|  Placebo|     

## Data Transformation

In [17]:
new_trials_df.printSchema()

root
 |-- age_of_participant: long (nullable = true)
 |-- clinic_branch: string (nullable = true)
 |-- head_clinician: string (nullable = true)
 |-- assistants_role: string (nullable = false)
 |-- drug_used: string (nullable = true)
 |-- experiment_end_date: string (nullable = true)
 |-- experiment_start_date: string (nullable = true)
 |-- hours_passed_at_first_reaction: long (nullable = false)
 |-- conclusion: string (nullable = false)
 |-- observed_side_effect: string (nullable = true)



#### Tansforming the experiment_end_date and experiment_start_date

In [18]:
from pyspark.sql import types as dtypes
from pyspark.sql.functions import col, from_unixtime
from pyspark.sql.types import TimestampType

In [19]:
new_trials_df = new_trials_df\
    .withColumn('start_ts', fn.from_unixtime(fn.col('experiment_start_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
    .withColumn('start_ts', fn.col('start_ts').cast(dtypes.TimestampType()))\
            .withColumn('end_ts', fn.from_unixtime(fn.col('experiment_end_date').cast(dtypes.LongType())/1000, 'yyyy-MM-dd HH:mm:ss'))\
            .withColumn('end_ts', fn.col('end_ts').cast(dtypes.TimestampType()))

In [20]:

# Show the DataFrame with the converted timestamp columns
new_trials_df.show()

+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-------------------+-------------------+
|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|           start_ts|             end_ts|
+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-------------------+-------------------+
|                19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|2021-04-01 02:00:00|2021-05-01 02:00:00|
|                14|      Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1

In [22]:
# Convert 'experiment_start_date' to a timestamp column
new_trials_df = new_trials_df.withColumn(
    'start_ts',
    from_unixtime(col('experiment_start_date') / 1000, 'yyyy-MM-dd HH:mm:ss').cast(TimestampType())
)

# Convert 'experiment_end_date' to a timestamp column
new_trials_df = new_trials_df.withColumn(
    'end_ts',
    from_unixtime(col('experiment_end_date') / 1000, 'yyyy-MM-dd HH:mm:ss').cast(TimestampType())
)

# Show the DataFrame with the converted timestamp columns
new_trials_df.show()

+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-------------------+-------------------+
|age_of_participant|clinic_branch|head_clinician|assistants_role|drug_used|experiment_end_date|experiment_start_date|hours_passed_at_first_reaction|   conclusion|observed_side_effect|           start_ts|             end_ts|
+------------------+-------------+--------------+---------------+---------+-------------------+---------------------+------------------------------+-------------+--------------------+-------------------+-------------------+
|                19|      Ontario|          Saul|      therapist|  Placebo|      1619827200000|        1617235200000|                            52|BP normalized|      rashes on neck|2021-04-01 02:00:00|2021-05-01 02:00:00|
|                14|      Ontario|          Saul|          nurse| Naproxen|      1619827200000|        1

## Partitioning for clinicians

In [23]:
new_trials_df.columns

['age_of_participant',
 'clinic_branch',
 'head_clinician',
 'assistants_role',
 'drug_used',
 'experiment_end_date',
 'experiment_start_date',
 'hours_passed_at_first_reaction',
 'conclusion',
 'observed_side_effect',
 'start_ts',
 'end_ts']

In [25]:
reordered_columns = ['start_ts', 'end_ts','clinic_branch', 'head_clinician', 'assistants_role', 'drug_used', 'age_of_participant', 'hours_passed_at_first_reaction', 'conclusion', 'observed_side_effect' ]
new_trials_df.select(reordered_columns).sort('start_ts').write.option('header', True).partitionBy('drug_used').mode("overwrite").format("csv").save('clinician')

Py4JJavaError: An error occurred while calling o223.save.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:578)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1623)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:341)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:331)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:370)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 23 more


## LOad for Machine Learning Engineer

In [None]:
#Extracting the Month from the timestamp since e partitioning by month for the ML engineers pyspark dataframe

reordered_columns = ['start_ts', 'end_ts','clinic_branch', 'head_clinician', 'assistants_role', 'drug_used', 'age_of_participant', 'hours_passed_at_first_reaction', 'conclusion', 'observed_side_effect' ]
new_trials_df.select(reordered_columns)\
    .withColumn('start_month', fn.month('start_ts'))\
        .sort('start_ts').write.partitionBy('start_month').mode("overwrite").format("parquet").save('ml_engineers')