In [2]:
%additional_python_modules openpyxl


Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Additional python modules to be included:
openpyxl


In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
import boto3
import re
from datetime import datetime
import os
import time

Trying to create a Glue session for the kernel.
Session Type: glueetl
Session ID: fd3990e2-3942-4272-9770-acd90eb21522
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
--additional-python-modules openpyxl
Waiting for session fd3990e2-3942-4272-9770-acd90eb21522 to get into ready status...
Session fd3990e2-3942-4272-9770-acd90eb21522 has been created.



In [2]:
# Configuração do  bucket
bucket = "pipeline-opea-bucket"
arquivo_excel = "s3://pipeline-opea-bucket/origem/dados_entrada.xlsx"
data_hoje = datetime.now().strftime("%Y-%m-%d")

# Cria sessão Spark (já existe no Glue)
spark = SparkSession.builder.appName("Pipeline OPEA").getOrCreate()

print("Bucket:", bucket)
print("Data:", data_hoje)
print("Spark:", spark.version)

Bucket: pipeline-opea-bucket
Data: 2025-12-01
Spark: 3.3.0-amzn-1


## Parte 1: Camada Raw

### Imports

# Pipeline OPEA - Excel para S3

Pipeline de dados com arquitetura medallion (Raw → Stage → Analytics)

### Validações

In [3]:
# Lista para guardar erros
erros = []

def adicionar_erro(linha, campo, valor, motivo):
    erros.append({
        "linha": linha,
        "campo": campo,
        "valor": valor,
        "motivo": motivo
    })

print("OK")

OK


In [4]:
# Valida CPF
def validar_cpf(cpf):
    if pd.isna(cpf):
        return False
    if re.match(r'^\d{3}\.\d{3}\.\d{3}-\d{2}$', str(cpf)):
        return True
    return False

# Valida CEP
def validar_cep(cep):
    if pd.isna(cep):
        return False
    if re.match(r'^\d{5}-\d{3}$', str(cep)):
        return True
    return False

# Valida Email
def validar_email(email):
    if pd.isna(email):
        return False
    if '@' in str(email) and '.' in str(email):
        return True
    return False

# Valida Data
def validar_data(data):
    if pd.isna(data):
        return False
    try:
        datetime.strptime(str(data), "%Y-%m-%d")
        return True
    except:
        return False

# Valida Status
def validar_status(status):
    if pd.isna(status):
        return False
    if str(status).lower() in ["ativo", "inativo", "suspenso"]:
        return True
    return False

print("Funções criadas")

Funções criadas


### Leitura dos dados

In [5]:
# Lê as abas do Excel com pandas (pq Spark não lê Excel direto)
clientes_pd = pd.read_excel(arquivo_excel, sheet_name="clientes")
enderecos_pd = pd.read_excel(arquivo_excel, sheet_name="enderecos")

# Converte tudo para string antes de criar Spark DataFrame
clientes_pd = clientes_pd.astype(str)
enderecos_pd = enderecos_pd.astype(str)

# Converte para Spark DataFrame
clientes = spark.createDataFrame(clientes_pd)
enderecos = spark.createDataFrame(enderecos_pd)

print("Clientes:", clientes.count())
print("Endereços:", enderecos.count())

# Mostra os dados
clientes.show(5)
enderecos.show(5)

Clientes: 16
Endereços: 15
+----------+--------------------+--------------------+--------------+---------------+--------+-------------------+
|id_cliente|                nome|               email|           cpf|data_nascimento|  status|        data_evento|
+----------+--------------------+--------------------+--------------+---------------+--------+-------------------+
|         1|          João Silva|joao.silva@email.com|123.456.789-10|     1990-05-15|   ativo|2024-01-10 08:30:00|
|         1|João da Silva Santos|joao.silva@email.com|123.456.789-10|     1990-05-15|   ativo|2024-01-15 10:45:00|
|         2|        Maria Santos|maria.santos@emai...|234.567.890-21|     1985-03-22| inativo|2024-01-08 14:20:00|
|         3|      Pedro Oliveira|pedro.oliveira@em...|345.678.901-32|     1992-07-30|suspenso|2024-01-12 09:15:00|
|         4|           Ana Costa| ana.costa@email.com|456.789.012-43|     1988-11-18|   ativo|2024-01-14 16:30:00|
+----------+--------------------+--------------------

