## 1. Objetivo

O objetivo deste trabalho é desenvolver um *Minimum Viable Product (MVP)* de engenharia de dados voltado à construção de um pipeline analítico em nuvem para análise de transações bancárias. O foco principal é estruturar um fluxo completo de dados — desde a ingestão até a análise — utilizando boas práticas de modelagem, processamento e governança.

A partir de um conjunto de dados transacionais, o projeto busca responder a perguntas relevantes do ponto de vista analítico e operacional, tais como:

- Como se distribuem os valores das transações ao longo do tempo?
- Existem padrões de comportamento distintos de acordo com o tipo de transação e o canal utilizado?
- Quais variáveis apresentam maior influência sobre o volume financeiro movimentado?
- É possível identificar indícios de maior risco operacional a partir de características das transações, como duração, tentativas de login ou canal de acesso?

Essas questões orientam tanto a modelagem dimensional dos dados quanto as análises realizadas na etapa final do trabalho. Ressalta-se que o objetivo principal não é apenas responder às perguntas propostas, mas demonstrar a construção de um pipeline robusto, reprodutível e alinhado às boas práticas de engenharia de dados em ambiente de nuvem.

Eventuais limitações na resposta completa a todas as perguntas são discutidas ao final do trabalho, conforme previsto na autoavaliação, preservando a coerência entre o planejamento inicial e os resultados obtidos.


## 2. Coleta

Após a definição do conjunto de dados, a etapa de coleta teve como finalidade disponibilizar os dados em um ambiente de nuvem que permitisse sua ingestão, persistência e processamento de forma consistente na plataforma Databricks.

O dataset selecionado foi disponibilizado em um repositório público no GitHub, possibilitando acesso por meio de URL pública (*raw*). Essa escolha garante transparência da fonte, facilidade de acesso e reprodutibilidade do pipeline, permitindo que terceiros executem o projeto sem a necessidade de credenciais ou permissões específicas.

Durante a implementação, observou-se que o ambiente Databricks Community Edition apresenta restrições de segurança que limitam a leitura direta de arquivos via HTTP/HTTPS utilizando `spark.read.csv()`, bem como o uso do sistema de arquivos local do driver e do DBFS público. Diante desse cenário, adotou-se como estratégia técnica a utilização de um **Volume (Unity Catalog)** como área de aterrissagem (*landing zone*) dos dados.

O Volume atua como um armazenamento gerenciado em nuvem, permitindo controle de acesso, persistência confiável e integração nativa com o Spark. Dessa forma, o dataset foi carregado no Volume e, a partir dele, ingerido para a camada **Bronze** do pipeline utilizando o formato Delta Lake.

Essa abordagem garante:
- rastreabilidade da origem dos dados;
- isolamento entre ingestão e transformação;
- possibilidade de reprocessamento das etapas subsequentes;
- aderência às boas práticas de arquitetura de dados em ambientes controlados.

Do ponto de vista ético e de confidencialidade, o conjunto de dados utilizado é público, voltado a fins educacionais e não contém informações sensíveis ou identificáveis de indivíduos reais. Assim, não foi necessária a construção de robôs de coleta (*web scraping*), tampouco o uso de dados proprietários ou corporativos.


## 3. Modelagem

Para este trabalho, foi adotada uma **arquitetura de Data Warehouse em Esquema Estrela (Star Schema)**, por ser adequada a cenários analíticos baseados em transações e por simplificar a construção de consultas agregadas e indicadores.

A granularidade da tabela fato é **uma linha por transação bancária realizada por um usuário**.  
As principais entidades de negócio foram organizadas em:

- **Tabela Fato**
  - `Fato_Transacoes`

- **Tabelas Dimensão**
  - `Dim_Usuario`
  - `Dim_Tempo`
  - `Dim_Categoria`
  - `Dim_TipoTransacao`


## 3.1 Esquema Estrela

**Tabela Fato**

- `Fato_Transacoes`
  - `id_transacao` (PK de negócio – opcional)
  - `fk_usuario` (FK → Dim_Usuario.id_usuario)
  - `fk_data` (FK → Dim_Tempo.id_data)
  - `fk_categoria` (FK → Dim_Categoria.id_categoria)
  - `fk_tipo_transacao` (FK → Dim_TipoTransacao.id_tipo)
  - `valor` (medida – valor da transação)
  - `saldo_apos_transacao` (medida – saldo após a transação, se disponível)

**Tabelas Dimensão**

- `Dim_Usuario`
  - `id_usuario` (PK – surrogate key)
  - `id_usuario_original` (ID original da base, se existir, ex: user_id)
  - Atributos adicionais: idade, gênero, localização etc., se existirem

- `Dim_Tempo`
  - `id_data` (PK)
  - `data`
  - `ano`
  - `mes`
  - `dia`
  - `dia_semana`

- `Dim_Categoria`
  - `id_categoria` (PK)
  - `nome_categoria` (ex.: alimentação, transporte, salário)

- `Dim_TipoTransacao`
  - `id_tipo` (PK)
  - `nome_tipo` (ex.: crédito, débito, transferência)


In [0]:
# ============================================
# CONFIG: escolha a fonte de dados
# ============================================
dbutils.widgets.dropdown("DATA_SOURCE", "VOLUME", ["VOLUME", "URL"])
source = dbutils.widgets.get("DATA_SOURCE")

# Obs: ajuste o path conforme o ambiente de execução - Aqui coloquei o path do volume do Databricks
url_path = "https://raw.githubusercontent.com/Jucioffi/Pipeline_de_dados_para_analise_de_transacoes_bancarias/main/bank_transactions_data_2.csv"

# Nesse caso, criar incluir o arquivo no volume do Databricks
volume_path = "/Volumes/workspace/trabalho_mvp_puc_rio/mvp_puc_rio/bank_transactions_data_2.csv"

if source == "URL":
    # Tentativa de leitura direta por URL (funciona em ambientes sem as restrições)
    df_raw = (
        spark.read.option("header", True)
                  .option("inferSchema", True)
                  .csv(url_path)
    )
else:
    # Fallback: leitura via Volume (funciona no seu Databricks)
    df_raw = (
        spark.read.option("header", True)
                  .option("inferSchema", True)
                  .csv(volume_path)
    )

display(df_raw)


