##Connecting to S3


##Patient_records Data

In [0]:
pat_df = spark.read.options(header= "True", inferSchema = "True").csv("s3://takeobucket/capstone/patients_records/Patient_records.csv")
pat_df.show()

+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date| patient_phone|    disease_name|                city|hospital_id|
+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|    187158|      Harbir|        Female|        1924-06-30|+91 0112009318|    Galactosemia|            Rourkela|      H1001|
|    112766|    Brahmdev|        Female|        1948-12-20|+91 1727749552|  Bladder cancer|        Tiruvottiyur|      H1016|
|    199252|     Ujjawal|          Male|        1980-04-16|+91 8547451606|   Kidney cancer|           Berhampur|      H1009|
|    133424|     Ballari|        Female|        1969-09-25|+91 0106026841|         Suicide|        Bihar Sharif|      H1017|
|    172579|     Devnath|        Female|        1946-05-01|+91 1868774631|    Food allergy|         Bidhannagar|      H1019|


###Checking and counting for Null values in patient table 

In [0]:
from pyspark.sql.functions import col, when, count
df2 = pat_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) |
                                (col(c).contains('NaN')) |
                                (col(c) == ' ') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in pat_df.columns])
df2.show()

+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone|disease_name|city|hospital_id|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|         0|          17|             0|                 0|            2|           0|   0|          0|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+



###Replacing null values 

In [0]:
from pyspark.sql.functions import col, when, desc

null_representations = ['None', 'null', ' ', 'NULL', 'NaN']

pat_cleaned = pat_df
for con in pat_df.columns:
    pat_cleaned = pat_cleaned.withColumn(con, when(col(con).isin(null_representations) | col(con).isNull(), "NA").otherwise(col(con)))
pat_cleaned.show()

+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date| patient_phone|    disease_name|                city|hospital_id|
+----------+------------+--------------+------------------+--------------+----------------+--------------------+-----------+
|    187158|      Harbir|        Female|        1924-06-30|+91 0112009318|    Galactosemia|            Rourkela|      H1001|
|    112766|    Brahmdev|        Female|        1948-12-20|+91 1727749552|  Bladder cancer|        Tiruvottiyur|      H1016|
|    199252|     Ujjawal|          Male|        1980-04-16|+91 8547451606|   Kidney cancer|           Berhampur|      H1009|
|    133424|     Ballari|        Female|        1969-09-25|+91 0106026841|         Suicide|        Bihar Sharif|      H1017|
|    172579|     Devnath|        Female|        1946-05-01|+91 1868774631|    Food allergy|         Bidhannagar|      H1019|


### Null Values before and after Replacement

In [0]:
import pyspark.sql.functions as F 

null_counts_before = pat_df.select([col(c).isNull().cast("int").alias(c) for c in pat_df.columns])\
    .agg(*[F.sum(c).alias(c) for c in pat_df.columns])

# Count null values after replacement
null_counts_after = pat_cleaned.select([col(c).isNull().cast("int").alias(c) for c in pat_cleaned.columns]).agg(*[F.sum(c).alias(c) for c in pat_cleaned.columns])

# Show the counts before and after
print("Null counts before replacement:")
null_counts_before.show()

print("Null counts after replacement:")
null_counts_after.show()

Null counts before replacement:
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone|disease_name|city|hospital_id|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|         0|          17|             0|                 0|            2|           0|   0|          0|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+

Null counts after replacement:
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|Patient_id|Patient_name|patient_gender|patient_birth_date|patient_phone|disease_name|city|hospital_id|
+----------+------------+--------------+------------------+-------------+------------+----+-----------+
|         0|           0|             0|                 0|            0|           0|   0|          0|


### Checking Duplicates & dropping them 

In [0]:
df3= pat_df.groupBy("Patient_Id").count()
df4 = df3.filter(col("count")> 1)
df4.show()
# there are no duplicate entries 

+----------+-----+
|Patient_Id|count|
+----------+-----+
+----------+-----+



## Subscribers Data

In [0]:
sub_df = spark.read.options(header= "True", inferSchema = "True").csv("s3://takeobucket/capstone/subscriber/subscriber.csv")
sub_df.show()

