In [0]:
# Requirement
# A Health Care insurance company is facing challenges in enhancing its revenue and understanding the customers so it wants to take help from Big Data Ecosystem to analyse the Competitors company data received from varieties of sources, namely through scrapping and third-party sources. This analysis will help them to track the behaviour, condition of customers so that they can customise offers for them to buy insurance policies and also calculate royalties to those customers who buy policies in the past, this in turn will enhance their revenues.


In [0]:
# Creating ETL data pipelines for the Health Care insurance company which will make the company make appropriate business strategies to enhance their revenue by analysing customers behaviours and send offers and royalties to customers respectively.


In [0]:
#Import libraries
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, count, avg, when, year,datediff, current_date, avg, sum, count
from pyspark.sql.types import IntegerType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Health Insurance Analysis") \
    .getOrCreate()


In [0]:
# Accessing the AWS Bucket
access_key = 'xxxxxxxxxxxxxx'
secret_key = 'xxxxxxx+xIP9o/I'
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)

# If you are using Auto Loader file notification mode to load files, provide the AWS Region ID.
aws_region = "us-east-2"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")


In [0]:
# 1. DATASET CREATION
# Upload the given sample data on AWS s3 in a folder named input-data.\

In [0]:
# EXTRACT DATA FROM AWS S3
# Reading the data from AWS S3
claims_df = spark.read.json("s3a://health-insurance-capstone/raw-data/claims.json")
disease_df = spark.read.csv("s3a://health-insurance-capstone/raw-data/disease.csv", header=True, inferSchema=True)
group_df = spark.read.csv("s3a://health-insurance-capstone/raw-data/group.csv", header=True, inferSchema=True)
grpsubgrp_df = spark.read.csv("s3a://health-insurance-capstone/raw-data/grpsubgrp.csv", header=True, inferSchema=True)
hospital_df = spark.read.csv("s3a://health-insurance-capstone/raw-data/hospital.csv", header=True, inferSchema=True)
subgroup_df = spark.read.csv("s3a://health-insurance-capstone/raw-data/subgroup.csv", header=True, inferSchema=True)
subscriber_df = spark.read.csv("s3a://health-insurance-capstone/raw-data/subscriber.csv", header=True, inferSchema=True)
patient_records_df = spark.read.csv("s3a://health-insurance-capstone/raw-data/Patient_records.csv", header=True, inferSchema=True)

In [0]:
# Print schema and display data for each DataFrame
print("Schema of claims_df:")
claims_df.printSchema()
claims_df.show()

print("Schema of disease_df:")
disease_df.printSchema()
disease_df.show()

print("Schema of group_df:")
group_df.printSchema()
group_df.show()

print("Schema of grpsubgrp_df:")
grpsubgrp_df.printSchema()
grpsubgrp_df.show()

print("Schema of hospital_df:")
hospital_df.printSchema()
hospital_df.show()

print("Schema of subgroup_df:")
subgroup_df.printSchema()
subgroup_df.show()

print("Schema of subscriber_df:")
subscriber_df.printSchema()
subscriber_df.show()

print("Schema of patient_records_df:")
patient_records_df.printSchema()
patient_records_df.show()

Schema of claims_df:
root
 |-- Claim_Or_Rejected: string (nullable = true)
 |-- SUB_ID: string (nullable = true)
 |-- claim_amount: string (nullable = true)
 |-- claim_date: string (nullable = true)
 |-- claim_id: long (nullable = true)
 |-- claim_type: string (nullable = true)
 |-- disease_name: string (nullable = true)
 |-- patient_id: long (nullable = true)

+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|Claim_Or_Rejected|    SUB_ID|claim_amount|claim_date|claim_id|      claim_type|    disease_name|patient_id|
+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|                N| SUBID1000|       79874|1949-03-14|       0| claims of value|    Galactosemia|    187158|
|              NaN|SUBID10001|      151142|1970-03-16|       1|claims of policy|  Bladder cancer|    112766|
|              NaN|SUBID10002|       59924|2008-02-03|       2| claims of value|   Kidney c

In [0]:
# DATA CLEANING
# Data cleaning is the process of fixing or removing incorrect, corrupted, incorrectly formatted, duplicate, or incomplete data within a dataset. When combining multiple data sources, there are many opportunities for data to be duplicated or mislabeled. If data is incorrect, outcomes and algorithms are unreliable, even though they may look correct. There is no one absolute way to prescribe the exact steps in the data cleaning process because the processes will vary from dataset to dataset.
# Cleaning Activity
# First check if there are null values in dataset
# Count the total Null values for each column
# And then replace the null values for specific columns by NA 
# Check the If there are duplicates records
# If there are duplicates then drop duplicates