### Processamento de clientes

In [6]:
# Valida clientes usando Spark
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# Funções UDF para validação
valida_cpf_udf = udf(validar_cpf, BooleanType())
valida_email_udf = udf(validar_email, BooleanType())
valida_data_udf = udf(validar_data, BooleanType())
valida_status_udf = udf(validar_status, BooleanType())

# Aplica validações
clientes_com_validacao = clientes \
    .withColumn("cpf_valido", valida_cpf_udf(col("cpf"))) \
    .withColumn("email_valido", valida_email_udf(col("email"))) \
    .withColumn("data_nasc_valido", valida_data_udf(col("data_nascimento"))) \
    .withColumn("data_evento_valido", valida_data_udf(col("data_evento"))) \
    .withColumn("status_valido", valida_status_udf(col("status")))

# Filtra só os válidos
clientes_ok = clientes_com_validacao.filter(
    (col("cpf_valido") == True) & 
    (col("email_valido") == True) & 
    (col("data_nasc_valido") == True) & 
    (col("data_evento_valido") == True) & 
    (col("status_valido") == True) &
    (col("id_cliente").isNotNull()) &
    (col("nome").isNotNull())
)

# Remove colunas de validação
clientes_ok = clientes_ok.drop("cpf_valido", "email_valido", "data_nasc_valido", "data_evento_valido", "status_valido")

# Pega os inválidos para o log
clientes_invalidos = clientes_com_validacao.filter(
    (col("cpf_valido") == False) | 
    (col("email_valido") == False) | 
    (col("data_nasc_valido") == False) | 
    (col("data_evento_valido") == False) | 
    (col("status_valido") == False) |
    (col("id_cliente").isNull()) |
    (col("nome").isNull())
)

print("Clientes OK:", clientes_ok.count())
print("Clientes com erro:", clientes_invalidos.count())

Clientes OK: 1
Clientes com erro: 15


### Processamento de endereços

In [7]:
# Valida endereços
valida_cep_udf = udf(validar_cep, BooleanType())

# Pega lista de IDs de clientes válidos
ids_clientes_ok = [row.id_cliente for row in clientes_ok.select("id_cliente").collect()]

# Aplica validações
enderecos_com_validacao = enderecos \
    .withColumn("cep_valido", valida_cep_udf(col("cep"))) \
    .withColumn("data_evento_valido", valida_data_udf(col("data_evento"))) \
    .withColumn("cliente_existe", col("id_cliente").isin(ids_clientes_ok))

# Filtra só os válidos
enderecos_ok = enderecos_com_validacao.filter(
    (col("cep_valido") == True) & 
    (col("data_evento_valido") == True) & 
    (col("cliente_existe") == True) &
    (col("id_endereco").isNotNull())
)

# Remove colunas de validação
enderecos_ok = enderecos_ok.drop("cep_valido", "data_evento_valido", "cliente_existe")

# Pega os inválidos
enderecos_invalidos = enderecos_com_validacao.filter(
    (col("cep_valido") == False) | 
    (col("data_evento_valido") == False) | 
    (col("cliente_existe") == False) |
    (col("id_endereco").isNull())
)

print("Endereços OK:", enderecos_ok.count())
print("Endereços com erro:", enderecos_invalidos.count())

Endereços OK: 0
Endereços com erro: 15


### Salvar erros

In [8]:
# Salva os erros em CSV
if clientes_invalidos.count() > 0 or enderecos_invalidos.count() > 0:
    # Junta os erros
    erros_cli = clientes_invalidos.toPandas()
    erros_end = enderecos_invalidos.toPandas()
    
    # Salva como CSV
    if len(erros_cli) > 0:
        erros_cli.to_csv(f"erros_clientes_{data_hoje}.csv", index=False)
        print(f"Erros de clientes: {len(erros_cli)}")
    
    if len(erros_end) > 0:
        erros_end.to_csv(f"erros_enderecos_{data_hoje}.csv", index=False)
        print(f"Erros de endereços: {len(erros_end)}")
else:
    print("Sem erros!")

Erros de clientes: 15
Erros de endereços: 15


### Salvar na camada Raw

In [9]:
# Adiciona data de processamento em todosos dados brutos
clientes = clientes.withColumn("data_processamento", lit(data_hoje))
enderecos = enderecos.withColumn("data_processamento", lit(data_hoje))

