In [0]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='optumscope'), SecretScope(name='optumscope1')]

In [0]:
dbutils.secrets.list(scope = "optumscope1")

[SecretMetadata(key='optumkeysstore')]

In [0]:
spark.conf.set(
    "fs.azure.account.key.ksrdatadlsa.dfs.core.windows.net",
    dbutils.secrets.get(scope="optumscope1", key="optumkeysstore"))

In [0]:
display(dbutils.fs.ls("abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData"))
                      

path,name,size,modificationTime
abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/Claims.csv,Claims.csv,5766,1715061116000
abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/Hospital.csv,Hospital.csv,1528,1715061121000
abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/Patient_records.csv,Patient_records.csv,5110,1715018546000
abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/disease.csv,disease.csv,1489,1715018546000
abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/group.csv,group.csv,4390,1715018546000
abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/subgroup.csv,subgroup.csv,561,1715018546000
abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/subscriber.csv,subscriber.csv,12061,1715018546000


In [0]:
Patient_Records = spark.read.csv("abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/RawData/Patient_records.csv",header=True, inferSchema=True,escape='"')

In [0]:
Patient_Records.show(5,False)

+----------+------------+--------------+------------------+--------------+--------------+------------+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone |disease_name  |city        |hospital_id|
+----------+------------+--------------+------------------+--------------+--------------+------------+-----------+
|187158    |Harbir      |Female        |1924-06-30        |+91 0112009318|Galactosemia  |Rourkela    |H1001      |
|112766    |Brahmdev    |Female        |1948-12-20        |+91 1727749552|Bladder cancer|Tiruvottiyur|H1016      |
|199252    |Ujjawal     |Male          |1980-04-16        |+91 8547451606|Kidney cancer |Berhampur   |H1009      |
|133424    |Ballari     |Female        |1969-09-25        |+91 0106026841|Suicide       |Bihar Sharif|H1017      |
|172579    |Devnath     |Female        |1946-05-01        |+91 1868774631|Food allergy  |Bidhannagar |H1019      |
+----------+------------+--------------+------------------+--------------+------

In [0]:
# to find the Data types columns present in the table
Patient_Records.printSchema()

root
 |-- Patient_id: integer (nullable = true)
 |-- Patient_name: string (nullable = true)
 |-- patient_gender: string (nullable = true)
 |-- patient_birth_date: date (nullable = true)
 |-- patient_phone: string (nullable = true)
 |-- disease_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- hospital_id: string (nullable = true)



In [0]:
# to find Rows count and Cloumn Count
Patient_Records.count(),len(Patient_Records.columns)

(70, 8)

In [0]:
# drop duplicates
Patient_Records = Patient_Records.dropDuplicates()

In [0]:
# to verify Rows count and Cloumn Count
Patient_Records.count(),len(Patient_Records.columns)

(70, 8)

In [0]:
# To find null values
from pyspark.sql.functions import count, col, isnan, when
Patient_Records.select([count(when(col(c).isNull(), c)).alias(c) for c in Patient_Records.columns]).show()



+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone|disease_name|city|hospital_id|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|         0|          17|             0|                 0|            2|           0|   0|          0|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+



In [0]:
Patient_Records = Patient_Records.fillna("Guest",subset=["Patient_name"])

In [0]:
# drop phone number column
Patient_Records = Patient_Records.fillna("NA", subset="patient_phone")


In [0]:
Patient_Records.select("*").show()

+----------+------------+--------------+------------------+--------------+-----------------+-----------+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date| patient_phone|     disease_name|       city|hospital_id|
+----------+------------+--------------+------------------+--------------+-----------------+-----------+-----------+
|    189996|       Ekant|          Male|        1943-08-13|+91 7686951174|          Measles| Berhampore|      H1003|
|    109251|   Anjushree|          Male|        1976-07-04|+91 5322869455|          Choking|  Ghaziabad|      H1001|
|    121783|     Paridhi|        Female|        1959-03-27|+91 2139280879|   Bladder cancer|   Jabalpur|      H1013|
|    194166|       Guest|          Male|        1946-10-17|+91 9887324437|Colorectal cancer|  Baranagar|      H1015|
|    156434|      Pushti|        Female|        1935-10-15|+91 7093722203|              Flu|      Morbi|      H1019|
|    140394|      Jitesh|          Male|        1983-02-03|+91 6

In [0]:
from pyspark.sql.functions import year, current_date, to_date
# Convert the 'patient_birth_date' column to a DateType
Patient_Records = Patient_Records.withColumn("patient_birth_date", to_date("patient_birth_date"))
# Calculate the age based on the birth date
Patient_Records = Patient_Records.withColumn("age", year(current_date()) - year("patient_birth_date"))

In [0]:
Patient_Records.select("*").show()