In [0]:
# TRANSFORM DATA

# Function to clean a DataFrame
def clean_data(df, df_name):
    # 1. Replace empty strings with None (null)
    df = df.replace('', None)
    
    # 2. Check for null values and count them for each column
    null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
    null_counts.show()
    
    # 3. Replace remaining null values with 'NA' for specific columns (customize as needed)
    columns_to_replace = df.columns
    df = df.fillna("NA", subset=columns_to_replace)
    
    # 4. Check for duplicates and drop duplicates
    initial_count = df.count()
    df = df.dropDuplicates()
    final_count = df.count()
    
    print(f"Number of duplicates dropped in {df_name}: {initial_count - final_count}")
    
    return df

In [0]:
# Clean data for at least for following datasets
# Patients
# Subscriber
# Claims
# Group_subgroup

In [0]:
# Clean each DataFrame using user defined function clean_data()
claims_df_cleaned = clean_data(claims_df, "claims_df")
disease_df_cleaned = clean_data(disease_df, "disease_df")
group_df_cleaned = clean_data(group_df, "group_df")
grpsubgrp_df_cleaned = clean_data(grpsubgrp_df, "grpsubgrp_df")
hospital_df_cleaned = clean_data(hospital_df, "hospital_df")
subgroup_df_cleaned = clean_data(subgroup_df, "subgroup_df")
subscriber_df_cleaned = clean_data(subscriber_df, "subscriber_df")
patient_records_df_cleaned = clean_data(patient_records_df, "patient_records_df")

+-----------------+------+------------+----------+--------+----------+------------+----------+
|Claim_Or_Rejected|SUB_ID|claim_amount|claim_date|claim_id|claim_type|disease_name|patient_id|
+-----------------+------+------------+----------+--------+----------+------------+----------+
|                0|     0|           0|         0|       0|         0|           0|         0|
+-----------------+------+------------+----------+--------+----------+------------+----------+

Number of duplicates dropped in claims_df: 0
+--------+-----------+------------+
|SubGrpID| Disease_ID|Disease_name|
+--------+-----------+------------+
|       0|          0|           0|
+--------+-----------+------------+

Number of duplicates dropped in disease_df: 0
+-------+---------------+-------+------+--------+--------+----+----+
|Country|premium_written|zipcode|Grp_Id|Grp_Name|Grp_Type|city|year|
+-------+---------------+-------+------+--------+--------+----+----+
|      0|              0|      0|     0|     

In [0]:
# Before Cleaning
patient_records_df.show()

+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date| patient_phone|    disease_name|                city|hospital_id|
+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|    187158|      Harbir|        Female|        1924-06-30|+91 0112009318|    Galactosemia|            Rourkela|      H1001|
|    112766|    Brahmdev|        Female|        1948-12-20|+91 1727749552|  Bladder cancer|        Tiruvottiyur|      H1016|
|    199252|     Ujjawal|          Male|        1980-04-16|+91 8547451606|   Kidney cancer|           Berhampur|      H1009|
|    133424|     Ballari|        Female|        1969-09-25|+91 0106026841|         Suicide|        Bihar Sharif|      H1017|
|    172579|     Devnath|        Female|        1946-05-01|+91 1868774631|    Food allergy|         Bidhannagar|      H1019|


In [0]:
# After Cleaning
patient_records_df_cleaned.show()

+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date| patient_phone|    disease_name|                city|hospital_id|
+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|    167340|          NA|        Female|        1981-01-25|+91 2960004518|    Galactosemia|Surendranagar Dud...|      H1003|
|    156168|          NA|          Male|        1976-02-03|+91 5586075345| Fanconi anaemia|              Rajkot|      H1004|
|    132748|    Brahmvir|          Male|        1991-11-11|+91 7316972612| Cystic fibrosis|              Ambala|      H1018|
|    172579|     Devnath|        Female|        1946-05-01|+91 1868774631|    Food allergy|         Bidhannagar|      H1019|
|    146382|  Dharmadaas|          Male|        1964-04-29|+91 6345482027|         Anthrax|Bhalswa Jahangir Pur|      H1019|


In [0]:
# WRITING DATA

In [0]:
# # Writing the cleaned data to AWS S3
claims_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/claims")
disease_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/disease")
group_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/group")
grpsubgrp_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/grpsubgrp")
hospital_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/hospital")
subgroup_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/subgroup")
subscriber_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/subscriber")
patient_records_df_cleaned.write.option("header", "true").mode("overwrite").csv("s3a://health-insurance-capstone/clean-data/patient")

