In [1]:
import pyspark
print(pyspark.__version__)

3.5.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import boto3
from io import StringIO
from io import BytesIO
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from dotenv import load_dotenv
import os

In [3]:
redshift_package = "io.github.spark-redshift-community:spark-redshift_2.12:6.2.0-spark_3.5"  # Community version for Scala 2.12

In [4]:
# Initialize Spark Session with Redshift
spark = SparkSession.builder \
    .appName("Daily-Data-Processing") \
    .config("spark.jars.packages", f"{redshift_package}") \
    .config("spark.jars", "redshift-jdbc42-2.1.0.12.jar") \
    .getOrCreate()

spark

In [None]:
# Load environment variables from .env file
load_dotenv()

# Access the credentials
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
client_id = os.getenv("CLIENT_ID")
secret = os.getenv("SECRET")
token = os.getenv("TOKEN")

In [5]:
# Initialize boto3 S3 client
s3_client = boto3.client('s3', 
                        aws_access_key_id=aws_access_key_id,
                        aws_secret_access_key=aws_secret_access_key)

s3_client

<botocore.client.S3 at 0x7f896d2cb110>

In [6]:
# S3 bucket details
bucket_name = "healthcare-data-analysis-apr"
data_directory = "input/"           # Specify the directory path within the bucket
archive_directory = "archive/"     

# Function to read CSV files from S3 and return as string
def read_csv_from_s3(bucket_name, file_key):
    response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    content = response['Body'].read().decode('utf-8')
    return content

# Read CSV files from the specified S3 directory
files = s3_client.list_objects(Bucket=bucket_name, Prefix=data_directory)
file_keys = [file['Key'] for file in files['Contents'] if file['Key'].endswith('.csv')]

csv_content = read_csv_from_s3(bucket_name, file_keys[0])

In [7]:
# Define the schema
schema = StructType([
    StructField("patient_id", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True),
    StructField("gender", StringType(), nullable=True),
    StructField("diagnosis_code", StringType(), nullable=True),
    StructField("diagnosis_description", StringType(), nullable=True),
    StructField("diagnosis_date", DateType(), nullable=True)
])

# Save the CSV content to a file
with open("temp.csv", "w") as file:
    file.write(csv_content)

# Read CSV content from the file into a DataFrame
df = spark.read.option("header", True).csv("temp.csv", schema=schema)

# Show the DataFrame
df.show(truncate=False)

+----------+---+------+--------------+---------------------+--------------+
|patient_id|age|gender|diagnosis_code|diagnosis_description|diagnosis_date|
+----------+---+------+--------------+---------------------+--------------+
|P101      |39 |M     |H234          |High Blood Pressure  |2024-04-02    |
|P102      |53 |M     |H234          |High Blood Pressure  |2024-04-02    |
|P103      |44 |F     |D123          |Diabetes             |2024-04-02    |
|P104      |54 |F     |C345          |Cancer               |2024-04-02    |
|P105      |38 |M     |D123          |Diabetes             |2024-04-02    |
|P106      |55 |F     |D123          |Diabetes             |2024-04-02    |
|P107      |39 |F     |D123          |Diabetes             |2024-04-02    |
|P108      |57 |F     |D123          |Diabetes             |2024-04-02    |
|P109      |70 |F     |C345          |Cancer               |2024-04-02    |
|P110      |49 |F     |D123          |Diabetes             |2024-04-02    |
|P111      |

In [8]:
## print the schema
df.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- diagnosis_code: string (nullable = true)
 |-- diagnosis_description: string (nullable = true)
 |-- diagnosis_date: date (nullable = true)



In [9]:
## number of null values in each column
print("Null Counts:")
null_counts = df.select([count(when(col(e).isNull(), 1)).alias(f"{e}_null_counts") for e in df.columns])
null_counts.show(truncate=False)

Null Counts:
+----------------------+---------------+------------------+--------------------------+---------------------------------+--------------------------+
|patient_id_null_counts|age_null_counts|gender_null_counts|diagnosis_code_null_counts|diagnosis_description_null_counts|diagnosis_date_null_counts|
+----------------------+---------------+------------------+--------------------------+---------------------------------+--------------------------+
|0                     |0              |0                 |0                         |0                                |0                         |
+----------------------+---------------+------------------+--------------------------+---------------------------------+--------------------------+



