In [1]:
# spark.stop()

In [2]:
import os
from pyspark.sql import SparkSession, types as t, functions as F

In [3]:
# https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.14.jar

spark = (
    SparkSession
    .builder
    # .master("spark://spark-master:7077")
    .appName("Testing Transformations")
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.14.jar") # GCS Connector
    .getOrCreate()
)

# Google Cloud Service Account Credentials
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile",os.environ.get("GOOGLE_APPLICATION_CREDENTIALS"))

spark

/usr/local/lib/python3.9/dist-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/30 13:45:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
bucket='gs://zoomcamp-454219-ade-pipeline/data/pq/'

# Patient

### Load patient

In [15]:
# patient_schema = t.StructType(
#     [
#         t.StructField('patientid', t.StringType(),True),
#         t.StructField('patientagegroup', t.StringType(),True),
#         t.StructField('patientonsetage', t.StringType(),True),
#         t.StructField('patientonsetageunit', t.StringType(),True),
#         t.StructField('patientsex', t.StringType(),True),
#         t.StructField('patientweight', t.StringType(),True),
#         t.StructField('serious', t.IntegerType(), True),
#         t.StructField('seriousnessdeath', t.IntegerType(), True),
#         t.StructField('seriousnesshospitalization', t.IntegerType(), True),
#         t.StructField('seriousnessdisabling', t.IntegerType(), True),
#         t.StructField('seriousnesslifethreatening', t.IntegerType(), True),
#         t.StructField('seriousnessother', t.IntegerType(), True),
#         t.StructField('receivedate', t.IntegerType(), True),
#         t.StructField('receiptdate', t.IntegerType(), True),
#         t.StructField('safetyreportid', t.IntegerType(), True)
#     ]
# )

patient = (
    spark
    .read
    .parquet(bucket+'patient/*/*')
    )
print(f"Count: {patient.count()}")
print(patient.printSchema())
patient.show()

                                                                                

Count: 1643038
root
 |-- patientid: string (nullable = true)
 |-- patientagegroup: string (nullable = true)
 |-- patientonsetage: string (nullable = true)
 |-- patientonsetageunit: string (nullable = true)
 |-- patientsex: string (nullable = true)
 |-- patientweight: string (nullable = true)
 |-- serious: integer (nullable = true)
 |-- seriousnessdeath: integer (nullable = true)
 |-- seriousnesshospitalization: integer (nullable = true)
 |-- seriousnessdisabling: integer (nullable = true)
 |-- seriousnesslifethreatening: integer (nullable = true)
 |-- seriousnessother: integer (nullable = true)
 |-- receivedate: integer (nullable = true)
 |-- receiptdate: integer (nullable = true)
 |-- safetyreportid: integer (nullable = true)

None
+--------------------+---------------+---------------+-------------------+----------+-------------+-------+----------------+--------------------------+--------------------+--------------------------+----------------+-----------+-----------+--------------+
|

### Perform Transformation

In [16]:
patient.printSchema()

root
 |-- patientid: string (nullable = true)
 |-- patientagegroup: string (nullable = true)
 |-- patientonsetage: string (nullable = true)
 |-- patientonsetageunit: string (nullable = true)
 |-- patientsex: string (nullable = true)
 |-- patientweight: string (nullable = true)
 |-- serious: integer (nullable = true)
 |-- seriousnessdeath: integer (nullable = true)
 |-- seriousnesshospitalization: integer (nullable = true)
 |-- seriousnessdisabling: integer (nullable = true)
 |-- seriousnesslifethreatening: integer (nullable = true)
 |-- seriousnessother: integer (nullable = true)
 |-- receivedate: integer (nullable = true)
 |-- receiptdate: integer (nullable = true)
 |-- safetyreportid: integer (nullable = true)



In [None]:
# Patient Column transformations

patient = patient.withColumn(
    "patientagegroup",
    (
        F
        .when(F.col("patientagegroup") == '1', "Neonate")
        .when(F.col("patientagegroup") == '2', "Infant")
        .when(F.col("patientagegroup") == '3', "Child")
        .when(F.col("patientagegroup") == '4', "Adolescent")
        .when(F.col("patientagegroup") == '5', "Adult")
        .when(F.col("patientagegroup") == '6', "Elderly")
        .otherwise(None)
     )
)