In [0]:
# LIST OF REQUIREMENTS
# Which disease has a maximum number of claims.
# Find those Subscribers having age less than 30 and they subscribe any subgroup
# Find out which group has maximum subgroups.
# Find out hospital which serve most number of patients
# Find out which subgroups subscribe most number of times
# Find out total number of claims which were rejected
# From where most claims are coming (city)
# Which groups of policies subscriber subscribe mostly Government or private
# Average monthly premium subscriber pay to insurance company.
# Find out Which group is most profitable
# List all the patients below age of 18 who admit for cancer
# List patients who have cashless insurance and have total charges greater than or equal for Rs. 50,000.
# List female patients over the age of 40 that have undergone knee surgery in the past year


In [0]:
# 1. Disease with maximum number of claim

from pyspark.sql.functions import col

# Group by disease_name and count the occurrences, then get the one with the maximum count
max_claims_disease_df = claims_df_cleaned.groupBy("disease_name") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(1)

# Display the result
max_claims_disease_df.show()

# Saving as a permanent table
permanent_table_name = "max_claims_disease"
max_claims_disease_df.write.mode("overwrite").saveAsTable(permanent_table_name)


+------------+-----+
|disease_name|count|
+------------+-----+
| Pet allergy|    3|
+------------+-----+



In [0]:
# 2. Subscribers under age 30 subscribing to any subgroup
from pyspark.sql.functions import datediff, current_date

subscribers_under_30_df = subscriber_df.join(subgroup_df, subscriber_df["Subgrp_id"] == subgroup_df["SubGrp_id"]) \
    .filter(datediff(current_date(), col("Birth_date")) / 365 < 30) \
    .select(subscriber_df["*"])

subscribers_under_30_df.show()

#Saving as a permanent table
permanent_table_name = "subscribers_under_30_df"
subscribers_under_30_df.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)


+----------+------------+---------+-----------------+----------+------+--------------+-------+-----------+--------+---------+--------+----------+----------+
|   sub _id|  first_name|last_name|           Street|Birth_date|Gender|         Phone|Country|       City|Zip Code|Subgrp_id|Elig_ind|  eff_date| term_date|
+----------+------------+---------+-----------------+----------+------+--------------+-------+-----------+--------+---------+--------+----------+----------+
|SUBID10017|      Bandhu|     Seth|        Varughese|1996-10-15|  Male|+91 0695289163|  India|  Chinsurah|  136713|     S108|       N|2016-10-15|2018-06-08|
|SUBID10083|  Bhilangana|   Pandit|Ramachandran Path|1995-01-04|Female|+91 6653069630|  India|   Fatehpur|  359466|     S109|       Y|2015-01-04|2017-10-05|
|SUBID10093|Chandavarman|    Singh|    Sarkar Circle|1997-05-10|Others|+91 6559031791|  India|Navi Mumbai|   83240|     S110|       N|2017-05-10|2022-08-27|
+----------+------------+---------+-----------------+-----

In [0]:
# 3. Group with maximum subgroup
# Group by Grp_Id and count the subgroups, then get the one with the maximum count
max_subgroup_group_df = grpsubgrp_df.groupBy("Grp_Id") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(1)

# Display the result
max_subgroup_group_df.show()

# Saving as a permanent table
permanent_table_name = "max_subgroup_group"
max_subgroup_group_df.write.mode("overwrite").saveAsTable(permanent_table_name)


+------+-----+
|Grp_Id|count|
+------+-----+
|GRP104|    2|
+------+-----+



In [0]:
from pyspark.sql.functions import col

# Group by hospital_id and count the patients, then get the hospital with the maximum count
most_patients_hospital_df = patient_records_df.groupBy("hospital_id") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(1)

# Display the result
most_patients_hospital_df.show()

# Saving as a permanent table
permanent_table_name = "most_patients_hospital"
most_patients_hospital_df.write.mode("overwrite").saveAsTable(permanent_table_name)


+-----------+-----+
|hospital_id|count|
+-----------+-----+
|      H1017|    9|
+-----------+-----+



In [0]:
# 5. Subgroup subscribed most number of times
from pyspark.sql.functions import col

# Group by Subgrp_id and count the occurrences, then get the one with the maximum count
most_subscribed_subgroup_df = subscriber_df.groupBy("Subgrp_id") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(1)

# Display the result
most_subscribed_subgroup_df.show()

# Saving as a permanent table
permanent_table_name = "most_subscribed_subgroup"
most_subscribed_subgroup_df.write.mode("overwrite").saveAsTable(permanent_table_name)

+---------+-----+
|Subgrp_id|count|
+---------+-----+
|     S104|   13|
+---------+-----+



