# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, trim, avg, sum, when, count, countDistinct
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 0a13d671-8f7b-48a9-8df9-eadfd32e5f56
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
Waiting for session 0a13d671-8f7b-48a9-8df9-eadfd32e5f56 to get into ready status...
Session 0a13d671-8f7b-48a9-8df9-eadfd32e5f56 ha

In [2]:
# Caminhos
rate_path = "s3://health-insurance-trusted/Rate.csv/"
benefit_path = "s3://health-insurance-trusted/BenefitsCostSharing.csv/"
plan_attr_path = "s3://health-insurance-trusted/PlanAttributes.csv/"
network_path = "s3://health-insurance-trusted/Network.csv/"
delivery_output = "s3://health-insurance-delivery/"




In [3]:
# ============================================
# Leitura das tabelas necessárias
# ============================================

# rate
df_rate_raw = spark.read.option("header", "true").csv(rate_path)
df_rate_filtered = df_rate_raw.select("BusinessYear", "StateCode", "Age", "IndividualRate", "PlanId") \
                              .dropna(subset=["IndividualRate", "Age", "PlanId"]) \
                              .withColumn("IndividualRate", col("IndividualRate").cast("double")) \
                              .withColumn("Age", col("Age").cast("int"))

# benefit
df_benefit_raw = spark.read.option("header", "true").csv(benefit_path)
df_benefit_filtered = df_benefit_raw.select("PlanId", "BenefitName", "IsCovered", "StateCode") \
                                    .dropna(subset=["PlanId"])

# plan attribute
df_plan_attr_raw = spark.read.option("header", "true").csv(plan_attr_path).dropna(subset=["PlanId"])
df_plan_attr_filtered = df_plan_attr_raw.filter(
    col("PlanId").isNotNull() & col("StandardComponentId").isNotNull())

# network
df_network = spark.read.option("header", True).csv(network_path)
df_network_filtered = df_network.select("IssuerId", "StateCode", "NetworkName") \
                                .filter(col("NetworkName").isNotNull()) \
                                .dropDuplicates()




In [4]:
# mapeamento PlanId e StandardComponentId
df_plan_attr_idmap = df_plan_attr_filtered.select("PlanId", "StandardComponentId").distinct()
df_plan_attr_idmap.show(5)

+-----------------+-------------------+
|           PlanId|StandardComponentId|
+-----------------+-------------------+
|50816IN0130036-00|     50816IN0130036|
|50816IN0130052-01|     50816IN0130052|
|56340IN0010001-00|     56340IN0010001|
|85320IN0010005-02|     85320IN0010005|
|69051IN0110001-00|     69051IN0110001|
+-----------------+-------------------+
only showing top 5 rows


In [5]:
# ============================================
# 1. Média de preço por estado e idade
# ============================================
"""
Objetivo: Utilizar o Rate.csv para calcular a média dos valores por estado e idade,
identificando quais regiões apresentam os planos mais caros ou mais acessíveis para
diferentes faixas etárias
"""

df_rate_summary = df_rate_filtered.groupBy("StateCode", "Age") \
                                  .agg(avg("IndividualRate").alias("AvgRate"))
df_rate_summary.show(5)

+---------+---+------------------+
|StateCode|Age|           AvgRate|
+---------+---+------------------+
|       MT| 36| 5175.295229244114|
|       MT| 53|  5314.09300495663|
|       SC| 26|5695.6274378043545|
|       SC| 27| 5700.366357680406|
|       TN| 33|  9773.87356910328|
+---------+---+------------------+
only showing top 5 rows


In [6]:
# ============================================
# 2. Quantidade de benefícios por plano
# ============================================
"""
Objetivo: Cruzar os dados de BenefitsCostSharing.csv com Rate.csv para verificar se
planos com maior cobertura de benefícios tendem a custar mais, gerando gráficos de
dispersão e agrupamentos por número de benefícios
"""