In [10]:
def cassandra_connection():
    cloud_config= {
        'secure_connect_bundle': 'secure-connect-healthcare-db.zip'
    }

    auth_provider = PlainTextAuthProvider(client_id, secret)
    cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
    session = cluster.connect()

    row = session.execute("select release_version from system.local").one()
    
    if row:
        print("Successfully Connected to Datastax Cassandra!")
        return cluster, session
    else:
        print("An error occurred while connecting to Datastax Cassandra!")
        return

# Setup Cassandra connection
cluster, session = cassandra_connection()

Successfully Connected to Datastax Cassandra!


##### Disease Gender Ratio:

In [11]:
df.createOrReplaceTempView("healthcare")

In [12]:
query = """
    SELECT
        diagnosis_date,
        diagnosis_description,
        diagnosis_code,
        ROUND(((SUM(CASE WHEN gender = 'M' THEN 1 ELSE 0 END)) / COUNT(gender))*100, 2) AS percentage_male,
        ROUND(((SUM(CASE WHEN gender = 'F' THEN 1 ELSE 0 END)) / COUNT(gender))*100, 2) AS percentage_female
    FROM healthcare
    GROUP BY diagnosis_date, diagnosis_description, diagnosis_code;
"""

disease_gender = spark.sql(query)
disease_gender.show()

+--------------+---------------------+--------------+---------------+-----------------+
|diagnosis_date|diagnosis_description|diagnosis_code|percentage_male|percentage_female|
+--------------+---------------------+--------------+---------------+-----------------+
|    2024-04-02|               Cancer|          C345|          48.57|            51.43|
|    2024-04-02|  High Blood Pressure|          H234|          42.42|            57.58|
|    2024-04-02|             Diabetes|          D123|           50.0|             50.0|
+--------------+---------------------+--------------+---------------+-----------------+



In [13]:
create_table_query = """
    CREATE TABLE IF NOT EXISTS healthcare_store.disease_gender_distn (
        diagnosis_date DATE,
        diagnosis_description TEXT,
        diagnosis_code TEXT,
        percentage_male DOUBLE,
        percentage_female DOUBLE,
        PRIMARY KEY (diagnosis_date, diagnosis_code)
    );
"""

session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = disease_gender.toPandas()

insert_query = """
    INSERT INTO healthcare_store.disease_gender_distn (diagnosis_date, diagnosis_description, diagnosis_code, percentage_male, percentage_female)
    VALUES (%s, %s, %s, %s, %s)
    """
# Insert data into Cassandra table
for index, row in pandas_df.iterrows():
    data = (
        row['diagnosis_date'],
        row['diagnosis_description'],
        row['diagnosis_code'],
        row['percentage_male'],
        row['percentage_female']
    )
    # Insert data into Cassandra
    session.execute(insert_query, data)
    print("Data inserted in cassandra!!")

Data inserted in cassandra!!
Data inserted in cassandra!!
Data inserted in cassandra!!


In [14]:
# checking
q = "SELECT * FROM healthcare_store.disease_gender_distn"
result = session.execute(q)

# Print the results
for row in result:
    print(row)

Row(diagnosis_date=Date(19814), diagnosis_code='C345', diagnosis_description='Cancer', percentage_female=45.0, percentage_male=55.0)
Row(diagnosis_date=Date(19814), diagnosis_code='D123', diagnosis_description='Diabetes', percentage_female=56.1, percentage_male=43.9)
Row(diagnosis_date=Date(19814), diagnosis_code='H234', diagnosis_description='High Blood Pressure', percentage_female=56.41, percentage_male=43.59)
Row(diagnosis_date=Date(19815), diagnosis_code='C345', diagnosis_description='Cancer', percentage_female=51.43, percentage_male=48.57)
Row(diagnosis_date=Date(19815), diagnosis_code='D123', diagnosis_description='Diabetes', percentage_female=50.0, percentage_male=50.0)
Row(diagnosis_date=Date(19815), diagnosis_code='H234', diagnosis_description='High Blood Pressure', percentage_female=57.58, percentage_male=42.42)


##### Most Common Diseases:

