# Health Insurance

#### Project Summary
This project aims to gather data of health insurance plans and analyze it through many aspects, It aims to answer questions like:
- What are the top 5 Networks that have the biggest Number of organizations
- The Plans with the highest Individual Rate for Each Age for Year 2023
- Average Individual Rate changes over years 2016 to 2023 grouped by Age
And many more analytical questions.

The project follows the following steps:
* Step 1: Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Data Analysis
* Step 5: Performance Optimization

## Step 1: Gather Data







In [1]:
import requests
import zipfile
from io import BytesIO
import os


parent_folder = 'health-insurance-raw-data'
domain = "https://download.cms.gov/marketplace-puf/"



In [2]:
def get_zip_urls (url):
  zip_urls = []
  for i in range(2014,2024):
    zip_urls.append(domain+str(i)+url)

  return zip_urls


def download_data(urls,folder_name):
  # Iterate over the URLs and extract the zip files
  for url in urls:
      response = requests.get(url)
      year = url.split("/")[-2]

      with zipfile.ZipFile(BytesIO(response.content)) as z:
          for filename in z.namelist():
              os.makedirs(parent_folder + "/" + folder_name + "/" + year, exist_ok=True)

              with open(parent_folder + "/" + folder_name + "/" +  year + "/" + filename, "wb") as f:
                  f.write(z.read(filename))




In [3]:
folders = ['benefits-and-cost-sharing',"rate","plan-attributes","business-rules","service-area","network"]
remaining_urls = ["/benefits-and-cost-sharing-puf.zip","/rate-puf.zip","/plan-attributes-puf.zip","/business-rules-puf.zip","/service-area-puf.zip","/network-puf.zip"]
for i in range(len(folders)):
  urls = get_zip_urls(remaining_urls[i])
  download_data(urls,folders[i])

## Step 2 : Explore and Assess the data  

In [4]:
!pip install pyspark



In [5]:
from pyspark.sql import SparkSession

In [6]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import count,when,col
from pyspark.sql.functions import desc


In [7]:
spark = SparkSession.builder \
    .appName("HealthInsurance") \
    .getOrCreate()

### 1- Plan Attributes Raw Data

