In [0]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='healthscope')]

In [0]:
dbutils.secrets.list(scope = 'healthscope')

[SecretMetadata(key='healthdata')]

In [0]:
spark.conf.set("fs.azure.account.key.healthdata.dfs.core.windows.net",
    dbutils.secrets.get(scope="healthscope", key="healthdata"))

In [0]:
dbutils.fs.ls("abfss://rawdata@healthdata.dfs.core.windows.net")

[FileInfo(path='abfss://rawdata@healthdata.dfs.core.windows.net/Patient_records.csv', name='Patient_records.csv', size=5110, modificationTime=1710596297000),
 FileInfo(path='abfss://rawdata@healthdata.dfs.core.windows.net/claims.csv', name='claims.csv', size=11438, modificationTime=1710615336000),
 FileInfo(path='abfss://rawdata@healthdata.dfs.core.windows.net/disease.csv', name='disease.csv', size=1489, modificationTime=1710596297000),
 FileInfo(path='abfss://rawdata@healthdata.dfs.core.windows.net/group.csv', name='group.csv', size=4390, modificationTime=1710596297000),
 FileInfo(path='abfss://rawdata@healthdata.dfs.core.windows.net/hospital.csv', name='hospital.csv', size=1328, modificationTime=1710596297000),
 FileInfo(path='abfss://rawdata@healthdata.dfs.core.windows.net/subgroup.csv', name='subgroup.csv', size=561, modificationTime=1710596297000),
 FileInfo(path='abfss://rawdata@healthdata.dfs.core.windows.net/subscriber.csv', name='subscriber.csv', size=12061, modificationTime=1

In [0]:
subscriber_data = spark.read.csv("abfss://rawdata@healthdata.dfs.core.windows.net/subscriber.csv", header = True, inferSchema = True)

In [0]:
#subscriber_data.show(5)

In [0]:
# null values, duplicates

In [0]:
#subscriber_data.select([count(when(isnan(c) | col(c).isNull(),c)).alias(c) for c in subscriber_data.columns]).show()

In [0]:
#subscriber_data.groupBy(["sub_id","first_name","last_name","Street","Birth_date","Gender","Phone","Country","City","Zip Code","Subgrp_id","Elig_ind","eff_date","term_date"]).count().where("count > 1").show()

In [0]:
subscriber_data = subscriber_data.drop("Phone")

In [0]:
subscriber_data = subscriber_data.dropDuplicates()

In [0]:
subscriber_data = subscriber_data.fillna({"Elig_ind":"N", "first_name":"Guest/NA"})

In [0]:
#find out why Subgrp_id is null
#subscriber_data.select("*").where(col("Subgrp_id").isNull()).show()

In [0]:
#fill Subgrp_id with hardcoded value  subgroupid value from an other table

In [0]:
subscriber_data = subscriber_data.withColumn("subgrp_id", when(col("sub_id")=="SUBID10022", "S110").otherwise(col("subgrp_id")))
subscriber_data = subscriber_data.withColumn("subgrp_id", when(col("sub_id")=="SUBID10049", "S107").otherwise(col("subgrp_id")))

In [0]:
subscriber_data = subscriber_data.withColumn("subcriber_age", (months_between(current_date(), col("Birth_date"))/12).cast("int"))

In [0]:
subscriber_data.show(5, False)

+----------+------------+---------+--------------+----------+------+-------+----------------+--------+---------+--------+----------+----------+-------------+
|sub_id    |first_name  |last_name|Street        |Birth_date|Gender|Country|City            |Zip Code|subgrp_id|Elig_ind|eff_date  |term_date |subcriber_age|
+----------+------------+---------+--------------+----------+------+-------+----------------+--------+---------+--------+----------+----------+-------------+
|SUBID10093|Chandavarman|Singh    |Sarkar Circle |1997-05-10|Others|India  |Navi Mumbai     |83240   |S110     |N       |2017-05-10|2022-08-27|26           |
|SUBID10068|Guest/NA    |Mishra   |Bath Nagar    |1927-02-26|Female|India  |Ambarnath       |766224  |S110     |N       |1947-02-26|1948-03-30|97           |
|SUBID1038 |Guest/NA    |Thakur   |Rastogi Street|1955-04-07|Female|India  |Vijayawada      |438940  |S104     |N       |1975-04-07|1982-01-25|68           |
|SUBID10095|Ekaaksh     |Rai      |Bansal Ganj   |19

In [0]:
# age is showing as null need to check in databricks if this issue is fixed or not bith_date column is not coming as date type.

In [0]:
#writing clean dataframe into stagging
#boilerplate code(standard code for ceratin operation)
stagging_path = "abfss://stagedata@healthdata.dfs.core.windows.net"
subscriber_data.coalesce(1).write.mode("append").option("header","true").format("com.databricks.spark.csv").save(stagging_path)
files = dbutils.fs.ls(stagging_path)
output_file = [x for x in files if x.name.startswith("part-")] #to read only part files
dbutils.fs.mv(output_file[0].path,"%s/subscriberstagging.csv" % stagging_path)
files_to_delete = [file.path for file in files if file.name.startswith("_")]
for file_path in files_to_delete:
                  dbutils.fs.rm(file_path, True)