+----------+------------+--------------+------------------+--------------+-----------------+-----------+-----------+---+
|Patient_id|Patient_name|patient_gender|patient_birth_date| patient_phone|     disease_name|       city|hospital_id|age|
+----------+------------+--------------+------------------+--------------+-----------------+-----------+-----------+---+
|    189996|       Ekant|          Male|        1943-08-13|+91 7686951174|          Measles| Berhampore|      H1003| 81|
|    109251|   Anjushree|          Male|        1976-07-04|+91 5322869455|          Choking|  Ghaziabad|      H1001| 48|
|    121783|     Paridhi|        Female|        1959-03-27|+91 2139280879|   Bladder cancer|   Jabalpur|      H1013| 65|
|    194166|       Guest|          Male|        1946-10-17|+91 9887324437|Colorectal cancer|  Baranagar|      H1015| 78|
|    156434|      Pushti|        Female|        1935-10-15|+91 7093722203|              Flu|      Morbi|      H1019| 89|
|    140394|      Jitesh|       

In [0]:
Patient_Records = Patient_Records.drop(col("patient_birth_date"))

In [0]:
Patient_Records.select("*").show()

+----------+------------+--------------+--------------+-----------------+-----------+-----------+---+
|Patient_id|Patient_name|patient_gender| patient_phone|     disease_name|       city|hospital_id|age|
+----------+------------+--------------+--------------+-----------------+-----------+-----------+---+
|    189996|       Ekant|          Male|+91 7686951174|          Measles| Berhampore|      H1003| 81|
|    109251|   Anjushree|          Male|+91 5322869455|          Choking|  Ghaziabad|      H1001| 48|
|    121783|     Paridhi|        Female|+91 2139280879|   Bladder cancer|   Jabalpur|      H1013| 65|
|    194166|       Guest|          Male|+91 9887324437|Colorectal cancer|  Baranagar|      H1015| 78|
|    156434|      Pushti|        Female|+91 7093722203|              Flu|      Morbi|      H1019| 89|
|    140394|      Jitesh|          Male|+91 6515468035|          Anthrax| Karimnagar|      H1010| 41|
|    114241|       Guest|        Female|+91 4146391938|    Breast cancer|  Ghaziab

In [0]:
from pyspark.sql.functions import col, asc

def analyze_columns(df):
    columns = df.columns
    for column in columns:
        distinct_count = df.select(column).distinct().count()
        null_count = df.filter(col(column).isNull()).count()
        numeric_count = df.filter(col(column).rlike("^[0-9]")).count()
        text_count = df.filter(col(column).rlike("^[A-Za-z]")).count()
        special_char_count = df.filter(~col(column).rlike("^[A-Za-z0-9]")).count()
        contains_string = df.filter(col(column).rlike("[A-Za-z]")).count()
        contains_number = df.filter(col(column).rlike("[0-9]")).count()
        contains_spl_char = df.filter(col(column).rlike("[^A-Za-z0-9]")).count()

        print("Distinct Count of {} column: {}".format(column, distinct_count))
        print("Null values Count of {} column: {}\n".format(column, null_count))
        print("{} Column Start With Number: {}".format(column, numeric_count))
        print("{} Column Start With String: {}".format(column, text_count))
        print("{} Column Start With Spl.Cha: {}".format(column, special_char_count))
        print("{} Column contains string values: {}".format(column, contains_string))
        print("{} Column contains number values: {}".format(column, contains_number))
        print("{} Column contains Spl.Char values: {}\n".format(column, contains_spl_char))
        
# Example usage:
analyze_columns(Patient_Records)


Distinct Count of Patient_id column: 70
Null values Count of Patient_id column: 0

Patient_id Column Start With Number: 70
Patient_id Column Start With String: 0
Patient_id Column Start With Spl.Cha: 0
Patient_id Column contains string values: 0
Patient_id Column contains number values: 70
Patient_id Column contains Spl.Char values: 0

Distinct Count of Patient_name column: 53
Null values Count of Patient_name column: 0

Patient_name Column Start With Number: 0
Patient_name Column Start With String: 70
Patient_name Column Start With Spl.Cha: 0
Patient_name Column contains string values: 70
Patient_name Column contains number values: 0
Patient_name Column contains Spl.Char values: 0

Distinct Count of patient_gender column: 2
Null values Count of patient_gender column: 0

patient_gender Column Start With Number: 0
patient_gender Column Start With String: 70
patient_gender Column Start With Spl.Cha: 0
patient_gender Column contains string values: 70
patient_gender Column contains number 

In [0]:
# Define the output container path
output_container_path = "abfss://optumdata@ksrdatadlsa.dfs.core.windows.net/StatgingData"

# Write the DataFrame to the output container path
Patient_Records.coalesce(1).write.mode("overwrite").option("header", "true").format("com.databricks.spark.csv").save(output_container_path)

# List files in the output container path
files = dbutils.fs.ls(output_container_path)

# Filter out the output file
output_file = [x for x in files if x.name.startswith("part-")]

# Move the output file to a specific location
dbutils.fs.mv(output_file[0].path, "%s/stg_Patient_Records.csv" % output_container_path)
