# Database Schema

In [10]:
from IPython.display import display, Image

url = 'https://github.com/bhishanpdl/HELTHCARE-SYSTEM/blob/main/RDBMS%20Schema/Rdbms%20Schema.jpg?raw=true'
display(Image(url=url))

# Create database
- https://github.com/tejasjbansal/HELTHCARE-SYSTEM
- https://github.com/bhishanpdl/HELTHCARE-SYSTEM/tree/main

# Create pandas dataframe from RAW csv from github

In [1]:
import pandas as pd
import numpy as np
import requests

In [2]:
url = "https://raw.githubusercontent.com/bhishanpdl/HELTHCARE-SYSTEM/refs/heads/main/Processed%20Data/disease.csv"
df = pd.read_csv(url)

df.head()

Unnamed: 0,S101,110001,Beriberi
0,S101,110002,Scurvy
1,S101,110003,Goitre
2,S101,110004,Osteoporosis
3,S101,110005,Rickets
4,S101,110006,Anaemia


# Create pandas dataframe from multiple csv and json files from github

In [9]:
import pandas as pd
import requests
from io import StringIO

# Base URL for the GitHub raw files
base_url = "https://raw.githubusercontent.com/bhishanpdl/HELTHCARE-SYSTEM/refs/heads/main/Processed%20Data/"

# List of files to read
files = {
    "df_disease": "disease.csv",
    "df_group": "group.csv",
    "df_grpsubgrp": "grpsubgrp.csv",
    "df_hospital": "hospital.csv",
    "df_patient": "patient.csv",
    "df_subgroup": "subgroup.csv",
    "df_subscriber": "subscriber.csv",
    "df_claims": "claims.json",  # JSON file
}

# Dictionary to store DataFrames
dataframes = {}
list_dfs = []

# Loop through files and read them
run = True
if run:
  for df_name, file_name in files.items():
      url = base_url + file_name
      response = requests.get(url)

      if response.status_code == 200:
          if file_name.endswith(".csv"):
              df = pd.read_csv(StringIO(response.text),header=None)
          elif file_name.endswith(".json"):
              df = pd.read_json(url)

          dataframes[df_name] = df  # Store DataFrame in dictionary
          list_dfs.append(df)  # Add to list
          print(f"Loaded {df_name} with shape {df.shape} and columns {list(df.columns)}")
      else:
          print(f"Failed to fetch {file_name}, status code: {response.status_code}")

  # Now you can access individual DataFrames, e.g.:
  # dataframes["df_disease"], dataframes["df_patient"], etc.

  # List of all DataFrames
  print(f"Total DataFrames loaded: {len(list_dfs)}")


Loaded df_disease with shape (60, 3) and columns [0, 1, 2]
Loaded df_group with shape (58, 8) and columns [0, 1, 2, 3, 4, 5, 6, 7]
Loaded df_grpsubgrp with shape (38, 2) and columns [0, 1]
Loaded df_hospital with shape (20, 5) and columns [0, 1, 2, 3, 4]
Loaded df_patient with shape (70, 8) and columns [0, 1, 2, 3, 4, 5, 6, 7]
Loaded df_subgroup with shape (10, 4) and columns [0, 1, 2, 3]
Loaded df_subscriber with shape (100, 15) and columns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Loaded df_claims with shape (70, 8) and columns ['claim_id', 'patient_id', 'disease_name', 'SUB_ID', 'Claim_Or_Rejected', 'claim_type', 'claim_amount', 'claim_date']
Total DataFrames loaded: 8


# Give column names

In [12]:
len(dataframes)

8

In [28]:
column_names = {
    "df_disease": ["DISEASE_ID", "SUBGRP_ID", "DISEASE_NAME"],

    "df_group": ["Country", "Premium_Written", "GRP_SK", "GRP_ID", "GRP_NAME", "State", "City", "Zip Code"],

    "df_grpsubgrp": ["G_ID", "S_ID"],

    "df_hospital": ["Hospital_id", "hospital_name", "city", "state", "country"],

    "df_patient": ["Patient_id", "Patient_name", "P_gender", "P_age", "P_phone", "disease_name", "City", "hospital_id"],

    "df_subgroup": ["SUBGRP_SK", "SUBGRP_ID", "SUBGRP_NAME", "MONTHLY_PREMIUM"],

    "df_subscriber": [
        "S_KEY", "SUB_ID", "FIRST_NAME", "LAST_NAME", "BIRTH_DATE", "Street", "gender",
        "phone", "Country", "city", "zip_code", "SUBGRP_ID", "ELIG_IND", "E_DATE", "T_DATE"
    ],

    "df_claims": ["claim_id", "patient_id", "disease_name", "SUB_ID", "claim_date", "claim_type",
                  "claim_amount", "ClaimOrRejected"]
}