# Normalize patientage
patient = patient.withColumn(
    "patientage(yrs)",
    (
        F
        .when(F.col("patientonsetageunit") == 800, F.col("patientonsetage") * 10)
        .when(F.col("patientonsetageunit") == 801, F.col("patientonsetage") * 1)
        .when(F.col("patientonsetageunit") == 802, F.col("patientonsetage") / 12)
        .when(F.col("patientonsetageunit") == 803, F.col("patientonsetage") / 52.143)
        .when(F.col("patientonsetageunit") == 804, F.col("patientonsetage") / 365.25)
        .when(F.col("patientonsetageunit") == 805, F.col("patientonsetage") / 8766)
        .otherwise(None)
    ).cast(t.FloatType())
).drop("patientonsetageunit", "patientonsetage")

patient = patient.withColumn(
    "patientsex",
    (
        F
        .when(F.col("patientsex") == 1, "Male")
        .when(F.col("patientsex") == 2, "Female")
        .otherwise(None)
    ).cast(t.StringType())
)

patient = patient.withColumn(
    "patientweight",
    (
        F
        .when(
            F.col("patientweight").rlike(r"^\d+(\.\d+)?$"),
            F.col("patientweight").cast(t.FloatType()))
        .otherwise(None)
     )
)

patient = patient.withColumn(
    "serious",
    (
        F
        .when(F.col("serious") == 1, True)
        .when(F.col("serious") == 2, False)
        .otherwise(None)
     )
)

patient = patient.withColumn(
    "seriousnessdeath",
    (
        F
        .when(F.col("seriousnessdeath") == 1, True)
        .otherwise(False)
     )
)

patient = patient.withColumn(
    "seriousnesshospitalization",
    (
        F
        .when(F.col("seriousnesshospitalization") == 1, True)
        .otherwise(False)
     )
)

patient = patient.withColumn(
    "seriousnessdisabling",
    (
        F
        .when(F.col("seriousnessdisabling") == 1, True)
        .otherwise(False)
     )
)

patient = patient.withColumn(
    "seriousnesslifethreatening",
    (
        F
        .when(F.col("seriousnesslifethreatening") == 1, True)
        .otherwise(False)
     )
)

patient = patient.withColumn(
    "seriousnessother",
    (
        F
        .when(F.col("seriousnessother") == 1, True)
        .otherwise(False)
     )
)

In [18]:
patient.printSchema()

root
 |-- patientid: string (nullable = true)
 |-- patientagegroup: string (nullable = true)
 |-- patientsex: string (nullable = true)
 |-- patientweight: float (nullable = true)
 |-- serious: boolean (nullable = true)
 |-- seriousnessdeath: boolean (nullable = false)
 |-- seriousnesshospitalization: boolean (nullable = false)
 |-- seriousnessdisabling: boolean (nullable = false)
 |-- seriousnesslifethreatening: boolean (nullable = false)
 |-- seriousnessother: boolean (nullable = false)
 |-- receivedate: integer (nullable = true)
 |-- receiptdate: integer (nullable = true)
 |-- safetyreportid: integer (nullable = true)
 |-- patientage: float (nullable = true)



### Display rows

In [19]:
patient.show()

+--------------------+---------------+----------+-------------+-------+----------------+--------------------------+--------------------+--------------------------+----------------+-----------+-----------+--------------+----------+
|           patientid|patientagegroup|patientsex|patientweight|serious|seriousnessdeath|seriousnesshospitalization|seriousnessdisabling|seriousnesslifethreatening|seriousnessother|receivedate|receiptdate|safetyreportid|patientage|
+--------------------+---------------+----------+-------------+-------+----------------+--------------------------+--------------------+--------------------------+----------------+-----------+-----------+--------------+----------+
|c03ef5e8-17e8-4b0...|           null|    Female|         null|   null|           false|                     false|               false|                     false|           false|       null|       null|          null|      null|
|be397b97-b8c8-42a...|           null|      Male|         null|   null|     

In [20]:
patient.groupBy("patientagegroup").count().show()

25/04/30 13:53:24 ERROR Executor: Exception in task 3.0 in stage 19.0 (TID 618)]
org.apache.spark.SparkException: Parquet column cannot be converted in file gs://zoomcamp-454219-ade-pipeline/data/pq/patient/2004/drug-event-part-8-of-20.parquet. Column: [patientagegroup], Expected: string, Found: INT32.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:868)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:301)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown S

Py4JJavaError: An error occurred while calling o241.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 19.0 failed 1 times, most recent failure: Lost task 3.0 in stage 19.0 (TID 618) (c18afd726dd1 executor driver): org.apache.spark.SparkException: Parquet column cannot be converted in file gs://zoomcamp-454219-ade-pipeline/data/pq/patient/2004/drug-event-part-8-of-20.parquet. Column: [patientagegroup], Expected: string, Found: INT32.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:868)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:301)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [patientagegroup], physicalType: INT32, logicalType: string
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:199)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:297)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Parquet column cannot be converted in file gs://zoomcamp-454219-ade-pipeline/data/pq/patient/2004/drug-event-part-8-of-20.parquet. Column: [patientagegroup], Expected: string, Found: INT32.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:868)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:301)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [patientagegroup], physicalType: INT32, logicalType: string
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:199)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:297)
	... 20 more