+----------+----------+-----------+-----------------+----------+------+--------------+-------+--------------------+--------+---------+--------+----------+----------+
|   sub _id|first_name|  last_name|           Street|Birth_date|Gender|         Phone|Country|                City|Zip Code|Subgrp_id|Elig_ind|  eff_date| term_date|
+----------+----------+-----------+-----------------+----------+------+--------------+-------+--------------------+--------+---------+--------+----------+----------+
|SUBID10000|    Harbir|Vishwakarma|       Baria Marg|1924-06-30|Female|+91 0112009318|  India|            Rourkela|  767058|     S107|       Y|1944-06-30|1954-01-14|
|SUBID10001|  Brahmdev|     Sonkar|        Lala Marg|1948-12-20|Female|+91 1727749552|  India|        Tiruvottiyur|   34639|     S105|       Y|1968-12-20|1970-05-16|
|SUBID10002|   Ujjawal|       Devi|      Mammen Zila|1980-04-16|  Male|+91 8547451606|  India|           Berhampur|  914455|     S106|       N|2000-04-16|2008-05-04|
|SUB

###Checking and counting for Null values in Subscribers table

In [0]:
df5 = sub_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) | 
                                (col(c).contains('NaN')) |
                                (col(c) == ' ') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in sub_df.columns])
df5.show()

+-------+----------+---------+------+----------+------+-----+-------+----+--------+---------+--------+--------+---------+
|sub _id|first_name|last_name|Street|Birth_date|Gender|Phone|Country|City|Zip Code|Subgrp_id|Elig_ind|eff_date|term_date|
+-------+----------+---------+------+----------+------+-----+-------+----+--------+---------+--------+--------+---------+
|      0|        27|        0|     0|         0|     0|    3|      0|   0|       0|        2|       4|       0|        0|
+-------+----------+---------+------+----------+------+-----+-------+----+--------+---------+--------+--------+---------+



###Replacing null values 

In [0]:
null_representations = ['None', 'null', ' ', 'isnull','NaN']

sub_cleaned = sub_df
for con in sub_df.columns:
    # Replacing representations of null values with "NA"
    sub_cleaned = sub_cleaned.withColumn(con, when(col(con).isin(null_representations) | col(con).isNull(), "NA").otherwise(col(con)))
sub_cleaned.show()

+----------+----------+-----------+-----------------+----------+------+--------------+-------+--------------------+--------+---------+--------+----------+----------+
|   sub _id|first_name|  last_name|           Street|Birth_date|Gender|         Phone|Country|                City|Zip Code|Subgrp_id|Elig_ind|  eff_date| term_date|
+----------+----------+-----------+-----------------+----------+------+--------------+-------+--------------------+--------+---------+--------+----------+----------+
|SUBID10000|    Harbir|Vishwakarma|       Baria Marg|1924-06-30|Female|+91 0112009318|  India|            Rourkela|  767058|     S107|       Y|1944-06-30|1954-01-14|
|SUBID10001|  Brahmdev|     Sonkar|        Lala Marg|1948-12-20|Female|+91 1727749552|  India|        Tiruvottiyur|   34639|     S105|       Y|1968-12-20|1970-05-16|
|SUBID10002|   Ujjawal|       Devi|      Mammen Zila|1980-04-16|  Male|+91 8547451606|  India|           Berhampur|  914455|     S106|       N|2000-04-16|2008-05-04|
|SUB

### Null Values before and after Replacement

In [0]:
null_counts_before = sub_df.select([col(c).isNull().cast("int").alias(c) for c in sub_df.columns])\
    .agg(*[F.sum(c).alias(c) for c in sub_df.columns])

# Count null values after replacement
null_counts_after = sub_cleaned.select([col(c).isNull().cast("int").alias(c) for c in sub_cleaned.columns])\
    .agg(*[F.sum(c).alias(c) for c in sub_cleaned.columns])

# Show the counts before and after
print("Null counts before replacement:")
null_counts_before.show()

print("Null counts after replacement:")
null_counts_after.show()

Null counts before replacement:
+-------+----------+---------+------+----------+------+-----+-------+----+--------+---------+--------+--------+---------+
|sub _id|first_name|last_name|Street|Birth_date|Gender|Phone|Country|City|Zip Code|Subgrp_id|Elig_ind|eff_date|term_date|
+-------+----------+---------+------+----------+------+-----+-------+----+--------+---------+--------+--------+---------+
|      0|        27|        0|     0|         0|     0|    3|      0|   0|       0|        2|       4|       0|        0|
+-------+----------+---------+------+----------+------+-----+-------+----+--------+---------+--------+--------+---------+

Null counts after replacement:
+-------+----------+---------+------+----------+------+-----+-------+----+--------+---------+--------+--------+---------+
|sub _id|first_name|last_name|Street|Birth_date|Gender|Phone|Country|City|Zip Code|Subgrp_id|Elig_ind|eff_date|term_date|
+-------+----------+---------+------+----------+------+-----+-------+----+--------

### Checking Duplicates & dropping them 

In [0]:

df6 = sub_df.withColumnRenamed("sub _id", "subscriber_id")
df7 = df6.groupBy("subscriber_id").count().orderBy(desc("count"))
df8 = df7.filter(col("count")> 1)
df7.show()
df8.show()

+-------------+-----+
|subscriber_id|count|
+-------------+-----+
|   SUBID10023|    1|
|   SUBID10059|    1|
|   SUBID10078|    1|
|   SUBID10083|    1|
|   SUBID10046|    1|
|   SUBID10084|    1|
|   SUBID10090|    1|
|    SUBID1034|    1|
|   SUBID10018|    1|
|    SUBID1065|    1|
|   SUBID10017|    1|
|   SUBID10057|    1|
|   SUBID10007|    1|
|   SUBID10089|    1|
|   SUBID10066|    1|
|   SUBID10071|    1|
|   SUBID10070|    1|
|   SUBID10094|    1|
|   SUBID10051|    1|
|   SUBID10055|    1|
+-------------+-----+
only showing top 20 rows

+-------------+-----+
|subscriber_id|count|
+-------------+-----+
+-------------+-----+



## Claims Data

In [0]:
claims_df = spark.read.options(header= "True", inferSchema = "True").json("s3://takeobucket/capstone/claims/claims (1).json")
claims_df.show()

+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|Claim_Or_Rejected|    SUB_ID|claim_amount|claim_date|claim_id|      claim_type|    disease_name|patient_id|
+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|                N| SUBID1000|       79874|1949-03-14|       0| claims of value|    Galactosemia|    187158|
|              NaN|SUBID10001|      151142|1970-03-16|       1|claims of policy|  Bladder cancer|    112766|
|              NaN|SUBID10002|       59924|2008-02-03|       2| claims of value|   Kidney cancer|    199252|
|              NaN|SUBID10003|      143120|1995-02-08|       3|  claims of fact|         Suicide|    133424|
|                Y|SUBID10004|      168634|1967-05-23|       4| claims of value|    Food allergy|    172579|
|              NaN|SUBID10005|       64840|1991-10-04|       5|claims of policy|        Whiplash|    171320|
|                N|

###Checking and counting for Null values in claims table 

In [0]:

df5 = claims_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) | 
                                (col(c).contains('NaN')) |
                                (col(c) == '') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in claims_df.columns])
df5.show()

+-----------------+------+------------+----------+--------+----------+------------+----------+
|Claim_Or_Rejected|SUB_ID|claim_amount|claim_date|claim_id|claim_type|disease_name|patient_id|
+-----------------+------+------------+----------+--------+----------+------------+----------+
|               30|     0|           0|         0|       0|         0|           0|         0|
+-----------------+------+------------+----------+--------+----------+------------+----------+



### Replacing Null Values

In [0]:
null_representations = ['None', 'null', ' ', 'isnull','NaN']

claim_cleaned = claims_df
for con in claims_df.columns:
    # Replacing representations of null values with "NA"
    claim_cleaned = claim_cleaned.withColumn(con, when(col(con).isin(null_representations) | col(con).isNull(), "NA").otherwise(col(con)))
claim_cleaned.show()

+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|Claim_Or_Rejected|    SUB_ID|claim_amount|claim_date|claim_id|      claim_type|    disease_name|patient_id|
+-----------------+----------+------------+----------+--------+----------------+----------------+----------+
|                N| SUBID1000|       79874|1949-03-14|       0| claims of value|    Galactosemia|    187158|
|               NA|SUBID10001|      151142|1970-03-16|       1|claims of policy|  Bladder cancer|    112766|
|               NA|SUBID10002|       59924|2008-02-03|       2| claims of value|   Kidney cancer|    199252|
|               NA|SUBID10003|      143120|1995-02-08|       3|  claims of fact|         Suicide|    133424|
|                Y|SUBID10004|      168634|1967-05-23|       4| claims of value|    Food allergy|    172579|
|               NA|SUBID10005|       64840|1991-10-04|       5|claims of policy|        Whiplash|    171320|
|                N|

### NaN Values before and after Replacement

In [0]:

nan_counts_before = claims_df.select([col(c).isin('NaN').cast("int").alias(c) for c in claims_df.columns]) \
    .agg(*[F.sum(c).alias(c) for c in claims_df.columns])


nan_counts_after = claim_cleaned.select([col(c).isin('NaN').cast("int").alias(c) for c in claim_cleaned.columns]) \
    .agg(*[F.sum(c).alias(c) for c in claim_cleaned.columns])

print("'NaN' counts before replacement:")
nan_counts_before.show()