# Assigning correct column names to DataFrames
for df_name, df in dataframes.items():
    if df_name in column_names:
        df.columns = column_names[df_name]


# QC: Make sure table columns are correct ( group and patient has wrong columns in schema)

In [29]:
for df_name, df in dataframes.items():
  print('Table name:', df_name)
  # print(df.head())
  display(df.head())
  print()

Table name: df_disease


Unnamed: 0,DISEASE_ID,SUBGRP_ID,DISEASE_NAME
0,S101,110001,Beriberi
1,S101,110002,Scurvy
2,S101,110003,Goitre
3,S101,110004,Osteoporosis
4,S101,110005,Rickets



Table name: df_group


Unnamed: 0,Country,Premium_Written,GRP_SK,GRP_ID,GRP_NAME,State,City,Zip Code
0,India,72000,482018,GRP101,Life Insurance Corporation of India,Govt.,Mumbai,1956
1,India,45000,482049,GRP102,HDFC Standard Life Insurance Co. Ltd.,Private,Mumbai,2000
2,India,64000,482030,GRP103,Max Life Insurance Co. Ltd.,Private,Delhi,2000
3,India,59000,482028,GRP104,ICICI Prudential Life Insurance Co. Ltd.,Private,Mumbai,2000
4,India,37000,482014,GRP105,Kotak Mahindra Life Insurance Co. Ltd.,Private,Mumbai,2001



Table name: df_grpsubgrp


Unnamed: 0,G_ID,S_ID
0,S101,GRP101
1,S101,GRP105
2,S102,GRP110
3,S102,GRP150
4,S102,GRP136



Table name: df_hospital


Unnamed: 0,Hospital_id,hospital_name,city,state,country
0,H1000,All India Institute of Medical Sciences,New Delhi,Na,India
1,H1001,Medanta The Medicity,Gurgaon,Haryana,India
2,H1002,The Christian Medical College,Vellore,Tamil Nadu,India
3,H1003,PGIMER - Postgraduate Institute of Medical Edu...,Chandigarh,Haryana,India
4,H1004,Apollo Hospital - Chennai,Chennai,Tamil Nadu,India



Table name: df_patient


Unnamed: 0,Patient_id,Patient_name,P_gender,P_age,P_phone,disease_name,City,hospital_id
0,187158,Harbir,Female,1960-02-24,+91 0112009318,Galactosemia,Rourkela,H1001
1,112766,Brahmdev,Female,1955-05-30,+91 1727749552,Bladder cancer,Tiruvottiyur,H1016
2,199252,Ujjawal,Male,1965-12-31,+91 8547451606,Kidney cancer,Berhampur,H1009
3,133424,Ballari,Female,1979-06-11,+91 0106026841,Suicide,Bihar Sharif,H1017
4,172579,Devnath,Female,1982-02-22,+91 1868774631,Food allergy,Bidhannagar,H1019



Table name: df_subgroup


Unnamed: 0,SUBGRP_SK,SUBGRP_ID,SUBGRP_NAME,MONTHLY_PREMIUM
0,S101,Deficiency Diseases,3000,"GRP101,GRP105"
1,S102,Accident,1000,"GRP110,GRP150,GRP136"
2,S103,Physiology,2000,"GRP122,GRP108,GRP138,GRP148"
3,S104,Therapy,1500,"GRP103,GRP113,GRP123,GRP133,GRP143"
4,S105,Allergies,2300,"GRP153,GRP104,GRP114,GRP124"



Table name: df_subscriber


Unnamed: 0,S_KEY,SUB_ID,FIRST_NAME,LAST_NAME,BIRTH_DATE,Street,gender,phone,Country,city,zip_code,SUBGRP_ID,ELIG_IND,E_DATE,T_DATE
0,0,SUBID10000,Harbir,Vishwakarma,Baria Marg,1924-06-30,Female,+91 0112009318,India,Rourkela,767058,S107,Y,1944-06-30,1954-01-14
1,1,SUBID10001,Brahmdev,Sonkar,Lala Marg,1948-12-20,Female,+91 1727749552,India,Tiruvottiyur,34639,S105,Y,1968-12-20,1970-05-16
2,2,SUBID10002,Ujjawal,Devi,Mammen Zila,1980-04-16,Male,+91 8547451606,India,Berhampur,914455,S106,N,2000-04-16,2008-05-04
3,3,SUBID10003,Ballari,Mishra,Sahni Zila,1969-09-25,Female,+91 0106026841,India,Bihar Sharif,91481,S104,N,1989-09-25,1995-06-05
4,4,SUBID10004,Devnath,Srivastav,Magar Zila,1946-05-01,Female,+91 1868774631,India,Bidhannagar,531742,S110,N,1966-05-01,1970-12-09



