In [250]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType,IntegerType, DateType
import cassandra
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

In [275]:
spark = SparkSession\
    .builder\
    .appName("Healthcare Data Analysis Incremental Load in Cassandra Using PySpark")\
    .enableHiveSupport()\
    .getOrCreate()

data directory in hdfs
copy files into data directory,
read files,
create archive directory
move files

In [252]:
csv_path = ["health_data_20240707.csv", "health_data_20240708.csv", "health_data_20240709.csv", "health_data_20240710.csv", "health_data_20240711.csv"]
data = spark.read.format("csv")\
    .option("header","true")\
    .option("multiline", "true")\
    .option("inferSchema", "true")\
    .load(csv_path)

data.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- diagnosis_code: string (nullable = true)
 |-- diagnosis_description: string (nullable = true)
 |-- diagnosis_date: date (nullable = true)



In [253]:
data.show(5)

+----------+---+------+--------------+---------------------+--------------+
|patient_id|age|gender|diagnosis_code|diagnosis_description|diagnosis_date|
+----------+---+------+--------------+---------------------+--------------+
|      P201| 35|  Male|          D567|             Dementia|    2024-07-08|
|      P202| 57|  Male|          D123|             Diabetes|    2024-07-08|
|      P203| 34|  Male|          H234|  High Blood Pressure|    2024-07-08|
|      P204| 48|Female|          S901|               Stroke|    2024-07-08|
|      P205| 55|Female|          D123|             Diabetes|    2024-07-08|
+----------+---+------+--------------+---------------------+--------------+
only showing top 5 rows



In [254]:
# check for null values
null_counts = data.select([sum(col(c).isNull().cast('int')).alias(c) for c in data.columns])
null_counts.show()

+----------+---+------+--------------+---------------------+--------------+
|patient_id|age|gender|diagnosis_code|diagnosis_description|diagnosis_date|
+----------+---+------+--------------+---------------------+--------------+
|         0|  0|     0|             0|                    0|             0|
+----------+---+------+--------------+---------------------+--------------+



In [255]:
# pip install cassandra-driver

In [256]:
print (cassandra.__version__)

3.29.1


In [257]:
# This secure connect bundle is autogenerated when you download your SCB,
# if yours is different update the file name below
cloud_config= {
  'secure_connect_bundle': 'secure-connect-db-healthcare.zip'
}

# This token JSON file is autogenerated when you download your token,
# if yours is different update the file name below
with open("db_healthcare-token.json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

row = session.execute("select release_version from system.local").one()
if row:
    print(f"row:{row}")
    print(f"row[0]:{row[0]}")
    print("Connection Successful!")
else:
    print("An error occurred.")

row:Row(release_version='4.0.0.6816')
row[0]:4.0.0.6816
Connection Successful!


In [258]:
try:
    query = """SELECT * FROM system_schema.keyspaces"""
    result = session.execute(query).all()
    if result:
        for row in result:
            print(row[0])
except Exception as err:
    print("Exception occurred:", err)

healthcare_keyspace
system_auth
system_schema
datastax_sla
system
system_traces
data_endpoint_auth


In [259]:
keyspace = "healthcare_keyspace"

In [260]:
# use keyspace
try:
    query = f"USE {keyspace}"
    session.execute(query)
    print("Inside healthcare_keyspace")
except Exception as err:
    print("Exception occurred", err)

Inside healthcare_keyspace


In [261]:
query = """DROP TABLE disease_gender_ratio
"""
session.execute(query)

<cassandra.cluster.ResultSet at 0x202b7ceda90>

In [262]:
try:
    table = "demo_tb2"
    query = f"""CREATE TABLE IF NOT EXISTS {keyspace}.{table}
    (id INT,
    Name VARCHAR,
    dept VARCHAR,
    email TEXT,
    age INT,
    PRIMARY KEY(id, dept)
    )"""
    session.execute(query)
    print("Table created successfully!")
except Exception as err:
    print("Exception occurred:", err)


Table created successfully!


In [263]:
try:
    query = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}'"
    result = session.execute(query)
    if result:
        for row in result:
            print(row[0])
    else:
        print("There are no tables")