[Stage 19:>                                                        (0 + 5) / 20]

25/04/30 13:53:25 WARN TaskSetManager: Lost task 14.0 in stage 19.0 (TID 629) (c18afd726dd1 executor driver): TaskKilled (Stage cancelled)
25/04/30 13:53:25 WARN TaskSetManager: Lost task 5.0 in stage 19.0 (TID 620) (c18afd726dd1 executor driver): TaskKilled (Stage cancelled)
25/04/30 13:53:25 WARN TaskSetManager: Lost task 12.0 in stage 19.0 (TID 627) (c18afd726dd1 executor driver): TaskKilled (Stage cancelled)
25/04/30 13:53:25 WARN TaskSetManager: Lost task 19.0 in stage 19.0 (TID 634) (c18afd726dd1 executor driver): TaskKilled (Stage cancelled)
25/04/30 13:53:25 WARN TaskSetManager: Lost task 7.0 in stage 19.0 (TID 622) (c18afd726dd1 executor driver): TaskKilled (Stage cancelled)


# Drug

### Load Drug

In [46]:
drug_schema = t.StructType(
    [
        t.StructField('patientid',t.StringType(),True),
        t.StructField('medicinalproduct',t.StringType(),True),
        t.StructField('activesubstancename',t.StringType(),True),
        t.StructField('drugadministrationroute',t.StringType(),True),
        t.StructField('drugstartdate',t.StringType(),True),
        t.StructField('drugenddate',t.StringType(),True),
        t.StructField('drugdosagetext',t.StringType(),True),
        t.StructField('drugstructuredosagenumb',t.StringType(),True),
        t.StructField('drugstructuredosageunit',t.StringType(),True),
        t.StructField('drugtreatmentduration',t.StringType(),True),
        t.StructField('drugtreatmentdurationunit',t.StringType(),True),
        t.StructField('drugrecurreadministration',t.StringType(),True),
    ]
)

drug = (
    spark
    .read
    .parquet(bucket+'drug/*/*')
    )

drug.printSchema()

                                                                                

root
 |-- patientid: string (nullable = true)
 |-- medicinalproduct: string (nullable = true)
 |-- activesubstancename: string (nullable = true)
 |-- drugindication: string (nullable = true)
 |-- drugadministrationroute: string (nullable = true)
 |-- drugstartdate: string (nullable = true)
 |-- drugenddate: string (nullable = true)
 |-- drugdosagetext: string (nullable = true)
 |-- drugstructuredosagenumb: string (nullable = true)
 |-- drugstructuredosageunit: string (nullable = true)
 |-- drugtreatmentduration: string (nullable = true)
 |-- drugtreatmentdurationunit: string (nullable = true)
 |-- drugrecurreadministration: string (nullable = true)



### Perform Cast

In [47]:
# Cast
drug = (
    drug
    .withColumn("drugstructuredosagenumb", F.col("drugstructuredosagenumb").cast(t.FloatType()))
    .withColumn("drugstructuredosageunit", F.col("drugstructuredosageunit").cast(t.StringType()))
    .withColumn("drugtreatmentduration", F.col("drugtreatmentduration").cast(t.IntegerType()))
    .withColumnRenamed("drugtreatmentduration", "drugtreatmentdurationnumb")
    .withColumn("drugtreatmentdurationunit", F.col("drugtreatmentdurationunit").cast(t.StringType()))
    .withColumn("drugrecurreadministration", F.col("drugrecurreadministration").cast(t.IntegerType()))
)

In [48]:
drug.printSchema()

root
 |-- patientid: string (nullable = true)
 |-- medicinalproduct: string (nullable = true)
 |-- activesubstancename: string (nullable = true)
 |-- drugindication: string (nullable = true)
 |-- drugadministrationroute: string (nullable = true)
 |-- drugstartdate: string (nullable = true)
 |-- drugenddate: string (nullable = true)
 |-- drugdosagetext: string (nullable = true)
 |-- drugstructuredosagenumb: float (nullable = true)
 |-- drugstructuredosageunit: string (nullable = true)
 |-- drugtreatmentdurationnumb: integer (nullable = true)
 |-- drugtreatmentdurationunit: string (nullable = true)
 |-- drugrecurreadministration: integer (nullable = true)