Table name: df_claims


Unnamed: 0,claim_id,patient_id,disease_name,SUB_ID,claim_date,claim_type,claim_amount,ClaimOrRejected
0,0,187158,Galactosemia,SUBID10000,N,claims of value,79874,1949-03-14
1,1,112766,Bladder cancer,SUBID10001,N,claims of policy,151142,1970-03-16
2,2,199252,Kidney cancer,SUBID10002,N,claims of value,59924,2008-02-03
3,3,133424,Suicide,SUBID10003,N,claims of fact,143120,1995-02-08
4,4,172579,Food allergy,SUBID10004,Y,claims of value,168634,1967-05-23





# Create hive database from the tables

In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import pandas as pd
import requests
from io import StringIO

# Create a Spark session
spark = SparkSession.builder \
    .appName("HealthcareData") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.driver.memory", "4g") \
    .enableHiveSupport() \
    .getOrCreate()

# Define database name
database_name = "healthcare_system"

# Drop the database if it exists (removes all tables and data)
spark.sql(f"DROP DATABASE IF EXISTS {database_name} CASCADE")

# Create a new clean database
spark.sql(f"CREATE DATABASE {database_name}")
spark.sql(f"USE {database_name}")

# Base URL for the GitHub raw files
base_url = "https://raw.githubusercontent.com/bhishanpdl/HELTHCARE-SYSTEM/refs/heads/main/Processed%20Data/"

# List of files to read
files = {
    "disease": "disease.csv",
    "group": "group.csv",
    "grpsubgrp": "grpsubgrp.csv",
    "hospital": "hospital.csv",
    "patient": "patient.csv",
    "subgroup": "subgroup.csv",
    "subscriber": "subscriber.csv",
    "claims": "claims.json",  # JSON file
}

# Corrected column names mapping
column_names = {
    "disease": ["DISEASE_ID", "SUBGRP_ID", "DISEASE_NAME"],
    "group": ["Country", "Premium_Written", "GRP_SK", "GRP_ID", "GRP_NAME", "State", "City", "Zip Code"],
    "grpsubgrp": ["G_ID", "S_ID"],
    "hospital": ["Hospital_id", "hospital_name", "city", "state", "country"],
    "patient": ["Patient_id", "Patient_name", "P_gender", "P_age", "P_phone", "disease_name", "City", "hospital_id"],
    "subgroup": ["SUBGRP_SK", "SUBGRP_ID", "SUBGRP_NAME", "MONTHLY_PREMIUM"],
    "subscriber": [
        "S_KEY", "SUB_ID", "FIRST_NAME", "LAST_NAME", "BIRTH_DATE", "Street", "gender",
        "phone", "Country", "city", "zip_code", "SUBGRP_ID", "ELIG_IND", "E_DATE", "T_DATE"
    ],
    "claims": ["claim_id", "patient_id", "disease_name", "SUB_ID", "claim_date", "claim_type",
               "claim_amount", "ClaimOrRejected"]
}

# Dictionary to store PySpark DataFrames
spark_dfs = {}

# Load CSV and JSON files into PySpark DataFrames with correct column names
for table_name, file_name in files.items():
    url = base_url + file_name
    response = requests.get(url)

    if response.status_code == 200:
        if file_name.endswith(".csv"):
            pandas_df = pd.read_csv(StringIO(response.text), header=None)
        elif file_name.endswith(".json"):
            pandas_df = pd.read_json(url)

        # Assign correct column names
        if table_name in column_names:
            pandas_df.columns = column_names[table_name]

        # Convert Pandas DataFrame to PySpark DataFrame
        spark_df = spark.createDataFrame(pandas_df)
        spark_dfs[table_name] = spark_df

        print(f"Loaded {table_name} with {spark_df.count()} records and columns {spark_df.columns}.")

# Store DataFrames as SQL Tables
for table_name, spark_df in spark_dfs.items():
    spark_df.write.mode("overwrite").saveAsTable(f"{database_name}.{table_name}")
    print(f"Table {table_name} created successfully.")

# List available tables
print("Available tables in Spark SQL:")
spark.sql(f"SHOW TABLES IN {database_name}").show()