except Exception as err:
    print("Exception Occurred:", err)


demo_tb
demo_tb2
disease_per_age_group_number_of_patients
is_senior_citizen
most_common_diseases
test_tb
weekly_disease_trend


# 1. Disease Gender Ratio
## Calculate the gender ratio for each disease. This will help in identifying if a particular disease is more prevalent in a particular gender. For example, for each diagnosis_code, you could calculate the ratio of male to female patients.

In [264]:
data.createOrReplaceTempView("DATA")

In [265]:
disease_gender_ratio_query = """WITH gender_pivot AS (
            SELECT patient_id, age, diagnosis_code, diagnosis_description, diagnosis_date,
            CASE WHEN gender = "Female" THEN gender END AS Female,
            CASE WHEN gender = "Male" THEN gender END AS Male
            FROM DATA
),
disease_per_gender AS(
SELECT diagnosis_code, diagnosis_description,COUNT(Female) female_count, COUNT(Male) male_count
FROM gender_pivot
GROUP BY 1,2
)

SELECT *, ROUND(male_count/female_count,2) AS male_to_female_ratio
FROM disease_per_gender
ORDER BY male_to_female_ratio DESC;

"""

disease_gender_ratio = spark.sql(disease_gender_ratio_query)
disease_gender_ratio.show(5)

+--------------+---------------------+------------+----------+--------------------+
|diagnosis_code|diagnosis_description|female_count|male_count|male_to_female_ratio|
+--------------+---------------------+------------+----------+--------------------+
|          R901| Rheumatoid Arthritis|          29|        38|                1.31|
|          E234|             Epilepsy|          30|        37|                1.23|
|          I456| Irritable Bowel S...|          29|        33|                1.14|
|          C345|               Cancer|          36|        40|                1.11|
|          H234|  High Blood Pressure|          34|        36|                1.06|
+--------------+---------------------+------------+----------+--------------------+
only showing top 5 rows



In [266]:
# INSERT RESULT INTO CASSANDRA
try:
    table = "disease_gender_ratio"

    try:
        # if the table exists truncate table, if not create table
        table_exists = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
        result = session.execute(table_exists).one()
        if result:
            print(f"Table {table} exists, NOW TRUNCATING!")
            truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
            session.execute(truncate_query)
        else:
            print(f"Table {table} does not exist in the keyspace {keyspace}, NOW CREATING!!")
            create_query = f"CREATE TABLE IF NOT EXISTS {keyspace}.{table}(diagnosis_code VARCHAR,diagnosis_description VARCHAR, female_count INT, male_count INT, male_to_female_ratio FLOAT, PRIMARY KEY(diagnosis_code, male_to_female_ratio)) WITH CLUSTERING ORDER BY (male_to_female_ratio DESC)"
            session.execute(create_query)
            print(f"Table {table} CREATED successfully")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        # convert spark dataframe to pandas dataframe
        disease_gender_ratio = disease_gender_ratio.toPandas()
        for index, row in disease_gender_ratio.iterrows():
            insert_query = f"""
            INSERT INTO {keyspace}.{table}
            (diagnosis_code, diagnosis_description, female_count, male_count, male_to_female_ratio)
            VALUES ('{row['diagnosis_code']}', '{row['diagnosis_description'].replace("'", "''")}',
            {row['female_count']}, {row['male_count']}, {row['male_to_female_ratio']}) """
            session.execute(insert_query)
        print(f"Records successfully INSERTED into Table {table} ")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        check_data_query = f"""SELECT diagnosis_code, diagnosis_description, female_count, male_count,male_to_female_ratio FROM {keyspace}.{table} LIMIT 5 """
        result = session.execute(check_data_query)
        if result:
            print("Now SHOWING the ratio of male to female patients!!")
            for row in result:
                print(row)
        else:
            print("No records to retrieve!")
    except Exception as err:
        print("Exception occurred:", err)

except Exception as err:
    print("Exception occurred:", err)


