In [0]:
## mounting storage account
# Variables
storage_account_name = "nationalhospitaldatalake"
container_name = "nationalhospital"
storage_account_key = dbutils.secrets.get(
    scope="datalakekey",
    key="datalakekey"
)

# Mount Point
mount_point = f"/mnt/{container_name}"

# Mount the storage account
dbutils.fs.mount(
    source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
    mount_point=mount_point,
    extra_configs={
        f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": storage_account_key }
)

print(f"Mounted {mount_point}")

In [0]:
## Data Reading
patients_df = spark.read.csv(f"{mount_point}/raw/patients_data.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)

imaging_df = spark.read.csv(f"{mount_point}/raw/imaging_results_data.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)

lab_df = spark.read.csv(f"{mount_point}/raw/lab_results_data.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)

med_records_df = spark.read.csv(f"{mount_point}/raw/medical_records_data.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)

trials_df = spark.read.csv(f"{mount_point}/raw/clinical_trials_data.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)

participants_df = spark.read.csv(f"{mount_point}/raw/trial_participants_data.csv", header=True, inferSchema=True, quote='"', escape='"', multiLine=True)



In [0]:
## Cleaning and Transformation
from pyspark.sql.functions import *


##patients data
processed_patients_df = patients_df.dropDuplicates() \
    .na.drop() \
    .withColumn("loaded_date", current_date()) \
    .write.mode("overwrite").csv(f"{mount_point}/processed_data/patients_data_processed",
    header=True)

##medical record data
processed_med_records_df = med_records_df.dropDuplicates() \
    .na.drop() \
    .withColumn("loaded_date", current_date()) \
    .write.mode("overwrite").csv(f"{mount_point}/processed_data/med_records_data_processed",
    header=True)

##imaging data
processed_imaging_df = imaging_df.dropDuplicates() \
    .na.drop() \
    .withColumn("loaded_date", current_date()) \
    .write.mode("overwrite").csv(f"{mount_point}/processed_data/imaging_data_processed",
    header=True)


##lab data
processed_lab_df = lab_df.dropDuplicates() \
        .na.drop() \
    .withColumn("loaded_date", current_date()) \
    .write.mode("overwrite").csv(f"{mount_point}/processed_data/lab_data_processed",
    header=True)

##clinical trials data
processed_trials_df = trials_df.dropDuplicates() \
    .na.drop() \
    .withColumn("loaded_date", current_date()) \
    .write.mode("overwrite").csv(f"{mount_point}/processed_data/trials_data_processed",
    header=True)

##participants data
processed_participants_df = participants_df.dropDuplicates() \
    .na.drop() \
    .withColumn("loaded_date", current_date()) \
    .write.mode("overwrite").csv(f"{mount_point}/processed_data/participants_data_processed",
    header=True)







In [0]:
#unmount the storage
dbutils.fs.unmount(mount_point)

## This was a fantastic project with databricks