print("'NaN' counts after replacement:")
nan_counts_after.show()

'NaN' counts before replacement:
+-----------------+------+------------+----------+--------+----------+------------+----------+
|Claim_Or_Rejected|SUB_ID|claim_amount|claim_date|claim_id|claim_type|disease_name|patient_id|
+-----------------+------+------------+----------+--------+----------+------------+----------+
|               30|     0|           0|         0|       0|         0|           0|         0|
+-----------------+------+------------+----------+--------+----------+------------+----------+

'NaN' counts after replacement:
+-----------------+------+------------+----------+--------+----------+------------+----------+
|Claim_Or_Rejected|SUB_ID|claim_amount|claim_date|claim_id|claim_type|disease_name|patient_id|
+-----------------+------+------------+----------+--------+----------+------------+----------+
|                0|     0|           0|         0|       0|         0|           0|         0|
+-----------------+------+------------+----------+--------+----------+---------

### Checking Duplicates & dropping them 

In [0]:
df6 = df5.groupBy("claim_id").count()
df7 = df6.filter(col("count")> 1)
df6.show()
#there are no duplicate entries

+--------+-----+
|claim_id|count|
+--------+-----+
|       0|    1|
+--------+-----+



## Grp_SubGrp Table

In [0]:
grp_df = spark.read.options(header= "True", inferSchema = "True").csv("s3://takeobucket/capstone/grp_subgrp/grpsubgrp.csv")
grp_df.show()

+---------+------+
|SubGrp_ID|Grp_Id|
+---------+------+
|     S101|GRP101|
|     S101|GRP105|
|     S102|GRP110|
|     S102|GRP150|
|     S102|GRP136|
|     S103|GRP122|
|     S103|GRP108|
|     S103|GRP138|
|     S103|GRP148|
|     S104|GRP103|
|     S104|GRP113|
|     S104|GRP123|
|     S104|GRP133|
|     S104|GRP143|
|     S105|GRP153|
|     S105|GRP104|
|     S105|GRP114|
|     S105|GRP124|
|     S106|GRP117|
|     S106|GRP127|
+---------+------+
only showing top 20 rows



###Checking and counting for Null values in grp_subGrp table 

In [0]:

df5 = grp_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) |
                                 (col(c).contains('NaN')) | 
                                (col(c) == '') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in grp_df.columns])
df5.show()
# no null values 

+---------+------+
|SubGrp_ID|Grp_Id|
+---------+------+
|        0|     0|
+---------+------+



##Hospital data

In [0]:
hosp_df = spark.read.options(header= "True", inferSchema = "True").csv("s3://takeobucket/capstone/hospital/hospital.csv")
hosp_df.show()

+-----------+--------------------+----------+-----------+-------+
|Hospital_id|       Hospital_name|      city|      state|country|
+-----------+--------------------+----------+-----------+-------+
|      H1000|All India Institu...| New Delhi|        NaN|  India|
|      H1001|Medanta The Medicity|   Gurgaon|    Haryana|  India|
|      H1002|The Christian Med...|   Vellore| Tamil Nadu|  India|
|      H1003|PGIMER - Postgrad...|Chandigarh|    Haryana|  India|
|      H1004|Apollo Hospital -...|   Chennai| Tamil Nadu|  India|
|      H1005|P. D. Hinduja Nat...|    Mumbai|Maharashtra|  India|
|      H1006|Breach Candy Hosp...|    Mumbai|Maharashtra|  India|
|      H1007|Fortis Flt. Lt. R...| New Delhi|        NaN|  India|
|      H1008|King Edward Memor...|    Mumbai|Maharashtra|  India|
|      H1009|Indraprastha Apol...|     Delhi|        NaN|  India|
|      H1010|Lilavati Hospital...|    Mumbai|Maharashtra|  India|
|      H1011|Sir Ganga Ram Hos...|     Delhi|        NaN|  India|
|      H10

###Checking and counting for Null values in grp_subGrp table 

In [0]:
from pyspark.sql.functions import col,when,count,desc
df5 = hosp_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) |
                                 (col(c).contains('NaN')) | 
                                (col(c) == '') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in hosp_df.columns])
df5.show()

+-----------+-------------+----+-----+-------+
|Hospital_id|Hospital_name|city|state|country|
+-----------+-------------+----+-----+-------+
|          0|            0|   0|    4|      0|
+-----------+-------------+----+-----+-------+



## Disease Data

In [0]:
dis_df = spark.read.options(header= "True", inferSchema = "True").csv("s3://takeobucket/capstone/disease/disease.csv")
dis_df.show()

