In [0]:
import pyspark
from pyspark.sql.functions import *

In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='healthcarescope')]

In [0]:
dbutils.secrets.list(scope = 'healthcarescope')

[SecretMetadata(key='blobaccesskey')]

In [0]:

secret_name = dbutils.secrets.get(scope = "healthcarescope", key = "blobaccesskey")

In [0]:
print(secret_name)

[REDACTED]


In [0]:
spark.conf.set(
    "fs.azure.account.key.healthcareprojectblob.dfs.core.windows.net",
    dbutils.secrets.get(scope = "healthcarescope", key = "blobaccesskey"))


In [0]:
display(dbutils.fs.ls("abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net"))

path,name,size,modificationTime
abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/Patient_records.csv,Patient_records.csv,5110,1725312200000
abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/claims.csv,claims.csv,5766,1725461306000
abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/disease.csv,disease.csv,1489,1725312200000
abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/group.csv,group.csv,4390,1725312200000
abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/hospital.csv,hospital.csv,1328,1725312200000
abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/subgroup.csv,subgroup.csv,561,1725312200000
abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/subscriber.csv,subscriber.csv,12061,1725312201000


In [0]:
# # Mount the Azure Blob Storage container as a DBFS path
# dbutils.fs.mount(
#   source="wasbs://rawhealthdata@healthcareprojectblob.blob.core.windows.net",
#   mount_point="/mnt/rawhealthdata",
#   extra_configs={
#     "fs.azure.account.key.healthcareprojectblob.blob.core.windows.net": "0izVsgjt8yavZcTHY3jKLiEiVe0Nu7jqnlYlYXCi3eAF/WsbNH1eY0Cvvd59kRgOIpunsgWDknLx+ASteulWxw=="
#   }
# )

# # List the files in the mounted DBFS path
# dbutils.fs.ls("/mnt/rawhealthdata")

In [0]:
# Unmount the Azure Blob Storage container
# dbutils.fs.unmount("/mnt/rawhealthdata")

In [0]:
data = spark.read.csv("abfss://rawhealthdata@healthcareprojectblob.dfs.core.windows.net/claims.csv", header=True, inferSchema=True)

In [0]:
display(data)

claim_id,patient_id,disease_name,SUB_ID,Claim_Or_Rejected,claim_type,claim_amount,claim_date
0,187158,Galactosemia,SUBID1000,N,claims of value,79874,1949-03-14
1,112766,Bladder cancer,SUBID10001,,claims of policy,151142,1970-03-16
2,199252,Kidney cancer,SUBID10002,,claims of value,59924,2008-02-03
3,133424,Suicide,SUBID10003,,claims of fact,143120,1995-02-08
4,172579,Food allergy,SUBID10004,Y,claims of value,168634,1967-05-23
5,171320,Whiplash,SUBID10005,,claims of policy,64840,1991-10-04
6,107794,Sunbathing,SUBID1006,N,claims of fact,26800,1991-03-26
7,130339,Drug consumption,SUBID10007,,claims of value,177186,1946-09-05
8,110377,Dengue,SUBID10008,N,claims of fact,141123,1966-06-20
9,149367,Head banging,SUBID10009,N,claims of value,88540,1945-12-29


In [0]:
# to do - patient name, null values/check for duplicates.

In [0]:
data.columns

['claim_id',
 'patient_id',
 'disease_name',
 'SUB_ID',
 'Claim_Or_Rejected',
 'claim_type',
 'claim_amount',
 'claim_date']

In [0]:
data = data.replace("NaN", None)

In [0]:
data.groupby(['claim_id', 'patient_id', 'disease_name', 'SUB_ID', 'Claim_Or_Rejected', 'claim_type', 'claim_amount', 'claim_date']).count().where("count > 1").show(15, False)

+--------+----------+------------+------+-----------------+----------+------------+----------+-----+
|claim_id|patient_id|disease_name|SUB_ID|Claim_Or_Rejected|claim_type|claim_amount|claim_date|count|
+--------+----------+------------+------+-----------------+----------+------------+----------+-----+
+--------+----------+------------+------+-----------------+----------+------------+----------+-----+



In [0]:
# check whether the null values are removed or not.
null_counts = data.select(
    [count(when(isnull(col(c)) | col(c).isNull(), c)).alias(c) for c in data.columns]
)

# Display the DataFrame
display(null_counts)

claim_id,patient_id,disease_name,SUB_ID,Claim_Or_Rejected,claim_type,claim_amount,claim_date
0,0,0,0,30,0,0,0


In [0]:
data = data.fillna({"Claim_Or_Rejected" : "N"})

In [0]:
display(data)

claim_id,patient_id,disease_name,SUB_ID,Claim_Or_Rejected,claim_type,claim_amount,claim_date
0,187158,Galactosemia,SUBID1000,N,claims of value,79874,1949-03-14
1,112766,Bladder cancer,SUBID10001,N,claims of policy,151142,1970-03-16
2,199252,Kidney cancer,SUBID10002,N,claims of value,59924,2008-02-03
3,133424,Suicide,SUBID10003,N,claims of fact,143120,1995-02-08
4,172579,Food allergy,SUBID10004,Y,claims of value,168634,1967-05-23
5,171320,Whiplash,SUBID10005,N,claims of policy,64840,1991-10-04
6,107794,Sunbathing,SUBID1006,N,claims of fact,26800,1991-03-26
7,130339,Drug consumption,SUBID10007,N,claims of value,177186,1946-09-05
8,110377,Dengue,SUBID10008,N,claims of fact,141123,1966-06-20
9,149367,Head banging,SUBID10009,N,claims of value,88540,1945-12-29


In [0]:
data.select("*").filter(col("SUB_ID").isin(["SUBID10022", "SUBID10049"])).show() 

+--------+----------+--------------+----------+-----------------+---------------+------------+----------+
|claim_id|patient_id|  disease_name|    SUB_ID|Claim_Or_Rejected|     claim_type|claim_amount|claim_date|
+--------+----------+--------------+----------+-----------------+---------------+------------+----------+
|      22|    134184|           Flu|SUBID10022|                Y|claims of value|       34771|1948-05-23|
|      49|    121783|Bladder cancer|SUBID10049|                N| claims of fact|      159815|1983-06-20|
+--------+----------+--------------+----------+-----------------+---------------+------------+----------+



In [0]:
# # Now we need to write this data into stagging area for the next step.
# # boilerplate code (standard code for certain operation)

# Define the output staging path
output_stagging_path = "abfss://stagginglayerhealthdata@healthcareprojectblob.dfs.core.windows.net"

# Write the data to the staging area
data.coalesce(1).write.mode("append").format("com.databricks.spark.csv").option("header", "true").option("format", "csv").save(output_stagging_path)

# List all files in the output staging path
files = dbutils.fs.ls(output_stagging_path)

# Identify part files and non-part files
part_files = [x for x in files if x.name.startswith("part-")]
non_part_files = [x for x in files if x.name.startswith("_")]

# Move the part file to the desired location
if part_files:
    dbutils.fs.mv(part_files[0].path, f"{output_stagging_path}/claimsstagging.csv")

# Remove non-part files
for file in non_part_files:
    dbutils.fs.rm(file.path)