In [8]:
df_plan_raw = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/health-insurance-raw-data/plan-attributes/*/*.csv")


df_plan_raw.show(5)
# df_plan_raw.filter(df_plan_raw.BusinessYear == "2023").show()


+------------+---------+--------+------------------------------+----------+---------------+------------------+--------------+-------------------+--------------------+-------------+---------+-------------+-----------+---------+---------+----------+----------+----------------+----------------+----------------------------+-------------------------------+---------------------------+-------------------+------------------------------------------------------------+----------------------+--------------------+---------------+----------------------+--------------------------------+----------------------+---------------------------------------+----------------+-----------------+------------------+--------------------+-------------------------------+------------------------+-----------------------------------+---------------+-----------------------+------------+-----------------+------------------------+--------------------+--------------------+------------------------+----------------------------

In [9]:
df_plan_raw.count()

227036

In [10]:
# df_plan_raw.printSchema()

In [11]:
df_plan_raw.select(countDistinct("StandardComponentId")).show()


+-----------------------------------+
|count(DISTINCT StandardComponentId)|
+-----------------------------------+
|                              17814|
+-----------------------------------+



In [12]:
df_plan_raw.filter(df_plan_raw.IssuerMarketPlaceMarketingName == "HIOS").show(5)


+------------+---------+--------+------------------------------+----------+-------------------+--------------+--------------+-------------------+------------------+-------------+----------+--------------+--------------------+----------+--------+----------+----------+----------------+---------------+----------------------------+-------------------------------+---------------------------+-------------------+------------------------------------------------------------+----------------------+-----------------+---------------+----------------------+--------------------------------+----------------------+---------------------------------------+----------------+-----------------+------------------+--------------------+-------------------------------+------------------------+-----------------------------------+---------------+-----------------------+------------+------+------------------------+----------------+--------------------+------------------------+--------------------------------+-----

In [13]:
df_plan_raw.filter(df_plan_raw.IssuerId == "21989").show(50)


+------------+---------+--------+------------------------------+----------+-------------------+------------------+--------------+-------------------+--------------------+-------------+----------+--------------+--------------------+----------+---------+----------+----------+----------------+----------------+----------------------------+-------------------------------+---------------------------+-------------------+------------------------------------------------------------+----------------------+--------------------+---------------+----------------------+--------------------------------+----------------------+---------------------------------------+----------------+-----------------+------------------+--------------------+-------------------------------+------------------------+-----------------------------------+---------------+-----------------------+-----------------+--------------------+------------------------+--------------------+--------------------+------------------------+----

In [14]:
df_plan_raw.filter(df_plan_raw.IssuerId == "38344").show(5)



+------------+---------+--------+------------------------------+----------+---------------+--------------+--------------+-------------------+--------------------+-------------+---------+-------------+-----------+---------+--------+----------+--------------+----------------+---------------+----------------------------+-------------------------------+---------------------------+-------------------+------------------------------------------------------------+----------------------+--------------------+---------------+----------------------+--------------------------------+----------------------+---------------------------------------+----------------+-----------------+------------------+--------------------+-------------------------------+------------------------+-----------------------------------+---------------+-----------------------+--------------------+-----------------+------------------------+--------------------+--------------------+------------------------+----------------------

We conclude from the above 3 cells that some data is outdated , and organization names were named HIOS at first then they were renamed in more recent years, so we don't need the raws with outdated data, so we can drop all data with ``` IssuerMarketPlaceMarketingName ``` value of ```HIOS```



In [15]:
df_plan_raw = df_plan_raw.filter(df_plan_raw.IssuerMarketPlaceMarketingName != "HIOS")


In [16]:
df_plan_raw.select(countDistinct("StandardComponentId")).show()


+-----------------------------------+
|count(DISTINCT StandardComponentId)|
+-----------------------------------+
|                              14503|
+-----------------------------------+



In [17]:
df_plan_raw.count()

143932

In [18]:
df_plan_raw = df_plan_raw.dropna(how = "any", subset = ["PlanId"])


In [19]:
df_plan_raw.count()

132225

In [20]:
from pyspark.sql.functions import length

df_plan_raw = df_plan_raw.filter(length(df_plan_raw.PlanId) == 17)


In [21]:
df_plan_raw.count()

33031

In [22]:
df_plan_raw.createOrReplaceTempView("plan_staging")


### 2- Network Raw Data

In [23]:
df_network_raw = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/health-insurance-raw-data/network/*/*.csv")


df_network_raw.show()


+------------+---------+--------+----------+----------+-------------------+---------+----------+--------------------+---------+--------------------+---------+--------------+----------+
|BusinessYear|StateCode|IssuerId|SourceName|VersionNum|         ImportDate|IssuerId2|StateCode2|         NetworkName|NetworkId|          NetworkURL|RowNumber|MarketCoverage|DentalOnly|
+------------+---------+--------+----------+----------+-------------------+---------+----------+--------------------+---------+--------------------+---------+--------------+----------+
|        2015|       GA|   89942|      HIOS|         7|2015-02-19 06:21:02|    89942|        GA|Kaiser Permanente...|   GAN001|   kp.org/gaprovider|       13|          NULL|      NULL|
|        2015|       GA|   93332|      HIOS|        10|2014-09-29 21:43:29|    93332|        GA|        Atlanta HMOx|   GAN001|https://www.human...|       13|          NULL|      NULL|
|        2015|       GA|   93332|      HIOS|        10|2014-09-29 21:43:29|

In [24]:
df_network_raw.createOrReplaceTempView("network_staging")


### 3- Rate Raw Data

