# AWS Glue Studio Notebook - Vendas

##### Você está executando um Notebook do AWS Glue Studio. Para começar a utilizá-lo, você precisa iniciar um Sessão interativa do AWS Glue.

Este é o notebook utilizado no processo de ETL dos dados da Tabela Vendas da Zoop. Observe cada uma das etapas e, em caso de dúvidas, assista ao vídeo correspondente ao tratamento desta tabela.

Faça bom proveito deste conteúdo! &#x1F642;

#### Opcional: Execute esta célula se quiser observar os comandos deste notebook ("magics").

In [None]:
%help

#### Configurando e iniciando uma sessão interativa.

In [1]:
%idle_timeout 60
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.5 
Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 60
Session ID: 6f1f3ad4-ea98-4e27-ad7a-7079a1f73882
Applying the following default arguments:
--glue_kernel_version 1.0.5
--enable-glue-datacatalog true
Waiting for session 6f1f3ad4-ea98-4e27-ad7a-7079a1f73882 to get into ready status...
Session 6f1f3ad4-ea98-4e27-ad7a-7079a1f73882 has be

#### Criando um DynamicFrame de uma tabela no AWS Glue Data Catalog e mostrando seu schema

In [27]:
vendas_zoop_dyf = glueContext.create_dynamic_frame.from_catalog(database='db-glue-zoop', table_name='zoop-glue-vendas_zoop_bronze_parquet')
vendas_zoop_dyf.printSchema()

root
|-- ID_venda: long
|-- Dia: long
|-- Mês: long
|-- Ano: long
|-- Horario: string
|-- Canal_venda: string
|-- Origem_venda: string
|-- ID_produto: long
|-- Produto: string
|-- Categoria_produto: string
|-- Preco_unitario: double
|-- Quantidade: long
|-- Metodo_pagamento: string
|-- ID_cliente: long
|-- Nome_cliente: string
|-- Genero_cliente: string
|-- Idade_cliente: long
|-- Cidade_cliente: string
|-- UF_cliente: string
|-- Regiao_cliente: string
|-- Avaliacao: long


#### Criando um Spark DataFrame de um DynamicFrame

In [28]:
vendas_zoop_df = vendas_zoop_dyf.toDF()
vendas_zoop_df.show(5)

+--------+---+---+----+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+--------------+--------------+---------+
|ID_venda|Dia|Mês| Ano| Horario|Canal_venda|Origem_venda|ID_produto|           Produto|Categoria_produto|Preco_unitario|Quantidade| Metodo_pagamento|ID_cliente|       Nome_cliente|Genero_cliente|Idade_cliente|Cidade_cliente|    UF_cliente|Regiao_cliente|Avaliacao|
+--------+---+---+----+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+--------------+--------------+---------+
|    9052|  8|  5|2024|04:19:11| e-commerce|    Facebook|        10|    Câmera digital|      Eletrônicos|        1299.0|         2|              PIX|       393|      Luigi da Mota|     Masculino|          

#### Concatenando os dados de dia, mês e ano em uma coluna de data e excluindo colunas

In [29]:
from pyspark.sql.functions import col, concat_ws, to_date

# Transformar as colunas dia, mes e ano em uma única coluna de data
vendas_zoop_df = vendas_zoop_df.withColumn("Data", concat_ws("-", col("Ano"), col("Mês"), col("Dia")))
vendas_zoop_df = vendas_zoop_df.withColumn("Data", to_date(col("Data")))

# Remove as colunas dia, mês e ano
vendas_zoop_df = vendas_zoop_df.drop("Dia","Mês","Ano")

vendas_zoop_df.show(3)

+--------+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+----------+--------------+---------+----------+
|ID_venda| Horario|Canal_venda|Origem_venda|ID_produto|           Produto|Categoria_produto|Preco_unitario|Quantidade| Metodo_pagamento|ID_cliente|       Nome_cliente|Genero_cliente|Idade_cliente|Cidade_cliente|UF_cliente|Regiao_cliente|Avaliacao|      Data|
+--------+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+----------+--------------+---------+----------+
|    9052|04:19:11| e-commerce|    Facebook|        10|    Câmera digital|      Eletrônicos|        1299.0|         2|              PIX|       393|      Luigi da Mota|     Masculino|           32|       Goiânia|     Goiás| 

#### Criando uma UDF (Função definida pelo usuário) para mapear e converter nomes dos estados para suas respectivas siglas

In [30]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

estados_brasil = {
    "Acre": "AC", "Alagoas": "AL", "Amazonas": "AM", "Amapá": "AP", "Bahia": "BA",
    "Ceará": "CE", "Distrito Federal": "DF", "Espírito Santo": "ES", "Goiás": "GO", "Maranhão": "MA",
    "Minas Gerais": "MG", "Mato Grosso do Sul": "MS", "Mato Grosso": "MT", "Pará": "PA", "Paraíba": "PB",
    "Pernambuco": "PE", "Piauí": "PI", "Paraná": "PR", "Rio de Janeiro": "RJ", "Rio Grande do Norte": "RN",
    "Rondônia": "RO", "Roraima": "RR", "Rio Grande do Sul": "RS", "Santa Catarina": "SC", "Sergipe": "SE",
    "São Paulo": "SP", "Tocantins": "TO"
}

