In [0]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *

In [0]:
# lists scopes created
dbutils.secrets.listScopes()

[SecretScope(name='databricksscope')]

In [0]:
dbutils.secrets.list(scope='databricksscope')

[SecretMetadata(key='adlskey')]

In [0]:
# Boilerplate code to configure 
spark.conf.set("fs.azure.account.key.optumsadllssa.dfs.core.windows.net",dbutils.secrets.get(scope="databricksscope", key="adlskey"))


In [0]:
# lists files present in the optumrawdata
display(dbutils.fs.ls("abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net"))

path,name,size,modificationTime
abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/Claims.csv,Claims.csv,5766,1716728851000
abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/Patient_records.csv,Patient_records.csv,5110,1716718820000
abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/disease.csv,disease.csv,1489,1716718820000
abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/group.csv,group.csv,4390,1716718820000
abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/hospital.csv,hospital.csv,1528,1716728950000
abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/subgroup.csv,subgroup.csv,561,1716718820000
abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/subscriber.csv,subscriber.csv,12061,1716718820000


In [0]:
# loading the subgroup data in variable called data  
data = spark.read.csv('abfss://optumrawdataset@optumsadllssa.dfs.core.windows.net/subgroup.csv',header=True)

In [0]:
# display of data
data.show(5,False)

+---------+-------------------+---------------+----------------------------------+
|subgrp_sk|subgrp_name        |monthly_premium|subgrp_id                         |
+---------+-------------------+---------------+----------------------------------+
|S101     |Deficiency Diseases|3000           |GRP101,GRP105                     |
|S102     |Accident           |1000           |GRP110,GRP150,GRP136              |
|S103     |Physiology         |2000           |GRP122,GRP108,GRP138,GRP148       |
|S104     |Therapy            |1500           |GRP103,GRP113,GRP123,GRP133,GRP143|
|S105     |Allergies          |2300           |GRP153,GRP104,GRP114,GRP124       |
+---------+-------------------+---------------+----------------------------------+
only showing top 5 rows



In [0]:
# Number of rows
data.count()

10

In [0]:
# Removing duplicate Rows
data = data.dropDuplicates()

In [0]:
data.count()

10

In [0]:
# splitting a string column into an array of substrings based on a delimiter
data = data.withColumn("subgrp_id",split(data['subgrp_id'],","))

In [0]:
data.show(5)

+---------+-----------+---------------+---------+
|subgrp_sk|subgrp_name|monthly_premium|subgrp_id|
+---------+-----------+---------------+---------+
|     S110|      Viral|           1000|   GRP143|
|     S110|      Viral|           1000|   GRP147|
|     S110|      Viral|           1000|   GRP126|
|     S107|     Cancer|           3200|   GRP151|
|     S107|     Cancer|           3200|   GRP131|
+---------+-----------+---------------+---------+
only showing top 5 rows



In [0]:
# "explode" or "flatten" arrays or maps into separate rows.
data = data.withColumn("subgrp_id",explode(data['subgrp_id']))

In [0]:
data.show(5)

+---------+-----------+---------------+---------+
|subgrp_sk|subgrp_name|monthly_premium|subgrp_id|
+---------+-----------+---------------+---------+
|     S110|      Viral|           1000|   GRP143|
|     S110|      Viral|           1000|   GRP147|
|     S110|      Viral|           1000|   GRP126|
|     S107|     Cancer|           3200|   GRP151|
|     S107|     Cancer|           3200|   GRP131|
+---------+-----------+---------------+---------+
only showing top 5 rows



In [0]:
# display of null values in all columns
data.select([count(when(isnan(c)|col(c).isNull(),c)).alias(c) for c in data.columns]).show(5,False)

+---------+-----------+---------------+---------+
|subgrp_sk|subgrp_name|monthly_premium|subgrp_id|
+---------+-----------+---------------+---------+
|0        |0          |0              |0        |
+---------+-----------+---------------+---------+



In [0]:
# finding duplicates 
data.groupby(data.columns).count().where("count>1").show(5,False)

+---------+-----------+---------------+---------+-----+
|subgrp_sk|subgrp_name|monthly_premium|subgrp_id|count|
+---------+-----------+---------------+---------+-----+
+---------+-----------+---------------+---------+-----+



In [0]:
# boiler plate to push code from databricks to adls
output_container_path = "abfss://optumstagingdata@optumsadllssa.dfs.core.windows.net"
output_blob_folder ="optumstagingdata"
data.coalesce(1).write.mode("overwrite").option("header","true").format("com.databricks.spark.csv").save(output_blob_folder)
files = dbutils.fs.ls(output_blob_folder)
output_file = [x for x in files if x.name.startswith("part-")]
dbutils.fs.mv(output_file[0].path,"%s/subgroupstagingdata.csv"%output_container_path)

True