In [25]:
df_rate_raw = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/health-insurance-raw-data/rate/*/*.csv")


df_rate_raw.show()
# drop na from age !!

+------------+---------+--------+----------+----------+-------------------+---------+----------+-----------------+------------------+--------------+-------------+-------------+-------------+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+---------+
|BusinessYear|StateCode|IssuerId|SourceName|VersionNum|         ImportDate|IssuerId2|FederalTIN|RateEffectiveDate|RateExpirationDate|        PlanId| RatingAreaId|      Tobacco|          Age|IndividualRate|IndividualTobaccoRate|Couple|PrimarySubscriberAndOneDependent|PrimarySubscriberAndTwoDependents|PrimarySubscriberAndThreeOrMoreDependents|CoupleAndOneDependent|CoupleAndTwoDependents|CoupleAndThreeOrMoreDependents|RowNumber|
+------------+---------+--------+----------+----------+-------------------+---------+----------+-----------------+------------------+-------

In [26]:
df_rate_raw.filter(df_rate_raw.PlanId == None).show()

+------------+---------+--------+----------+----------+----------+---------+----------+-----------------+------------------+------+------------+-------+---+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+---------+
|BusinessYear|StateCode|IssuerId|SourceName|VersionNum|ImportDate|IssuerId2|FederalTIN|RateEffectiveDate|RateExpirationDate|PlanId|RatingAreaId|Tobacco|Age|IndividualRate|IndividualTobaccoRate|Couple|PrimarySubscriberAndOneDependent|PrimarySubscriberAndTwoDependents|PrimarySubscriberAndThreeOrMoreDependents|CoupleAndOneDependent|CoupleAndTwoDependents|CoupleAndThreeOrMoreDependents|RowNumber|
+------------+---------+--------+----------+----------+----------+---------+----------+-----------------+------------------+------+------------+-------+---+--------------+---------------------+------+--------

In [27]:
df_rate_raw.createOrReplaceTempView("rate_staging")


### 4- Benifit Raw Data

In [28]:
df_benifit_cost_raw = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/health-insurance-raw-data/benefits-and-cost-sharing/*/*.csv")

df_benifit_cost_raw.show()


+------------+---------+--------+----------+----------+-------------------+---------+----------+-------------------+-----------------+--------------------+-------------+-------------+-------------+-------------+-------------+-------------+-----+--------------+---------+---------------+--------+--------------------+-----------+----------+--------------------+--------------------+----------------+----------------+-----------------+-----------------+---------+
|BusinessYear|StateCode|IssuerId|SourceName|VersionNum|         ImportDate|IssuerId2|StateCode2|StandardComponentId|           PlanId|         BenefitName|CopayInnTier1|CopayInnTier2|CopayOutofNet|CoinsInnTier1|CoinsInnTier2|CoinsOutofNet|IsEHB|IsStateMandate|IsCovered|QuantLimitOnSvc|LimitQty|           LimitUnit|MinimumStay|Exclusions|         Explanation|        EHBVarReason|IsSubjToDedTier1|IsSubjToDedTier2|IsExclFromInnMOOP|IsExclFromOonMOOP|RowNumber|
+------------+---------+--------+----------+----------+-------------------+-

In [29]:
df_benifit_cost_raw.select(countDistinct("BenefitName")).show()


+---------------------------+
|count(DISTINCT BenefitName)|
+---------------------------+
|                       1329|
+---------------------------+



In [30]:
df_benifit_cost_raw.createOrReplaceTempView("benifit_staging")


## Step 3 : Define the Data Model


### Organization Table

In [31]:
organization_table = spark.sql("""
                            SELECT  DISTINCT p.IssuerId AS OrganizationId,
                                    p.IssuerMarketPlaceMarketingName AS Name,
                                    p.StateCode AS StateCode
                            FROM plan_staging p

""")

organization_table.createOrReplaceTempView('organization_table')

organization_table.show()