### Perform Transformation

In [49]:
route_mapping = {
    "001": "Auricular (otic)",
    "002": "Buccal",
    "003": "Cutaneous",
    "004": "Dental",
    "005": "Endocervical",
    "006": "Endosinusial",
    "007": "Endotracheal",
    "008": "Epidural",
    "009": "Extra-amniotic",
    "010": "Hemodialysis",
    "011": "Intra corpus cavernosum",
    "012": "Intra-amniotic",
    "013": "Intra-arterial",
    "014": "Intra-articular",
    "015": "Intra-uterine",
    "016": "Intracardiac",
    "017": "Intracavernous",
    "018": "Intracerebral",
    "019": "Intracervical",
    "020": "Intracisternal",
    "021": "Intracorneal",
    "022": "Intracoronary",
    "023": "Intradermal",
    "024": "Intradiscal (intraspinal)",
    "025": "Intrahepatic",
    "026": "Intralesional",
    "027": "Intralymphatic",
    "028": "Intramedullar (bone marrow)",
    "029": "Intrameningeal",
    "030": "Intramuscular",
    "031": "Intraocular",
    "032": "Intrapericardial",
    "033": "Intraperitoneal",
    "034": "Intrapleural",
    "035": "Intrasynovial",
    "036": "Intratumor",
    "037": "Intrathecal",
    "038": "Intrathoracic",
    "039": "Intratracheal",
    "040": "Intravenous bolus",
    "041": "Intravenous drip",
    "042": "Intravenous (not otherwise specified)",
    "043": "Intravesical",
    "044": "Iontophoresis",
    "045": "Nasal",
    "046": "Occlusive dressing technique",
    "047": "Ophthalmic",
    "048": "Oral",
    "049": "Oropharingeal",
    "050": "Other",
    "051": "Parenteral",
    "052": "Periarticular",
    "053": "Perineural",
    "054": "Rectal",
    "055": "Respiratory (inhalation)",
    "056": "Retrobulbar",
    "057": "Sunconjunctival",
    "058": "Subcutaneous",
    "059": "Subdermal",
    "060": "Sublingual",
    "061": "Topical",
    "062": "Transdermal",
    "063": "Transmammary",
    "064": "Transplacental",
    "065": "Unknown",
    "066": "Urethral",
    "067": "Vaginal"
}

# Fix missing parts of the date
drug = (
    drug
    .withColumn(
        "drugstartdate",
        (
            F
            .when(F.length("drugstartdate") == 4, F.concat("drugstartdate",F.lit("0101")))
            .when(F.length("drugstartdate") == 6, F.concat("drugstartdate",F.lit("01")))
            .otherwise(F.col("drugstartdate"))
        )
    )
    .withColumn("drugstartdate",(F.to_date("drugstartdate","yyyyMMdd")))
)

drug = (
    drug
    .withColumn(
        "drugenddate",
        (
            F
            .when(F.length("drugenddate") == 4, F.concat("drugenddate",F.lit("0101")))
            .when(F.length("drugenddate") == 6, F.concat("drugenddate",F.lit("01")))
            .otherwise(F.col("drugenddate"))
        )
    )
    .withColumn("drugenddate",(F.to_date("drugenddate","yyyyMMdd")))
)

map_expr = F.create_map([F.lit(i) for i in sum(route_mapping.items(),())])

drug = (
    drug
    .withColumn(
        "drugadministrationroute",
        map_expr[F.col("drugadministrationroute")]
    )
)

# Create drugstructuredosage and normalize to (mg) based on numb and unit
drug = (
    drug
    .withColumn(
        "drugstructuredosage(mg)",
        (
            F
            .when(F.col("drugstructuredosageunit") == "001", F.col("drugstructuredosagenumb") * 1e-6)
            .when(F.col("drugstructuredosageunit") == "002", F.col("drugstructuredosagenumb") * 1e-3)
            .when(F.col("drugstructuredosageunit") == "003", F.col("drugstructuredosagenumb") * 1)
            .when(F.col("drugstructuredosageunit") == "004", F.col("drugstructuredosagenumb") * 10**3)
            .otherwise(None)
        )
    )
    .drop("drugstructuredosageunit", "drugstructuredosagenumb")
)