#df_covered_benefits = df_benefit_filtered.filter(col("IsCovered") == "Yes")
df_covered_benefits = df_benefit_filtered.filter(col("IsCovered") == "Covered")

df_benefit_count = df_covered_benefits.groupBy("PlanId") \
                                      .agg(count("BenefitName").alias("BenefitCount"))

# adicionando a coluna de mapeamento
df_benefit_std = df_benefit_count.join(df_plan_attr_idmap, on="PlanId", how="left")


df_rate_avg = df_rate_filtered.groupBy("PlanId") \
                              .agg(avg("IndividualRate").alias("AvgRate"))

df_benefit_vs_price = df_benefit_std.join(
    df_rate_avg.withColumnRenamed("PlanId", "StandardComponentId"),
    on="StandardComponentId",
    how="inner"
)

#df_benefit_vs_price = df_benefit_count.join(df_rate_avg, on="PlanId", how="inner")
df_benefit_vs_price.show(5)

+-------------------+-----------------+------------+------------------+
|StandardComponentId|           PlanId|BenefitCount|           AvgRate|
+-------------------+-----------------+------------+------------------+
|     89281SC0020005|89281SC0020005-01|          21| 33.99239130434756|
|     89281SC0020005|89281SC0020005-00|          21| 33.99239130434756|
|     89281SC0010007|89281SC0010007-00|          14| 35.39487948960249|
|     89281SC0010007|89281SC0010007-01|          14| 35.39487948960249|
|     89281SC0010009|89281SC0010009-01|          14|31.515439508505708|
+-------------------+-----------------+------------+------------------+
only showing top 5 rows


In [8]:
# ============================================
# 3. Frequência de benefícios por estado
# ============================================
"""
Objetivo: Agrupar os dados por StateCode e BenefitName, verificando a frequência de
benefícios como cuidados preventivos, odontológicos e maternidade por região
"""

df_benefit_freq = df_covered_benefits.groupBy("StateCode", "BenefitName") \
                                     .agg(count("*").alias("Frequency"))
df_benefit_freq.show(10)

+---------+--------------------+---------+
|StateCode|         BenefitName|Frequency|
+---------+--------------------+---------+
|       OH|          Transplant|     4483|
|       OH|                Wigs|      305|
|       OH|        Chemotherapy|     4483|
|       OH|Laboratory Outpat...|     4483|
|       OH|Off Label Prescri...|     4483|
|       OH|Preferred Brand D...|     4483|
|       OH|Preventive Care/S...|     4483|
|       OH|Dental Check-Up f...|     1819|
|       AK|Basic Dental Care...|      664|
|       AK|            Sealants|        4|
+---------+--------------------+---------+
only showing top 10 rows


In [36]:
##### não executar #####
# ============================================
# 4. Planos com maior cobertura e custo médio
# ============================================
"""
Objetivo: Identificar os planos com maior número de benefícios distintos e calcular a
média dos valores cobrados para esses planos, destacando aqueles com melhor custo
benefício
"""
#df_top_coverage = df_benefit_count.join(df_rate_avg, on="PlanId", how="inner") \
#                                  .orderBy(col("BenefitCount").desc())

#df_benefit_std

# criação top_coverage raw
df_top_coverage_raw = df_benefit_std.join(df_rate_avg.withColumnRenamed("PlanId", "StandardComponentId"),
                                        on="StandardComponentId", how="inner") \
                                        .orderBy(col("BenefitCount").desc())

df_top_coverage_raw2 = df_top_coverage_raw.withColumn(
    "CoverageCategory",
    when(col("BenefitCount") >= 100, "Alta")
    .when(col("BenefitCount") >= 50, "Média")
    .otherwise("Baixa")
)

# criação df_newtwork
df_plan_attr_idmap = df_plan_attr_filtered.select("StandardComponentId", "PlanId", "IssuerId", "StateCode") \
                                 .filter(col("PlanId").isNotNull()) \
                                 .dropDuplicates()