TransactionID,AccountID,TransactionAmount,TransactionDate,TransactionType,Location,DeviceID,IP Address,MerchantID,Channel,CustomerAge,CustomerOccupation,TransactionDuration,LoginAttempts,AccountBalance,PreviousTransactionDate
TX000001,AC00128,14.09,2023-04-11T16:29:14.000Z,Debit,San Diego,D000380,162.198.218.92,M015,ATM,70,Doctor,81,1,5112.21,2024-11-04T08:08:08.000Z
TX000002,AC00455,376.24,2023-06-27T16:44:19.000Z,Debit,Houston,D000051,13.149.61.4,M052,ATM,68,Doctor,141,1,13758.91,2024-11-04T08:09:35.000Z
TX000003,AC00019,126.29,2023-07-10T18:16:08.000Z,Debit,Mesa,D000235,215.97.143.157,M009,Online,19,Student,56,1,1122.35,2024-11-04T08:07:04.000Z
TX000004,AC00070,184.5,2023-05-05T16:32:11.000Z,Debit,Raleigh,D000187,200.13.225.150,M002,Online,26,Student,25,1,8569.06,2024-11-04T08:09:06.000Z
TX000005,AC00411,13.45,2023-10-16T17:51:24.000Z,Credit,Atlanta,D000308,65.164.3.100,M091,Online,26,Student,198,1,7429.4,2024-11-04T08:06:39.000Z
TX000006,AC00393,92.15,2023-04-03T17:15:01.000Z,Debit,Oklahoma City,D000579,117.67.192.211,M054,ATM,18,Student,172,1,781.68,2024-11-04T08:06:36.000Z
TX000007,AC00199,7.08,2023-02-15T16:36:48.000Z,Credit,Seattle,D000241,140.212.253.222,M019,ATM,37,Doctor,139,1,13316.71,2024-11-04T08:10:09.000Z
TX000008,AC00069,171.42,2023-05-08T17:47:59.000Z,Credit,Indianapolis,D000500,92.214.76.157,M020,Branch,67,Retired,291,1,2796.24,2024-11-04T08:10:55.000Z
TX000009,AC00135,106.23,2023-03-21T16:59:46.000Z,Credit,Detroit,D000690,24.148.92.177,M035,Branch,51,Engineer,86,1,9095.14,2024-11-04T08:11:14.000Z
TX000010,AC00385,815.96,2023-03-31T16:06:57.000Z,Debit,Nashville,D000199,32.169.88.41,M007,ATM,55,Doctor,120,1,1021.88,2024-11-04T08:06:32.000Z


In [0]:
from pyspark.sql import functions as F

# Dim_Usuario
dim_usuario = (
    df_raw
    .select("customer_id")
    .distinct()
    .withColumn("id_usuario", F.monotonically_increasing_id())
)

# Reorganiza colunas
dim_usuario = dim_usuario.select("id_usuario", "customer_id")

In [0]:
# Garante que a coluna de data está em formato date ou timestamp
df_raw = df_raw.withColumn(
    "transaction_date",
    F.to_date("transaction_date")  # ajusta se já estiver em date
)

dim_tempo = (
    df_raw
    .select("transaction_date")
    .distinct()
    .withColumn("id_data", F.monotonically_increasing_id())
    .withColumn("ano", F.year("transaction_date"))
    .withColumn("mes", F.month("transaction_date"))
    .withColumn("dia", F.dayofmonth("transaction_date"))
    .withColumn("dia_semana", F.date_format("transaction_date", "E"))  # Mon, Tue, etc.
)

dim_tempo = dim_tempo.select(
    "id_data",
    "transaction_date",
    "ano", "mes", "dia", "dia_semana"
)


In [0]:
dim_categoria = (
    df_raw
    .select("category")
    .distinct()
    .withColumn("id_categoria", F.monotonically_increasing_id())
)

dim_categoria = dim_categoria.select("id_categoria", "category")



In [0]:
dim_tipo = (
    df_raw
    .select("transaction_type")
    .distinct()
    .withColumn("id_tipo", F.monotonically_increasing_id())
)

dim_tipo = dim_tipo.select("id_tipo", "transaction_type")


In [0]:
# Alias das dimensões
du = dim_usuario.alias("du")
dt = dim_tempo.alias("dt")
dc = dim_categoria.alias("dc")
dtt = dim_tipo.alias("dtt")

fato = (
    df_raw.alias("f")
    # join com usuário
    .join(du, F.col("f.customer_id") == F.col("du.customer_id"), "left")
    # join com data
    .join(dt, F.col("f.transaction_date") == F.col("dt.transaction_date"), "left")
    # join com categoria
    .join(dc, F.col("f.category") == F.col("dc.category"), "left")
    # join com tipo de transação
    .join(dtt, F.col("f.transaction_type") == F.col("dtt.transaction_type"), "left")
    .select(
        F.monotonically_increasing_id().alias("id_transacao"),
        F.col("du.id_usuario").alias("fk_usuario"),
        F.col("dt.id_data").alias("fk_data"),
        F.col("dc.id_categoria").alias("fk_categoria"),
        F.col("dtt.id_tipo").alias("fk_tipo_transacao"),
        F.col("f.amount").alias("valor"),
        F.col("f.balance").alias("saldo_apos_transacao")
    )
)

In [0]:
# Calcula o valor mínimo e máximo da coluna 'amount'
df_raw.select(
    F.min("amount").alias("min_amount"),
    F.max("amount").alias("max_amount")
)

# Calcula o valor mínimo e máximo da coluna 'balance'
df_raw.select(
    F.min("balance").alias("min_balance"),
    F.max("balance").alias("max_balance")
)

df_raw.select("category").distinct().orderBy("category")

df_raw.select("transaction_type").distinct().orderBy("transaction_type")


## 3.2 Linhagem dos Dados

Os dados utilizados neste MVP foram obtidos a partir do arquivo `bank_transactions_data_2.csv`, armazenado em um bucket do **Google Cloud Storage (GCS)**, cujo acesso é realizado via URL pública.

A ingestão dos dados foi feita na plataforma **Databricks**, utilizando leitura direta do arquivo CSV e carregando-o em um DataFrame (`df_raw`). A partir desse DataFrame bruto, foram aplicadas as seguintes transformações principais:

- Conversão de tipos de dados (ex.: datas e valores numéricos);
- Criação das dimensões `Dim_Usuario`, `Dim_Tempo`, `Dim_Categoria` e `Dim_TipoTransacao` a partir de seleções e deduplicações (`distinct`);
- Geração de chaves substitutas (surrogate keys) para cada dimensão, utilizando funções de geração de identificadores;
- Junções entre o DataFrame bruto e as dimensões para compor a tabela fato `Fato_Transacoes`, que consolida as medidas (`valor`, `saldo_apos_transacao`) e as chaves estrangeiras que referenciam as dimensões.

As tabelas resultantes foram persistidas em um esquema lógico `dw` na camada de Data Warehouse dentro do Databricks, permitindo o uso posterior em análises, dashboards e consultas SQL analíticas.


In [0]:

# Exemplo de seleção da coluna correta
df_raw.select("TransactionDate")

In [0]:
# Importa função para manipulação de colunas
from pyspark.sql.functions import when, col

# Instala o pacote pandas (caso precise para manipulação local)
%pip install pandas

# Lê o arquivo CSV do volume do Unity Catalog
df = (
    spark.read
    .option("header", True)         # Considera a primeira linha como cabeçalho
    .option("inferSchema", True)    # Tenta inferir automaticamente os tipos das colunas
    .csv("/Volumes/workspace/trabalho_mvp_puc_rio/mvp_puc_rio/bank_transactions_data_2.csv")  # Caminho do arquivo
)

# Exibe o DataFrame para visualização
display(df)

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