+--------------+--------------------+---------+
|OrganizationId|                Name|StateCode|
+--------------+--------------------+---------+
|         23435|         BannerAetna|       AZ|
|         34210| Renaissance Dental |       WI|
|         27833|Ambetter of Illinois|       IL|
|         60075|TRUASSURE INSURAN...|       AL|
|         15833|            Guardian|       FL|
|         91908|Oscar Insurance C...|       OK|
|         24601|           BEST Life|       TN|
|         47638|Retailers Insuran...|       MI|
|         66759|   Dominion National|       NC|
|         99969|           MedMutual|       OH|
|         24832|Renaissance Life ...|       VA|
|         85533|Imperial Insuranc...|       AZ|
|         34968|             MetLife|       OH|
|         17414|AmeriHealth Carit...|       NC|
|         68398|    UnitedHealthcare|       FL|
|         97560|    UnitedHealthcare|       MS|
|         52664|           SummaCare|       OH|
|         11904|      DentaQuest- LA|   

In [32]:
organization_table.count()

464

In [33]:
organization_table.select(countDistinct("OrganizationId")).show()


+------------------------------+
|count(DISTINCT OrganizationId)|
+------------------------------+
|                           462|
+------------------------------+



In [34]:
organization_table = organization_table.drop_duplicates(['OrganizationId','Name','StateCode'])
organization_table.count()

464

In [35]:
from pyspark.sql.functions import desc

most_common_value = organization_table.groupBy("OrganizationId").count().orderBy(desc("count")).first()[0]
most_common_value

'14630'

In [36]:
organization_table.filter(organization_table.OrganizationId =="36096").show()

+--------------+--------------------+---------+
|OrganizationId|                Name|StateCode|
+--------------+--------------------+---------+
|         36096|Blue Cross and Bl...|       IL|
+--------------+--------------------+---------+



In [37]:
organization_table.filter(organization_table.Name =="OPM").show(5)

+--------------+----+---------+
|OrganizationId|Name|StateCode|
+--------------+----+---------+
+--------------+----+---------+



From the exploration above we conclude that the issuer ID is not unique.
Each organization can exist in different states and the identifier is the organiztion with the state code with the id (which represents the id of this organization in this state)
if we take a closer look , the id has duplicates across the data , so we need to add a primary key

In [38]:
organization_table = organization_table.withColumn("Id", monotonically_increasing_id())
organization_table.show(5)

+--------------+--------------------+---------+---+
|OrganizationId|                Name|StateCode| Id|
+--------------+--------------------+---------+---+
|         23435|         BannerAetna|       AZ|  0|
|         34210| Renaissance Dental |       WI|  1|
|         27833|Ambetter of Illinois|       IL|  2|
|         60075|TRUASSURE INSURAN...|       AL|  3|
|         15833|            Guardian|       FL|  4|
+--------------+--------------------+---------+---+
only showing top 5 rows



In [39]:
# from pyspark.sql.functions import col

# df_filtered = organization_table.groupBy(col("Id")).count().filter(col("count") > 1)
# df_filtered.show()


### Network Table

In [40]:
network_table = spark.sql("""
                            SELECT  DISTINCT n.NetworkId AS NetworkId,
                                    n.IssuerId AS OrganizationId,
                                    n.NetworkName AS NetworkName,
                                    n.StateCode AS StateCode,
                                    n.BusinessYear AS Year

                            FROM network_staging n

""")
network_table = network_table.withColumn("Id", monotonically_increasing_id())

network_table.createOrReplaceTempView('network_table')

network_table.show(5)


+---------+--------------+--------------------+---------+----+---+
|NetworkId|OrganizationId|         NetworkName|StateCode|Year| Id|
+---------+--------------+--------------------+---------+----+---+
|   INN001|         17575|   Pathway X HMO/POS|       IN|2015|  0|
|   INN001|         20855|ADVANTAGE Health ...|       IN|2015|  1|
|   ARN001|         67635|Lincoln DentalCon...|       AR|2015|  2|
|   ORN001|         60257|  Renaissance Dental|       OR|2015|  3|
|   ALN001|         28899|Ameritas PPO Dent...|       AL|2016|  4|
+---------+--------------+--------------------+---------+----+---+
only showing top 5 rows