df_plan_networks = df_plan_attr_idmap.join(
    df_network_filtered,
    on=["IssuerId", "StateCode"],
    how="left"
)

df_network_count = df_plan_networks.groupBy("PlanId") \
                                   .agg(countDistinct("NetworkName").alias("NetworkCount"))

df_top_coverage = df_top_coverage_raw2.join(df_network_count, on="PlanId", how="left")

"""
df_network_type = df_network_filtered.select("PlanId", "NetworkName", "NetworkTier")

df_top_coverage = df_top_coverage.join(
    df_network_type,
    on="PlanId",
    how="left"
)
"""

df_top_coverage.show(5)

+-----------------+-------------------+------------+------------------+----------------+------------+
|           PlanId|StandardComponentId|BenefitCount|           AvgRate|CoverageCategory|NetworkCount|
+-----------------+-------------------+------------+------------------+----------------+------------+
|18558KS0360019-01|     18558KS0360019|         174|403.94305383022794|            Alta|           5|
|18558KS0360019-00|     18558KS0360019|         116|403.94305383022794|            Alta|           5|
|18558KS0360019-02|     18558KS0360019|         174|403.94305383022794|            Alta|           5|
|18558KS0360019-03|     18558KS0360019|         174|403.94305383022794|            Alta|           5|
|18558KS0370001-00|     18558KS0370001|         119| 390.0586128364391|            Alta|           5|
+-----------------+-------------------+------------+------------------+----------------+------------+
only showing top 5 rows


In [9]:
# ============================================
# 4. Planos com maior cobertura e custo médio
# ============================================
"""
Objetivo: Identificar os planos com maior número de benefícios distintos e calcular a
categoria de cobertura
"""
# criação top_coverage raw
df_top_coverage_raw = df_benefit_std.join(df_rate_avg.withColumnRenamed("PlanId", "StandardComponentId"),
                                        on="StandardComponentId", how="inner") \
                                        .orderBy(col("BenefitCount").desc())

df_top_coverage_raw2 = df_top_coverage_raw.withColumn(
    "CoverageCategory",
    when(col("BenefitCount") >= 100, "Alta")
    .when(col("BenefitCount") >= 50, "Média")
    .otherwise("Baixa")
)

df_top_coverage = df_top_coverage_raw2.drop("AvgRate")
df_top_coverage.show(5)

+-------------------+-----------------+------------+----------------+
|StandardComponentId|           PlanId|BenefitCount|CoverageCategory|
+-------------------+-----------------+------------+----------------+
|     31274WV0310001|31274WV0310001-03|         243|            Alta|
|     31274WV0310002|31274WV0310002-06|         243|            Alta|
|     31274WV0310001|31274WV0310001-02|         243|            Alta|
|     31274WV0310001|31274WV0310001-01|         243|            Alta|
|     31274WV0310002|31274WV0310002-03|         243|            Alta|
+-------------------+-----------------+------------+----------------+
only showing top 5 rows


In [10]:
# ============================================
# 5. Dedutíveis e coparticipações médias por plano e estado
# ============================================
"""
Objetivo: análises de custo além do valor mensal
"""
"""
df_ded_moop = df_plan_attr_raw.select("PlanId", "StateCode", 
                                      col("DEHBDedCombInnOonIndividual").cast("double").alias("Deductible"),
                                      col("DEHBOutOfNetIndividualMOOP").cast("double").alias("MOOP")) \
                              .dropna()
"""
from pyspark.sql.functions import col, when, regexp_replace

df_ded_summary = df_plan_attr_raw.select(
    "PlanId", "StateCode",
    # Substituir o símbolo de dólar por vazio
    regexp_replace(col("DEHBDedCombInnOonIndividual"), "[\\$,]", "").alias("Deductible")
).dropna(subset=["Deductible"])

