In [2]:
# Pipeline ETL - COVID-19 Brasil → SQL Server
# 16/10/2025

import os
import sys

# Configurações Spark
python_path = sys.executable
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path
os.environ['SPARK_LOCAL_HOSTNAME'] = 'localhost'

print("Configuração do ambiente:")
print(f"Python: {sys.version.split()[0]}")
print(f"Executável: {python_path}")

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import urllib.request
print("Bibliotecas importadas")

# ============================================

# Criando Spark Session
spark = SparkSession.builder \
    .appName("ETL_COVID_to_SQL") \
    .master("local[*]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.driver.host", "localhost") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

print(f"Spark {spark.version} iniciado")
print(f"App: {spark.sparkContext.appName}")

# ============================================

# EXTRACT - Baixando dados públicos
print("\nFase 1: Extração")

url = "https://raw.githubusercontent.com/wcota/covid19br/master/cases-brazil-cities.csv"
os.makedirs("../data/raw", exist_ok=True)
raw_file = "../data/raw/covid19_brazil_raw.csv"

print(f"Baixando de: {url}")
try:
    urllib.request.urlretrieve(url, raw_file)
    file_size = os.path.getsize(raw_file) / (1024 * 1024)
    print(f"Arquivo baixado: {file_size:.2f} MB")
except Exception as e:
    print(f"Erro ao baixar: {e}")

# ============================================

# Lendo CSV
print("\nLendo CSV com Spark...")

df_raw = spark.read.csv(
    raw_file,
    header=True,
    inferSchema=True,
    sep=","
)

print(f"Registros: {df_raw.count():,}")
print(f"Colunas: {len(df_raw.columns)}")

print("\nPrimeiras linhas:")
df_raw.show(5, truncate=False)

# ============================================

# Análise exploratória
print("\nSchema dos dados:")
df_raw.printSchema()

print("\nColunas disponíveis:")
for i, col in enumerate(df_raw.columns, 1):
    print(f"{i}. {col}")

print("\nEstatísticas:")
df_raw.describe().show()

# Valores nulos
print("\nValores nulos por coluna:")
from pyspark.sql.functions import col, sum as spark_sum, when

null_counts = df_raw.select([
    spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_raw.columns
])
null_counts.show()

# ============================================

# TRANSFORM - Limpeza e transformação
print("\nFase 2: Transformação")

# Selecionar colunas relevantes
colunas_interesse = [
    'date', 'state', 'city', 'totalCases', 
    'newCases', 'deaths', 'newDeaths'
]

colunas_disponiveis = [c for c in colunas_interesse if c in df_raw.columns]
print(f"Colunas selecionadas: {colunas_disponiveis}")

df_clean = df_raw.select(colunas_disponiveis)

# Remover nulos
print("\nRemovendo valores nulos...")
registros_antes = df_clean.count()
df_clean = df_clean.dropna(subset=['date', 'state', 'city'])
registros_depois = df_clean.count()
print(f"Removidos: {registros_antes - registros_depois:,}")
print(f"Restantes: {registros_depois:,}")

# Remover duplicatas
print("\nRemovendo duplicatas...")
registros_antes = df_clean.count()
df_clean = df_clean.dropDuplicates(['date', 'state', 'city'])
registros_depois = df_clean.count()
print(f"Duplicatas removidas: {registros_antes - registros_depois:,}")

# Converter tipos
print("\nConvertendo tipos de dados...")
df_clean = df_clean.withColumn('date', to_date(col('date'), 'yyyy-MM-dd'))
df_clean = df_clean.withColumn('totalCases', col('totalCases').cast('integer'))
df_clean = df_clean.withColumn('newCases', col('newCases').cast('integer'))
df_clean = df_clean.withColumn('deaths', col('deaths').cast('integer'))
df_clean = df_clean.withColumn('newDeaths', col('newDeaths').cast('integer'))

# Adicionar colunas calculadas
print("\nAdicionando colunas calculadas...")
df_clean = df_clean.withColumn('year', year(col('date')))
df_clean = df_clean.withColumn('month', month(col('date')))
df_clean = df_clean.withColumn('data_carga', current_timestamp())

# Renomear para português
print("\nRenomeando colunas...")
df_clean = df_clean \
    .withColumnRenamed('date', 'data') \
    .withColumnRenamed('state', 'estado') \
    .withColumnRenamed('city', 'cidade') \
    .withColumnRenamed('totalCases', 'casos_totais') \
    .withColumnRenamed('newCases', 'casos_novos') \
    .withColumnRenamed('deaths', 'obitos_totais') \
    .withColumnRenamed('newDeaths', 'obitos_novos') \
    .withColumnRenamed('year', 'ano') \
    .withColumnRenamed('month', 'mes')

print("Transformação concluída")

# ============================================

# Visualizar dados limpos
print("\nSchema final:")
df_clean.printSchema()

print("\nAmostra dos dados:")
df_clean.show(10)

print("\nEstatísticas finais:")
df_clean.select('casos_totais', 'casos_novos', 'obitos_totais', 'obitos_novos').describe().show()

# ============================================

# Análises básicas
print("\nTop 10 Estados com mais casos:")
df_clean.groupBy('estado') \
    .agg(
        sum('casos_totais').alias('total_casos'),
        sum('obitos_totais').alias('total_obitos')
    ) \
    .orderBy(col('total_casos').desc()) \
    .limit(10) \
    .show()

print("\nCasos por ano:")
df_clean.groupBy('ano') \
    .agg(
        sum('casos_novos').alias('casos_ano'),
        sum('obitos_novos').alias('obitos_ano')
    ) \
    .orderBy('ano') \
    .show()