+--------+-----------+--------------------+
|SubGrpID| Disease_ID|        Disease_name|
+--------+-----------+--------------------+
|    S101|     110001|            Beriberi|
|    S101|     110002|              Scurvy|
|    S101|     110003|              Goitre|
|    S101|     110004|        Osteoporosis|
|    S101|     110005|             Rickets|
|    S101|     110006|             Anaemia|
|    S102|     110007|           Fractures|
|    S102|     110008|        Heart Attack|
|    S102|     110009|               Burns|
|    S102|     110010|             Choking|
|    S102|     110011|              Stroke|
|    S102|     110012|      Food Poisoning|
|    S103|     110013|              Asthma|
|    S103|     110014|            Glaucoma|
|    S103|     110015|            Diabetes|
|    S103|     110016|             Amnesia|
|    S103|     110017|         Parasomnias|
|    S103|     110018|Neurocognitive di...|
|    S104|     110019|             Vertigo|
|    S104|     110020|          

###Checking and counting for Null values in grp_subGrp table 

In [0]:

df5 = dis_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) |
                                 (col(c).contains('NaN')) | 
                                (col(c) == '') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in dis_df.columns])
df5.show()

+--------+-----------+------------+
|SubGrpID| Disease_ID|Disease_name|
+--------+-----------+------------+
|       0|          0|           0|
+--------+-----------+------------+



## SubGroup Data

In [0]:
sbg_df = spark.read.options(header= "True", inferSchema = "True").csv("s3://takeobucket/capstone/subgroup/subgroup.csv")
sbg_df.show()

+---------+-------------------+---------------+
|SubGrp_id|        SubGrp_Name|Monthly_Premium|
+---------+-------------------+---------------+
|     S101|Deficiency Diseases|           3000|
|     S102|           Accident|           1000|
|     S103|         Physiology|           2000|
|     S104|            Therapy|           1500|
|     S105|          Allergies|           2300|
|     S106|     Self inflicted|           1200|
|     S107|             Cancer|           3200|
|     S108| Infectious disease|           1500|
|     S109|         Hereditary|           2000|
|     S110|              Viral|           1000|
+---------+-------------------+---------------+



###Checking and counting for Null values in grp_subGrp table 

In [0]:

df5 = sbg_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) |
                                 (col(c).contains('NaN')) | 
                                (col(c) == '') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in sbg_df.columns])
df5.show()

+---------+-----------+---------------+
|SubGrp_id|SubGrp_Name|Monthly_Premium|
+---------+-----------+---------------+
|        0|          0|              0|
+---------+-----------+---------------+



## Group data

In [0]:
gp_df = spark.read.options(header= "True", inferSchema = "True").csv("s3://takeobucket/capstone/group/group.csv")
gp_df.show()

+-------+---------------+-------+------+--------------------+--------+---------+----+
|Country|premium_written|zipcode|Grp_Id|            Grp_Name|Grp_Type|     city|year|
+-------+---------------+-------+------+--------------------+--------+---------+----+
|  India|          72000| 482018|GRP101|Life Insurance Co...|   Govt.|   Mumbai|1956|
|  India|          45000| 482049|GRP102|HDFC Standard Lif...| Private|   Mumbai|2000|
|  India|          64000| 482030|GRP103|Max Life Insuranc...| Private|    Delhi|2000|
|  India|          59000| 482028|GRP104|ICICI Prudential ...| Private|   Mumbai|2000|
|  India|          37000| 482014|GRP105|Kotak Mahindra Li...| Private|   Mumbai|2001|
|  India|          89000| 482011|GRP106|Aditya Birla Sun ...| Private|   Mumbai|2000|
|  India|          70000| 482006|GRP107|TATA AIG Life Ins...| Private|   Mumbai|2001|
|  India|          52000| 482034|GRP108|SBI Life Insuranc...| Private|   Mumbai|2001|
|  India|          78000| 482032|GRP109|Exide Life Ins

###Checking and counting for Null values in grp_subGrp table 

In [0]:

df5 = gp_df.select([count(when((col(c).contains('None')) | 
                                (col(c).contains('NULL')) |
                                 (col(c).contains('NaN')) | 
                                (col(c) == '') | 
                                col(c).isNull(), 
                               True)
                            ).alias(c)
                    for c in gp_df.columns])
df5.show()

+-------+---------------+-------+------+--------+--------+----+----+
|Country|premium_written|zipcode|Grp_Id|Grp_Name|Grp_Type|city|year|
+-------+---------------+-------+------+--------+--------+----+----+
|      0|              0|      0|     0|       0|       0|   0|   0|
+-------+---------------+-------+------+--------+--------+----+----+