df_ded_moop = df_ded_summary.groupBy("StateCode").agg(
    # Calcular a média do valor de Deductible, ignorando 'Not Applicable'
    avg(when(col("Deductible") != "Not Applicable", col("Deductible").cast("double")).otherwise(None)).alias("Avg_Deductible")
)

df_ded_moop.show(5)

+---------+------------------+
|StateCode|    Avg_Deductible|
+---------+------------------+
|       IL| 21.12676056338028|
|       OK|130.63063063063063|
|       WV| 6.648936170212766|
|       WI|52.839506172839506|
|       MT|               0.0|
+---------+------------------+
only showing top 5 rows


In [11]:
# ============================================
# 6. Distribuição dos tipos de plano por estado (PlanType)
# ============================================
"""
Objetivo: Entender quais tipos de plano (HMO, PPO, POS, etc.) são mais comuns em cada estado
"""

df_plan_type_dist = df_plan_attr_raw.select("StateCode", "PlanType") \
                                    .groupBy("StateCode", "PlanType") \
                                    .count()

df_plan_type_dist.show(5)

+---------+---------+-----+
|StateCode| PlanType|count|
+---------+---------+-----+
|       NC|Indemnity|   17|
|       GA|      HMO| 1541|
|       MO|      EPO|  224|
|       ND|      PPO|  661|
|       NC|      PPO|  630|
+---------+---------+-----+
only showing top 5 rows


In [12]:
# ============================================
# 7. Distribuição por Metal Level (Bronze, Silver, Gold, Platinum)
# ============================================
"""
Objetivo: Avaliar se certos estados oferecem planos mais “caros” ou “baratos”, de acordo com o nível de cobertura
"""
df_metal_dist = df_plan_attr_raw.select("StateCode", "MetalLevel") \
                                .groupBy("StateCode", "MetalLevel") \
                                .count()

df_metal_dist.show(10)

+---------+----------+-----+
|StateCode|MetalLevel|count|
+---------+----------+-----+
|       WI|      Gold| 1164|
|       PA|      Gold|  878|
|       MT|      Gold|  157|
|       VA|      Gold|  374|
|       SC|    Silver|  840|
|       TN|    Silver|  962|
|       TN|      High|  130|
|       WY|      High|  104|
|       PA|    Silver| 1818|
|       LA|    Silver|  454|
+---------+----------+-----+
only showing top 10 rows


In [13]:
# ============================================
# 8. Comparativo de preço médio por MetalLevel
# ============================================
"""
Objetivo: Confirmar se há relação entre o nível do plano (Bronze, Silver, etc.) e o valor médio mensal cobrado
"""
# renomeando a coluna para efetuar o join corretamente
df_rate_by_std = df_rate_filtered.select(col("PlanId").alias("StandardComponentId"), "IndividualRate")

# join com a tabela df_plan_attr_idmap, que mapeia PlanId em StandardComponentId
df_plan_attr_with_rates = df_plan_attr_idmap.join(df_rate_by_std, on="StandardComponentId", how="inner")

# selecionando as colunas de interesse em df_plan_attr_filtered
df_plan_attr_with_metal = df_plan_attr_filtered.select("PlanId", "MetalLevel")

# join dos dataframes que possuem o valor dos planos e a categoria dos planos, respectivamente
df_price_by_metal = df_plan_attr_with_rates.join(df_plan_attr_with_metal, on="PlanId", how="left")

# agregação de df_price_by_metal para calcular o valor médio de cada plano
df_rate_metal_avg = df_price_by_metal.groupBy("MetalLevel") \
                                        .agg(avg("IndividualRate").alias("AvgRate")) \
                                        .orderBy("AvgRate", ascending=False)

df_rate_metal_avg.show()

+------------+------------------+
|  MetalLevel|           AvgRate|
+------------+------------------+
|        High| 9932.566237576582|
|         Low| 9051.245244250505|
|    Platinum| 600.7985745064326|
|        Gold| 530.1987336997677|
|      Silver| 440.1652698442609|
|      Bronze| 364.9217999604044|
|Catastrophic|283.60358587774465|
+------------+------------------+


