## Tech Challenge

### O que é o projeto

Entender como foi o comportamento da população na época da pandemia da COVID-19 e quais indicadores seriam importantes para o planejamento, caso haja um novo surto da doença, utilizando o estudo do PNAD-COVID 19 do IBGE para termos respostas ao problema proposto, pois são dados confiáveis, porém, não será necessário utilizar todas as perguntas realizadas na pesquisa para enxergar todas as oportunidades ali postas, mas há dados triviais que precisam estar no projeto, pois auxiliam muito na análise dos dados:

• Características clínicas dos sintomas;  
• Características da população;  
• Características econômicas da sociedade.  

Dessa forma, acessar os dados do PNAD-COVID-19 do IBGE (https://covid19.ibge.gov.br/pnad-covid/) e organizar esta base para análise, utilizando Banco de Dados em Nuvem e trazendo as seguintes características:

a. Utilização de no máximo 20 questionamentos realizados na pesquisa;  
b. Utilizar 3 meses para construção da solução;  
c. Caracterização dos sintomas clínicos da população;  
d. Comportamento da população na época da COVID-19;  
e. Características econômicas da Sociedade;  

Com objetivo de trazer uma breve análise dessas informações, como foi a organização do banco, as perguntas selecionadas para a resposta do problema e quais seriam as principais ações que o hospital deverá tomar em caso de um novo surto de COVID-19

### Arquitetura de Dados

![Arquitetura](arquitetura.png)

1) Dados referente a Pesquisa Nacional por Amostra de Domicílios (PNAD COVID19) através do link [PNAD](https://www.ibge.gov.br/estatisticas/investigacoes-experimentais/estatisticas-experimentais/27946-divulgacao-semanal-pnadcovid1?t=microdados&utm_source=covid19&utm_medium=hotsite&utm_campaign=covid_19) *
2) Realizado o processo de ETL para extrair os dados (E), tratar os dados (T) e carregar os dados (L)
3) Dados armazenado em um banco dados PostgreSQL criado via AWS RDS 
4) Aplicar as regras de negócio como filtro das perguntas necessarias e dados dos ultimos 3 meses
5) Realizar o carregamento dos dados em um tabela especifica (data mart) esse disponiviel no PostgreSQL 
6) Construir a analise utilizando o Power BI conectado ao Data Mart


\* Os dados foram armazenados no GitHub para a construção desse trabalho

### Códigos

#### Importar bibliotecas

In [None]:
# Importar biblioteca completa
import requests
import zipfile
import io
import pandas as pd
import os
import findspark

# Importar algo especifico de uma biblioteca
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from pyspark.sql import SparkSession

#### Funções (DEF)

In [None]:
# Testar a conexão ao banco de dados
def test_connection(engine):

    try:
        with engine.connect() as connection:
            
            # Testar a versão do PostgreSQL
            result = connection.execute(text("SELECT version();"))
            versao = result.fetchone()
            print("✅ Conectado com sucesso:", versao[0])

            # Listar as tabelas no schema público
            result = connection.execute(text("""
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = 'public';
            """))
            tabelas = result.fetchall()
            print("📄 Tabelas no banco:")
            for tabela in tabelas:
                print("  -", tabela[0])

    except Exception as e:
        print("❌ Erro ao executar comandos:", e)


#### Realizar o processo de ETL

In [None]:
# URL da API do GitHub para o conteúdo do diretório
api_url = 'https://api.github.com/repos/RicardViana/fiap-Big-Data/contents/PNAD-COVID/Microdados'

print(f"Buscando lista de arquivos em: {api_url}")

# Faz a requisição para a API
response = requests.get(api_url)
response.raise_for_status()

# Gerar a lista de arquivos em formato JSON
files = response.json()

# Lista para armazenar os DataFrames individuais
lista_de_dataframes = []

# Itera sobre cada item no diretório
for file_info in files:
   
    if file_info['name'].endswith('.zip'):  # --> Verificar se o arquivo termina com .zip
        zip_name = file_info['name']
        zip_url = file_info['download_url']
        
        print(f"\nProcessando: {zip_name}")
        
        try:
            r_zip = requests.get(zip_url) # Baixar o conteúdo do ZIP
            r_zip.raise_for_status()
            zip_content = io.BytesIO(r_zip.content)  # Processar o ZIP em memória

            with zipfile.ZipFile(zip_content) as z:
                csv_filename = [f for f in z.namelist() if f.endswith('.csv')][0] # Achar o primeiro arquivo CSV dentro do ZIP
                print(f"Lendo o arquivo: {csv_filename}")
                
                with z.open(csv_filename) as csv_file:
                    df_temp = pd.read_csv(csv_file, sep=',', encoding='utf-8')
                    df_temp['nome_arquivo_origem'] = zip_name # Criar uma coluna com o nome do arquivo ZIP de origem
                    lista_de_dataframes.append(df_temp) # Adicionar o Data Frame a lista
                    print(f"Sucesso! DataFrame de '{zip_name}' adicionado à lista")
        
        except requests.exceptions.RequestException as e:
            print(f"ERRO: Falha ao baixar {zip_name}. Erro: {e}")

        except IndexError:
            print(f"AVISO: Nenhum arquivo .csv encontrado dentro de {zip_name}")

        except Exception as e:
            print(f"ERRO: Ocorreu um erro inesperado ao processar {zip_name}. Erro: {e}")

