In [0]:
# Create Spark session
spark = SparkSession.builder.appName("HealthCareAnalysis").getOrCreate()

# Load datasets from Filestore
claims = spark.read.json("/FileStore/tables/claims-2.json")
disease = spark.read.csv("/FileStore/tables/disease-2.csv", header=True, inferSchema=True)
group = spark.read.csv("/FileStore/tables/group-2.csv", header=True, inferSchema=True)
grpsubgrp = spark.read.csv("/FileStore/tables/grpsubgrp-2.csv", header=True, inferSchema=True)
hospital = spark.read.csv("/FileStore/tables/hospital-2.csv", header=True, inferSchema=True)
patient_records = spark.read.csv("/FileStore/tables/Patient_records-2.csv", header=True, inferSchema=True)
subgroup = spark.read.csv("/FileStore/tables/subgroup-2.csv", header=True, inferSchema=True)
subscriber = spark.read.csv("/FileStore/tables/subscriber-2.csv", header=True, inferSchema=True)

# Show schema of each DataFrame to verify column names
claims.printSchema()
subscriber.printSchema()
grpsubgrp.printSchema()
hospital.printSchema()
patient_records.printSchema()


root
 |-- Claim_Or_Rejected: string (nullable = true)
 |-- SUB_ID: string (nullable = true)
 |-- claim_amount: string (nullable = true)
 |-- claim_date: string (nullable = true)
 |-- claim_id: long (nullable = true)
 |-- claim_type: string (nullable = true)
 |-- disease_name: string (nullable = true)
 |-- patient_id: long (nullable = true)