# Noramlize drugtreatmentduration to days
drug = (
    drug
    .withColumn(
        "drugtreatmentduration(days)",
        (
            F
            .when(F.col("drugtreatmentdurationunit") == "801", F.col("drugtreatmentdurationnumb") * 365.25)
            .when(F.col("drugtreatmentdurationunit") == "802", F.col("drugtreatmentdurationnumb") * 30.46)
            .when(F.col("drugtreatmentdurationunit") == "803", F.col("drugtreatmentdurationnumb") * 7)
            .when(F.col("drugtreatmentdurationunit") == "804", F.col("drugtreatmentdurationnumb") * 1)
            .when(F.col("drugtreatmentdurationunit") == "805", F.col("drugtreatmentdurationnumb") / 24)
            .when(F.col("drugtreatmentdurationunit") == "806", F.col("drugtreatmentdurationnumb") / 1440)
            .otherwise(None)
        )
    )
    .drop("drugtreatmentdurationunit", "drugtreatmentdurationnumb")
)

drug = (
    drug
    .withColumn(
        "drug_reaction_after_readministration",
        (
            F
            .when(F.col("drugrecurreadministration") == 1, "Yes")
            .when(F.col("drugrecurreadministration") == 2, "No")
            .when(F.col("drugrecurreadministration") == 3, "Unknown")
            .otherwise(None)
        )
    ).drop("drugrecurreadministration")
)

In [50]:
drug.show()

[Stage 35:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+-----------------------+-------------+-----------+--------------------+-----------------------+---------------------------+------------------------------------+
|           patientid|    medicinalproduct| activesubstancename|      drugindication|drugadministrationroute|drugstartdate|drugenddate|      drugdosagetext|drugstructuredosage(mg)|drugtreatmentduration(days)|drug_reaction_after_readministration|
+--------------------+--------------------+--------------------+--------------------+-----------------------+-------------+-----------+--------------------+-----------------------+---------------------------+------------------------------------+
|070ec3d6-9b34-412...|            ENTRESTO|SACUBITRIL\VALSARTAN|                null|                   Oral|         null|       null|50 MG, BID (24/26...|                   50.0|                       null|                             Unknown|
|070ec3d6-9b34-4

                                                                                

# Reaction

### Load Reaction

In [53]:
reaction_schema = t.StructType(
    [
        t.StructField('patientid',t.StringType(),True),
        t.StructField('reactionmeddrapt',t.StringType(),True),
        t.StructField('reactionoutcome',t.StringType(),True),
    ]
)

reaction = (
    spark
    .read
    .parquet(bucket+'reaction/*/*')
)

reaction.printSchema()

                                                                                

root
 |-- patientid: string (nullable = true)
 |-- reactionmeddrapt: string (nullable = true)
 |-- reactionoutcome: string (nullable = true)



In [56]:
reaction.show()

[Stage 42:>                                                         (0 + 1) / 1]

+--------------------+--------------------+--------------------+
|           patientid|    reactionmeddrapt|     reactionoutcome|
+--------------------+--------------------+--------------------+
|10294a0f-73ae-4de...|    Facial paralysis|                null|
|9b65b922-f87b-4ec...|   Visual impairment|             Unknown|
|9b65b922-f87b-4ec...|    Drug ineffective|             Unknown|
|ef3225ee-9fbc-44d...|    Spinal operation|             Unknown|
|60b37dc4-3e42-48c...|        Pancreatitis|             Unknown|
|747908f1-b359-46d...|     Hospitalisation|             Unknown|
|56167985-ce49-42e...|Neuropathy periph...|Not recovered/not...|
|56167985-ce49-42e...|      Joint swelling|Not recovered/not...|
|805e5cc4-cba0-494...|               Shock|Not recovered/not...|
|805e5cc4-cba0-494...| Acute kidney injury|Not recovered/not...|
|805e5cc4-cba0-494...| Acute kidney injury|Not recovered/not...|
|805e5cc4-cba0-494...|Blood creatinine ...|Not recovered/not...|
|97ec028d-2271-4bd...|   

                                                                                

### Perform Cast

In [58]:
reaction = (
    reaction
    .withColumn(
        "reactionoutcome",
        (
            F
            .when(F.col("reactionoutcome") == '1', "Recovered/resolved")
            .when(F.col("reactionoutcome") == '2', "Recovering/resolving")
            .when(F.col("reactionoutcome") == '3', "Not recovered/not resolved")
            .when(F.col("reactionoutcome") == '4', "Recovered/resolved with sequelae (consequent health issues)")
            .when(F.col("reactionoutcome") == '5', "Fatal")
            .when(F.col("reactionoutcome") == '6', "Unknown")
            .otherwise(None)
        ).cast(t.StringType())
    )
)

In [59]:
reaction.printSchema()

root
 |-- patientid: string (nullable = true)
 |-- reactionmeddrapt: string (nullable = true)
 |-- reactionoutcome: string (nullable = true)