# Criando uma UDF (User Defined Function) para fazer a conversão
def converter_estado(nome_estado):
    return estados_brasil.get(nome_estado)

# Registrando a UDF com o tipo de retorno String
converter_estado_udf = udf(converter_estado, StringType())

# Aplicando a UDF na coluna 'uf_cliente' para criar a nova coluna com as siglas
vendas_zoop_df = vendas_zoop_df.withColumn("UF_cliente", converter_estado_udf(col("UF_cliente")))
vendas_zoop_df.show(3)

+--------+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+----------+--------------+---------+----------+
|ID_venda| Horario|Canal_venda|Origem_venda|ID_produto|           Produto|Categoria_produto|Preco_unitario|Quantidade| Metodo_pagamento|ID_cliente|       Nome_cliente|Genero_cliente|Idade_cliente|Cidade_cliente|UF_cliente|Regiao_cliente|Avaliacao|      Data|
+--------+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+----------+--------------+---------+----------+
|    9052|04:19:11| e-commerce|    Facebook|        10|    Câmera digital|      Eletrônicos|        1299.0|         2|              PIX|       393|      Luigi da Mota|     Masculino|           32|       Goiânia|        GO| 

#### Observando se existem dados faltantes e os tipos dos dados

In [31]:
vendas_zoop_df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 19 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   ID_venda           10000 non-null  int64  
 1   Horario            10000 non-null  object 
 2   Canal_venda        10000 non-null  object 
 3   Origem_venda       10000 non-null  object 
 4   ID_produto         10000 non-null  int64  
 5   Produto            10000 non-null  object 
 6   Categoria_produto  9900 non-null   object 
 7   Preco_unitario     10000 non-null  float64
 8   Quantidade         10000 non-null  int64  
 9   Metodo_pagamento   10000 non-null  object 
 10  ID_cliente         10000 non-null  int64  
 11  Nome_cliente       10000 non-null  object 
 12  Genero_cliente     10000 non-null  object 
 13  Idade_cliente      10000 non-null  int64  
 14  Cidade_cliente     10000 non-null  object 
 15  UF_cliente         10000 non-null  object 
 16  Regiao_cliente     1000

#### Criando uma UDF para mapear e preencher as categorias faltantes de acordo com a coluna de produtos

In [32]:
produtos_categorias = {
    'Smart TV 55"': 'Eletrônicos', 'Frigobar': 'Eletrodomésticos', 'Ventilador de teto': 'Eletrodomésticos',
    'Cafeteira': 'Eletrodomésticos', 'Smartphone': 'Eletrônicos', 'Liquidificador': 'Eletrodomésticos',
    'Notebook': 'Eletrônicos', 'Tablet': 'Eletrônicos', 'Micro-ondas': 'Eletrodomésticos',
    'Aspirador de pó': 'Eletrodomésticos', 'Câmera digital': 'Eletrônicos', 'Chuveiro elétrico': 'Eletrodomésticos',
    'Fone de ouvido': 'Eletrônicos', 'Ventilador de mesa': 'Eletrodomésticos', 'Impressora': 'Eletrônicos',
    'Secador de cabelo': 'Eletrodomésticos', 'Relógio inteligente': 'Eletrônicos', 'Batedeira': 'Eletrodomésticos',
    'Máquina de lavar roupa': 'Eletrodomésticos', 'Ferro de passar roupa': 'Eletrodomésticos', 'Cafeteira expresso': 'Eletrodomésticos',
    'Aparelho de som': 'Eletrônicos', 'Geladeira': 'Eletrodomésticos', 'Forno elétrico': 'Eletrodomésticos',
    'TV Box': 'Eletrônicos', 'Panela elétrica': 'Eletrodomésticos', 'Ventilador de coluna': 'Eletrodomésticos',
    'Câmera de segurança': 'Eletrônicos', 'Fritadeira elétrica': 'Eletrodomésticos', 'Máquina de café': 'Eletrodomésticos'
}

# Passo 1: Definir uma UDF que usa o dicionário para preencher a categoria
def preencher_categoria(produto, categoria):
    if categoria is None:  # Se a categoria está faltando
        return produtos_categorias.get(produto)  # Retorna a categoria correspondente ou None se não encontrado
    else:
        return categoria  # Mantém a categoria existente

# Registrar a UDF
preencher_categoria_udf = udf(preencher_categoria, StringType())

# Passo 2: Aplicar a UDF na coluna 'categoria_produto'
vendas_zoop_df = vendas_zoop_df.withColumn("Categoria_produto", preencher_categoria_udf(col("Produto"), col("Categoria_produto"))
)

# Exibir o DataFrame atualizado
vendas_zoop_df.show(5)