root
 |-- sub _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- Birth_date: date (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip Code: integer (nullable = true)
 |-- Subgrp_id: string (nullable = true)
 |-- Elig_ind: string (nullable = true)
 |-- eff_date: date (nullable = true)
 |-- term_date: date (nullable = true)

root
 |-- SubGrp_ID: string (nullable = true)
 |-- Grp_Id: string (nullable = true)

root
 |-- Hospita

In [0]:
# Query to find the disease with the maximum number of claims
disease_max_claims = claims.groupBy("disease_name").count().orderBy("count", ascending=False).limit(1)
disease_max_claims.show()


+------------+-----+
|disease_name|count|
+------------+-----+
| Pet allergy|    3|
+------------+-----+



In [0]:
# Find the disease with the maximum number of claims
disease_max_claims = claims.groupBy("disease_name").count().orderBy("count", ascending=False).first()
disease_max_claims_disease = disease_max_claims['disease_name']
disease_max_claims_count = disease_max_claims['count']
disease_max_claims_disease, disease_max_claims_count


Out[22]: ('Pet allergy', 3)

In [0]:
# Reimport necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, date_sub, current_date

# Create Spark session
spark = SparkSession.builder.appName("HealthCareAnalysis").getOrCreate()

# Load datasets from Filestore
claims = spark.read.json("/FileStore/tables/claims-2.json")
disease = spark.read.csv("/FileStore/tables/disease-2.csv", header=True, inferSchema=True)
group = spark.read.csv("/FileStore/tables/group-2.csv", header=True, inferSchema=True)
grpsubgrp = spark.read.csv("/FileStore/tables/grpsubgrp-2.csv", header=True, inferSchema=True)
hospital = spark.read.csv("/FileStore/tables/hospital-2.csv", header=True, inferSchema=True)
patient_records = spark.read.csv("/FileStore/tables/Patient_records-2.csv", header=True, inferSchema=True)
subgroup = spark.read.csv("/FileStore/tables/subgroup-2.csv", header=True, inferSchema=True)
subscriber = spark.read.csv("/FileStore/tables/subscriber-2.csv", header=True, inferSchema=True)

# Show schema of each DataFrame to verify column names
claims.printSchema()
subscriber.printSchema()
grpsubgrp.printSchema()
hospital.printSchema()
patient_records.printSchema()


root
 |-- Claim_Or_Rejected: string (nullable = true)
 |-- SUB_ID: string (nullable = true)
 |-- claim_amount: string (nullable = true)
 |-- claim_date: string (nullable = true)
 |-- claim_id: long (nullable = true)
 |-- claim_type: string (nullable = true)
 |-- disease_name: string (nullable = true)
 |-- patient_id: long (nullable = true)

root
 |-- sub _id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- Birth_date: date (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Phone: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zip Code: integer (nullable = true)
 |-- Subgrp_id: string (nullable = true)
 |-- Elig_ind: string (nullable = true)
 |-- eff_date: date (nullable = true)
 |-- term_date: date (nullable = true)

root
 |-- SubGrp_ID: string (nullable = true)
 |-- Grp_Id: string (nullable = true)

root
 |-- Hospita

In [0]:
# Find the disease with the maximum number of claims
disease_max_claims = claims.groupBy("disease_name").count().orderBy("count", ascending=False).first()
disease_max_claims_disease = disease_max_claims['disease_name']
disease_max_claims_count = disease_max_claims['count']
disease_max_claims_disease, disease_max_claims_count


Out[25]: ('Pet allergy', 3)

In [0]:
# Find subscribers who are aged less than 30 and have subscribed to any subgroup
from pyspark.sql.functions import current_date, datediff

# Calculate the age based on Birth_date and filter subscribers under 30
subscribers_under_30 = subscriber.join(claims, subscriber["sub _id"] == claims["SUB_ID"]) \
                                  .filter(datediff(current_date(), subscriber["Birth_date"]) / 365 < 30) \
                                  .select(subscriber["sub _id"]).distinct().count()
subscribers_under_30


Out[26]: 1

In [0]:
# Find the group with the maximum number of subgroups
group_max_subgroups = grpsubgrp.groupBy("Grp_Id").count().orderBy("count", ascending=False).first()
group_max_subgroups_group = group_max_subgroups['Grp_Id']
group_max_subgroups_count = group_max_subgroups['count']
group_max_subgroups_group, group_max_subgroups_count


Out[27]: ('GRP104', 2)

In [0]:
# Find the hospital that serves the most patients
hospital_max_patients = patient_records.groupBy("hospital_id").count().orderBy("count", ascending=False).first()
hospital_max_patients_hospital = hospital_max_patients['hospital_id']
hospital_max_patients_count = hospital_max_patients['count']
hospital_max_patients_hospital, hospital_max_patients_count


Out[28]: ('H1017', 9)

In [0]:
# Find the subgroups that are subscribed to the most
subgroup_max_subscriptions = grpsubgrp.groupBy("SubGrp_ID").count().orderBy("count", ascending=False).first()
subgroup_max_subscriptions_subgroup = subgroup_max_subscriptions['SubGrp_ID']
subgroup_max_subscriptions_count = subgroup_max_subscriptions['count']
subgroup_max_subscriptions_subgroup, subgroup_max_subscriptions_count


Out[29]: ('S106', 5)

In [0]:
# Find the total number of claims that were rejected
rejected_claims_count = claims.filter(claims["Claim_Or_Rejected"] == 'Y').count()
rejected_claims_count


Out[30]: 18

In [0]:
# Find the city from which the most claims are coming
city_max_claims = patient_records.groupBy("city").count().orderBy("count", ascending=False).first()
city_max_claims_city = city_max_claims['city']
city_max_claims_count = city_max_claims['count']
city_max_claims_city, city_max_claims_count


Out[32]: ('Mysore', 2)

In [0]:
# Find out which groups of policies subscribers subscribe to the most: Government or private
policy_type_max_subscriptions = subscriber.groupBy("Elig_ind").count().orderBy("count", ascending=False).first()
policy_type_max_subscriptions_type = policy_type_max_subscriptions['Elig_ind']
policy_type_max_subscriptions_count = policy_type_max_subscriptions['count']
policy_type_max_subscriptions_type, policy_type_max_subscriptions_count


Out[33]: ('N', 50)

In [0]:
# Calculate the average monthly premium
average_monthly_premium = subgroup.agg(avg("Monthly_Premium")).first()[0]
print(f"The average monthly premium subscribers pay to the insurance company is {average_monthly_premium}.")


The average monthly premium subscribers pay to the insurance company is 1870.0.


In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, trim

# Create Spark session
spark = SparkSession.builder.appName("HealthCareAnalysis").getOrCreate()

# Load datasets from Filestore
claims_df = spark.read.json("/FileStore/tables/claims-2.json")
grpsubgrp_df = spark.read.csv("/FileStore/tables/grpsubgrp-2.csv", header=True, inferSchema=True)

# Display a few records from each dataset to inspect SUB_ID and SubGrp_ID values
claims_df.select("SUB_ID", "claim_amount").show(10)
grpsubgrp_df.select("SubGrp_ID", "Grp_Id").show(10)

# Check the number of records in both datasets
claims_count = claims_df.count()
grpsubgrp_count = grpsubgrp_df.count()
print(f"Number of records in claims dataset: {claims_count}")
print(f"Number of records in grpsubgrp dataset: {grpsubgrp_count}")

# Check for null values in the join columns
null_sub_id_claims = claims_df.filter(col("SUB_ID").isNull()).count()
null_subgrp_id_grpsubgrp = grpsubgrp_df.filter(col("SubGrp_ID").isNull()).count()
print(f"Number of null SUB_IDs in claims dataset: {null_sub_id_claims}")
print(f"Number of null SubGrp_IDs in grpsubgrp dataset: {null_subgrp_id_grpsubgrp}")

# Trim whitespace from SUB_ID and SubGrp_ID columns
claims_df = claims_df.withColumn("SUB_ID", trim(col("SUB_ID")))
grpsubgrp_df = grpsubgrp_df.withColumn("SubGrp_ID", trim(col("SubGrp_ID")))

# Perform the join again and inspect
joined_df = claims_df.join(grpsubgrp_df, claims_df["SUB_ID"] == grpsubgrp_df["SubGrp_ID"], "inner")
joined_df.show(10)

# Count the number of matching records
joined_count = joined_df.count()
print(f"Number of matching records after join: {joined_count}")

# Find out which group is the most profitable based on the total claim amount
result = joined_df.groupBy("Grp_Id") \
                  .agg(_sum("claim_amount").alias("total_claim_amount")) \
                  .orderBy("total_claim_amount", ascending=False) \
                  .first()

if result:
    group_most_profitable_group = result['Grp_Id']
    group_most_profitable_amount = result['total_claim_amount']
    print(f"The most profitable group is {group_most_profitable_group} with a total claim amount of {group_most_profitable_amount}.")
else:
    print("No result found for the most profitable group.")


+----------+------------+
|    SUB_ID|claim_amount|
+----------+------------+
| SUBID1000|       79874|
|SUBID10001|      151142|
|SUBID10002|       59924|
|SUBID10003|      143120|
|SUBID10004|      168634|
|SUBID10005|       64840|
| SUBID1006|       26800|
|SUBID10007|      177186|
|SUBID10008|      141123|
|SUBID10009|       88540|
+----------+------------+
only showing top 10 rows

+---------+------+
|SubGrp_ID|Grp_Id|
+---------+------+
|     S101|GRP101|
|     S101|GRP105|
|     S102|GRP110|
|     S102|GRP150|
|     S102|GRP136|
|     S103|GRP122|
|     S103|GRP108|
|     S103|GRP138|
|     S103|GRP148|
|     S104|GRP103|
+---------+------+
only showing top 10 rows

Number of records in claims dataset: 70
Number of records in grpsubgrp dataset: 38
Number of null SUB_IDs in claims dataset: 0
Number of null SubGrp_IDs in grpsubgrp dataset: 0
+-----------------+------+------------+----------+--------+----------+------------+----------+---------+------+
|Claim_Or_Rejected|SUB_ID|cla