### Creating a Spark Session

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col

### Creating a Schema

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType

# Define the schema based on the JSON structure
schema = StructType([
    StructField("safetyreportid", StringType(), True),
    StructField("transmissiondateformat", StringType(), True),
    StructField("transmissiondate", StringType(), True),
    StructField("serious", StringType(), True),
    StructField("seriousnessdeath", StringType(), True),
    StructField("receivedateformat", StringType(), True),
    StructField("receivedate", StringType(), True),
    StructField("receiptdateformat", StringType(), True),
    StructField("receiptdate", StringType(), True),
    StructField("fulfillexpeditecriteria", StringType(), True),
    StructField("companynumb", StringType(), True),
    StructField("primarysource", StructType([
        StructField("reportercountry", StringType(), True),
        StructField("qualification", StringType(), True)
    ]), True),
    StructField("sender", StructType([
        StructField("senderorganization", StringType(), True)
    ]), True),
    StructField("receiver", StringType(), True),
    StructField("patient", StructType([
        StructField("patientonsetage", StringType(), True),
        StructField("patientonsetageunit", StringType(), True),
        StructField("patientsex", StringType(), True),
        StructField("patientdeath", StructType([
            StructField("patientdeathdateformat", StringType(), True),
            StructField("patientdeathdate", StringType(), True)
        ]), True),
        StructField("reaction", ArrayType(StructType([
            StructField("reactionmeddrapt", StringType(), True)
        ])), True),
        StructField("drug", ArrayType(StructType([
            StructField("drugcharacterization", StringType(), True),
            StructField("medicinalproduct", StringType(), True),
            StructField("drugauthorizationnumb", StringType(), True),
            StructField("drugadministrationroute", StringType(), True),
            StructField("drugindication", StringType(), True)
        ])), True)
    ]), True)
])


### Setting up event hub configuration and loading the data

In [0]:


import json
from pyspark.sql import functions as F
eventHubConf = {}
pos = {
    "offset": -1,
    "seqNo": -1,
    "enqueuedTime": None,
    "isInclusive": True
    }
    
connstring = "<EVENT_HUB_CONNECTION_STRING"
eventHubConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connstring)

from pyspark.sql.functions import col, from_json

# Step 1: Read the data from Event Hub (stream or batch)
# Use the 'earliest' option to get the oldest data in batch mode
df = spark.read.format("eventhubs") \
    .options(**eventHubConf) \
    .option("eventhubs.startingPosition", json.dumps(pos)) \
    .load()

# Step 2: Decode the body field from binary to string
df_decoded = df.withColumn("decoded_body", F.col("body").cast("string"))


# Step 3: Parse the JSON content using the schema
df_with_json = df_decoded.withColumn("parsed_body", F.from_json(F.col("decoded_body"), schema))

df_with_json.select(
    "parsed_body.safetyreportid",
    "parsed_body.transmissiondate",
    "parsed_body.patient.patientonsetage",
    "parsed_body.patient.reaction"
).show(truncate=False)

# Step 4: Select the parsed JSON and show the results
#f_with_json.select("parsed_body").show(truncate=False)

+--------------+----------------+---------------+---------------------------------------------------+
|safetyreportid|transmissiondate|patientonsetage|reaction                                           |
+--------------+----------------+---------------+---------------------------------------------------+
|5801206-7     |20090109        |26             |[{DRUG ADMINISTRATION ERROR}, {OVERDOSE}]          |
|10003300      |20141002        |77             |[{Vomiting}, {Diarrhoea}, {Arthralgia}, {Headache}]|
|10003301      |20141002        |NULL           |[{Dyspepsia}, {Renal impairment}]                  |
|10003302      |20141002        |NULL           |[{Drug ineffective}]                               |
|10003304      |20141212        |NULL           |[{Drug hypersensitivity}]                          |
+--------------+----------------+---------------+---------------------------------------------------+