In [41]:
network_table.filter(network_table.NetworkName == "Molina Marketplace").show()

+---------+--------------+------------------+---------+----+----+
|NetworkId|OrganizationId|       NetworkName|StateCode|Year|  Id|
+---------+--------------+------------------+---------+----+----+
|   FLN001|         54172|Molina Marketplace|       FL|2015| 166|
|   MIN001|         40047|Molina Marketplace|       MI|2014| 599|
|   FLN001|         54172|Molina Marketplace|       FL|2016| 943|
|   MIN001|         40047|Molina Marketplace|       MI|2016|1010|
|   MIN001|         40047|Molina Marketplace|       MI|2015|1339|
|   NMN001|         19722|Molina Marketplace|       NM|2015|3780|
|   FLN001|         54172|Molina Marketplace|       FL|2014|4152|
+---------+--------------+------------------+---------+----+----+



In [42]:
network_table = network_table.drop_duplicates(['NetworkId','OrganizationId','NetworkName',"StateCode"])


In [43]:
network_table.filter(network_table.NetworkName == "Molina Marketplace").show()

+---------+--------------+------------------+---------+----+----+
|NetworkId|OrganizationId|       NetworkName|StateCode|Year|  Id|
+---------+--------------+------------------+---------+----+----+
|   FLN001|         54172|Molina Marketplace|       FL|2015| 166|
|   MIN001|         40047|Molina Marketplace|       MI|2014| 599|
|   NMN001|         19722|Molina Marketplace|       NM|2015|3780|
+---------+--------------+------------------+---------+----+----+



In [44]:
network_table.filter(network_table.NetworkName == "Phoenix HMOx").show()

+---------+--------------+------------+---------+----+----+
|NetworkId|OrganizationId| NetworkName|StateCode|Year|  Id|
+---------+--------------+------------+---------+----+----+
|   AZN001|         23307|Phoenix HMOx|       AZ|2016|3258|
+---------+--------------+------------+---------+----+----+



In [45]:
network_table.filter(network_table.OrganizationId == "19722").show()

+---------+--------------+--------------------+---------+----+----+
|NetworkId|OrganizationId|         NetworkName|StateCode|Year|  Id|
+---------+--------------+--------------------+---------+----+----+
|   NMN001|         19722|Molina Healthcare...|       NM|2016|2429|
|   NMN001|         19722|  Molina Marketplace|       NM|2015|3780|
|   NMN001|         19722|Molina New Mexico...|       NM|2014|1197|
|       No|         19722|          Individual|       NM|2017|3816|
+---------+--------------+--------------------+---------+----+----+



In [46]:
network_table.filter(network_table.OrganizationId == "40047").show()

+---------+--------------+------------------+---------+----+----+
|NetworkId|OrganizationId|       NetworkName|StateCode|Year|  Id|
+---------+--------------+------------------+---------+----+----+
|   MIN001|         40047|Molina Marketplace|       MI|2014| 599|
|       No|         40047|        Individual|       MI|2017|2508|
+---------+--------------+------------------+---------+----+----+



In [47]:
# unique_network_ids = network_table.select("NetworkId").distinct().collect()
# unique_network_ids

#### Clean Network table

In [48]:
network_table = network_table.filter(~col("NetworkId").isin("No","Yes","Individual") & ~col("networkId").isNull())
network_table.show(5)


+---------+--------------+--------------------+---------+----+----+
|NetworkId|OrganizationId|         NetworkName|StateCode|Year|  Id|
+---------+--------------+--------------------+---------+----+----+
|   AKN001|         21989|Delta Dental Premier|       AK|2016|3467|
|   AKN001|         21989|         ODS Premier|       AK|2014|1794|
|   AKN001|         38344|        HeritagePlus|       AK|2014|1372|
|   AKN001|         38344|HeritagePlus and ...|       AK|2016|1064|
|   AKN001|         38536|Lincoln Dental Co...|       AK|2014|1971|
+---------+--------------+--------------------+---------+----+----+
only showing top 5 rows



