#### Create Silver Layer (Medallion Architecture) Tables from raw unzipped JSON files for Human Drug Adverse Event dataset

- This Notebook is Step2 to create flattened table (Silver Layer) from raw JSON files downloaded using the first Notebook in the sample.
- On successful execution of the Notebook you will have three Delta Parquet tables available in the Lakehouse: fda_drug_event, fda_drug_event_patient_drug and fda_drug_event_patient_reaction
- The JSON from original dataset is deeply nested and not very conducive for analytics hence this Notebook flattens the JSON to a few levels but still there a few columns which have JSON, the idea is to further build out Gold Layer tables based on use cases
- Delta Parquet format is highly compressed so size of files for the three tables is going to be in the range of 15-20 GB
- Hence the tables are created you can delete the raw files if you prefer to save costs




In [None]:
%%sql

--delete the tables if they exist
DROP TABLE IF EXISTS fda_drug_event;
DROP TABLE IF EXISTS fda_drug_event_patient_drug;
DROP TABLE IF EXISTS fda_drug_event_patient_reaction;

In [None]:
from pyspark.sql.functions import col, explode_outer
from pyspark.sql.types import ArrayType, StructType

#define the function to flatten json columns in dataframe
def flatten_dataframe(df):
    fields = df.schema.fields
    field_names = [field.name for field in fields]

    for i in range(len(fields)):
        field = fields[i]
        field_type = field.dataType
        field_name = field.name

        if isinstance(field_type, StructType):
            child_field_names = [f"{field_name}.{child_name}" for child_name in field_type.fieldNames()]
            new_field_names = [name for name in field_names if name != field_name] + child_field_names
            renamed_cols = [col(name).alias(name.replace(".", "_")) for name in new_field_names]
            exploded_df = df.select(*renamed_cols)
            return flatten_dataframe(exploded_df)

    return df

In [None]:
#read unzipped json files into dataframe
df = spark.read.option("multiline", "true").json("Files/fda_ds_unzipped/*.json")
df.printSchema()

In [None]:
from pyspark.sql.functions import explode

#results column is an array so using explode function to create row for each element of the array
df= df.withColumn("results", explode("results"))
df = df.drop("meta")
df.printSchema()

In [None]:
#function to help with renaming of columns
def renameColumnPrefix(df, prefix, replaceValue = ""):
    columns = df.columns
    rename_dict = {col: col.replace(prefix, '') for col in columns if col.startswith(prefix)}

    # Rename the columns
    df = df.select([df[col].alias(rename_dict.get(col, col)) for col in df.columns])
    return df

In [None]:
#flatten the struct columns
df = flatten_dataframe(df)
df.printSchema()

In [None]:
#renaming the columns to remove results_ prefix
df = renameColumnPrefix(df, "results_")
df.printSchema()

In [None]:
#create a separate dataframe for Patient reaction information, flattening, exploding and renaming as needed
patient_reaction_df = df.select("safetyreportid",  "patient_reaction")
patient_reaction_df = patient_reaction_df.withColumn("patient_reaction", explode_outer("patient_reaction"))
patient_reaction_df = flatten_dataframe(patient_reaction_df)
patient_reaction_df = renameColumnPrefix(patient_reaction_df,'patient_reaction_')
#display(patient_reaction_df)

In [None]:
#save patient reation table
patient_reaction_df.write.mode("overwrite").format("delta").save("Tables/fda_drug_event_patient_reaction")

In [None]:
#remove patient reaction column as the table has already 
df = df.drop("patient_reaction")
df.printSchema()

In [None]:
#create separate dataframe for Patient Drug records, explodeing, flattening and renaming as needed
patient_drug_df = df.select("safetyreportid",  "patient_drug")
patient_drug_df = patient_drug_df.withColumn("patient_drug", explode_outer("patient_drug"))
patient_drug_df = flatten_dataframe(patient_drug_df)
patient_drug_df = renameColumnPrefix(patient_drug_df,'patient_drug_')
#display(patient_drug_df)

In [None]:
#save patient drug table
patient_drug_df.write.mode("overwrite").format("delta").save("Tables/fda_drug_event_patient_drug")

In [None]:
#drop patient drug column as it is no longer needed
df = df.drop("patient_drug")
df.printSchema()

In [None]:
#save fda_drug_event tbable
df.write.mode("overwrite").format("delta").save("Tables/fda_drug_event")

In [None]:
%%sql

select count(*) from fda_drug_event;
select count(*) from fda_drug_event_patient_drug;
select count(*) from fda_drug_event_patient_reaction;

In [None]:
%%sql

select medicinalproduct, count(*) as num_adverse_events 
from fda_drug_event_patient_drug 
group by medicinalproduct
order by num_adverse_events desc

In [None]:
%%sql

select * from fda_drug_event_patient_drug;