In [0]:
# 6. Total number of claims which were rejected
from pyspark.sql import Row

total_rejected_claims = claims_df.filter(col("Claim_Or_Rejected") == "Rejected") \
    .count()
#print("Total number of rejected claims:", total_rejected_claims)
# Convert the count to a DataFrame
total_rejected_claims_df = spark.createDataFrame([Row(total_rejected_claims=total_rejected_claims)])

# Display the result
total_rejected_claims_df.show()

# Saving as a permanent table
permanent_table_name = "total_rejected_claims"
total_rejected_claims_df.write.mode("overwrite").saveAsTable(permanent_table_name)


+---------------------+
|total_rejected_claims|
+---------------------+
|                    0|
+---------------------+



In [0]:
# 7. Join claims_df with subscriber_df to get city information for claims
from pyspark.sql.functions import count, avg, year, current_date, desc
claims_with_city_df = claims_df.join(subscriber_df, claims_df.SUB_ID == subscriber_df["sub _id"], "left")

# City with most claims
city_most_claims = claims_with_city_df.groupBy("City").count().orderBy(desc("count")).first()
print(f"City with most claims: {city_most_claims['City']}")

# Convert to DataFrame
city_most_claims_df = spark.createDataFrame([city_most_claims])

# Display the result
city_most_claims_df.show()

#Saving as a permanent table
permanent_table_name = "city_most_claims"
city_most_claims_df.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)

City with most claims: Mysore
+------+-----+
|  City|count|
+------+-----+
|Mysore|    2|
+------+-----+



In [0]:
# 8. Government or private policy subscriptions
policy_type_subscription = subscriber_df.groupBy("Elig_ind").count().orderBy(desc("count"))
policy_type_subscription.show()

#Saving as a permanent table
permanent_table_name = "policy_type_subscription"
policy_type_subscription.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)

+--------+-----+
|Elig_ind|count|
+--------+-----+
|       N|   50|
|       Y|   46|
|    null|    4|
+--------+-----+



In [0]:
# 9. Average monthly premium paid by subscribers
avg_monthly_premium = subgroup_df.select(avg("Monthly_Premium")).first()
print(f"Average monthly premium paid by subscribers: {avg_monthly_premium['avg(Monthly_Premium)']}")

# Convert to DataFrame
avg_monthly_premium_df = spark.createDataFrame([avg_monthly_premium])

# Display the result
avg_monthly_premium_df.show()

#Saving as a permanent table
permanent_table_name = "avg_monthly_premium"
avg_monthly_premium_df.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)


Average monthly premium paid by subscribers: 1870.0
+--------------------+
|avg(Monthly_Premium)|
+--------------------+
|              1870.0|
+--------------------+



In [0]:
# 10. Most profitable group

most_profitable_group = group_df.groupBy("Grp_Id") \
    .agg(sum("premium_written").alias("total_premium")) \
    .orderBy(col("total_premium").desc()) \
    .first()

print("Most profitable group:", most_profitable_group["Grp_Id"])

# Convert to DataFrame
most_profitable_group_df = spark.createDataFrame([most_profitable_group])

# Display the result
most_profitable_group_df.show()

#Saving as a permanent table
permanent_table_name = "most_profitable_group"
most_profitable_group_df.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)

Most profitable group: GRP131
+------+-------------+
|Grp_Id|total_premium|
+------+-------------+
|GRP131|        99000|
+------+-------------+



In [0]:
# 11. Patients below age of 18 admitted for cancer
from pyspark.sql.functions import sum as spark_sum

# Filter patients below 18 with cancer
patients_below_18_cancer = patient_records_df.filter(
    (col("disease_name").contains("cancer")) &
    (datediff(current_date(), col("patient_birth_date")) / 365 < 18)
)

# Display the result
patients_below_18_cancer.show()

#Saving as a permanent table
permanent_table_name = "patients_below_18_cancer"
patients_below_18_cancer.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)

+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone|disease_name|city|hospital_id|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
+----------+------------+--------------+------------------+-------------+------------+----+-----------+



In [0]:
claims_df_cleaned.show()
patient_records_df_cleaned.show()

+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|Claim_Or_Rejected|    SUB_ID|claim_amount|claim_date|claim_id|      claim_type|    disease_name|patient_id|
+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|                N|SUBID10016|       42860|1955-01-20|      16| claims of value|        Smallpox|    179662|
|                N| SUBID1010|       29150|1999-01-25|      10| claims of value| Fanconi anaemia|    156168|
|                N|SUBID10017|      161786|2017-06-01|      17|claims of policy|  Pollen allergy|    184479|
|                Y|SUBID10011|       40897|1975-02-08|      11| claims of value|   Breast cancer|    114241|
|                Y|SUBID10015|      100224|1986-08-02|      15| claims of value|          Dengue|    135184|
|              NaN|SUBID10018|       66129|1956-01-04|      18|  claims of fact|   Breast cancer|    156988|
|                Y|