So a network can exist in different states but it exists only for one organization in that state.
An organization can have only one network in the state.

In [49]:
network_table.filter(network_table.NetworkId == "AKN001").show()

+---------+--------------+--------------------+---------+----+----+
|NetworkId|OrganizationId|         NetworkName|StateCode|Year|  Id|
+---------+--------------+--------------------+---------+----+----+
|   AKN001|         21989|Delta Dental Premier|       AK|2016|3467|
|   AKN001|         21989|         ODS Premier|       AK|2014|1794|
|   AKN001|         38344|        HeritagePlus|       AK|2014|1372|
|   AKN001|         38344|HeritagePlus and ...|       AK|2016|1064|
|   AKN001|         38536|Lincoln Dental Co...|       AK|2014|1971|
|   AKN001|         38536|Lincoln DentalCon...|       AK|2015|1436|
|   AKN001|         42507|DentalGuard Prefe...|       AK|2014|3591|
|   AKN001|         45858|Ameritas PPO Dent...|       AK|2016| 224|
|   AKN001|         47904|  Renaissance Dental|       AK|2016| 584|
|   AKN001|         58670|           Indemnity|       AK|2016|1315|
|   AKN001|         58670|           indemnity|       AK|2015|4223|
|   AKN001|         73836|     Endeavor Select| 

In [50]:
network_table.createOrReplaceTempView('network_table')


### Plan Table

In [51]:
plan_table = spark.sql("""
                            SELECT  DISTINCT p.StandardComponentId AS Id,
                                    n.Id AS NetworkId

                            FROM plan_staging p
                            JOIN network_table n
                            ON n.NetworkId == p.NetworkId
                            AND n.OrganizationId == p.IssuerId
                            AND n.StateCode == p.StateCode

""")
# plan_table = plan_table.withColumn("plan_id", monotonically_increasing_id())

plan_table.createOrReplaceTempView('plan_table')

plan_table.show(5)


+--------------+---------+
|            Id|NetworkId|
+--------------+---------+
|28725AL0120002|     2854|
|16842FL0120076|      909|
|18628FL0160035|     1342|
|48129FL0080004|     1272|
|43802GA0040027|     4014|
+--------------+---------+
only showing top 5 rows



In [52]:
plan_table.count()

7963

In [53]:
plan_table.select(countDistinct("Id")).show()


+------------------+
|count(DISTINCT Id)|
+------------------+
|              4922|
+------------------+



In [54]:
plan_table.filter(plan_table.Id == "21989AK0030001").show()

+--------------+---------+
|            Id|NetworkId|
+--------------+---------+
|21989AK0030001|     3467|
|21989AK0030001|     1794|
+--------------+---------+



In [55]:
network_table.filter(network_table.Id == 3467).show()

+---------+--------------+--------------------+---------+----+----+
|NetworkId|OrganizationId|         NetworkName|StateCode|Year|  Id|
+---------+--------------+--------------------+---------+----+----+
|   AKN001|         21989|Delta Dental Premier|       AK|2016|3467|
+---------+--------------+--------------------+---------+----+----+



In [56]:
network_table.filter(network_table.Id == 1794).show()

+---------+--------------+-----------+---------+----+----+
|NetworkId|OrganizationId|NetworkName|StateCode|Year|  Id|
+---------+--------------+-----------+---------+----+----+
|   AKN001|         21989|ODS Premier|       AK|2014|1794|
+---------+--------------+-----------+---------+----+----+



In [57]:
df_network_raw.filter((df_network_raw.NetworkId =="AKN001" )& (df_network_raw.IssuerId =="21989" ) & (df_network_raw.StateCode =="AK")).show()