# Consolidar os Data Frame 
if lista_de_dataframes:
    df_consolidado = pd.concat(lista_de_dataframes, ignore_index=True)

    # Pegr lista de todas as colunas exceto a que será movimentada
    lista_colunas = [col for col in df_consolidado.columns if col != 'nome_arquivo_origem']
    df_consolidado = df_consolidado[lista_colunas + ['nome_arquivo_origem']]
    
    print("\nConsolidação Concluída")
    print(f"O DataFrame final possui {df_consolidado.shape[0]} linhas e {df_consolidado.shape[1]} colunas")
    
    # Mostrar shape + primeiros e ultimos dados
    print("\nQuantidade de linhas e colunas do Data Frame")
    print(f"Linhas: {df_consolidado.shape[0]}")
    print(f"Colunas: {df_consolidado.shape[1]}")

    print("\nAmostra do DataFrame Consolidado (Primeiras Linhas)")
    display(df_consolidado.head())
    
    print("\nAmostra do DataFrame Consolidado (Últimas Linhas)")
    display(df_consolidado.tail())
    
    # Verificar os valores únicos na nova coluna para confirmar a consolidação
    print("\nArquivos que foram consolidados")
    print(df_consolidado['nome_arquivo_origem'].unique())

else:
    print("\nNenhum DataFrame foi processado. O DataFrame final não pôde ser criado")

In [None]:
# Gerar o Data Frame com os códigos do UF da IBGE
link_codigo_uf = "https://raw.githubusercontent.com/RicardViana/tabela-uf-ibge/refs/heads/main/codigo_uf.csv"
df_uf = pd.read_csv(link_codigo_uf, sep=",")

df_uf.head()

In [None]:
# Realacionar os dados data frame 
df_final = pd.merge(
    df_consolidado,
    df_uf,
    how='left',
    left_on='UF', 
    right_on='Código'
)

# Remover colunas não necessarias
df_final = df_final.drop(columns=["Código"])

# Renomear nome 
df_final = df_final.rename(columns={"UF_x":"UF", "UF_y":"UF_Nome"})

# Visualizar as primeiras linhas do resultado
df_final.tail()


In [None]:
# Criar a engine para conexão ao banco de dados usando
load_dotenv()

usuario = os.getenv("POSTGRES_USER_PNAD")
senha = os.getenv("POSTGRES_PASSWORD_PNAD")
host = os.getenv("POSTGRES_HOST_PNAD")
porta = os.getenv("POSTGRES_PORT_PNAD")
banco = os.getenv("POSTGRES_DB_PNAD")

engine = create_engine(f"postgresql+psycopg2://{usuario}:{senha}@{host}:{porta}/{banco}")

# Testar a conexão
test_connection(engine)

In [None]:
# Criar a tabela
nome_tabela = 'questionario_pnad_covid'

print("Iniciando a criação do esquema da tabela")

try:
    df_schema = df_final.head(0)
    df_schema.to_sql(
        nome_tabela, 
        con=engine, 
        if_exists='replace', 
        index=False)
    
    print(f"✅ Esquema da tabela '{nome_tabela}' criado com sucesso no PostgreSQL!")

except Exception as e:
    print(f"❌ Erro na Parte 1: {e}")
    exit() 



In [None]:
caminho_driver_jdbc = "postgresql-42.7.7.jar"

findspark.init()

spark = SparkSession.builder \
    .appName("CargaPNADparaPostgreSQL") \
    .config("spark.driver.extraClassPath", caminho_driver_jdbc) \
    .getOrCreate()

print("\nSessão Spark iniciada com sucesso!")
print(f"Versão do Spark: {spark.version}")

spark_df = None

try:

    print("\nConvertendo DataFrame Pandas para o formato Spark")
    spark_df = spark.createDataFrame(df_final)
    print("Conversão concluída. Schema do Spark DataFrame:")
    spark_df.printSchema()

    load_dotenv()
    usuario = os.getenv("POSTGRES_USER_PNAD")
    senha = os.getenv("POSTGRES_PASSWORD_PNAD")
    host = os.getenv("POSTGRES_HOST_PNAD")
    porta = os.getenv("POSTGRES_PORT_PNAD")
    banco = os.getenv("POSTGRES_DB_PNAD")

    nome_tabela = 'questionario_pnad_covid'
    
    jdbc_url = f"jdbc:postgresql://{host}:{porta}/{banco}"
    connection_properties = {
        "user": usuario,
        "password": senha,
        "driver": "org.postgresql.Driver"
    }

    print(f"\nIniciando a carga de {spark_df.count()} linhas para a tabela '{nome_tabela}' via PySpark...")
    spark_df.write.jdbc(
        url=jdbc_url,
        table=nome_tabela,
        mode="overwrite",
        properties=connection_properties
    )

    print(f"\n✅ Sucesso! Dados carregados na tabela '{nome_tabela}' utilizando PySpark.")

except Exception as e:
    print(f"\n❌ Ocorreu um erro durante o processo do PySpark: {e}")

finally:
    if spark:
        print("\nEncerrando a sessão Spark")
        spark.stop()