In [15]:
## Find the top 3 most common diseases in the dataset. This will help in identifying the most prevalent diseases.
query = """
    SELECT
        ROW_NUMBER() OVER(ORDER BY COUNT(1) DESC) AS rank,
        diagnosis_date,
        diagnosis_description
    FROM healthcare
    GROUP BY diagnosis_date, diagnosis_description
    HAVING COUNT(1) IN (
            SELECT
                COUNT(1) AS cnt
            FROM healthcare
            GROUP BY diagnosis_date, diagnosis_description
            ORDER BY cnt DESC
            LIMIT 3
        );
"""

common_diseases = spark.sql(query)
common_diseases.show()

+----+--------------+---------------------+
|rank|diagnosis_date|diagnosis_description|
+----+--------------+---------------------+
|   1|    2024-04-02|               Cancer|
|   2|    2024-04-02|  High Blood Pressure|
|   3|    2024-04-02|             Diabetes|
+----+--------------+---------------------+



In [16]:
create_table_query = """
    CREATE TABLE IF NOT EXISTS healthcare_store.common_diseases(
        rank INT,
        diagnosis_date DATE,
        diagnosis_description TEXT,
        PRIMARY KEY (diagnosis_date, rank)
    );
"""

## execute the create table command
session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
common_diseases_df = common_diseases.toPandas()

insert_query = """
    INSERT INTO healthcare_store.common_diseases (rank, diagnosis_date, diagnosis_description)
    VALUES (%s, %s, %s)
    """

# Insert data into Cassandra table
for index, row in common_diseases_df.iterrows():
    data = (
        row['rank'],
        row['diagnosis_date'],
        row['diagnosis_description']
    )
    # Insert data into Cassandra
    session.execute(insert_query, data)
    print("Data inserted in cassandra!!")

Data inserted in cassandra!!
Data inserted in cassandra!!
Data inserted in cassandra!!


In [17]:
# checking
q = "SELECT * FROM healthcare_store.common_diseases"
result = session.execute(q)

# Print the results
for row in result:
    print(row)

Row(diagnosis_date=Date(19814), rank=1, diagnosis_description='Diabetes')
Row(diagnosis_date=Date(19814), rank=2, diagnosis_description='High Blood Pressure')
Row(diagnosis_date=Date(19814), rank=3, diagnosis_description='Cancer')
Row(diagnosis_date=Date(19815), rank=1, diagnosis_description='Cancer')
Row(diagnosis_date=Date(19815), rank=2, diagnosis_description='High Blood Pressure')
Row(diagnosis_date=Date(19815), rank=3, diagnosis_description='Diabetes')


##### Age Category: 

In [18]:
query1 = """
    SELECT 
        *,
        CASE
            WHEN age BETWEEN 30 AND 40 THEN '30-40'
            WHEN age BETWEEN 41 AND 50 THEN '41-50'
            WHEN age BETWEEN 51 AND 60 THEN '51-60'
            WHEN age BETWEEN 61 AND 70 THEN '61-70'
            ELSE '>70' 
            END AS age_group
    FROM healthcare;
"""

df_age_cat = spark.sql(query1)
df_age_cat.show()

## create a temp view
df_age_cat.createOrReplaceTempView("healthcare_age_categories")

query2 = """
    SELECT
        diagnosis_date,
        diagnosis_description,
        diagnosis_code,
        SUM(CASE WHEN age_group = '30-40' THEN 1 ELSE 0 END) AS cnt_30_40,
        SUM(CASE WHEN age_group = '41-50' THEN 1 ELSE 0 END) AS cnt_41_50,
        SUM(CASE WHEN age_group = '51-60' THEN 1 ELSE 0 END) AS cnt_51_60,
        SUM(CASE WHEN age_group = '61-70' THEN 1 ELSE 0 END) AS cnt_61_70,
        SUM(CASE WHEN age_group = '>70' THEN 1 ELSE 0 END) AS cnt_more_than_70
    FROM healthcare_age_categories
    GROUP BY diagnosis_date, diagnosis_description, diagnosis_code;
"""

print("Age Group Distribution:")
age_cat_distn = spark.sql(query2)
age_cat_distn.show(truncate=False)