+--------+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+----------+--------------+---------+----------+
|ID_venda| Horario|Canal_venda|Origem_venda|ID_produto|           Produto|Categoria_produto|Preco_unitario|Quantidade| Metodo_pagamento|ID_cliente|       Nome_cliente|Genero_cliente|Idade_cliente|Cidade_cliente|UF_cliente|Regiao_cliente|Avaliacao|      Data|
+--------+--------+-----------+------------+----------+------------------+-----------------+--------------+----------+-----------------+----------+-------------------+--------------+-------------+--------------+----------+--------------+---------+----------+
|    9052|04:19:11| e-commerce|    Facebook|        10|    Câmera digital|      Eletrônicos|        1299.0|         2|              PIX|       393|      Luigi da Mota|     Masculino|           32|       Goiânia|        GO| 

#### Observando se a operação de preenchimento dos dados faltantes foi bem sucedida

In [20]:
vendas_zoop_df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 19 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   ID_venda           10000 non-null  int64  
 1   Horario            10000 non-null  object 
 2   Canal_venda        10000 non-null  object 
 3   Origem_venda       10000 non-null  object 
 4   ID_produto         10000 non-null  int64  
 5   Produto            10000 non-null  object 
 6   Categoria_produto  10000 non-null  object 
 7   Preco_unitario     10000 non-null  float64
 8   Quantidade         10000 non-null  int64  
 9   Metodo_pagamento   10000 non-null  object 
 10  ID_cliente         10000 non-null  int64  
 11  Nome_cliente       10000 non-null  object 
 12  Genero_cliente     10000 non-null  object 
 13  Idade_cliente      10000 non-null  int64  
 14  Cidade_cliente     10000 non-null  object 
 15  UF_cliente         10000 non-null  object 
 16  Regiao_cliente     1000

#### Convertendo um Spark Data Frame em um DynamicFrame

In [34]:
from awsglue.dynamicframe import DynamicFrame

# Converter o DataFrame Spark em DynamicFrame
vendas_zoop_dyf = DynamicFrame.fromDF(vendas_zoop_df, glueContext, "glue_etl")




#### Observando o schema do DynamicFrame transformado

In [35]:
vendas_zoop_dyf.printSchema()

root
|-- ID_venda: long
|-- Horario: string
|-- Canal_venda: string
|-- Origem_venda: string
|-- ID_produto: long
|-- Produto: string
|-- Categoria_produto: string
|-- Preco_unitario: double
|-- Quantidade: long
|-- Metodo_pagamento: string
|-- ID_cliente: long
|-- Nome_cliente: string
|-- Genero_cliente: string
|-- Idade_cliente: long
|-- Cidade_cliente: string
|-- UF_cliente: string
|-- Regiao_cliente: string
|-- Avaliacao: long
|-- Data: date


#### Ajustando os tipos dos dados e mapeando as colunas desejadas

In [37]:
vendas_zoop_dyf_mapeado = vendas_zoop_dyf.apply_mapping(
    mappings=[
        ("ID_venda", "long", "id_venda", "long"),
        ("Data", "date", "data", "date"),
        ("Horario", "string", "horario", "timestamp"),
        ("Canal_venda", "string", "canal_venda", "string"),
        ("Origem_venda", "string", "origem_venda", "string"),
        ("ID_produto", "long", "id_produto", "long"),
        ("Produto", "string", "produto", "string"),
        ("Categoria_produto", "string", "categoria_produto", "string"),
        ("Preco_unitario", "double", "preco_unitario", "double"),
        ("Quantidade", "long", "quantidade", "int"),
        ("Metodo_pagamento", "string", "metodo_pagamento", "string"),
        ("ID_cliente", "long", "id_cliente", "long"),
        ("Nome_cliente", "string", "nome_cliente", "string"),
        ("Genero_cliente", "string", "genero_cliente", "string"),
        ("Idade_cliente", "long", "idade_cliente", "int"),
        ("Cidade_cliente", "string", "cidade_cliente", "string"),
        ("UF_cliente", "string", "uf_cliente", "string"),
        ("Regiao_cliente", "string", "regiao_cliente", "string"),
        ("Avaliacao", "long", "avaliacao", "int")
    ]
)

vendas_zoop_dyf_mapeado.printSchema()

root
|-- id_venda: long
|-- data: date
|-- horario: timestamp
|-- canal_venda: string
|-- origem_venda: string
|-- id_produto: long
|-- produto: string
|-- categoria_produto: string
|-- preco_unitario: double
|-- quantidade: int
|-- metodo_pagamento: string
|-- id_cliente: long
|-- nome_cliente: string
|-- genero_cliente: string
|-- idade_cliente: int
|-- cidade_cliente: string
|-- uf_cliente: string
|-- regiao_cliente: string
|-- avaliacao: int


#### Escrevendo os dados do DynamicFrame em um objeto do Bucket S3 na camada silver e em uma tabela no AWS Glue Data Catalog


In [38]:
s3output = glueContext.getSink(
  path="s3://dados-zoop/silver/vendas",  # adicione o nome do seu bucket no path
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="db-glue-zoop", catalogTableName="zoop-glue-vendas_zoop_silver"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(vendas_zoop_dyf_mapeado)

<awsglue.dynamicframe.DynamicFrame object at 0x7ffb7693ff70>
