# PicPay – Case Técnico: Receita Financeira

Este notebook apresenta a solução para o case técnico de receita financeira enviado pelo PicPay. O objetivo é aplicar regras de negócio sobre transações financeiras, com foco no cálculo de receitas provenientes de:

- P2P (pagamentos entre usuários via cartão de crédito)
- BILLS (pagamento de boletos, com ou sem parcelamento)


O pipeline organiza as etapas de ingestão, tratamento e geração de dados analíticos em camadas, utilizando recursos nativos da plataforma Azure.

- **Bronze**: ingestão dos dados brutos no formato original (CSV)
- **Silver**: aplicação de regras de negócio e normalização dos dados
- **Gold**: geração da tabela de parcelas detalhadas (Tabela Price) com juros compostos



## Autenticação com o Azure Data Lake Storage (ADLS Gen2)

A leitura e escrita dos dados neste notebook é feita diretamente no Azure Data Lake Storage Gen2.  
Para garantir segurança e evitar exposição de credenciais, foi utilizado um Secret Scope (`picpay_scope`) configurado no Azure Databricks.

A chave da Storage Account foi armazenada no Databricks por meio da CLI, e acessada de forma segura utilizando a função `dbutils.secrets.get()`.

In [0]:
spark.conf.set(
    "fs.azure.account.key.stcensodados.dfs.core.windows.net",
    dbutils.secrets.get(scope="picpay_scope", key="stcensodados_key")
)

## Leitura dos dados brutos (Camada Bronze)

Nesta etapa, os dados da camada Bronze são lidos diretamente do Data Lake.

O arquivo `transactions.csv` foi carregado via updload para a camada bronze (raw) no adls.

sep = ';'


In [0]:
bronze_path = "abfss://raw@stcensodados.dfs.core.windows.net/picpay_analytics_case_receitas/transactions.csv"

df_raw = spark.read.option("header", True).option("sep", ";").csv(bronze_path)

display(df_raw)

transaction_id,transaction_date,transaction_type,transaction_value,receiver_used_cc_limit,payment_method,installments,p2p_surcharge_rate,bills_surcharge_rate,installment_rate
1,03/01/2021,P2P,400,600,Credit card,12,1.99,2.99,3.49
2,14/09/2021,BILLS,650,300,Credit card,5,1.99,2.99,3.49
3,20/07/2021,BILLS,1200,0,Credit card,8,1.99,2.99,3.49
4,06/08/2021,P2P,350,800,Credit card,9,1.99,2.99,3.49
5,13/04/2021,P2P,3500,0,Credit card,10,1.99,2.99,3.49
6,24/05/2021,P2P,3420,0,Credit card,7,1.99,2.99,3.49
7,31/03/2021,BILLS,5000,0,Credit card,12,1.99,2.99,3.49
8,30/08/2021,P2P,2800,0,Credit card,12,1.99,2.99,3.49
9,28/08/2021,BILLS,6000,0,Credit card,4,1.99,2.99,3.49
10,27/09/2021,P2P,8000,0,Credit card,6,1.99,2.99,3.49


## Análise exploratória inicial

Antes da fase de transformação, foram inspecionados alguns pontos, tais como a estrutura do dataset, tipos de dados incorretos, campos nulos e avaliar a consistência dos valores.

Abaixo, são verificados:
- Schema do DataFrame
- Quantidade de linhas e colunas
- Amostras de valores únicos
- Estatísticas descritivas de colunas numéricas


In [0]:
# Ver estrutura do DataFrame
df_raw.printSchema()

# Contagem de linhas e colunas
print(f"Total de linhas: {df_raw.count()} | Total de colunas: {len(df_raw.columns)}")