Loaded disease with 60 records and columns ['DISEASE_ID', 'SUBGRP_ID', 'DISEASE_NAME'].
Loaded group with 58 records and columns ['Country', 'Premium_Written', 'GRP_SK', 'GRP_ID', 'GRP_NAME', 'State', 'City', 'Zip Code'].
Loaded grpsubgrp with 38 records and columns ['G_ID', 'S_ID'].
Loaded hospital with 20 records and columns ['Hospital_id', 'hospital_name', 'city', 'state', 'country'].
Loaded patient with 70 records and columns ['Patient_id', 'Patient_name', 'P_gender', 'P_age', 'P_phone', 'disease_name', 'City', 'hospital_id'].
Loaded subgroup with 10 records and columns ['SUBGRP_SK', 'SUBGRP_ID', 'SUBGRP_NAME', 'MONTHLY_PREMIUM'].
Loaded subscriber with 100 records and columns ['S_KEY', 'SUB_ID', 'FIRST_NAME', 'LAST_NAME', 'BIRTH_DATE', 'Street', 'gender', 'phone', 'Country', 'city', 'zip_code', 'SUBGRP_ID', 'ELIG_IND', 'E_DATE', 'T_DATE'].
Loaded claims with 70 records and columns ['claim_id', 'patient_id', 'disease_name', 'SUB_ID', 'claim_date', 'claim_type', 'claim_amount', 'Cla





```sql
-- NOTE: these sql does not work for spark sql
-- Disease and Subgroup
ALTER TABLE healthcare_db.disease
ADD CONSTRAINT fk_disease_subgroup FOREIGN KEY (SUBGRP_ID) REFERENCES healthcare_db.subgroup(SUBGRP_ID);

-- Patient and Disease
ALTER TABLE healthcare_db.patient
ADD CONSTRAINT fk_patient_disease FOREIGN KEY (disease_name) REFERENCES healthcare_db.disease(DISEASE_NAME);

-- Patient and Hospital
ALTER TABLE healthcare_db.patient
ADD CONSTRAINT fk_patient_hospital FOREIGN KEY (hospital_id) REFERENCES healthcare_db.hospital(hospital_id);

-- Claims and Patient
ALTER TABLE healthcare_db.claims
ADD CONSTRAINT fk_claims_patient FOREIGN KEY (patient_id) REFERENCES healthcare_db.patient(patient_id);

-- Claims and Disease
ALTER TABLE healthcare_db.claims
ADD CONSTRAINT fk_claims_disease FOREIGN KEY (disease_name) REFERENCES healthcare_db.disease(DISEASE_NAME);

-- Claims and Subscriber
ALTER TABLE healthcare_db.claims
ADD CONSTRAINT fk_claims_subscriber FOREIGN KEY (SUB_ID) REFERENCES healthcare_db.subscriber(SUB_ID);

-- Subscriber and Subgroup
ALTER TABLE healthcare_db.subscriber
ADD CONSTRAINT fk_subscriber_subgroup FOREIGN KEY (SUBGRP_ID) REFERENCES healthcare_db.subgroup(SUBGRP_ID);

-- Group and Group_Subgroup
ALTER TABLE healthcare_db.group_subgroup
ADD CONSTRAINT fk_group_subgroup_group FOREIGN KEY (G_ID) REFERENCES healthcare_db.groups(GRP_ID);

ALTER TABLE healthcare_db.group_subgroup
ADD CONSTRAINT fk_group_subgroup_subgroup FOREIGN KEY (S_ID) REFERENCES healthcare_db.subgroup(SUBGRP_ID);

```



In [33]:
# Define relationships using SQL commands
relations_sql = [
    "ALTER TABLE disease ADD CONSTRAINT fk_disease_subgroup FOREIGN KEY (SUBGRP_ID) REFERENCES subgroup(SUBGRP_ID)",

    "ALTER TABLE patient ADD CONSTRAINT fk_patient_disease FOREIGN KEY (disease_name) REFERENCES disease(DISEASE_NAME)",

    "ALTER TABLE patient ADD CONSTRAINT fk_patient_hospital FOREIGN KEY (hospital_id) REFERENCES hospital(hospital_id)",

    "ALTER TABLE claims ADD CONSTRAINT fk_claims_patient FOREIGN KEY (patient_id) REFERENCES patient(patient_id)",

    "ALTER TABLE claims ADD CONSTRAINT fk_claims_disease FOREIGN KEY (disease_name) REFERENCES disease(DISEASE_NAME)",

    "ALTER TABLE claims ADD CONSTRAINT fk_claims_subscriber FOREIGN KEY (SUB_ID) REFERENCES subscriber(SUB_ID)",

    "ALTER TABLE subscriber ADD CONSTRAINT fk_subscriber_subgroup FOREIGN KEY (SUBGRP_ID) REFERENCES subgroup(SUBGRP_ID)",

    "ALTER TABLE group_subgroup ADD CONSTRAINT fk_group_subgroup_group FOREIGN KEY (G_ID) REFERENCES groups(GRP_ID)",

    "ALTER TABLE group_subgroup ADD CONSTRAINT fk_group_subgroup_subgroup FOREIGN KEY (S_ID) REFERENCES subgroup(SUBGRP_ID)"
]

# Run each query
for sql_query in relations_sql:
    try:
        spark.sql(sql_query)
        print(f"Executed: {sql_query}")
    except Exception as e:
        print(f"Error executing {sql_query}: {e}")
        break  # Stop if any query fails

# spark sql does not support foreign keys.
# for other databases we can use these codes


Error executing ALTER TABLE disease ADD CONSTRAINT fk_disease_subgroup FOREIGN KEY (SUBGRP_ID) REFERENCES subgroup(SUBGRP_ID): 
[PARSE_SYNTAX_ERROR] Syntax error at or near 'CONSTRAINT'.(line 1, pos 24)

== SQL ==
ALTER TABLE disease ADD CONSTRAINT fk_disease_subgroup FOREIGN KEY (SUBGRP_ID) REFERENCES subgroup(SUBGRP_ID)
------------------------^^^



# Get Schema of the tables

In [34]:
# Function to get schema of Spark tables
def get_table_schemas(database_name):
    # Get list of tables in the database
    tables = spark.sql(f"SHOW TABLES IN {database_name}").collect()

    for table in tables:
        table_name = table["tableName"]
        print(f"Schema for table: {table_name}")
        spark.sql(f"DESCRIBE {database_name}.{table_name}").show(truncate=False)
        print("\n" + "="*50 + "\n")

# Use the function for your database
get_table_schemas("healthcare_system")


Schema for table: claims
+---------------+---------+-------+
|col_name       |data_type|comment|
+---------------+---------+-------+
|claim_id       |bigint   |NULL   |
|patient_id     |bigint   |NULL   |
|disease_name   |string   |NULL   |
|SUB_ID         |string   |NULL   |
|claim_date     |string   |NULL   |
|claim_type     |string   |NULL   |
|claim_amount   |bigint   |NULL   |
|ClaimOrRejected|string   |NULL   |
+---------------+---------+-------+



Schema for table: disease
+------------+---------+-------+
|col_name    |data_type|comment|
+------------+---------+-------+
|DISEASE_ID  |string   |NULL   |
|SUBGRP_ID   |bigint   |NULL   |
|DISEASE_NAME|string   |NULL   |
+------------+---------+-------+



Schema for table: group
+---------------+---------+-------+
|col_name       |data_type|comment|
+---------------+---------+-------+
|Country        |string   |NULL   |
|Premium_Written|bigint   |NULL   |
|GRP_SK         |bigint   |NULL   |
|GRP_ID         |string   |NULL   |
|GRP

# Get one spark dataframe from database table

In [35]:
# Load the disease table into a PySpark DataFrame
sdf = spark.sql("SELECT * FROM healthcare_system.disease")

# Show first few rows
sdf.show(5)


+----------+---------+-------------------+
|DISEASE_ID|SUBGRP_ID|       DISEASE_NAME|
+----------+---------+-------------------+
|      S106|   110031|            Suicide|
|      S106|   110032|            Smoking|
|      S106|   110033|         Sunbathing|
|      S106|   110034|Alcohol consumption|
|      S106|   110035|       Head banging|
+----------+---------+-------------------+
only showing top 5 rows



In [36]:
sdf.printSchema()

root
 |-- DISEASE_ID: string (nullable = true)
 |-- SUBGRP_ID: long (nullable = true)
 |-- DISEASE_NAME: string (nullable = true)



In [37]:
sdf.limit(5).show()

+----------+---------+-------------------+
|DISEASE_ID|SUBGRP_ID|       DISEASE_NAME|
+----------+---------+-------------------+
|      S106|   110031|            Suicide|
|      S106|   110032|            Smoking|
|      S106|   110033|         Sunbathing|
|      S106|   110034|Alcohol consumption|
|      S106|   110035|       Head banging|
+----------+---------+-------------------+



# Pyspark Basics

In [38]:
spark.sql("show databases").show()

+-----------------+
|        namespace|
+-----------------+
|          default|
|healthcare_system|
+-----------------+



In [39]:
spark.sql("use healthcare_system")

++
||
++
++