+----------+---+------+--------------+---------------------+--------------+---------+
|patient_id|age|gender|diagnosis_code|diagnosis_description|diagnosis_date|age_group|
+----------+---+------+--------------+---------------------+--------------+---------+
|      P101| 39|     M|          H234|  High Blood Pressure|    2024-04-02|    30-40|
|      P102| 53|     M|          H234|  High Blood Pressure|    2024-04-02|    51-60|
|      P103| 44|     F|          D123|             Diabetes|    2024-04-02|    41-50|
|      P104| 54|     F|          C345|               Cancer|    2024-04-02|    51-60|
|      P105| 38|     M|          D123|             Diabetes|    2024-04-02|    30-40|
|      P106| 55|     F|          D123|             Diabetes|    2024-04-02|    51-60|
|      P107| 39|     F|          D123|             Diabetes|    2024-04-02|    30-40|
|      P108| 57|     F|          D123|             Diabetes|    2024-04-02|    51-60|
|      P109| 70|     F|          C345|               C

In [19]:
create_table_query = """
    CREATE TABLE IF NOT EXISTS healthcare_store.age_group_distn(
        diagnosis_date DATE,
        diagnosis_description TEXT,
        diagnosis_code TEXT,
        cnt_30_40 INT,
        cnt_41_50 INT,
        cnt_51_60 INT,
        cnt_61_70 INT,
        cnt_more_than_70 INT,
        PRIMARY KEY (diagnosis_date, diagnosis_code)
    );
"""

## execute the create table command
session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
age_cat_distn_df = age_cat_distn.toPandas()

insert_query = """
    INSERT INTO healthcare_store.age_group_distn (diagnosis_date, diagnosis_description, diagnosis_code, cnt_30_40, cnt_41_50,
                                                       cnt_51_60, cnt_61_70, cnt_more_than_70)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """
# Insert data into Cassandra table
for index, row in age_cat_distn_df.iterrows():
    data = (
        row['diagnosis_date'],
        row['diagnosis_description'],
        row['diagnosis_code'],
        row['cnt_30_40'],
        row['cnt_41_50'],
        row['cnt_51_60'],
        row['cnt_61_70'],
        row['cnt_more_than_70']
    )
    # Insert data into Cassandra
    session.execute(insert_query, data)
    print("Data inserted in cassandra!!")

Data inserted in cassandra!!
Data inserted in cassandra!!
Data inserted in cassandra!!


In [20]:
# checking
q = "SELECT * FROM healthcare_store.age_group_distn;"
result = session.execute(q)

# Print the results
for row in result:
    print(row)

Row(diagnosis_date=Date(19814), diagnosis_code='C345', cnt_30_40=5, cnt_41_50=3, cnt_51_60=4, cnt_61_70=8, cnt_more_than_70=0, diagnosis_description='Cancer')
Row(diagnosis_date=Date(19814), diagnosis_code='D123', cnt_30_40=13, cnt_41_50=7, cnt_51_60=11, cnt_61_70=10, cnt_more_than_70=0, diagnosis_description='Diabetes')
Row(diagnosis_date=Date(19814), diagnosis_code='H234', cnt_30_40=6, cnt_41_50=9, cnt_51_60=12, cnt_61_70=12, cnt_more_than_70=0, diagnosis_description='High Blood Pressure')
Row(diagnosis_date=Date(19815), diagnosis_code='C345', cnt_30_40=11, cnt_41_50=5, cnt_51_60=12, cnt_61_70=7, cnt_more_than_70=0, diagnosis_description='Cancer')
Row(diagnosis_date=Date(19815), diagnosis_code='D123', cnt_30_40=10, cnt_41_50=6, cnt_51_60=8, cnt_61_70=8, cnt_more_than_70=0, diagnosis_description='Diabetes')
Row(diagnosis_date=Date(19815), diagnosis_code='H234', cnt_30_40=10, cnt_41_50=7, cnt_51_60=10, cnt_61_70=6, cnt_more_than_70=0, diagnosis_description='High Blood Pressure')


##### Flag for senior patients (Age >= 60 Years): 

In [21]:
query = """
    SELECT 
        *,
        CASE
            WHEN age >= 60 THEN True
            ELSE False
            END AS flag
    FROM healthcare;
"""

