<a href="https://colab.research.google.com/github/Kouyoumdjian/Arquivos_Vinhos/blob/main/notebook-data-analizys.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Conexão banco de dados PostgreSQL

In [1]:
%pip install pyspark
%pip install psycopg2-binary



In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.7.3.jar

--2025-09-30 22:03:44--  https://jdbc.postgresql.org/download/postgresql-42.7.3.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1089312 (1.0M) [application/java-archive]
Saving to: ‘postgresql-42.7.3.jar.3’


2025-09-30 22:03:44 (10.9 MB/s) - ‘postgresql-42.7.3.jar.3’ saved [1089312/1089312]



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("PNAD Data Processing") \
    .config("spark.jars", "/content/postgresql-42.7.3.jar") \
    .getOrCreate()

In [4]:
# Credenciais do PostgreSQL
usuario_pg = "postgres"
senha_pg = "TechFiapTechFiap"
host_pg = "postgres-db.c3o4yi8i6o3f.us-east-1.rds.amazonaws.com"
porta_pg = "5432"
banco_pg = "postgres"

jdbc_url = f"jdbc:postgresql://{host_pg}:{porta_pg}/{banco_pg}"

jdbc_properties = {
    "user": usuario_pg,
    "password": senha_pg,
    "driver": "org.postgresql.Driver"
}

print(f"JDBC URL: {jdbc_url}")
print(f"JDBC Properties: {jdbc_properties}")

JDBC URL: jdbc:postgresql://postgres-db.c3o4yi8i6o3f.us-east-1.rds.amazonaws.com:5432/postgres
JDBC Properties: {'user': 'postgres', 'password': 'TechFiapTechFiap', 'driver': 'org.postgresql.Driver'}


In [5]:
query = "SELECT * FROM QUESTIONARIO"

df_spark = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("dbtable", f"({query}) as custom_query") \
    .options(**jdbc_properties) \
    .load()

df_spark.printSchema()