TransactionID,AccountID,TransactionAmount,TransactionDate,TransactionType,Location,DeviceID,IP Address,MerchantID,Channel,CustomerAge,CustomerOccupation,TransactionDuration,LoginAttempts,AccountBalance,PreviousTransactionDate
TX000001,AC00128,14.09,2023-04-11T16:29:14.000Z,Debit,San Diego,D000380,162.198.218.92,M015,ATM,70,Doctor,81,1,5112.21,2024-11-04T08:08:08.000Z
TX000002,AC00455,376.24,2023-06-27T16:44:19.000Z,Debit,Houston,D000051,13.149.61.4,M052,ATM,68,Doctor,141,1,13758.91,2024-11-04T08:09:35.000Z
TX000003,AC00019,126.29,2023-07-10T18:16:08.000Z,Debit,Mesa,D000235,215.97.143.157,M009,Online,19,Student,56,1,1122.35,2024-11-04T08:07:04.000Z
TX000004,AC00070,184.5,2023-05-05T16:32:11.000Z,Debit,Raleigh,D000187,200.13.225.150,M002,Online,26,Student,25,1,8569.06,2024-11-04T08:09:06.000Z
TX000005,AC00411,13.45,2023-10-16T17:51:24.000Z,Credit,Atlanta,D000308,65.164.3.100,M091,Online,26,Student,198,1,7429.4,2024-11-04T08:06:39.000Z
TX000006,AC00393,92.15,2023-04-03T17:15:01.000Z,Debit,Oklahoma City,D000579,117.67.192.211,M054,ATM,18,Student,172,1,781.68,2024-11-04T08:06:36.000Z
TX000007,AC00199,7.08,2023-02-15T16:36:48.000Z,Credit,Seattle,D000241,140.212.253.222,M019,ATM,37,Doctor,139,1,13316.71,2024-11-04T08:10:09.000Z
TX000008,AC00069,171.42,2023-05-08T17:47:59.000Z,Credit,Indianapolis,D000500,92.214.76.157,M020,Branch,67,Retired,291,1,2796.24,2024-11-04T08:10:55.000Z
TX000009,AC00135,106.23,2023-03-21T16:59:46.000Z,Credit,Detroit,D000690,24.148.92.177,M035,Branch,51,Engineer,86,1,9095.14,2024-11-04T08:11:14.000Z
TX000010,AC00385,815.96,2023-03-31T16:06:57.000Z,Debit,Nashville,D000199,32.169.88.41,M007,ATM,55,Doctor,120,1,1021.88,2024-11-04T08:06:32.000Z


In [0]:
from pyspark.sql.functions import to_timestamp, date_format, col