+------------+---------+--------+----------+----------+-------------------+---------+----------+--------------------+---------+--------------------+---------+--------------+----------+
|BusinessYear|StateCode|IssuerId|SourceName|VersionNum|         ImportDate|IssuerId2|StateCode2|         NetworkName|NetworkId|          NetworkURL|RowNumber|MarketCoverage|DentalOnly|
+------------+---------+--------+----------+----------+-------------------+---------+----------+--------------------+---------+--------------------+---------+--------------+----------+
|        2015|       AK|   21989|      HIOS|         4|2014-08-08 08:53:29|    21989|        AK|Delta Dental Premier|   AKN001|https://www.modah...|       14|          NULL|      NULL|
|        2016|       AK|   21989|      HIOS|         4|2015-08-22 15:09:32|    21989|        AK|Delta Dental Premier|   AKN001|https://www.modah...|       13|          NULL|      NULL|
|        2014|       AK|   21989|      HIOS|         6|2014-03-19 07:06:49|

This means that Network's name may change over years and that is why there is duplicates in this table so the primary key is both of the fields.

### Plan Variant Table

In [58]:
plan_variant_table = spark.sql("""
                            SELECT  DISTINCT p.PlanId AS Id,
                                    p.StandardComponentId AS PlanId,
                                    p.PlanMarketingName AS Name,
                                    p.MarketCoverage AS MarketCoverage,
                                    p.BusinessYear AS Year,
                                    p.IsNewPlan AS IsNewPlan,
                                    p.PlanType AS PlanType,
                                    r.Age AS Age,
                                    r.IndividualRate AS IndividualRate,
                                    r.IndividualTobaccoRate AS IndividualTobaccoRate,
                                    r.Couple AS Couple,
                                    r.PrimarySubscriberAndOneDependent AS PrimarySubscriberAndOneDependent,
                                    r.PrimarySubscriberAndTwoDependents AS PrimarySubscriberAndTwoDependents,
                                    r.PrimarySubscriberAndThreeOrMoreDependents AS PrimarySubscriberAndThreeOrMoreDependents,
                                    r.CoupleAndOneDependent AS CoupleAndOneDependent,
                                    r.CoupleAndTwoDependents AS CoupleAndTwoDependents,
                                    r.CoupleAndThreeOrMoreDependents AS CoupleAndThreeOrMoreDependents

                            FROM plan_staging p
                            INNER JOIN rate_staging r
                            ON p.StandardComponentId == r.PlanId

""")
# plan_table = plan_table.withColumn("plan_id", monotonically_increasing_id())


plan_variant_table.show(5)


+-----------------+--------------+--------------------+------------------+----+---------+--------+----+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+
|               Id|        PlanId|                Name|    MarketCoverage|Year|IsNewPlan|PlanType| Age|IndividualRate|IndividualTobaccoRate|Couple|PrimarySubscriberAndOneDependent|PrimarySubscriberAndTwoDependents|PrimarySubscriberAndThreeOrMoreDependents|CoupleAndOneDependent|CoupleAndTwoDependents|CoupleAndThreeOrMoreDependents|
+-----------------+--------------+--------------------+------------------+----+---------+--------+----+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+
|

In [59]:
plan_variant_table = plan_variant_table.drop_duplicates(['Id','Age','year'])


In [60]:
plan_variant_table.filter(plan_variant_table.PlanId =="11083MI0100001").show(60)

+-----------------+--------------+--------------------+------------------+----+---------+--------+-----------+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------------------------+
|               Id|        PlanId|                Name|    MarketCoverage|Year|IsNewPlan|PlanType|        Age|IndividualRate|IndividualTobaccoRate|Couple|PrimarySubscriberAndOneDependent|PrimarySubscriberAndTwoDependents|PrimarySubscriberAndThreeOrMoreDependents|CoupleAndOneDependent|CoupleAndTwoDependents|CoupleAndThreeOrMoreDependents|
+-----------------+--------------+--------------------+------------------+----+---------+--------+-----------+--------------+---------------------+------+--------------------------------+---------------------------------+-----------------------------------------+---------------------+----------------------+------------

In [63]:
plan_variant_table.createOrReplaceTempView('plan_variant_table')