Table disease_gender_ratio does not exist in the keyspace healthcare_keyspace, NOW CREATING!!
Table disease_gender_ratio CREATED successfully
Records successfully INSERTED into Table disease_gender_ratio 
Now SHOWING the ratio of male to female patients!!
Row(diagnosis_code='A456', diagnosis_description='Asthma', female_count=31, male_count=32, male_to_female_ratio=1.0299999713897705)
Row(diagnosis_code='C345', diagnosis_description='Cancer', female_count=36, male_count=40, male_to_female_ratio=1.1100000143051147)
Row(diagnosis_code='I456', diagnosis_description='Irritable Bowel Syndrome', female_count=29, male_count=33, male_to_female_ratio=1.1399999856948853)
Row(diagnosis_code='T789', diagnosis_description='Thyroid Disorder', female_count=33, male_count=32, male_to_female_ratio=0.9700000286102295)
Row(diagnosis_code='B234', diagnosis_description='Bronchitis', female_count=29, male_count=21, male_to_female_ratio=0.7200000286102295)


# 2. Most Common Diseases
## Find the top 3 most common diseases in the dataset. This will help in identifying the most prevalent diseases.

In [267]:
most_common_diseases_query = """ WITH disease_rank AS (
            SELECT diagnosis_code,
            diagnosis_description,
            COUNT(*) total_count
            FROM DATA
            GROUP BY 1,2 ),

ranks AS (SELECT diagnosis_code, diagnosis_description, total_count,
ROW_NUMBER() OVER(ORDER BY total_count DESC) ranks
FROM disease_rank)

SELECT diagnosis_code, diagnosis_description,
        total_count,ranks
        FROM ranks
        ORDER BY 4
        LIMIT 3
        ;

"""
most_common_diseases = spark.sql(most_common_diseases_query)
most_common_diseases.show(5)

+--------------+---------------------+-----------+-----+
|diagnosis_code|diagnosis_description|total_count|ranks|
+--------------+---------------------+-----------+-----+
|          C345|               Cancer|         76|    1|
|          P678|  Parkinson's Disease|         74|    2|
|          D123|             Diabetes|         72|    3|
+--------------+---------------------+-----------+-----+



In [268]:
# INSERT RESULT INTO CASSANDRA
try:
    table = "most_common_diseases"

    try:
        # if the table exists truncate table, if not create table
        table_exists = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
        result = session.execute(table_exists).one()
        if result:
            print(f"Table {table} exists, NOW TRUNCATING!")
            truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
            session.execute(truncate_query)
        else:
            print(f"Table {table} does not exist in the keyspace {keyspace}, NOW CREATING!!")
            create_query = f"CREATE TABLE IF NOT EXISTS {keyspace}.{table}(diagnosis_code VARCHAR,diagnosis_description VARCHAR,total_count INT, ranks INT, PRIMARY KEY(diagnosis_code))"
            session.execute(create_query)
            print(f"Table {table} CREATED successfully")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        # convert spark dataframe to pandas dataframe
        most_common_diseases = most_common_diseases.toPandas()
        for index, row in most_common_diseases.iterrows():
            insert_query = f"""
            INSERT INTO {keyspace}.{table}
            (diagnosis_code, diagnosis_description,total_count, ranks)
            VALUES ('{row['diagnosis_code']}', '{row['diagnosis_description'].replace("'", "''")}',
            {row['total_count']},{row['ranks']}) IF NOT EXISTS"""
            session.execute(insert_query)
        print(f"Records successfully INSERTED into Table {table} ")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        check_data_query = f"""SELECT * FROM {keyspace}.{table} """
        result = session.execute(check_data_query)
        if result:
            print("Now SHOWING the top 3 most common diseases!!")
            for row in result:
                print(row)
        else:
            print("No records to retrieve!")
    except Exception as err:
        print("Exception occurred:", err)

except Exception as err:
    print("Exception occurred:", err)


Table most_common_diseases exists, NOW TRUNCATING!
Records successfully INSERTED into Table most_common_diseases 
Now SHOWING the top 3 most common diseases!!
Row(diagnosis_code='C345', diagnosis_description='Cancer', ranks=1, total_count=76)
Row(diagnosis_code='D123', diagnosis_description='Diabetes', ranks=3, total_count=72)
Row(diagnosis_code='P678', diagnosis_description="Parkinson's Disease", ranks=2, total_count=74)