In [0]:
#12 List patients who have cashless insurance and have total charges greater than or equal for Rs. 50,000.
# Joining patients and claims DataFrames
joined_df = patient_records_df_cleaned.join(claims_df_cleaned, patient_records_df_cleaned["Patient_id"] == claims_df_cleaned["patient_id"])
# Applying filters
filtered_df = joined_df.filter(
    (claims_df_cleaned["claim_type"] == "claims of value") &
    (claims_df_cleaned["claim_amount"].cast("int") >= 50000)
)
# Selecting required columns
patients_cashless = filtered_df.select(
    patient_records_df_cleaned["Patient_id"],
    patient_records_df_cleaned["Patient_name"],
    patient_records_df_cleaned["hospital_id"],
    claims_df_cleaned["claim_amount"],
    claims_df_cleaned["claim_type"]
)
# Showing the result
patients_cashless.show()
# Saving as a permanent table
permanent_table_name = "patients_cashless"
patients_cashless.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)

+----------+------------+-----------+------------+---------------+
|Patient_id|Patient_name|hospital_id|claim_amount|     claim_type|
+----------+------------+-----------+------------+---------------+
|    189996|       Ekant|      H1003|      192381|claims of value|
|    109251|   Anjushree|      H1001|      116937|claims of value|
|    167340|          NA|      H1003|      118628|claims of value|
|    132947|       Saroj|      H1016|      186502|claims of value|
|    172579|     Devnath|      H1019|      168634|claims of value|
|    109342|  Chitranjan|      H1011|      125727|claims of value|
|    146382|  Dharmadaas|      H1019|       75983|claims of value|
|    196369|          NA|      H1017|      164159|claims of value|
|    148137|       Umang|      H1002|      105982|claims of value|
|    198182|       Lalit|      H1017|       71703|claims of value|
|    187158|      Harbir|      H1001|       79874|claims of value|
|    199252|     Ujjawal|      H1009|       59924|claims of va

In [0]:
patient_records_df.show()
disease_df_cleaned.show()

+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date| patient_phone|    disease_name|                city|hospital_id|
+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|    187158|      Harbir|        Female|        1924-06-30|+91 0112009318|    Galactosemia|            Rourkela|      H1001|
|    112766|    Brahmdev|        Female|        1948-12-20|+91 1727749552|  Bladder cancer|        Tiruvottiyur|      H1016|
|    199252|     Ujjawal|          Male|        1980-04-16|+91 8547451606|   Kidney cancer|           Berhampur|      H1009|
|    133424|     Ballari|        Female|        1969-09-25|+91 0106026841|         Suicide|        Bihar Sharif|      H1017|
|    172579|     Devnath|        Female|        1946-05-01|+91 1868774631|    Food allergy|         Bidhannagar|      H1019|


In [0]:
# 13. Female patients over age 40 who underwent knee surgery in the past year
# Filter for female patients over the age of 40
female_patients_over_40 = patient_records_df.filter(
    (col("patient_gender") == "Female") &
    (datediff(current_date(), col("patient_birth_date")) / 365 > 40)
)
#female_patients_over_40.show()
# Filter for knee surgery in the past year
knee_surgery_patients = female_patients_over_40.filter(
    col("disease_name").contains("Knee Surgery"))

# Show the results
knee_surgery_patients.show()

# #Saving as a permanent table
permanent_table_name = "knee_surgery_patients"
knee_surgery_patients.write.format("csv").mode("overwrite").saveAsTable(permanent_table_name)

+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone|disease_name|city|hospital_id|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
+----------+------------+--------------+------------------+-------------+------------+----+-----------+



In [0]:
# Assuming your DataFrames are already cleaned and ready
claims_df_cleaned.createOrReplaceTempView("claims")
disease_df_cleaned.createOrReplaceTempView("disease")
group_df_cleaned.createOrReplaceTempView("group")
grpsubgrp_df_cleaned.createOrReplaceTempView("grpsubgrp")
hospital_df_cleaned.createOrReplaceTempView("hospital")
subgroup_df_cleaned.createOrReplaceTempView("subgroup")
subscriber_df_cleaned.createOrReplaceTempView("subscriber")
patient_records_df_cleaned.createOrReplaceTempView("patient_records")