In [14]:
# ============================================
# 9. Quantos planos por estado incluem cobertura fora do país / fora da área
# ============================================
"""
Objetivo: Avaliar quais estados têm planos com maior cobertura geográfica (campos: OutOfCountryCoverage, OutOfServiceAreaCoverage)
"""

df_coverage_scope = df_plan_attr_raw.select("StateCode", "OutOfCountryCoverage", "OutOfServiceAreaCoverage") \
    .withColumn("OutOfCountry", when(col("OutOfCountryCoverage") == "Yes", 1).otherwise(0)) \
    .withColumn("OutOfServiceArea", when(col("OutOfServiceAreaCoverage") == "Yes", 1).otherwise(0))

df_coverage_scope_grouped = df_coverage_scope.groupBy("StateCode") \
    .agg(
        sum("OutOfCountry").alias("NumPlansWithIntlCoverage"),
        sum("OutOfServiceArea").alias("NumPlansWithOutOfAreaCoverage")
    )
df_coverage_scope_grouped.show(5)

+---------+------------------------+-----------------------------+
|StateCode|NumPlansWithIntlCoverage|NumPlansWithOutOfAreaCoverage|
+---------+------------------------+-----------------------------+
|       IN|                    1015|                         1227|
|       ND|                     831|                          963|
|       MS|                     377|                          448|
|       LA|                    1023|                         1335|
|       GA|                    1385|                         1940|
+---------+------------------------+-----------------------------+
only showing top 5 rows


In [15]:
# ============================================
# 10. Quantos planos por estado oferecem programas de bem-estar ou gestão de doenças
# ============================================
"""
Objetivo: Ver se há relação entre suporte preventivo e estado/região.
"""
df_wellness = df_plan_attr_raw.select("StateCode", "WellnessProgramOffered", "DiseaseManagementProgramsOffered") \
    .withColumn("HasWellness", when(col("WellnessProgramOffered") == "Yes", 1).otherwise(0)) \
    .withColumn("HasDiseaseMgmt", when(col("DiseaseManagementProgramsOffered") == "Yes", 1).otherwise(0))

df_wellness_grouped = df_wellness.groupBy("StateCode") \
    .agg(
        sum("HasWellness").alias("PlansWithWellnessPrograms"),
        sum("HasDiseaseMgmt").alias("PlansWithDiseaseManagement")
    )
df_wellness_grouped.show(5)

+---------+-------------------------+--------------------------+
|StateCode|PlansWithWellnessPrograms|PlansWithDiseaseManagement|
+---------+-------------------------+--------------------------+
|       IL|                     1397|                         0|
|       WY|                       24|                         0|
|       IN|                      950|                         0|
|       OK|                      682|                         0|
|       WV|                      423|                         0|
+---------+-------------------------+--------------------------+
only showing top 5 rows


In [16]:
# ============================================
# 11. Quantidade de planos por faixa etária
# ============================================
"""
Objetivo: Ver se há mais opções de planos para algumas faixas etárias (útil para o público jovem, idoso etc.).
"""

df_rate_age_grouped = df_rate_filtered.groupBy("Age") \
                                      .count()
df_rate_age_grouped.show(5)

+----+------+
| Age| count|
+----+------+
|NULL|591497|
|  38|275067|
|  39|275067|
|  40|275067|
|  41|275067|
+----+------+
only showing top 5 rows


#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [None]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='database_name', table_name='table_name')
dyf.printSchema()

#### Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data


In [None]:
df = dyf.toDF()
df.show()

#### Example: Visualize data with matplotlib


In [None]:
import matplotlib.pyplot as plt

# Set X-axis and Y-axis values
x = [5, 2, 8, 4, 9]
y = [10, 4, 8, 5, 2]
  
# Create a bar chart 
plt.bar(x, y)
  
# Show the plot
%matplot plt

#### Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)