# 3. Age Category
## Create age categories. For example, you can divide age into groups like '30-40', '41-50', '51-60', '61-70' and so forth. Then, calculate the number of patients in each age category for each disease. This can help understand the age distribution of different diseases.

In [269]:
age_group_query = """ WITH age_groups AS (SELECT *,
CASE
WHEN age>= 30 AND age < 40 THEN "30-40"
WHEN age>= 40 AND age < 50 THEN "40-50"
WHEN age>= 50 AND age < 60 THEN "50-60"
WHEN age>= 60 AND age < 70 THEN "60-70"
ELSE age
END AS age_group
FROM DATA),

counts AS (SELECT diagnosis_code, diagnosis_description, age_group, COUNT(*) AS total_count
FROM age_groups
GROUP BY 1,2,3
ORDER BY 2, 3)

SELECT * FROM counts;
"""

age_group=spark.sql(age_group_query)
age_group.show(10)

+--------------+---------------------+---------+-----------+
|diagnosis_code|diagnosis_description|age_group|total_count|
+--------------+---------------------+---------+-----------+
|          A456|               Asthma|    30-40|         11|
|          A456|               Asthma|    40-50|         23|
|          A456|               Asthma|    50-60|         15|
|          A456|               Asthma|    60-70|         13|
|          A456|               Asthma|       70|          1|
|          B234|           Bronchitis|    30-40|         14|
|          B234|           Bronchitis|    40-50|         12|
|          B234|           Bronchitis|    50-60|         15|
|          B234|           Bronchitis|    60-70|          7|
|          B234|           Bronchitis|       70|          2|
+--------------+---------------------+---------+-----------+
only showing top 10 rows



In [270]:
# INSERT RESULT INTO CASSANDRA
try:
    table = "disease_per_age_group_number_of_patients"

    try:
        # if the table exists truncate table, if not create table
        table_exists = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
        result = session.execute(table_exists).one()
        if result:
            print(f"Table {table} exists, NOW TRUNCATING!")
            truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
            session.execute(truncate_query)
        else:
            print(f"Table {table} does not exist in the keyspace {keyspace}, NOW CREATING!!")
            create_query = f"CREATE TABLE IF NOT EXISTS {keyspace}.{table}(diagnosis_code VARCHAR,diagnosis_description VARCHAR, age_group VARCHAR, total_count INT, PRIMARY KEY(diagnosis_code))"
            session.execute(create_query)
            print(f"Table {table} CREATED successfully")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        # convert spark dataframe to pandas dataframe
        age_group = age_group.toPandas()
        for index, row in age_group.iterrows():
            insert_query = f"""
            INSERT INTO {keyspace}.{table}
            (diagnosis_code, diagnosis_description,age_group, total_count)
            VALUES ('{row['diagnosis_code']}', '{row['diagnosis_description'].replace("'", "''")}','{row['age_group']}', {row['total_count']}) IF NOT EXISTS"""
            session.execute(insert_query)
        print(f"Records successfully INSERTED into Table {table} ")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        check_data_query = f"""SELECT * FROM {keyspace}.{table} """
        result = session.execute(check_data_query)
        if result:
            print("Now SHOWING the number of patients in each age category for each disease!!")
            for row in result:
                print(row)
        else:
            print("No records to retrieve!")
    except Exception as err:
        print("Exception occurred:", err)

except Exception as err:
    print("Exception occurred:", err)