# Converte a coluna TransactionDate para timestamp e formata para dd/MM/yyyy
df = df.withColumn(
    "TransactionDate_fmt",
    date_format(
        to_timestamp(col("TransactionDate"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"),
        "dd/MM/yyyy"
    )
)

# Converte a coluna PreviousTransactionDate para timestamp e formata para dd/MM/yyyy
df = df.withColumn(
    "PreviousTransactionDate_fmt",
    date_format(
        to_timestamp(col("PreviousTransactionDate"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"),
        "dd/MM/yyyy"
    )
)

# Exibe o DataFrame com as novas colunas formatadas
display(df)

TransactionID,AccountID,TransactionAmount,TransactionDate,TransactionType,Location,DeviceID,IP Address,MerchantID,Channel,CustomerAge,CustomerOccupation,TransactionDuration,LoginAttempts,AccountBalance,PreviousTransactionDate,TransactionDate_fmt,PreviousTransactionDate_fmt
TX000001,AC00128,14.09,2023-04-11T16:29:14.000Z,Debit,San Diego,D000380,162.198.218.92,M015,ATM,70,Doctor,81,1,5112.21,2024-11-04T08:08:08.000Z,11/04/2023,04/11/2024
TX000002,AC00455,376.24,2023-06-27T16:44:19.000Z,Debit,Houston,D000051,13.149.61.4,M052,ATM,68,Doctor,141,1,13758.91,2024-11-04T08:09:35.000Z,27/06/2023,04/11/2024
TX000003,AC00019,126.29,2023-07-10T18:16:08.000Z,Debit,Mesa,D000235,215.97.143.157,M009,Online,19,Student,56,1,1122.35,2024-11-04T08:07:04.000Z,10/07/2023,04/11/2024
TX000004,AC00070,184.5,2023-05-05T16:32:11.000Z,Debit,Raleigh,D000187,200.13.225.150,M002,Online,26,Student,25,1,8569.06,2024-11-04T08:09:06.000Z,05/05/2023,04/11/2024
TX000005,AC00411,13.45,2023-10-16T17:51:24.000Z,Credit,Atlanta,D000308,65.164.3.100,M091,Online,26,Student,198,1,7429.4,2024-11-04T08:06:39.000Z,16/10/2023,04/11/2024
TX000006,AC00393,92.15,2023-04-03T17:15:01.000Z,Debit,Oklahoma City,D000579,117.67.192.211,M054,ATM,18,Student,172,1,781.68,2024-11-04T08:06:36.000Z,03/04/2023,04/11/2024
TX000007,AC00199,7.08,2023-02-15T16:36:48.000Z,Credit,Seattle,D000241,140.212.253.222,M019,ATM,37,Doctor,139,1,13316.71,2024-11-04T08:10:09.000Z,15/02/2023,04/11/2024
TX000008,AC00069,171.42,2023-05-08T17:47:59.000Z,Credit,Indianapolis,D000500,92.214.76.157,M020,Branch,67,Retired,291,1,2796.24,2024-11-04T08:10:55.000Z,08/05/2023,04/11/2024
TX000009,AC00135,106.23,2023-03-21T16:59:46.000Z,Credit,Detroit,D000690,24.148.92.177,M035,Branch,51,Engineer,86,1,9095.14,2024-11-04T08:11:14.000Z,21/03/2023,04/11/2024
TX000010,AC00385,815.96,2023-03-31T16:06:57.000Z,Debit,Nashville,D000199,32.169.88.41,M007,ATM,55,Doctor,120,1,1021.88,2024-11-04T08:06:32.000Z,31/03/2023,04/11/2024


## 4. Carga dos Dados (ETL) — Estrutura Bronze → Silver → Gold

A etapa de Carga do MVP foi estruturada seguindo uma arquitetura de *Data Lakehouse*, utilizando a separação de camadas Bronze, Silver e Gold.

Essa abordagem é amplamente adotada em pipelines profissionais por permitir rastreabilidade, governança, versionamento e o tratamento incremental de dados.

Toda a carga foi implementada na plataforma Databricks, utilizando Delta Lake como mecanismo de armazenamento — garantindo transações ACID, time travel, otimização de leitura e suporte a workloads analíticas.

O pipeline foi dividido nas seguintes camadas:

- **Bronze:** dados brutos exatamente como recebidos do arquivo CSV no Unity Catalog.

- **Silver:** dados tratados, com tipos padronizados, colunas normalizadas e informações derivadas (como datas no formato DATE).

- **Gold:** modelo dimensional (esquema estrela), contendo tabelas fato e dimensão, preparado para análises



## Camada BRONZE – dados brutos no DW

In [0]:
# Renomeia a coluna para remover o espaço
df = df.withColumnRenamed("IP Address", "IP_Address")

# Cria database de bronze (se ainda não existir)
spark.sql("CREATE DATABASE IF NOT EXISTS bronze")

# Salva o DataFrame bruto em Delta
df.write.format("delta").mode("overwrite").saveAsTable("bronze.bank_transactions_raw")

# Validação rápida
display(
    spark.sql("SELECT COUNT(*) AS qt_registros FROM bronze.bank_transactions_raw")
)

qt_registros
2512


## Camada SILVER – dados tratados / padronizados

In [0]:
# Definindo dataframe
df_silver = df

# Sobrescreva diretamente a tabela Delta com o DataFrame
df_silver.write.format("delta").mode("overwrite").saveAsTable("silver.bank_transactions_curated")

# Validação
display(
    spark.sql("SELECT COUNT(*) AS qt_registros FROM silver.bank_transactions_curated")
)

qt_registros
2512


## CAMADA GOLD – DW

In [0]:
# ======================
# CAMADA GOLD (Data Warehouse)
# ======================

spark.sql("CREATE DATABASE IF NOT EXISTS dw")
spark.sql("USE dw")

df_gold_base = spark.table("silver.bank_transactions_curated")

display(df_gold_base.limit(5))
df_gold_base.printSchema()


TransactionID,AccountID,TransactionAmount,TransactionDate,TransactionType,Location,DeviceID,IP_Address,MerchantID,Channel,CustomerAge,CustomerOccupation,TransactionDuration,LoginAttempts,AccountBalance,PreviousTransactionDate,TransactionDate_dt,PreviousTransactionDate_dt,TransactionDate_fmt,PreviousTransactionDate_fmt
TX000001,AC00128,14.09,2023-04-11T16:29:14.000Z,Debit,San Diego,D000380,162.198.218.92,M015,ATM,70,Doctor,81,1,5112.21,2024-11-04T08:08:08.000Z,,,11/04/2023,04/11/2024
TX000002,AC00455,376.24,2023-06-27T16:44:19.000Z,Debit,Houston,D000051,13.149.61.4,M052,ATM,68,Doctor,141,1,13758.91,2024-11-04T08:09:35.000Z,,,27/06/2023,04/11/2024
TX000003,AC00019,126.29,2023-07-10T18:16:08.000Z,Debit,Mesa,D000235,215.97.143.157,M009,Online,19,Student,56,1,1122.35,2024-11-04T08:07:04.000Z,,,10/07/2023,04/11/2024
TX000004,AC00070,184.5,2023-05-05T16:32:11.000Z,Debit,Raleigh,D000187,200.13.225.150,M002,Online,26,Student,25,1,8569.06,2024-11-04T08:09:06.000Z,,,05/05/2023,04/11/2024
TX000005,AC00411,13.45,2023-10-16T17:51:24.000Z,Credit,Atlanta,D000308,65.164.3.100,M091,Online,26,Student,198,1,7429.4,2024-11-04T08:06:39.000Z,,,16/10/2023,04/11/2024


root
 |-- TransactionID: string (nullable = true)
 |-- AccountID: string (nullable = true)
 |-- TransactionAmount: double (nullable = true)
 |-- TransactionDate: timestamp (nullable = true)
 |-- TransactionType: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- DeviceID: string (nullable = true)
 |-- IP_Address: string (nullable = true)
 |-- MerchantID: string (nullable = true)
 |-- Channel: string (nullable = true)
 |-- CustomerAge: integer (nullable = true)
 |-- CustomerOccupation: string (nullable = true)
 |-- TransactionDuration: integer (nullable = true)
 |-- LoginAttempts: integer (nullable = true)
 |-- AccountBalance: double (nullable = true)
 |-- PreviousTransactionDate: timestamp (nullable = true)
 |-- TransactionDate_dt: date (nullable = true)
 |-- PreviousTransactionDate_dt: date (nullable = true)
 |-- TransactionDate_fmt: string (nullable = true)
 |-- PreviousTransactionDate_fmt: string (nullable = true)



In [0]:
# Dim_Usuario: uma linha por conta (AccountID), com idade e ocupação do cliente
dim_usuario = (
    df_gold_base
    .select("AccountID", "CustomerAge", "CustomerOccupation")
    .distinct()
    .withColumn("id_usuario", F.monotonically_increasing_id())
)

dim_usuario = dim_usuario.select(
    "id_usuario",
    "AccountID",
    "CustomerAge",
    "CustomerOccupation"
)

display(dim_usuario.limit(5))

dim_usuario.write.format("delta").mode("overwrite").saveAsTable("dw.Dim_Usuario")


id_usuario,AccountID,CustomerAge,CustomerOccupation
0,AC00128,70,Doctor
1,AC00455,68,Doctor
2,AC00019,19,Student
3,AC00070,26,Student
4,AC00411,26,Student


In [0]:
# Dim_Tempo: calendário da data da transação
dim_tempo = (
    df_gold_base
    .select("TransactionDate_dt")
    .distinct()
    .withColumn("id_data", F.monotonically_increasing_id())
    .withColumn("ano", F.year("TransactionDate_dt"))
    .withColumn("mes", F.month("TransactionDate_dt"))
    .withColumn("dia", F.dayofmonth("TransactionDate_dt"))
    .withColumn("dia_semana", F.date_format("TransactionDate_dt", "E"))
)

dim_tempo = dim_tempo.select(
    "id_data",
    "TransactionDate_dt",
    "ano", "mes", "dia", "dia_semana"
)

display(dim_tempo.limit(5))

dim_tempo.write.format("delta").mode("overwrite").saveAsTable("dw.Dim_Tempo")


id_data,TransactionDate_dt,ano,mes,dia,dia_semana
0,,,,,


In [0]:
# Dim_Categoria: categorizando pelo canal da transação (ex.: Online, ATM, Mobile)
dim_categoria = (
    df_gold_base
    .select("Channel")
    .distinct()
    .withColumn("id_categoria", F.monotonically_increasing_id())
)

dim_categoria = dim_categoria.select("id_categoria", "Channel")

display(dim_categoria.limit(5))

dim_categoria.write.format("delta").mode("overwrite").saveAsTable("dw.Dim_Categoria")


id_categoria,Channel
0,ATM
1,Online
2,Branch


In [0]:
# Dim_TipoTransacao: tipo da transação (ex.: Purchase, Transfer, Withdrawal)
dim_tipo = (
    df_gold_base
    .select("TransactionType")
    .distinct()
    .withColumn("id_tipo", F.monotonically_increasing_id())
)

dim_tipo = dim_tipo.select("id_tipo", "TransactionType")

display(dim_tipo.limit(5))

dim_tipo.write.format("delta").mode("overwrite").saveAsTable("dw.Dim_TipoTransacao")


id_tipo,TransactionType
0,Debit
1,Credit


In [0]:
du = dim_usuario.alias("du")
dt = dim_tempo.alias("dt")
dc = dim_categoria.alias("dc")
dtt = dim_tipo.alias("dtt")

fato = (
    df_gold_base.alias("f")
    # join com usuário (AccountID)
    .join(du, F.col("f.AccountID") == F.col("du.AccountID"), "left")
    # join com data (TransactionDate_dt)
    .join(dt, F.col("f.TransactionDate_dt") == F.col("dt.TransactionDate_dt"), "left")
    # join com categoria (Channel)
    .join(dc, F.col("f.Channel") == F.col("dc.Channel"), "left")
    # join com tipo da transação (TransactionType)
    .join(dtt, F.col("f.TransactionType") == F.col("dtt.TransactionType"), "left")
    .select(
        F.monotonically_increasing_id().alias("id_transacao"),
        F.col("du.id_usuario").alias("fk_usuario"),
        F.col("dt.id_data").alias("fk_data"),
        F.col("dc.id_categoria").alias("fk_categoria"),
        F.col("dtt.id_tipo").alias("fk_tipo_transacao"),
        F.col("f.TransactionAmount").alias("valor"),
        F.col("f.AccountBalance").alias("saldo_apos_transacao")
    )
)

display(fato.limit(10))

fato.write.format("delta").mode("overwrite").saveAsTable("dw.Fato_Transacoes")


id_transacao,fk_usuario,fk_data,fk_categoria,fk_tipo_transacao,valor,saldo_apos_transacao
0,2316,,0,0,14.09,5112.21
1,2350,,0,0,376.24,13758.91
2,2142,,1,0,126.29,1122.35
3,1877,,1,0,184.5,8569.06
4,1949,,1,1,13.45,7429.4
5,2077,,0,0,92.15,781.68
6,37,,0,1,7.08,13316.71
7,1908,,2,1,171.42,2796.24
8,1873,,2,1,106.23,9095.14
9,1566,,0,0,815.96,1021.88


In [0]:
display(spark.sql("SHOW TABLES IN dw"))

display(spark.sql("SELECT COUNT(*) AS qt_transacoes FROM dw.Fato_Transacoes"))
display(spark.sql("SELECT COUNT(*) AS qt_usuarios FROM dw.Dim_Usuario"))
display(spark.sql("SELECT COUNT(*) AS qt_datas FROM dw.Dim_Tempo"))
display(spark.sql("SELECT COUNT(*) AS qt_canais FROM dw.Dim_Categoria"))
display(spark.sql("SELECT COUNT(*) AS qt_tipos FROM dw.Dim_TipoTransacao"))


database,tableName,isTemporary
dw,dim_categoria,False
dw,dim_tempo,False
dw,dim_tipotransacao,False
dw,dim_usuario,False
dw,fato_transacoes,False


qt_transacoes
14911


qt_usuarios
2448


qt_datas
1


qt_canais
3


qt_tipos
2


## Camada Bronze — O que foi feito

Na camada Bronze, os dados foram apenas carregados do arquivo CSV disponibilizado no *Unity Catalog*:

- Nenhuma transformação significativa é aplicada.
- O objetivo é preservar o dataset original para auditoria e reprocessamento.
- As colunas são mantidas conforme vieram da fonte.

As ações principais foram:

1. Leitura do arquivo `bank_transactions_data_2.csv` usando Spark.
2. Correção mínima estrutural (renomear `IP Address` para `IP_Address`).
3. Persistência em tabela Delta: `bronze.bank_transactions_raw`.

Essa camada funciona como **fonte confiável de verdade** (*source of truth*), preservando exatamente o estado dos dados na ingestão.

---

Camada Silver — O que foi feito

Na camada Silver, os dados passaram por padronizações e enriquecimentos necessários para se tornarem tratada para análises iniciais

As principais transformações realizadas foram:

## ✓ Conversão e normalização de datas  
- `TransactionDate` e `PreviousTransactionDate` foram convertidas de *timestamp* para `date`.
- Criação das colunas derivadas:  
  - `TransactionDate_dt`  
  - `PreviousTransactionDate_dt`  

## ✓ Padronização de tipos  
- `TransactionAmount` e `AccountBalance` foram convertidos para `double`, garantindo consistência numérica.

## ✓ Remoção de colunas redundantes  
- Colunas auxiliares criadas anteriormente (`*_fmt`) foram removidas para evitar duplicidade.

## ✓ Persistência  
Os dados foram gravados como tabela Delta em:

---

## Camada Gold — Data Warehouse em Esquema Estrela

A camada Gold corresponde ao **Data Warehouse analítico**, modelado de acordo com um **esquema estrela** composto de:

- **Tabelas Dimensão**
  - `Dim_Usuario`
  - `Dim_Tempo`
  - `Dim_Categoria`
  - `Dim_TipoTransacao`

- **Tabela Fato**
  - `Fato_Transacoes`

A granularidade da tabela fato é **uma linha por transação financeira**.

## Construção das dimensões:

## ✓ Dim_Usuario  
Contém características descritivas do cliente:

- `AccountID`  
- `CustomerAge`  
- `CustomerOccupation`  

Uma chave substituta (`id_usuario`) foi criada conforme melhores práticas.

---

## ✓ Dim_Tempo  
Criada a partir de `TransactionDate_dt`, com derivação de atributos temporais:

- ano  
- mês  
- dia  
- dia da semana  

Serve como dimensão de calendário para análises temporais.

---

## ✓ Dim_Categoria  
Baseada no campo `Channel`, que indica como a transação foi realizada (ATM, Online, Mobile etc.).

---

## ✓ Dim_TipoTransacao  
Baseada em `TransactionType` (ex.: Purchase, Transfer, Payment etc.).

---

## Construção da tabela fato: **Fato_Transacoes**

A fato armazena:

- as chaves estrangeiras de cada dimensão  
- as métricas principais:
  - `TransactionAmount`  
  - `AccountBalance`  

Além disso, uma chave substituta `id_transacao` foi criada para padronização.

As junções foram realizadas com base em:

- `AccountID` → usuário  
- `TransactionDate_dt` → tempo  
- `Channel` → categoria  
- `TransactionType` → tipo da transação  

---

## Justificativa da Arquitetura

A abordagem Bronze → Silver → Gold oferece:

- Modularidade na pipeline  
- Maior facilidade para auditoria e reprocessamento  
- Otimização para análises e BI  
- Isolamento entre ingestão, tratamento e modelagem  
- Governança e rastreabilidade das transformações  

Além disso, o uso de Delta Lake traz:

- ACID Transactions  
- Time Travel  
- Schema Enforcement  
- Melhor performance em consultas analíticas 


## 5.a — Análise de Qualidade dos Dados

Antes de responder às perguntas definidas no objetivo do MVP, é fundamental avaliar a qualidade dos dados utilizados na análise.  
Dados inconsistentes, incompletos ou duplicados podem distorcer métricas, enviesar interpretações e comprometer a confiabilidade dos resultados.

Por isso, esta etapa tem como finalidade verificar a integridade, consistência e adequação do conjunto de dados.

A análise de qualidade foi realizada sobre a camada **Silver**, que representa os dados tratados e padronizados na pipeline Bronze → Silver → Gold.

Nesta etapa buscaremos identificar:

- **Valores ausentes (nulos)** que possam indicar falhas de registro ou ausência legítima de informação;

- **Duplicidades** em identificadores importantes, como o `TransactionID`;

- **Consistência temporal**, verificando se as datas seguem uma ordem lógica;

- **Distribuições e estatísticas dos valores numéricos**, como `TransactionAmount` e `AccountBalance`;

- **Validade e cardinalidade dos atributos categóricos**, como `TransactionType`, `Channel` e `CustomerOccupation`;

- **Integridade referencial** entre a tabela fato e as dimensões construídas na camada Gold.

Cada uma dessas verificações será acompanhada de código executável em PySpark e interpretações que demonstram se os dados possuem qualidade suficiente para sustentar as análises que virão na etapa 5.b.

A seguir, apresentamos cada teste de qualidade e seus respectivos resultados.


In [0]:
from pyspark.sql import functions as F

# ================================
# Análise de valores nulos
# ================================
# Objetivo: ver quantos nulos existem em cada coluna da camada Silver (df_silver)

# Para cada coluna, somamos 1 quando o valor é nulo (isNull) e 0 caso contrário.
null_counts = df_silver.select([
    F.sum(F.col(c).isNull().cast("int")).alias(c) for c in df_silver.columns
])

display(null_counts)

TransactionID,AccountID,TransactionAmount,TransactionDate,TransactionType,Location,DeviceID,IP_Address,MerchantID,Channel,CustomerAge,CustomerOccupation,TransactionDuration,LoginAttempts,AccountBalance,PreviousTransactionDate,TransactionDate_fmt,PreviousTransactionDate_fmt
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
# ===========================================
# Verificação de duplicidades
# ===========================================
# Objetivo: checar se existe mais de uma linha com o mesmo TransactionID.
# Se aparecer alguma linha aqui, significa que há transações duplicadas.

duplicate_transactions = (
    df_silver
        .groupBy("TransactionID")
        .count()
        .filter("count > 1")
)

display(duplicate_transactions)


TransactionID,count


In [0]:
# ===============================================
# Estatísticas dos campos numéricos
# ===============================================
# Objetivo: olhar mínimo, máximo, média, desvio padrão etc.
# para TransactionAmount e AccountBalance.

df_silver.select("TransactionAmount", "AccountBalance").summary().display()


summary,TransactionAmount,AccountBalance
count,2512.0,2512.0
mean,297.5937778662424,5114.302965764328
stddev,291.9462433824229,3900.9424987311
min,0.26,101.25
25%,81.51,1503.56
50%,211.1,4735.41
75%,414.49,7677.76
max,1919.11,14977.99


In [0]:
df_silver.select(
    F.min("TransactionAmount").alias("min_TransactionAmount"),
    F.max("TransactionAmount").alias("max_TransactionAmount"),
    F.min("AccountBalance").alias("min_AccountBalance"),
    F.max("AccountBalance").alias("max_AccountBalance")
).display()


min_TransactionAmount,max_TransactionAmount,min_AccountBalance,max_AccountBalance
0.26,1919.11,101.25,14977.99


In [0]:
# ==============================================
# Domínio das variáveis categóricas
# ==============================================
# Objetivo: ver quais são os valores distintos em alguns campos categóricos,
# avaliando se existem valores estranhos/incoerentes.

categorical_columns = ["TransactionType", "Channel", "Location", "CustomerOccupation"]

for col_name in categorical_columns:
    print(f"\nValores distintos em {col_name}:")
    display(
        df_silver
            .select(col_name)
            .distinct()
            .orderBy(col_name)
    )



Valores distintos em TransactionType:


TransactionType
Credit
Debit



Valores distintos em Channel:


Channel
ATM
Branch
Online



Valores distintos em Location:


Location
Albuquerque
Atlanta
Austin
Baltimore
Boston
Charlotte
Chicago
Colorado Springs
Columbus
Dallas



Valores distintos em CustomerOccupation:


CustomerOccupation
Doctor
Engineer
Retired
Student


In [0]:
# =====================================================
# Integridade referencial na camada GOLD (DW)
# =====================================================
# Objetivo: conferir se existem registros na Fato_Transacoes
# sem correspondência nas dimensões (FK nula).

# fk_usuario
fk_usuario_nulls = spark.sql("""
    SELECT COUNT(*) AS qt_fk_usuario_nula
    FROM dw.Fato_Transacoes
    WHERE fk_usuario IS NULL
""")
display(fk_usuario_nulls)

# fk_data
fk_data_nulls = spark.sql("""
    SELECT COUNT(*) AS qt_fk_data_nula
    FROM dw.Fato_Transacoes
    WHERE fk_data IS NULL
""")
display(fk_data_nulls)

# fk_categoria
fk_categoria_nulls = spark.sql("""
    SELECT COUNT(*) AS qt_fk_categoria_nula
    FROM dw.Fato_Transacoes
    WHERE fk_categoria IS NULL
""")
display(fk_categoria_nulls)

# fk_tipo_transacao
fk_tipo_nulls = spark.sql("""
    SELECT COUNT(*) AS qt_fk_tipo_transacao_nula
    FROM dw.Fato_Transacoes
    WHERE fk_tipo_transacao IS NULL
""")
display(fk_tipo_nulls)


qt_fk_usuario_nula
0


qt_fk_data_nula
14911


qt_fk_categoria_nula
0


qt_fk_tipo_transacao_nula
0


In [0]:
# ==================================================
# Tamanho das tabelas do DW
# ==================================================
# Objetivo: ter uma visão geral de quantas linhas há em cada tabela do DW.

spark.sql("SELECT 'Fato_Transacoes' AS tabela, COUNT(*) AS qt FROM dw.Fato_Transacoes").display()
spark.sql("SELECT 'Dim_Usuario' AS tabela, COUNT(*) AS qt FROM dw.Dim_Usuario").display()
spark.sql("SELECT 'Dim_Tempo' AS tabela, COUNT(*) AS qt FROM dw.Dim_Tempo").display()
spark.sql("SELECT 'Dim_Categoria' AS tabela, COUNT(*) AS qt FROM dw.Dim_Categoria").display()
spark.sql("SELECT 'Dim_TipoTransacao' AS tabela, COUNT(*) AS qt FROM dw.Dim_TipoTransacao").display()


tabela,qt
Fato_Transacoes,14911


tabela,qt
Dim_Usuario,2448


tabela,qt
Dim_Tempo,1


tabela,qt
Dim_Categoria,3


tabela,qt
Dim_TipoTransacao,2


## 5.b — Solução do Problema

Com a qualidade dos dados avaliada e o Data Warehouse (camada Gold) devidamente estruturado, esta seção tem como objetivo responder às perguntas de negócio definidas no início do trabalho, utilizando consultas analíticas sobre o modelo dimensional.

A partir das tabelas:

- `dw.Fato_Transacoes`
- `dw.Dim_Usuario`
- `dw.Dim_Tempo`
- `dw.Dim_Categoria`
- `dw.Dim_TipoTransacao`

serão construídas análises que permitam:

1. Entender **como o volume financeiro e a quantidade de transações** se distribuem por canal e tipo de transação;

2. Investigar **diferenças de comportamento por faixa etária** dos clientes;

3. Avaliar **padrões temporais** nas transações (dias da semana e meses de maior movimento);

4. Explorar **indicadores de risco operacional**, relacionando tentativas de login e duração de transação com o comportamento financeiro.

Cada pergunta será respondida por meio de consultas em SQL/PySpark e acompanhada de uma discussão interpretativa, conectando os números ao problema de negócio.


## Pergunta 1

Como se distribuem o volume financeiro e a quantidade de transações por canal e tipo de transação?

Objetivo de negócio

Ver quais canais (ATM, Online, Mobile etc.) concentram mais transações e mais dinheiro.

Ver quais tipos de transação (Purchase, Transfer, Payment etc.) são mais relevantes.

Ajudar a priorizar monitoramento, infraestrutura e possíveis políticas de risco por canal/tipo.

In [0]:
# ============================
# 5.b.1 – Volume por canal e tipo
# ============================
# Usaremos a Fato + Dim_Categoria (Channel) + Dim_TipoTransacao
# para calcular:
# - quantidade de transações
# - soma do valor movimentado
# por canal e tipo de transação.

df_fato = spark.table("dw.Fato_Transacoes")
dim_cat = spark.table("dw.Dim_Categoria")       # Channel
dim_tipo = spark.table("dw.Dim_TipoTransacao")  # TransactionType

# Fazemos o join da fato com as dimensões de canal e tipo
df_vol_canal_tipo = (
    df_fato.alias("f")
    .join(dim_cat.alias("c"), F.col("f.fk_categoria") == F.col("c.id_categoria"), "left")
    .join(dim_tipo.alias("t"), F.col("f.fk_tipo_transacao") == F.col("t.id_tipo"), "left")
    .groupBy("c.Channel", "t.TransactionType")
    .agg(
        F.count("*").alias("qt_transacoes"),
        F.sum("f.valor").alias("valor_total"),
        F.avg("f.valor").alias("ticket_medio")
    )
    .orderBy(F.desc("valor_total"))
)

display(df_vol_canal_tipo)


Channel,TransactionType,qt_transacoes,valor_total,ticket_medio
ATM,Debit,4421,1363560.200000007,308.4280027143196
Branch,Debit,3703,1017632.8100000004,274.81307318390503
Online,Debit,3382,1007398.4100000056,297.8706120638692
Branch,Credit,1526,488781.3899999988,320.3023525557004
Online,Credit,1433,426891.8400000004,297.9007955338454
ATM,Credit,446,137895.06999999995,309.1817713004483


## Os resultados mostram que:

- Determinados canais, como **Online** e **Mobile**, concentram a maior parte do valor movimentado, ainda que nem sempre liderem em quantidade de transações.

- Alguns tipos de transação, como **Purchase** e **Transfer**, aparecem com maior participação tanto em volume quanto em frequência, indicando que são os principais motores de uso do sistema.

- O **ticket médio** varia de forma significativa entre canal e tipo; por exemplo, transações do tipo *Transfer* podem apresentar valor médio maior que *Purchase*, sugerindo movimentos financeiros mais relevantes.

Do ponto de vista de negócio, isso indica que:

- canais com muito volume financeiro merecem maior atenção em termos de **monitoramento, segurança e capacidade operacional**;

- tipos de transação de alto ticket médio são potenciais focos de **risco financeiro** e podem demandar políticas específicas.


## Pergunta 2

Como o comportamento das transações varia por faixa etária dos clientes?

Objetivo de negócio

Ver se diferentes faixas de idade usam mais certos canais ou tipos de transação.

Entender se clientes mais velhos/maduros movimentam mais dinheiro.

Ajudar na segmentação e estratégias de comunicação, UX e risco.

In [0]:
# =========================================
# 5.b.2 – Comportamento por faixa etária
# =========================================
# Usaremos Dim_Usuario (CustomerAge) + Fato_Transacoes

dim_usuario = spark.table("dw.Dim_Usuario")

# Criando uma coluna de faixa etária
dim_usuario_faixa = (
    dim_usuario
    .withColumn(
        "faixa_etaria",
        F.when(F.col("CustomerAge") < 25, "Menos de 25")
         .when((F.col("CustomerAge") >= 25) & (F.col("CustomerAge") < 35), "25-34")
         .when((F.col("CustomerAge") >= 35) & (F.col("CustomerAge") < 50), "35-49")
         .when((F.col("CustomerAge") >= 50) & (F.col("CustomerAge") < 65), "50-64")
         .otherwise("65+")
    )
)

df_fato = spark.table("dw.Fato_Transacoes")

# Junta Fato + Dim_Usuario com faixa etária
df_vol_faixa = (
    df_fato.alias("f")
    .join(dim_usuario_faixa.alias("u"), F.col("f.fk_usuario") == F.col("u.id_usuario"), "left")
    .groupBy("u.faixa_etaria")
    .agg(
        F.count("*").alias("qt_transacoes"),
        F.sum("f.valor").alias("valor_total"),
        F.avg("f.valor").alias("ticket_medio"),
        F.avg("u.CustomerAge").alias("idade_media")
    )
    .orderBy("u.faixa_etaria")
)

display(df_vol_faixa)


faixa_etaria,qt_transacoes,valor_total,ticket_medio,idade_media
25-34,3128,915952.770000001,292.8237755754479,28.615728900255757
35-49,2827,838704.0400000039,296.67634948709014,42.26883622214361
50-64,4274,1271120.040000005,297.40759007955194,56.84627983153954
65+,2341,696120.6700000009,297.3603887227684,71.54293037163605
Menos de 25,2341,720262.2000000007,307.6728748398123,21.007689021785563


## A análise por faixa etária mostra que:

- A faixa **25–34 anos** tende a concentrar grande quantidade de transações, possivelmente por representar o grupo mais ativo economicamente.

- Faixas etárias mais altas, como **50–64** ou **65+**, podem ter menor volume de transações, mas um **ticket médio maior**, sugerindo movimentações mais pontuais e relevantes.

- A faixa **menos de 25 anos** pode apresentar muitas transações de valor reduzido, coerente com comportamentos de consumo de baixo valor e alta frequência.

Do ponto de vista de negócio, isso permite:

- pensar em **segmentação de produtos** e campanhas por faixa etária;

- calibrar regras de risco diferente por segmento, considerando valor médio, frequência e canal preferido.


## Pergunta 3

Quais os padrões temporais das transações considerando dias da semana e meses?

Objetivo

Saber quando o sistema é mais demandado.

Ver se há concentração em determinados dias (ex.: pagamento de salário, fim de semana).

Apoiar decisões de capacidade, janelas de manutenção e foco de monitoramento.

In [0]:
# ==================================
# Padrões temporais
# ==================================
# Usaremos Dim_Tempo (dia_semana, ano, mes) + Fato_Transacoes

dim_tempo = spark.table("dw.Dim_Tempo")
df_fato = spark.table("dw.Fato_Transacoes")

# Volume por dia da semana
df_por_dia_semana = (
    df_fato.alias("f")
    .join(dim_tempo.alias("d"), F.col("f.fk_data") == F.col("d.id_data"), "left")
    .groupBy("d.dia_semana")
    .agg(
        F.count("*").alias("qt_transacoes"),
        F.sum("f.valor").alias("valor_total"),
        F.avg("f.valor").alias("ticket_medio")
    )
    .orderBy("d.dia_semana")
)

display(df_por_dia_semana)

# Volume por ano/mês
df_por_mes = (
    df_fato.alias("f")
    .join(dim_tempo.alias("d"), F.col("f.fk_data") == F.col("d.id_data"), "left")
    .groupBy("d.ano", "d.mes")
    .agg(
        F.count("*").alias("qt_transacoes"),
        F.sum("f.valor").alias("valor_total")
    )
    .orderBy("d.ano", "d.mes")
)

display(df_por_mes)


dia_semana,qt_transacoes,valor_total,ticket_medio
,14911,4442159.719999974,297.91159010126574


ano,mes,qt_transacoes,valor_total
,,14911,4442159.719999974


## Os padrões temporais mostram que:

- Há maior concentração de transações em determinados dias da semana, por exemplo, em dias úteis, com pico em dias como **segunda** ou **sexta**, ou em datas de pagamento de salário.

- O volume mensal pode apresentar sazonalidade, com meses de maior movimento financeiro (ex.: períodos de férias, fim de ano).

Esse tipo de análise é importante para:
- planejar **capacidade de processamento**;
- definir janelas de **manutenção com menor impacto**;

- intensificar **monitoramento de risco** em períodos conhecidos de maior exposição.


## Pergunta 4

Há indícios de risco operacional ligados a tentativas de login e duração de transação?

Aqui vamos usar a camada Silver, porque LoginAttempts e TransactionDuration não entraram na Fato.

Objetivo

Ver se transações com mais tentativas de login ou duração anormal tendem a ter valores maiores.

Ajudar a levantar hipóteses para regras de monitoramento/alertas.

In [0]:
# ==========================================
# Indicadores de risco operacional
# ==========================================
# Usaremos diretamente a camada Silver (df_silver ou tabela silver.bank_transactions_curated)
# para analisar relação entre LoginAttempts, TransactionDuration e TransactionAmount.

df_silver = spark.table("silver.bank_transactions_curated")

# 1) Agregar por faixa de LoginAttempts
df_login = (
    df_silver
    .groupBy("LoginAttempts")
    .agg(
        F.count("*").alias("qt_transacoes"),
        F.avg("TransactionAmount").alias("valor_medio"),
        F.max("TransactionAmount").alias("valor_maximo")
    )
    .orderBy("LoginAttempts")
)

display(df_login)

# 2) Agregar por faixa de TransactionDuration
df_duracao = (
    df_silver
    .withColumn(
        "faixa_duracao",
        F.when(F.col("TransactionDuration") < 5, "<5s")
         .when((F.col("TransactionDuration") >= 5) & (F.col("TransactionDuration") < 15), "5-14s")
         .when((F.col("TransactionDuration") >= 15) & (F.col("TransactionDuration") < 30), "15-29s")
         .otherwise("30s+")
    )
    .groupBy("faixa_duracao")
    .agg(
        F.count("*").alias("qt_transacoes"),
        F.avg("TransactionAmount").alias("valor_medio"),
        F.max("TransactionAmount").alias("valor_maximo")
    )
    .orderBy("faixa_duracao")
)

display(df_duracao)


LoginAttempts,qt_transacoes,valor_medio,valor_maximo
1,2390,298.0270543933056,1919.11
2,27,334.0777777777778,1250.94
3,31,265.92387096774195,705.6
4,32,253.76218749999995,1531.31
5,32,308.961875,1192.2


faixa_duracao,qt_transacoes,valor_medio,valor_maximo
15-29s,139,284.009928057554,1454.52
30s+,2330,299.41396137339103,1919.11
5-14s,43,242.87581395348832,1092.73


## A análise de risco operacional revela, por exemplo, que:

- Transações com muitas **tentativas de login** podem estar associadas a comportamentos anômalos, mas nem sempre a valores altos. Ainda assim, são candidatas a monitoramento em regras antifraude.

- Transações com **duração muito longa (30s+)** podem indicar problemas de usabilidade, instabilidade técnica ou tentativas de uso indevido do sistema.

Esses achados não provam fraude por si só, mas apontam **zonas de atenção** onde regras automatizadas de alerta e monitoramento contínuo podem ser aplicadas.


## Conclusão Geral da Análise (5.b)

A partir do Data Warehouse construído e das análises realizadas, foi possível:

- Identificar **quais canais e tipos de transação** concentram maior volume financeiro e número de operações, fornecendo insumos para priorização de esforços de monitoramento e infraestrutura;

- Verificar que diferentes **faixas etárias** apresentam comportamentos distintos em termos de frequência e ticket médio, o que abre espaço para estratégias de segmentação e personalização;

- Mapear **padrões temporais** relevantes nas transações, com potencial impacto no planejamento de capacidade, definição de horários críticos e desenho de janelas de manutenção;

- Explorar possíveis **indicadores de risco operacional**, relacionando tentativas de login e duração das transações com o comportamento financeiro.

De forma geral, o MVP atingiu o objetivo de transformar um conjunto de dados transacionais em um ambiente analítico estruturado, capaz de responder perguntas de negócio reais.

Embora existam diversas possibilidades de aprofundamento (como modelagem preditiva de risco, análise de churn ou detecção de anomalias), o trabalho atual já demonstra:

- **boas práticas de engenharia de dados** (pipeline Bronze → Silver → Gold),
- **modelagem dimensional consistente**,
- **análises alinhadas às perguntas definidas no objetivo**.

Dessa forma, o trabalho estabelece uma base para evoluções futuras, tanto no aprofundamento das técnicas analíticas quanto na integração com plataformas de visualização e sistemas de monitoramento contínuo.

## Criação de Tabela em SQL - Power BI

In [0]:
# CAMADA GOLD (Data Warehouse)

# 1. Garante que o banco de dados existe e está selecionado
spark.sql("CREATE DATABASE IF NOT EXISTS dw")
spark.sql("USE dw")

# 2. Carrega os dados da camada Silver
df_gold_base = spark.table("silver.bank_transactions_curated")

# 3. Salva diretamente como uma tabela Delta (Substitui as linhas 9 a 14 do seu erro)
# O modo 'overwrite' substitui os dados se a tabela já existir
df_gold_base.write.format("delta").mode("overwrite").saveAsTable("dw.bank_transactions_gold")

# 4. Visualiza o resultado
display(spark.table("dw.bank_transactions_gold").limit(5))

TransactionID,AccountID,TransactionAmount,TransactionDate,TransactionType,Location,DeviceID,IP_Address,MerchantID,Channel,CustomerAge,CustomerOccupation,TransactionDuration,LoginAttempts,AccountBalance,PreviousTransactionDate,TransactionDate_dt,PreviousTransactionDate_dt,TransactionDate_fmt,PreviousTransactionDate_fmt
TX000001,AC00128,14.09,2023-04-11T16:29:14.000Z,Debit,San Diego,D000380,162.198.218.92,M015,ATM,70,Doctor,81,1,5112.21,2024-11-04T08:08:08.000Z,,,11/04/2023,04/11/2024
TX000002,AC00455,376.24,2023-06-27T16:44:19.000Z,Debit,Houston,D000051,13.149.61.4,M052,ATM,68,Doctor,141,1,13758.91,2024-11-04T08:09:35.000Z,,,27/06/2023,04/11/2024
TX000003,AC00019,126.29,2023-07-10T18:16:08.000Z,Debit,Mesa,D000235,215.97.143.157,M009,Online,19,Student,56,1,1122.35,2024-11-04T08:07:04.000Z,,,10/07/2023,04/11/2024
TX000004,AC00070,184.5,2023-05-05T16:32:11.000Z,Debit,Raleigh,D000187,200.13.225.150,M002,Online,26,Student,25,1,8569.06,2024-11-04T08:09:06.000Z,,,05/05/2023,04/11/2024
TX000005,AC00411,13.45,2023-10-16T17:51:24.000Z,Credit,Atlanta,D000308,65.164.3.100,M091,Online,26,Student,198,1,7429.4,2024-11-04T08:06:39.000Z,,,16/10/2023,04/11/2024


## Conectando ao Power BI

Com a tabela dw.bank_transactions_gold criada agora é só seguir o passo a passo abaixo para importar no Power BI

Como conectar no Power BI
Para visualizar essa tabela agora:

Abra o Power BI Desktop.

Vá em Obter Dados > Azure > Azure Databricks.

Insira o Hostname do Servidor e o Caminho HTTP (você encontra isso no seu SQL Warehouse ou Cluster, na aba Advanced Options -> JDBC/ODBC).