### Performing some transformations and returning the following data
- safetyreportid: The ID of the safety report.
- transmissiondate_converted: The transmissiondate in a proper date format.
- patientonsetage: The age of the patient at the onset of symptoms.
- reaction.reactionmeddrapt: The type of reaction.
- is_overdose: A flag indicating whether the reaction was an overdose ("Yes" or "No").
- drug_name: The name of the drug administered.
- drug_characterization: The characterization of the drug (e.g., whether it's a primary or secondary drug).                                                       
- drug_indication: The indication for the drug (e.g., condition it was prescribed for)

In [0]:
from pyspark.sql.functions import col, explode, to_date, when
from pyspark.sql.types import StringType

# Step 3: Extract the necessary fields (including reaction and drug information)
df_transformed = df_with_json.select(
    "parsed_body.safetyreportid",
    "parsed_body.transmissiondate",
    "parsed_body.serious",
    "parsed_body.patient.patientonsetage",
    "parsed_body.patient.patientsex",
    "parsed_body.patient.reaction",
    "parsed_body.patient.drug",  # Added drug information
    "parsed_body.receivedate"
)

# Step 4: Flatten the 'reaction' and 'drug' arrays
df_flat_reaction = df_transformed.withColumn("reaction", explode(col("reaction")))
df_flat_drug = df_flat_reaction.withColumn("drug", explode(col("drug")))

# Step 5: Filter for serious cases only
df_serious_cases = df_flat_drug.filter(col("serious") == "1")

# Step 6: Convert 'transmissiondate' to date format
df_final = df_serious_cases.withColumn(
    "transmissiondate_converted", to_date(col("transmissiondate"), "yyyyMMdd")
)

# Step 7: Add column to indicate if the reaction was an overdose
df_final = df_final.withColumn(
    "is_overdose", when(col("reaction.reactionmeddrapt") == "OVERDOSE", "Yes").otherwise("No")
)

# Step 8: Extract drug details (medicinalproduct, drugcharacterization, and drugindication)
df_final = df_final.withColumn(
    "drug_name", col("drug.medicinalproduct")
).withColumn(
    "drug_characterization", col("drug.drugcharacterization")
).withColumn(
    "drug_indication", col("drug.drugindication")
)

# Step 9: Finalize the DataFrame with the required columns
df_final = df_final.select(
    "safetyreportid",
    "transmissiondate_converted",
    "patientonsetage",
    "patientsex",
    "reaction.reactionmeddrapt",
    "drug_name",
    "drug_characterization",
    "drug_indication",
    "is_overdose"
)

# Step 9: Show the transformed data (this step can be modified as needed)
df_final.show(truncate=False)

+--------------+--------------------------+---------------+----------+-------------------------+-------------+---------------------+-----------------------------------+-----------+
|safetyreportid|transmissiondate_converted|patientonsetage|patientsex|reactionmeddrapt         |drug_name    |drug_characterization|drug_indication                    |is_overdose|
+--------------+--------------------------+---------------+----------+-------------------------+-------------+---------------------+-----------------------------------+-----------+
|5801206-7     |2009-01-09                |26             |1         |DRUG ADMINISTRATION ERROR|DURAGESIC-100|1                    |DRUG ABUSE                         |No         |
|5801206-7     |2009-01-09                |26             |1         |OVERDOSE                 |DURAGESIC-100|1                    |DRUG ABUSE                         |Yes        |
|10003300      |2014-10-02                |77             |2         |Vomiting                 

### Uploading data to the SQL Database

In [0]:
## upload the data to the sql db

# Define the connection properties for Azure SQL Database
jdbc_url = "jdbc:sqlserver://az-dev-server.database.windows.net:1433;database=az-dev-hub"
connection_properties = {
    "user": "USERNAME",
    "password": "PASSWORD",
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Specify the table where data will be saved
target_table = "PatientAdverseEventReport"

# Save the transformed DataFrame to the Azure SQL Database
df_final.write.jdbc(url=jdbc_url, table=target_table, mode="overwrite", properties=connection_properties)