root
 |-- transaction_id: string (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- transaction_value: string (nullable = true)
 |-- receiver_used_cc_limit: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- installments: string (nullable = true)
 |-- p2p_surcharge_rate: string (nullable = true)
 |-- bills_surcharge_rate: string (nullable = true)
 |-- installment_rate: string (nullable = true)

Total de linhas: 10 | Total de colunas: 10


In [0]:
# Verificar valores únicos de colunas categóricas
df_raw.select("transaction_type").distinct().show()
df_raw.select("payment_method").distinct().show()


+----------------+
|transaction_type|
+----------------+
|             P2P|
|           BILLS|
+----------------+

+--------------+
|payment_method|
+--------------+
|   Credit card|
+--------------+



In [0]:
# Estatísticas básicas de colunas numéricas
df_raw.describe(["transaction_value", "receiver_used_cc_limit"]).show()


+-------+-----------------+----------------------+
|summary|transaction_value|receiver_used_cc_limit|
+-------+-----------------+----------------------+
|  count|               10|                    10|
|   mean|           3132.0|                 170.0|
| stddev|2599.939315531123|      298.328677803526|
|    min|             1200|                     0|
|    max|             8000|                   800|
+-------+-----------------+----------------------+



## Camada Silver: Conversão e Normalização dos Dados

Nesta etapa, os dados da camada Bronze são padronizados para facilitar a aplicação das regras de negócio e análise posterior.

Essas foram as transformações aplicadas:
- Conversão de colunas numéricas e de data para tipos apropriados
- Normalização de colunas categóricas (remoção de espaços e valores nulos)
- Conversão de taxas percentuais para valores decimais
- Remoção de linhas com `transaction_type` vazio

In [0]:
from pyspark.sql.functions import col, to_date, trim

df_silver = df_raw \
    .filter(col("transaction_type") != "") \
    .withColumnRenamed("transaction_id", "id_transacao") \
    .withColumnRenamed("transaction_date", "data_transacao") \
    .withColumnRenamed("transaction_type", "tipo_transacao") \
    .withColumnRenamed("transaction_value", "valor_transacao") \
    .withColumnRenamed("receiver_used_cc_limit", "valor_utilizado_limite") \
    .withColumnRenamed("payment_method", "forma_pagamento") \
    .withColumnRenamed("installments", "parcelas") \
    .withColumnRenamed("p2p_surcharge_rate", "taxa_p2p") \
    .withColumnRenamed("bills_surcharge_rate", "taxa_boleto") \
    .withColumnRenamed("installment_rate", "taxa_parcelamento") \
    .withColumn("data_transacao", to_date("data_transacao", "dd/MM/yyyy")) \
    .withColumn("valor_transacao", col("valor_transacao").cast("double")) \
    .withColumn("valor_utilizado_limite", col("valor_utilizado_limite").cast("double")) \
    .withColumn("parcelas", col("parcelas").cast("int")) \
    .withColumn("taxa_p2p", col("taxa_p2p").cast("double") / 100) \
    .withColumn("taxa_boleto", col("taxa_boleto").cast("double") / 100) \
    .withColumn("taxa_parcelamento", col("taxa_parcelamento").cast("double") / 100) \
    .withColumn("tipo_transacao", trim(col("tipo_transacao"))) \
    .withColumn("forma_pagamento", trim(col("forma_pagamento")))

display(df_silver)

id_transacao,data_transacao,tipo_transacao,valor_transacao,valor_utilizado_limite,forma_pagamento,parcelas,taxa_p2p,taxa_boleto,taxa_parcelamento
1,2021-01-03,P2P,400.0,600.0,Credit card,12,0.0199,0.0299,0.0349
2,2021-09-14,BILLS,650.0,300.0,Credit card,5,0.0199,0.0299,0.0349
3,2021-07-20,BILLS,1200.0,0.0,Credit card,8,0.0199,0.0299,0.0349
4,2021-08-06,P2P,350.0,800.0,Credit card,9,0.0199,0.0299,0.0349
5,2021-04-13,P2P,3500.0,0.0,Credit card,10,0.0199,0.0299,0.0349
6,2021-05-24,P2P,3420.0,0.0,Credit card,7,0.0199,0.0299,0.0349
7,2021-03-31,BILLS,5000.0,0.0,Credit card,12,0.0199,0.0299,0.0349
8,2021-08-30,P2P,2800.0,0.0,Credit card,12,0.0199,0.0299,0.0349
9,2021-08-28,BILLS,6000.0,0.0,Credit card,4,0.0199,0.0299,0.0349
10,2021-09-27,P2P,8000.0,0.0,Credit card,6,0.0199,0.0299,0.0349


In [0]:
silver_path = "abfss://processed@stcensodados.dfs.core.windows.net/picpay_analytics_case_receitas/transactions_silver"

df_silver.write.format("delta").mode("overwrite").save(silver_path)


## Cálculo da taxa sobre transações P2P

Esta etapa aplica a regra de negócio definida para transações do tipo P2P, onde é cobrada uma taxa de 1,99% sobre o valor excedente a R$800, por recebedor, considerando o total de recebimentos via cartão de crédito no mês.

O resultado será armazenado em uma nova coluna chamada `valor_taxa_p2p`.


In [0]:
from pyspark.sql.functions import month, year, sum as _sum, col, when

# Adiciona colunas de ano e mês
df_p2p = df_silver \
    .withColumn("ano", year("data_transacao")) \
    .withColumn("mes", month("data_transacao"))

# Filtra somente transações P2P com pagamento via cartão de crédito
df_p2p_filtrado = df_p2p.filter(
    (col("tipo_transacao") == "P2P") &
    (col("forma_pagamento") == "Credit card")
)

# Agrega por mês/ano e ID (substituir por recebedor se houver)
df_p2p_agrupado = df_p2p_filtrado.groupBy("ano", "mes", "id_transacao") \
    .agg(_sum("valor_transacao").alias("valor_total_mes"))

# Calcula o excedente e aplica taxa de 1,99%
df_p2p_taxado = df_p2p_agrupado.withColumn(
    "valor_taxa_p2p",
    when(col("valor_total_mes") > 800, (col("valor_total_mes") - 800) * 0.0199).otherwise(0)
)


## Cálculo da taxa de boleto (BILLS)

Para transações do tipo `BILLS` pagas com cartão de crédito, é aplicada uma taxa fixa de 2,99% sobre o valor da transação.

Caso o pagamento tenha sido parcelado (coluna `parcelas` > 1), é aplicado também juros compostos mensais de 3,49%, com base no sistema de amortização francês (Tabela Price).

O valor da transação é ajustado com a taxa fixa antes do cálculo dos juros.


In [0]:
from pyspark.sql.functions import when

# Aplica a taxa de 2,99% se for BILLS com cartão de crédito
df_silver = df_silver.withColumn(
    "valor_com_taxa",
    when(
        (col("tipo_transacao") == "BILLS") &
        (col("forma_pagamento") == "Credit card"),
        col("valor_transacao") * (1 + col("taxa_boleto"))
    ).otherwise(col("valor_transacao"))
)


In [0]:
from pyspark.sql.functions import pow

juros_mensal = 0.0349

df_silver = df_silver.withColumn(
    "valor_parcela",
    when(
        (col("tipo_transacao") == "BILLS") &
        (col("forma_pagamento") == "Credit card") &
        (col("parcelas") > 1),
        col("valor_com_taxa") * (juros_mensal / (1 - pow(1 + juros_mensal, -col("parcelas"))))
    )
)

df_silver = df_silver.withColumn(
    "valor_total_a_pagar",
    when(col("valor_parcela").isNotNull(), col("valor_parcela") * col("parcelas"))
)

df_silver = df_silver.withColumn(
    "valor_juros_total",
    col("valor_total_a_pagar") - col("valor_com_taxa")
)


## Geração da tabela de parcelas (transactions_installments)

Para cada transação do tipo BILLS com parcelamento, são geradas as respectivas parcelas utilizando o sistema de amortização francês (Tabela Price), com juros compostos mensais.

Cada linha representa uma parcela com os seguintes dados:

- Número da parcela
- Valor da parcela (fixo)
- Juros mensal da parcela
- Amortização da parcela
- Saldo restante após a amortização


In [0]:
from pyspark.sql.functions import expr, col, sequence, explode, lit
from pyspark.sql.types import DoubleType

# Apenas transações parceladas do tipo BILLS com cartão
df_parceladas = df_silver.filter(
    (col("tipo_transacao") == "BILLS") &
    (col("forma_pagamento") == "Credit card") &
    (col("parcelas") > 1)
)

# Adiciona coluna com sequência de número de parcelas: [1, 2, ..., n]
df_parceladas = df_parceladas.withColumn("numero_parcela", explode(sequence(lit(1), col("parcelas"))))


## Geração da tabela de parcelas (transactions_installments)

Nesta etapa, para cada transação parcelada do tipo BILLS paga com cartão de crédito será detalhado o num mensal de parcelas conforme a Tabela Price (amortização francesa).

Cada linha da tabela representa uma parcela, contendo:

- ID da transação
- Número da parcela
- Valor total da parcela
- Juros do período
- Amortização (parte que reduz o saldo devedor)
- Saldo devedor restante após a parcela

A lógica é aplicada por meio de um Pandas UDF.


In [0]:
from pyspark.sql.functions import col, explode, sequence, lit, pow, round

# Constante da taxa de juros mensal (3,49%)
juros_mensal = 0.0349

# Filtra BILLS parceladas com cartão
df_parceladas = df_silver.filter(
    (col("tipo_transacao") == "BILLS") &
    (col("forma_pagamento") == "Credit card") &
    (col("parcelas") > 1)
)

# Gera sequência de parcelas
df_exp = df_parceladas.withColumn("numero_parcela", explode(sequence(lit(1), col("parcelas"))))

# Calcula saldo anterior a cada parcela (approx simplificada pela Tabela Price)
df_installments = df_exp.withColumn(
    "valor_juros", round((col("valor_com_taxa") * pow(1 + juros_mensal, col("parcelas") - col("numero_parcela"))) * juros_mensal, 2)
).withColumn(
    "valor_parcela", round(col("valor_parcela"), 2)
).withColumn(
    "valor_amortizacao", round(col("valor_parcela") - col("valor_juros"), 2)
).withColumn(
    "saldo_restante", round(
        col("valor_com_taxa") - (
            (col("numero_parcela") * col("valor_parcela")) - 
            (col("numero_parcela") * col("valor_juros"))
        ), 2
    )
).select(
    "id_transacao", "numero_parcela", "valor_parcela",
    "valor_juros", "valor_amortizacao", "saldo_restante"
)


In [0]:
df_installments.write.format("delta").mode("overwrite").save(
    "abfss://processed@stcensodados.dfs.core.windows.net/picpay_analytics_case_receitas/transactions_installments"
)
display(df_installments)

id_transacao,numero_parcela,valor_parcela,valor_juros,valor_amortizacao,saldo_restante
2,1,148.23,26.8,121.43,548.01
2,2,148.23,25.9,122.33,424.78
2,3,148.23,25.02,123.21,299.81
2,4,148.23,24.18,124.05,173.24
2,5,148.23,23.36,124.87,45.09
3,1,179.72,54.84,124.88,1111.0
3,2,179.72,52.99,126.73,982.42
3,3,179.72,51.2,128.52,850.32
3,4,179.72,49.48,130.24,714.92
3,5,179.72,47.81,131.91,576.33


In [0]:
df_installments.write.option("header", True).mode("overwrite").csv(
    "abfss://analytics@stcensodados.dfs.core.windows.net/picpay_analytics_case_receitas/entrega_csv/installments"
)


## Conclusão

A solução implementada entrega um pipeline analítico completo para cálculo de receitas financeiras com base em transações do tipo P2P e BILLS.

- Os dados foram organizados em camadas (Bronze, Silver, Gold) no Data Lake (ADLS Gen2).
- Utilizamos Delta Lake para garantir confiabilidade e performance.
- A tabela `transactions_installments` foi gerada com base na Tabela Price, pronta para análise financeira.
- Todos os dados foram armazenados na camada Gold e exportados em formato `.csv`.