Table disease_per_age_group_number_of_patients exists, NOW TRUNCATING!
Records successfully INSERTED into Table disease_per_age_group_number_of_patients 
Now SHOWING the number of patients in each age category for each disease!!
Row(diagnosis_code='A456', age_group='30-40', diagnosis_description='Asthma', total_count=11)
Row(diagnosis_code='C345', age_group='30-40', diagnosis_description='Cancer', total_count=19)
Row(diagnosis_code='I456', age_group='30-40', diagnosis_description='Irritable Bowel Syndrome', total_count=13)
Row(diagnosis_code='T789', age_group='30-40', diagnosis_description='Thyroid Disorder', total_count=12)
Row(diagnosis_code='B234', age_group='30-40', diagnosis_description='Bronchitis', total_count=14)
Row(diagnosis_code='D123', age_group='30-40', diagnosis_description='Diabetes', total_count=15)
Row(diagnosis_code='S901', age_group='30-40', diagnosis_description='Stroke', total_count=10)
Row(diagnosis_code='R901', age_group='30-40', diagnosis_description='Rheumatoid

# 4. Flag for senior patients
## Flag patients who are senior citizens (typically, age >= 60 years). This might be useful information since senior citizens might require special healthcare attention or follow-up

In [271]:
senior_citizen_query = """SELECT patient_id,gender,
CASE WHEN age >= 60 THEN "Yes"
ELSE "No"
END AS is_senior_citizen
FROM DATA
"""

senior_citizen = spark.sql(senior_citizen_query)
senior_citizen.show(5)

+----------+------+-----------------+
|patient_id|gender|is_senior_citizen|
+----------+------+-----------------+
|      P201|  Male|               No|
|      P202|  Male|               No|
|      P203|  Male|               No|
|      P204|Female|               No|
|      P205|Female|               No|
+----------+------+-----------------+
only showing top 5 rows



In [272]:
# INSERT RESULT INTO CASSANDRA
try:
    table = "is_senior_citizen"

    try:
        # if the table exists truncate table, if not create table
        table_exists = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
        result = session.execute(table_exists).one()
        if result:
            print(f"Table {table} exists, NOW TRUNCATING!")
            truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
            session.execute(truncate_query)
        else:
            print(f"Table {table} does not exist in the keyspace {keyspace}, NOW CREATING!!")
            create_query = f"CREATE TABLE IF NOT EXISTS {keyspace}.{table}(patient_id VARCHAR, gender VARCHAR, is_senior_citizen VARCHAR, PRIMARY KEY(patient_id))"
            session.execute(create_query)
            print(f"Table {table} CREATED successfully")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        # convert spark dataframe to pandas dataframe
        senior_citizen = senior_citizen.toPandas()
        for index, row in senior_citizen.iterrows():
            insert_query = f"""
            INSERT INTO {keyspace}.{table}
            (patient_id, gender, is_senior_citizen)
            VALUES ('{row['patient_id']}', '{row['gender']}','{row['is_senior_citizen']}')"""
            session.execute(insert_query)
        print(f"Records successfully INSERTED into Table {table} ")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        check_data_query = f"""SELECT * FROM {keyspace}.{table} LIMIT 7"""
        result = session.execute(check_data_query)
        if result:
            print("Now FLAGGING for senior patients!!")
            for row in result:
                print(row)
        else:
            print("No records to retrieve!")
    except Exception as err:
        print("Exception occurred:", err)

except Exception as err:
    print("Exception occurred:", err)


Table is_senior_citizen exists, NOW TRUNCATING!
Records successfully INSERTED into Table is_senior_citizen 
Now FLAGGING for senior patients!!
Row(patient_id='P241', gender='Female', is_senior_citizen='No')
Row(patient_id='P314', gender='Female', is_senior_citizen='No')
Row(patient_id='P426', gender='Male', is_senior_citizen='No')
Row(patient_id='P842', gender='Female', is_senior_citizen='No')
Row(patient_id='P681', gender='Female', is_senior_citizen='No')
Row(patient_id='P246', gender='Male', is_senior_citizen='No')
Row(patient_id='P448', gender='Male', is_senior_citizen='No')


# 5. Disease trend over the week
## If you have more than week's data, you can calculate the number of cases of each disease for each day of the week to understand if there's a trend (more cases diagnosed on particular days).

In [273]:
weekly_disease_trend_query = """ WITH index_of_week AS ( SELECT DAYOFWEEK(diagnosis_date) AS day_of_week, patient_id, diagnosis_description
FROM DATA),
day_of_week AS (SELECT diagnosis_description, CASE
WHEN day_of_week = 1 THEN "Sunday"
WHEN day_of_week = 2 THEN "Monday"
WHEN day_of_week = 3 THEN "Tuesday"
WHEN day_of_week = 4 THEN "Wednesday"
WHEN day_of_week = 5 THEN "Thursday"
WHEN day_of_week = 6 THEN "Friday"
WHEN day_of_week = 1 THEN "Saturday"
END AS day_of_week
FROM index_of_week),

disease_per_day(SELECT diagnosis_description,day_of_week, COUNT(*) counts_per_day
FROM day_of_week
GROUP BY 1,2
ORDER BY 2),

ranks AS (SELECT *,
DENSE_RANK() OVER(PARTITION BY day_of_week ORDER BY counts_per_day DESC) AS ranks FROM disease_per_day)

SELECT diagnosis_description, day_of_week, counts_per_day
FROM ranks
WHERE ranks = 1
;
"""
weekly_disease_trend = spark.sql(weekly_disease_trend_query)
weekly_disease_trend.show( )

+---------------------+-----------+--------------+
|diagnosis_description|day_of_week|counts_per_day|
+---------------------+-----------+--------------+
|             Diabetes|     Monday|            23|
| Irritable Bowel S...|     Sunday|            21|
|               Asthma|   Thursday|            18|
|             Diabetes|    Tuesday|            22|
|        Heart Disease|  Wednesday|            21|
+---------------------+-----------+--------------+



In [274]:
# INSERT RESULT INTO CASSANDRA
try:
    table = "weekly_disease_trend"

    try:
        # if the table exists truncate table, if not create table
        table_exists = f"SELECT table_name FROM system_schema.tables WHERE keyspace_name = '{keyspace}' AND table_name = '{table}'"
        result = session.execute(table_exists).one()
        if result:
            print(f"Table {table} exists, NOW TRUNCATING!")
            truncate_query = f"TRUNCATE TABLE {keyspace}.{table}"
            session.execute(truncate_query)
        else:
            print(f"Table {table} does not exist in the keyspace {keyspace}, NOW CREATING!!")
            create_query = f"CREATE TABLE IF NOT EXISTS {keyspace}.{table}(diagnosis_description VARCHAR, day_of_week VARCHAR, counts_per_day INT, PRIMARY KEY(day_of_week))"
            session.execute(create_query)
            print(f"Table {table} CREATED successfully")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        # convert spark dataframe to pandas dataframe
        weekly_disease_trend = weekly_disease_trend.toPandas()
        for index, row in weekly_disease_trend.iterrows():
            insert_query = f"""
            INSERT INTO {keyspace}.{table}
            (diagnosis_description, day_of_week, counts_per_day)
            VALUES ('{row['diagnosis_description'].replace("'", "''")}', '{row['day_of_week']}', {row['counts_per_day']})"""
            session.execute(insert_query)
        print(f"Records successfully INSERTED into Table {table} ")
    except Exception as err:
        print("Exception occurred:", err)

    try:
        check_data_query = f"""SELECT * FROM {keyspace}.{table} LIMIT 7"""
        result = session.execute(check_data_query)
        if result:
            print("Now SHOWING disease trend over the week!!")
            for row in result:
                print(row)
        else:
            print("No records to retrieve!")
    except Exception as err:
        print("Exception occurred:", err)

except Exception as err:
    print("Exception occurred:", err)


Table weekly_disease_trend exists, NOW TRUNCATING!
Records successfully INSERTED into Table weekly_disease_trend 
Now SHOWING disease trend over the week!!
Row(day_of_week='Thursday', counts_per_day=18, diagnosis_description='Asthma')
Row(day_of_week='Wednesday', counts_per_day=21, diagnosis_description='Heart Disease')
Row(day_of_week='Sunday', counts_per_day=21, diagnosis_description='Irritable Bowel Syndrome')
Row(day_of_week='Monday', counts_per_day=23, diagnosis_description='Diabetes')
Row(day_of_week='Tuesday', counts_per_day=22, diagnosis_description='Diabetes')


In [274]:
# List and move files individually
file_list = dbutils.fs.ls(data_directory)
for file in file_list:
    if file.name.endswith(".csv"):
        print(f"{file} Moved in archive folder")
        dbutils.fs.mv(file.path, os.path.join(archive_directory, file.name))