print("OK")

OK


### Função de salvar

In [10]:
def salvar_s3(df, nome):
    caminho_s3 = f"s3://{bucket}/raw/{nome}/data_processamento={data_hoje}/"
    
    try:
        df.write.mode("overwrite").parquet(caminho_s3, compression="snappy")
        print(f"Salvo: {caminho_s3}")
        return True
    except Exception as e:
        print(f"Erro: {e}")
        return False

print("OK")

OK


In [11]:
# Salva TODOS os dados brutos (sem filtro)
salvar_s3(clientes, "clientes")
salvar_s3(enderecos, "enderecos")

Salvo: s3://pipeline-opea-bucket/raw/clientes/data_processamento=2025-12-01/
Salvo: s3://pipeline-opea-bucket/raw/enderecos/data_processamento=2025-12-01/
True


## Parte 2: Camada Stage

In [12]:
caminho_raw_clientes = f"s3://{bucket}/raw/clientes/"
caminho_raw_enderecos = f"s3://{bucket}/raw/enderecos/"

df_raw_clientes = spark.read.parquet(caminho_raw_clientes)
df_raw_enderecos = spark.read.parquet(caminho_raw_enderecos)

print("Clientes Raw:", df_raw_clientes.count())
print("Endereços Raw:", df_raw_enderecos.count())

# aplica as validações na Stage
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

valida_cpf_udf = udf(validar_cpf, BooleanType())
valida_email_udf = udf(validar_email, BooleanType())
valida_data_udf = udf(validar_data, BooleanType())
valida_status_udf = udf(validar_status, BooleanType())
valida_cep_udf = udf(validar_cep, BooleanType())

# Valida clientes
clientes_validados = df_raw_clientes \
    .withColumn("cpf_valido", valida_cpf_udf(col("cpf"))) \
    .withColumn("email_valido", valida_email_udf(col("email"))) \
    .withColumn("data_nasc_valido", valida_data_udf(col("data_nascimento"))) \
    .withColumn("data_evento_valido", valida_data_udf(col("data_evento"))) \
    .withColumn("status_valido", valida_status_udf(col("status")))

# Filtra apenas válidos
df_raw_clientes = clientes_validados.filter(
    (col("cpf_valido") == True) & 
    (col("email_valido") == True) & 
    (col("data_nasc_valido") == True) & 
    (col("data_evento_valido") == True) & 
    (col("status_valido") == True) &
    (col("id_cliente").isNotNull()) &
    (col("nome").isNotNull())
).drop("cpf_valido", "email_valido", "data_nasc_valido", "data_evento_valido", "status_valido")

# Valida endereços
ids_clientes_validos = [row.id_cliente for row in df_raw_clientes.select("id_cliente").collect()]

enderecos_validados = df_raw_enderecos \
    .withColumn("cep_valido", valida_cep_udf(col("cep"))) \
    .withColumn("data_evento_valido", valida_data_udf(col("data_evento"))) \
    .withColumn("cliente_existe", col("id_cliente").isin(ids_clientes_validos))

df_raw_enderecos = enderecos_validados.filter(
    (col("cep_valido") == True) & 
    (col("data_evento_valido") == True) & 
    (col("cliente_existe") == True) &
    (col("id_endereco").isNotNull())
).drop("cep_valido", "data_evento_valido", "cliente_existe")

print("Clientes válidos:", df_raw_clientes.count())
print("Endereços válidos:", df_raw_enderecos.count())

Clientes Raw: 16
Endereços Raw: 15
Clientes válidos: 1
Endereços válidos: 0


In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, current_timestamp

janela_clientes = Window.partitionBy("id_cliente").orderBy(col("data_evento").desc())

clientes_stage = df_raw_clientes \
    .withColumn("rn", row_number().over(janela_clientes)) \
    .filter(col("rn") == 1) \
    .drop("rn") \
    .withColumn("data_atualizacao", current_timestamp())

print("Clientes Stage:", clientes_stage.count())

Clientes Stage: 1


In [14]:
janela_enderecos = Window.partitionBy("id_endereco").orderBy(col("data_evento").desc())