root
 |-- ano: long (nullable = true)
 |-- v1013: long (nullable = true)
 |-- v1012: long (nullable = true)
 |-- uf: long (nullable = true)
 |-- capital: double (nullable = true)
 |-- rm_ride: double (nullable = true)
 |-- uf_nome: string (nullable = true)
 |-- sigla: string (nullable = true)
 |-- regiao: string (nullable = true)
 |-- a002: long (nullable = true)
 |-- a003: long (nullable = true)
 |-- a004: long (nullable = true)
 |-- b008: double (nullable = true)
 |-- b009a: double (nullable = true)
 |-- b009c: double (nullable = true)
 |-- b009e: double (nullable = true)
 |-- a005: long (nullable = true)
 |-- a006: double (nullable = true)
 |-- b0011: long (nullable = true)
 |-- b0012: long (nullable = true)
 |-- b0013: long (nullable = true)
 |-- b0014: long (nullable = true)
 |-- b0015: long (nullable = true)
 |-- b0016: long (nullable = true)
 |-- b0017: long (nullable = true)
 |-- b0018: long (nullable = true)
 |-- b0019: long (nullable = true)
 |-- b00110: long (nullable = true

# DataViz

In [6]:
from pyspark.sql.functions import col, when, mean, count, asc, input_file_name, lit, create_map
from itertools import chain

# 1. Selecione as colunas relevantes
cols_keep = [
    "uf","v2007","v2009","v2010",     # Demográficas
    "b002","c001",                    # Comportamento/teste
    "d0013",                          # Econômico
    "v1013",
    "b0011","b0012","b0013","b0014" # Sintomas
]
# Ensure columns exist in the dataframe
cols_keep = [c.lower() for c in cols_keep if c.lower() in df_spark.columns]
df_clean = df_spark.select(*cols_keep)
print(f"🧮 Colunas selecionadas ({len(cols_keep)}): {cols_keep}")

# 2. Crie dicionários de mapeamento
map_sim_nao = {1: "Sim", 2: "Não", 9: "Não sabe"}
map_ficou   = {1: "Sempre", 2: "Quase sempre", 3: "Às vezes", 4: "Raramente", 5: "Nunca", 9: "Não sabe"}
def dict_to_map(d): return create_map([lit(x) for x in chain(*d.items())])
m_simnao, m_ficou = dict_to_map(map_sim_nao), dict_to_map(map_ficou)

# 3. Aplique mapeamentos e crie novas colunas
sint_map = {"b0011":"FEBRE","b0012":"TOSSE","b0013":"DOR_GARGANTA","b0014":"DIFICULDADE_RESPIRAR"}
for k,v in sint_map.items():
    if k in df_clean.columns:
        df_clean = df_clean.withColumn(v, m_simnao.getItem(col(k).cast("int")))
if "c001" in df_clean.columns:
    df_clean = df_clean.withColumn("FICOU_EM_CASA", m_ficou.getItem(col("c001").cast("int")))
if "b002" in df_clean.columns:
    df_clean = df_clean.withColumn("TESTE_COVID", m_simnao.getItem(col("b002").cast("int")))
if "d0013" in df_clean.columns:
    df_clean = df_clean.withColumn("PERDA_RENDA", m_simnao.getItem(col("d0013").cast("int")))

print("✅ Limpeza + mapeamentos concluídos.")
print("👀 Amostra após mapeamentos (5 linhas):")
df_clean.limit(5).show(truncate=False)


# 4. Identifique os meses disponíveis
disp_rows = df_clean.select("v1013").distinct().orderBy(asc("v1013")).collect()
MESES_DISPONIVEIS = [r["v1013"] for r in disp_rows if r["v1013"] is not None]
print("🗓️ Meses detectados:", MESES_DISPONIVEIS)

# 5. Selecione até 3 meses para análise (substitua MESES_DESEJADOS conforme necessário ou deixe vazio para seleção automática)
MESES_DESEJADOS = [] # Example: [5, 7, 10]

def escolher_3_meses(desejados, disponiveis):
    desejados = [m for m in desejados if m]
    disp = [m for m in disponiveis if m]
    usados = []
    for m in desejados:
        if m in disp and m not in usados:
            usados.append(m)
    def prox(target, pool): return sorted(pool, key=lambda x: abs(int(x)-int(target)))
    for m in desejados:
        if len(usados)>=3: break
        if m not in usados and disp:
            cand = [x for x in disp if x not in usados]
            if cand: usados.append(prox(m, cand)[0])
    for x in disp:
        if len(usados)>=3: break
        if x not in usados: usados.append(x)
    return sorted(usados[:3]) # Sort for consistent output

MESES_USO = escolher_3_meses(MESES_DESEJADOS, MESES_DISPONIVEIS)
print(f"🎯 Meses desejados: {MESES_DESEJADOS} | Meses usados: {MESES_USO}")

# 6. Filtre o DataFrame pelos meses selecionados
df_3m = df_clean.filter(col("v1013").isin(MESES_USO))
print("📈 Contagem por MES (usados):")
df_3m.groupBy("v1013").agg(count("*").alias("qtde")).orderBy("v1013").show()
print("🔢 Total:", df_3m.count())


# 7. Calcule a prevalência de sintomas por UF
sym_cols = [c for c in ["FEBRE","TOSSE","DOR_GARGANTA","DIFICULDADE_RESPIRAR"] if c in df_3m.columns]
exprs = [mean(when(col(c)=="Sim",1).otherwise(0)).alias(f"prop_{c}") for c in sym_cols]
prev_uf_3m = df_3m.groupBy("uf").agg(*exprs)

# 9. Exiba a prevalência de sintomas por UF
print("📊 Prevalência por UF:")
prev_uf_3m.show(30, truncate=False)


🧮 Colunas selecionadas (6): ['uf', 'v1013', 'b0011', 'b0012', 'b0013', 'b0014']




✅ Limpeza + mapeamentos concluídos.
👀 Amostra após mapeamentos (5 linhas):
+---+-----+-----+-----+-----+-----+-----+-----+------------+--------------------+
|uf |v1013|b0011|b0012|b0013|b0014|FEBRE|TOSSE|DOR_GARGANTA|DIFICULDADE_RESPIRAR|
+---+-----+-----+-----+-----+-----+-----+-----+------------+--------------------+
|11 |5    |1    |1    |2    |2    |Sim  |Sim  |Não         |Não                 |
|11 |5    |1    |1    |2    |2    |Sim  |Sim  |Não         |Não                 |
|11 |5    |2    |2    |2    |2    |Não  |Não  |Não         |Não                 |
|11 |5    |2    |2    |2    |2    |Não  |Não  |Não         |Não                 |
|11 |5    |2    |2    |2    |2    |Não  |Não  |Não         |Não                 |
+---+-----+-----+-----+-----+-----+-----+-----+------------+--------------------+

🗓️ Meses detectados: [5, 7, 10]
🎯 Meses desejados: [] | Meses usados: [5, 7, 10]
📈 Contagem por MES (usados):
+-----+------+
|v1013|  qtde|
+-----+------+
|    5|349306|
|    7|384166|
|

In [8]:
# ================================================
# MEU TRECHO – Medalhão + padrões + Postgres (SILVER/GOLD)
# Requisitos prévios: spark, jdbc_url, jdbc_properties, df_clean (do bloco anterior)
# Usa df_3m/prev_uf_3mes se já existirem; senão constrói a partir do df_clean
# ================================================
from pyspark.sql.functions import (
    col, upper, trim, when, lit, current_timestamp,
    avg as f_avg, count as f_count
)
from pyspark.sql.types import IntegerType, StringType

# ---- Pré-checagem de requisitos ----
try:
    spark
    jdbc_url
    jdbc_properties
    df_clean
except NameError as e:
    raise RuntimeError(
        "Pré-requisito ausente: garanta que o bloco da colega rodou (spark/jdbc_url/jdbc_properties/df_clean)."
    ) from e

# ---- helpers ----
def exists(df, c):
    return c in df.columns

def cast_if_exists(df, colname, dtype, normalize_str=False):
    if exists(df, colname):
        expr = col(colname).cast(dtype)
        if normalize_str:
            expr = upper(trim(expr))
        return df.withColumn(colname, expr)
    return df

# ------------------------------------------------
# 1) SILVER: padronização (≤20 questionamentos) + derivadas legíveis
# ------------------------------------------------
quest_cols = [c for c in [
    "uf","v2007","v2009","v2010","v1013",      # demográficas + mês
    "b0011","b0012","b0013","b0014",          # sintomas (códigos)
    "b002","c001","d0013"                     # teste, comportamento, econômico
] if exists(df_clean, c)]

pretty_cols = [c for c in [
    "FEBRE","TOSSE","DOR_GARGANTA","DIFICULDADE_RESPIRAR",
    "FICOU_EM_CASA","TESTE_COVID","PERDA_RENDA"
] if exists(df_clean, c)]

silver = df_clean.select(*(quest_cols + pretty_cols))

# renomear p/ padrão (mais legível)
renames = {
    "uf":"UF", "v2007":"SEXO", "v2009":"IDADE", "v2010":"RACA_COR", "v1013":"MES",
    "b0011":"B0011","b0012":"B0012","b0013":"B0013","b0014":"B0014",
    "b002":"B002","c001":"C001","d0013":"D0013"
}
for a,b in renames.items():
    if exists(silver, a) and a != b:
        silver = silver.withColumnRenamed(a,b)

# tipos/normalização
silver = cast_if_exists(silver, "UF", StringType(), normalize_str=True)
silver = cast_if_exists(silver, "SEXO", IntegerType())
silver = cast_if_exists(silver, "IDADE", IntegerType())
silver = cast_if_exists(silver, "RACA_COR", IntegerType())
silver = cast_if_exists(silver, "MES", IntegerType())

# metadados
silver = silver.withColumn("ORIGEM", lit("postgres_rds.QUESTIONARIO"))
silver = silver.withColumn("CARGA_TS", current_timestamp())

print("✅ SILVER pronto (amostra):")
silver.limit(5).show(truncate=False)
print("📚 Colunas do SILVER:", silver.columns)

# ------------------------------------------------
# 2) GOLD – fato (3 meses) + agregado por UF
#    • usa df_3m/prev_uf_3m se já existirem
#    • senão, constrói a partir do SILVER
# ------------------------------------------------
# fato (linhas dos 3 meses)
try:
    df3m = df_3m
except NameError:
    if not exists(silver, "MES"):
        # tentar renomear se veio como v1013
        if exists(silver, "v1013"):
            silver = silver.withColumnRenamed("v1013", "MES").withColumn("MES", col("MES").cast(IntegerType()))
        else:
            raise RuntimeError("Não encontrei 'MES'/'v1013' para montar o GOLD fato.")
    # top-3 meses mais frequentes
    meses_usados = [r["MES"] for r in silver.groupBy("MES").agg(f_count("*").alias("qt")) \
                                   .orderBy(col("qt").desc()).limit(3).collect()]
    meses_usados = [m for m in meses_usados if m is not None]
    df3m = silver.filter(col("MES").isin(meses_usados))

# normalização garantida em df3m
if exists(df3m, "v1013") and not exists(df3m, "MES"):
    df3m = df3m.withColumnRenamed("v1013", "MES")
df3m = cast_if_exists(df3m, "MES", IntegerType())
if exists(df3m, "uf") and not exists(df3m, "UF"):
    df3m = df3m.withColumnRenamed("uf","UF")
df3m = cast_if_exists(df3m, "UF", StringType(), normalize_str=True)

print("\n🎯 GOLD fato – contagem por MES:")
if exists(df3m, "MES"):
    df3m.groupBy("MES").agg(f_count("*").alias("qtde")).orderBy("MES").show()
else:
    print("  (sem coluna MES)")

# agregado por UF
try:
    prev_uf = prev_uf_3m
    if exists(prev_uf, "uf") and not exists(prev_uf, "UF"):
        prev_uf = prev_uf.withColumnRenamed("uf","UF")
except NameError:
    symptom_cols = [c for c in ["FEBRE","TOSSE","DOR_GARGANTA","DIFICULDADE_RESPIRAR"] if exists(df3m, c)]
    agg_exprs = [f_avg(when(col(c)=="Sim",1).otherwise(0)).alias(f"prop_{c}") for c in symptom_cols] \
                if symptom_cols else [f_count("*").alias("qtde")]
    prev_uf = df3m.groupBy("UF").agg(*agg_exprs)

print("📊 GOLD agregado por UF (amostra):")
prev_uf.show(20, truncate=False)

# ------------------------------------------------
# 3) Escrita no Postgres (overwrite)
# ------------------------------------------------
t_silver = "public.pnad_silver"
t_fato   = "public.pnad_curado_3meses"
t_agg    = "public.prev_sintomas_uf_3meses"

for (name, dframe) in [(t_silver, silver), (t_fato, df3m), (t_agg, prev_uf)]:
    (
        dframe.write
        .format("jdbc")
        .option("url", jdbc_url)
        .option("dbtable", name)
        .options(**jdbc_properties)
        .mode("overwrite")
        .save()
    )
    print(f"✅ Gravado no Postgres: {name}")

# ------------------------------------------------
# 4) Checks rápidos + análises pra apresentação
# ------------------------------------------------
print("\n🧪 Nulos por coluna (SILVER):")
nulls = silver.select([f_count(when(col(c).isNull(), c)).alias(c) for c in silver.columns])
nulls.show(truncate=False)

if exists(df3m, "FEBRE"):
    print("\n🔥 Top 10 UFs por prevalência de FEBRE:")
    top_febre = df3m.groupBy("UF") \
                    .agg(f_avg(when(col("FEBRE")=="Sim",1).otherwise(0)).alias("prop_febre")) \
                    .orderBy(col("prop_febre").desc())
    top_febre.show(10, truncate=False)

if exists(df3m, "FICOU_EM_CASA"):
    print("\n🏠 Distribuição do isolamento (FICOU_EM_CASA):")
    dist_iso = df3m.groupBy("FICOU_EM_CASA").agg(f_count("*").alias("qtd")).orderBy(col("qtd").desc())
    dist_iso.show(truncate=False)

if exists(df3m, "PERDA_RENDA"):
    print("\n💸 Proporção de PERDA_RENDA:")
    pr = df3m.agg(f_avg(when(col("PERDA_RENDA")=="Sim",1).otherwise(0)).alias("prop_perda_renda"))
    pr.show(truncate=False)

print("\n✅ FIM — Medalhão concluído (SILVER/GOLD) e tabelas gravadas no Postgres.")


✅ SILVER pronto (amostra):
+---+---+-----+-----+-----+-----+-----+-----+------------+--------------------+-------------------------+--------------------------+
|UF |MES|B0011|B0012|B0013|B0014|FEBRE|TOSSE|DOR_GARGANTA|DIFICULDADE_RESPIRAR|ORIGEM                   |CARGA_TS                  |
+---+---+-----+-----+-----+-----+-----+-----+------------+--------------------+-------------------------+--------------------------+
|11 |5  |1    |1    |2    |2    |Sim  |Sim  |Não         |Não                 |postgres_rds.QUESTIONARIO|2025-09-30 22:14:13.308078|
|11 |5  |1    |1    |2    |2    |Sim  |Sim  |Não         |Não                 |postgres_rds.QUESTIONARIO|2025-09-30 22:14:13.308078|
|11 |5  |2    |2    |2    |2    |Não  |Não  |Não         |Não                 |postgres_rds.QUESTIONARIO|2025-09-30 22:14:13.308078|
|11 |5  |2    |2    |2    |2    |Não  |Não  |Não         |Não                 |postgres_rds.QUESTIONARIO|2025-09-30 22:14:13.308078|
|11 |5  |2    |2    |2    |2    |Não  |Não