## Step 4 : Data Analysis


Show the Top 5 Networks with the greatest number of organizations

In [64]:

network_counts = network_table.groupBy("NetworkId").count()

result = network_counts.join(network_table, "NetworkId")

network_name_counts = result.groupBy("NetworkName").count()

top_networks = network_name_counts.orderBy(desc("count")).limit(5).select("NetworkName", "count")

top_networks.show()



+--------------------+-----+
|         NetworkName|count|
+--------------------+-----+
|Ameritas PPO Dent...|   96|
|            DenteMax|   54|
|        Delta Dental|   38|
| Dentegra Dental PPO|   36|
|  Renaissance Dental|   34|
+--------------------+-----+



The Plans with the highest Individual Rate for Each Age for Year 2023

---



In [65]:
from pyspark.sql.functions import desc

plan_variant_table.filter(plan_variant_table.Year == 2023) \
  .groupBy("Age") \
  .agg({"IndividualRate": "max"}) \
  .withColumnRenamed("max(IndividualRate)", "IndividualRate") \
  .join(plan_variant_table, ["Age", "IndividualRate"], "inner") \
  .select("PlanId", "Name", "Age", "IndividualRate") \
  .show()



+--------------+--------------------+----+--------------+
|        PlanId|                Name| Age|IndividualRate|
+--------------+--------------------+----+--------------+
|38345WI0080029|Dean Focus Networ...|0-20|         99.55|
|40411NC0030001|Cigna Dental Pedi...|  21|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  24|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  26|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  27|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  28|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  30|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  31|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  32|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  36|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  41|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  42|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  48|       9999.00|
|40411NC0030001|Cigna Dental Pedi...|  50|       9999.00|
|40411NC003000

Individual Rate changes over years 2016 to 2023 grouped by Age

In [66]:
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

w = Window.partitionBy("Age").orderBy("Year")

plan_variant_table.filter((plan_variant_table.Year >= 2016) & (plan_variant_table.Year <= 2023)) \
  .select("Age", "IndividualRate", "Year") \
  .withColumn("IndividualRate_2016", lag("IndividualRate", 1).over(w)) \
  .withColumn("IndividualRate_2023", col("IndividualRate")) \
  .groupBy("Age") \
  .agg({"IndividualRate_2016": "avg", "IndividualRate_2023": "avg"}) \
  .select("Age", "avg(IndividualRate_2016)", "avg(IndividualRate_2023)", \
          ((col("avg(IndividualRate_2023)") - col("avg(IndividualRate_2016)")) / col("avg(IndividualRate_2016)") * 100).alias("Change")) \
  .show()


+----+------------------------+------------------------+--------------------+
| Age|avg(IndividualRate_2016)|avg(IndividualRate_2023)|              Change|
+----+------------------------+------------------------+--------------------+
|0-20|      115.72161681487495|      115.68493333333357|-0.03169976582686199|
|  21|      214.77521827000766|      214.69925252525212| -0.0353698836241126|
|  22|       214.3922554567504|       214.3177414141416|-0.03475593950446075|
|  23|      215.03921584478567|      214.96050505050496|-0.03660299539853...|
|  24|      214.73429264349284|      214.65570505050556|-0.03659759790568196|
|  25|       216.1183346806786|      216.03959999999952|-0.03643128233215817|
|  26|      219.29485852869882|      219.21773737373775|-0.03516778983260299|
|  27|       223.8271948261927|      223.74493333333365|-0.03675223331237257|
|  28|       229.7308730800333|       229.6462262626272|-0.03684607831381574|
|  29|      235.55830234438167|       235.4713010101011|-0.03693

## Step 5 : Performance Optimization

In this PySpark application we used DataFrames instead of using RDDs, This is an optimization because RDDs have no built-in optimization while DataFrames benefit from Spark's built-in optimizations, including the Catalyst optimizer and Tungsten execution engine. These optimizations enable DataFrames to offer better performance and resource efficiency compared to RDDs.	