enderecos_stage = df_raw_enderecos \
    .withColumn("rn", row_number().over(janela_enderecos)) \
    .filter(col("rn") == 1) \
    .drop("rn") \
    .withColumn("data_atualizacao", current_timestamp())

print("Endereços Stage:", enderecos_stage.count())

Endereços Stage: 0


In [15]:
caminho_stage_clientes = f"s3://{bucket}/stage/clientes/"
caminho_stage_enderecos = f"s3://{bucket}/stage/enderecos/"

clientes_stage.write.mode("overwrite").parquet(caminho_stage_clientes, compression="snappy")
enderecos_stage.write.mode("overwrite").parquet(caminho_stage_enderecos, compression="snappy")

print("Dados salvos na camada Stage!")

Dados salvos na camada Stage!


## Parte 3: Camada Analytics

In [16]:
# Lê os dados da camada Stage
df_stage_clientes = spark.read.parquet(caminho_stage_clientes)
df_stage_enderecos = spark.read.parquet(caminho_stage_enderecos)

print("Clientes Stage:", df_stage_clientes.count())
print("Endereços Stage:", df_stage_enderecos.count())

Clientes Stage: 1
Endereços Stage: 0


In [17]:
# Filtra só clientes ativos
clientes_ativos = df_stage_clientes.filter(col("status") == "ativo")

print("Clientes ativos:", clientes_ativos.count())
clientes_ativos.show(5)

Clientes ativos: 1
+----------+--------------------+--------------------+--------------+---------------+------+-----------+------------------+--------------------+
|id_cliente|                nome|               email|           cpf|data_nascimento|status|data_evento|data_processamento|    data_atualizacao|
+----------+--------------------+--------------------+--------------+---------------+------+-----------+------------------+--------------------+
|       104|Cliente Data Even...|data.evento.inval...|123.456.789-14|     1990-01-01| ativo| 2024-01-10|        2025-12-01|2025-12-01 18:04:...|
+----------+--------------------+--------------------+--------------+---------------+------+-----------+------------------+--------------------+


In [18]:
from pyspark.sql.functions import to_date, datediff, floor

clientes_com_idade = clientes_ativos.withColumn(
    "idade",
    floor(datediff(lit(data_hoje), to_date(col("data_nascimento"), "yyyy-MM-dd")) / 365.25)
)

print("Idade calculada!")
clientes_com_idade.select("nome", "data_nascimento", "idade").show(5)

Idade calculada!
+--------------------+---------------+-----+
|                nome|data_nascimento|idade|
+--------------------+---------------+-----+
|Cliente Data Even...|     1990-01-01|   35|
+--------------------+---------------+-----+


In [19]:
enderecos_renamed = df_stage_enderecos \
    .withColumnRenamed("data_evento", "endereco_data_evento") \
    .withColumnRenamed("data_processamento", "endereco_data_processamento") \
    .withColumnRenamed("data_atualizacao", "endereco_data_atualizacao")

analytics = clientes_com_idade.join(enderecos_renamed, on="id_cliente", how="left")

print("Total de registros:", analytics.count())
analytics.show(5)

Total de registros: 1
+----------+--------------------+--------------------+--------------+---------------+------+-----------+------------------+--------------------+-----+-----------+----+----------+------+-----------+------+------+------+--------------------+---------------------------+-------------------------+
|id_cliente|                nome|               email|           cpf|data_nascimento|status|data_evento|data_processamento|    data_atualizacao|idade|id_endereco| cep|logradouro|numero|complemento|bairro|cidade|estado|endereco_data_evento|endereco_data_processamento|endereco_data_atualizacao|
+----------+--------------------+--------------------+--------------+---------------+------+-----------+------------------+--------------------+-----+-----------+----+----------+------+-----------+------+------+------+--------------------+---------------------------+-------------------------+
|       104|Cliente Data Even...|data.evento.inval...|123.456.789-14|     1990-01-01| ativo| 202

In [20]:
caminho_analytics = f"s3://{bucket}/analytics/clientes/"

analytics.orderBy("id_cliente").coalesce(1) \
    .write.mode("overwrite").parquet(caminho_analytics, compression="snappy")

print(f"Dados salvos: {caminho_analytics}")

Dados salvos: s3://pipeline-opea-bucket/analytics/clientes/


## Parte 4: Catálogo e Athena

In [21]:
glue_client = boto3.client('glue', region_name='us-east-1')
database_name = 'clientes'