print("\nDados mais recentes:")
df_clean.orderBy(col('data').desc()).limit(5).show()

# ============================================

# Salvar dados limpos
print("\nSalvando dados processados...")

os.makedirs("../data/processed", exist_ok=True)

# CSV
processed_file = "../data/processed/covid19_brazil_clean.csv"
df_clean_pandas = df_clean.toPandas()
df_clean_pandas.to_csv(processed_file, index=False)
print(f"CSV salvo: {processed_file}")
print(f"Registros: {len(df_clean_pandas):,}")

# Parquet
parquet_path = "../data/processed/covid19_brazil_clean.parquet"
df_clean_pandas.to_parquet(parquet_path, index=False)
print(f"Parquet salvo: {parquet_path}")

# ============================================

# LOAD - Preparar para SQL Server
print("\nFase 3: Carregamento")

# Criar amostra menor
print("\nCriando amostra para SQL Server...")
df_sample = df_clean.orderBy(col('data').desc()).limit(1000)

print(f"Amostra: {df_sample.count():,} registros")
df_sample.show(5)

# ============================================

# Criar script SQL
print("\nGerando script SQL...")

sql_script = """-- Script de criação da tabela COVID-19
-- Gerado automaticamente pelo pipeline ETL
-- Data: 16/10/2025

USE datalake_local;
GO

-- Limpar tabela se existir
IF OBJECT_ID('dbo.covid19_brazil', 'U') IS NOT NULL
    DROP TABLE dbo.covid19_brazil;
GO

-- Criar tabela
CREATE TABLE dbo.covid19_brazil (
    data DATE NOT NULL,
    estado VARCHAR(2),
    cidade VARCHAR(200),
    casos_totais INT,
    casos_novos INT,
    obitos_totais INT,
    obitos_novos INT,
    ano INT,
    mes INT,
    data_carga DATETIME DEFAULT GETDATE()
);
GO

-- Índices para performance
CREATE INDEX idx_data ON dbo.covid19_brazil(data);
CREATE INDEX idx_estado ON dbo.covid19_brazil(estado);
CREATE INDEX idx_cidade ON dbo.covid19_brazil(cidade);
GO

PRINT 'Tabela covid19_brazil criada';
GO
"""

sql_file = "../sql/create_table_covid19.sql"
os.makedirs("../sql", exist_ok=True)
with open(sql_file, 'w', encoding='utf-8') as f:
    f.write(sql_script)

print(f"Script SQL salvo: {sql_file}")

# ============================================

# Gerar INSERT SQL
print("\nGerando comandos INSERT...")

df_insert = df_sample.limit(10).toPandas()

insert_statements = []
insert_statements.append("-- Inserir dados de exemplo\n")
insert_statements.append("USE datalake_local;\nGO\n\n")

for _, row in df_insert.iterrows():
    insert = f"""INSERT INTO dbo.covid19_brazil (data, estado, cidade, casos_totais, casos_novos, obitos_totais, obitos_novos, ano, mes) VALUES ('{row['data']}', '{row['estado']}', '{row['cidade']}', {row['casos_totais']}, {row['casos_novos']}, {row['obitos_totais']}, {row['obitos_novos']}, {row['ano']}, {row['mes']});"""
    insert_statements.append(insert + "\n")

insert_statements.append("\nGO\n")
insert_statements.append("\nPRINT 'Dados inseridos';\nGO")

insert_file = "../sql/insert_covid19_sample.sql"
with open(insert_file, 'w', encoding='utf-8') as f:
    f.writelines(insert_statements)

print(f"Script INSERT salvo: {insert_file}")
print(f"Total de INSERTs: {len(df_insert)}")

print("\nExemplo de INSERT:")
print(''.join(insert_statements[:5]))

# ============================================

# Resumo
print("\n" + "="*60)
print("RESUMO DO PIPELINE ETL")
print("="*60)

print("\nEXTRACT:")
print(f"  Fonte: COVID-19 Brasil (GitHub)")
print(f"  Registros originais: {df_raw.count():,}")

print("\nTRANSFORM:")
print(f"  Colunas: {len(df_clean.columns)}")
print(f"  Registros limpos: {df_clean.count():,}")

print("\nLOAD:")
print(f"  CSV: {processed_file}")
print(f"  Parquet: {parquet_path}")
print(f"  SQL: {sql_file}")
print(f"  Inserts: {insert_file}")

print("\n" + "="*60)
print("Pipeline concluído")
print("="*60)

# ============================================

print("\nSpark Session ativa para análises adicionais")

Configuração do ambiente:
Python: 3.10.0
Executável: C:\Users\Gabriel\Documents\ml-data-studying-projects\venv_spark\Scripts\python.exe
Bibliotecas importadas
Spark 4.0.1 iniciado
App: ETL_COVID_to_SQL

Fase 1: Extração
Baixando de: https://raw.githubusercontent.com/wcota/covid19br/master/cases-brazil-cities.csv
Arquivo baixado: 0.54 MB

Lendo CSV com Spark...
Registros: 5,596
Colunas: 14

Primeiras linhas:
+-------+-----+----------------------+-------+------+----------+---------------------------+-------------------------------+--------------------+-------+----------+--------+---------+--------------+
|country|state|city                  |ibgeID |deaths|totalCases|deaths_per_100k_inhabitants|totalCases_per_100k_inhabitants|deaths_by_totalCases|_source|date      |newCases|newDeaths|last_info_date|
+-------+-----+----------------------+-------+------+----------+---------------------------+-------------------------------+--------------------+-------+----------+--------+---------+--------