senior_patients = spark.sql(query)
senior_patients.show()

+----------+---+------+--------------+---------------------+--------------+-----+
|patient_id|age|gender|diagnosis_code|diagnosis_description|diagnosis_date| flag|
+----------+---+------+--------------+---------------------+--------------+-----+
|      P101| 39|     M|          H234|  High Blood Pressure|    2024-04-02|false|
|      P102| 53|     M|          H234|  High Blood Pressure|    2024-04-02|false|
|      P103| 44|     F|          D123|             Diabetes|    2024-04-02|false|
|      P104| 54|     F|          C345|               Cancer|    2024-04-02|false|
|      P105| 38|     M|          D123|             Diabetes|    2024-04-02|false|
|      P106| 55|     F|          D123|             Diabetes|    2024-04-02|false|
|      P107| 39|     F|          D123|             Diabetes|    2024-04-02|false|
|      P108| 57|     F|          D123|             Diabetes|    2024-04-02|false|
|      P109| 70|     F|          C345|               Cancer|    2024-04-02| true|
|      P110| 49|

In [22]:
senior_patients.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- diagnosis_code: string (nullable = true)
 |-- diagnosis_description: string (nullable = true)
 |-- diagnosis_date: date (nullable = true)
 |-- flag: boolean (nullable = false)



In [23]:
create_table_query = """
    CREATE TABLE IF NOT EXISTS healthcare_store.healthcare_fact (
        patient_id TEXT,
        age INT,
        gender TEXT,
        diagnosis_code TEXT,
        diagnosis_description TEXT,
        diagnosis_date DATE,
        flag BOOLEAN,
        PRIMARY KEY (diagnosis_date, patient_id)
    );
"""

## execute the create table command
session.execute(create_table_query)

# Convert Spark DataFrame to Pandas DataFrame
senior_patients_df = senior_patients.toPandas()

insert_query = """
    INSERT INTO healthcare_store.healthcare_fact (patient_id, age, gender, diagnosis_code, diagnosis_description, diagnosis_date, flag)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
# Insert data into Cassandra table
for index, row in senior_patients_df.iterrows():
    data = (
        row['patient_id'],
        row['age'],
        row['gender'],
        row['diagnosis_code'],
        row['diagnosis_description'],
        row['diagnosis_date'],
        row['flag']
    )
    # Insert data into Cassandra
    session.execute(insert_query, data)
    
print("All Data inserted in cassandra!!")

All Data inserted in cassandra!!


In [24]:
load_dotenv()

# Configure Redshift connection properties
jdbc_url = os.getenv("JDBC_URL")
table_name = "healthcare.healthcare_fact"
redshift_username = os.getenv("REDSHIFT_USERNAME")
redshift_password = os.getenv("REDSHIFT_PASSWORD")

## temp s3 directory 
s3_temp_dir = "s3://s3-redshift-bucket-feb27/temp/"

# Set connection properties
properties = {
    "user": redshift_username,
    "password": redshift_password,
    "tempdir": s3_temp_dir,
    "driver": "com.amazon.redshift.jdbc.Driver"
}

# Write data to Redshift
senior_patients.write.jdbc(url=jdbc_url, table=table_name, mode="append", properties=properties)
print("Write Successful into the Redshift Table!")

Write Successful into the Redshift Table!


In [25]:
file_keys[0]

'input/health_data_20240402.csv'

In [26]:
archive_key = f"archive/{file_keys[0].split('/')[-1]}" 
archive_key

'archive/health_data_20240402.csv'

In [27]:
# S3 bucket details
bucket_name = "healthcare-data-analysis-apr"

input_key = file_keys[0]
archive_key = f"archive/{file_keys[0].split('/')[-1]}" 

# Copy object from input folder to archive folder
s3_client.copy_object(
    Bucket=bucket_name,
    CopySource={'Bucket': bucket_name, 'Key': input_key},
    Key=archive_key
)

# Delete object from input folder
s3_client.delete_object(
    Bucket=bucket_name,
    Key=input_key
)
print("Object moved from 'input' folder to 'archive' folder successfully!")

Object moved from 'input' folder to 'archive' folder successfully!


In [28]:
# Remove the temporary file
import os
os.remove("temp.csv")

In [29]:
spark.stop()