try:
    glue_client.create_database(
        DatabaseInput={
            'Name': database_name,
            'Description': 'Banco de dados para analytics de clientes'
        }
    )
    print(f"Banco '{database_name}' criado!")
except glue_client.exceptions.AlreadyExistsException:
    print(f"Banco '{database_name}' já existe")

role_arn = "arn:aws:iam::816648956948:role/service-role/AWSGlueServiceRole-brendon"
print(f"Role ARN: {role_arn}")

Banco 'clientes' já existe
Role ARN: arn:aws:iam::816648956948:role/service-role/AWSGlueServiceRole-brendon


In [22]:
crawler_name = 'crawler-analytics-clientes'
s3_path = f"s3://{bucket}/analytics/clientes/"

try:
    glue_client.create_crawler(
        Name=crawler_name,
        Role=role_arn,
        DatabaseName=database_name,
        Targets={'S3Targets': [{'Path': s3_path}]},
        Schedule='cron(0 */6 * * ? *)',
        SchemaChangePolicy={'UpdateBehavior': 'LOG', 'DeleteBehavior': 'LOG'},
        RecrawlPolicy={'RecrawlBehavior': 'CRAWL_NEW_FOLDERS_ONLY'}
    )
    print(f"Crawler '{crawler_name}' criado!")
except glue_client.exceptions.AlreadyExistsException:
    print(f"Crawler '{crawler_name}' já existe")

Crawler 'crawler-analytics-clientes' já existe


In [23]:
try:
    glue_client.start_crawler(Name=crawler_name)
    print(f"Crawler '{crawler_name}' iniciado!")
except glue_client.exceptions.CrawlerRunningException:
    print("Crawler já está executando")

Crawler já está executando


In [24]:
for i in range(10):
    response = glue_client.get_crawler(Name=crawler_name)
    state = response['Crawler']['State']
    
    print(f"Status: {state}")
    
    if state == 'READY':
        print("Crawler terminou!")
        last_crawl = response['Crawler'].get('LastCrawl', {})
        if last_crawl:
            print(f"Status: {last_crawl.get('Status')}")
            print(f"Tabelas criadas/atualizadas: {last_crawl.get('TablesCreated', 0) + last_crawl.get('TablesUpdated', 0)}")
        break
    
    time.sleep(10)

Status: STOPPING
Status: STOPPING
Status: STOPPING
Status: READY
Crawler terminou!
Status: SUCCEEDED
Tabelas criadas/atualizadas: 0


In [25]:
response = glue_client.get_tables(DatabaseName=database_name)

print(f"Tabelas no banco '{database_name}':")
for table in response['TableList']:
    print(f"  {table['Name']}: {len(table['StorageDescriptor']['Columns'])} colunas")

Tabelas no banco 'clientes':
  clientes: 21 colunas


In [26]:
athena_client = boto3.client('athena', region_name='us-east-1')
output_location = f"s3://{bucket}/athena-results/"

print(f"Configure o Athena em: https://console.aws.amazon.com/athena")
print(f"Settings > Query result location: {output_location}")

Configure o Athena em: https://console.aws.amazon.com/athena
Settings > Query result location: s3://pipeline-opea-bucket/athena-results/


In [27]:
query = f"SELECT COUNT(*) as total FROM {database_name}.clientes"

response = athena_client.start_query_execution(
    QueryString=query,
    QueryExecutionContext={'Database': database_name},
    ResultConfiguration={'OutputLocation': output_location}
)

query_id = response['QueryExecutionId']
print(f"Query executada! ID: {query_id}")

for i in range(30):
    status = athena_client.get_query_execution(QueryExecutionId=query_id)
    state = status['QueryExecution']['Status']['State']
    
    if state == 'SUCCEEDED':
        results = athena_client.get_query_results(QueryExecutionId=query_id)
        for row in results['ResultSet']['Rows']:
            values = [col.get('VarCharValue', 'NULL') for col in row['Data']]
            print(f"  {' | '.join(values)}")
        break
    elif state == 'FAILED':
        print(f"Falha: {status['QueryExecution']['Status']['StateChangeReason']}")
        break
    
    time.sleep(2)

Query executada! ID: df4e2927-1026-48ba-8468-3081ee17018b
  total
  1
