In [0]:
# verifica se o arquivo existe na volume
file_path = 'dbfs:/Volumes/bcb/pix/pix/Transações Pix por Município.csv'

try:
    files = dbutils.fs.ls('dbfs:/Volumes/bcb/pix/pix/')
    if any(f.name == 'Transações Pix por Município.csv' for f in files):
        print("Arquivo existe !")
    else:
        print("Arquivo não encontrado !")
except Exception as e:
    print("Erro:", e)

In [0]:
from pyspark.sql.functions import col, regexp_replace, concat_ws, lit, to_date
from pyspark.sql.types import DecimalType

# Lê o CSV
df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file_path)
)

# --- Tratamento da coluna AnoMes ---
df = (
    df.withColumn("AnoMes", col("AnoMes").cast("string"))  # garante string
      .withColumn("AnoMes", regexp_replace(col("AnoMes"), r"(\d{4})(\d{2})", "$1-$2"))  # 202401 -> 2024-01
      .withColumn("AnoMes", to_date(concat_ws("-", col("AnoMes"), lit("01")), "yyyy-MM-dd"))  # adiciona dia 01
)

# --- Conversão de valores monetários ---
colunas_valor = ["VL_PagadorPF", "VL_PagadorPJ", "VL_RecebedorPF", "VL_RecebedorPJ"]

for c in colunas_valor:
    # Substitui vírgula decimal por ponto
    df = df.withColumn(c, regexp_replace(col(c).cast("string"), ",", "."))
    # Converte para decimal (18,2) — seguro para valores grandes
    df = df.withColumn(c, col(c).cast(DecimalType(18, 2)))

# Exibe o resultado
display(df)


In [0]:
# valor total pago por pessoas fisicas por mes ano e municipio

import pyspark.sql.functions as F

df_per_year_pf_pago = (
    df.groupBy(
        F.year(df.AnoMes).alias("Ano"),
        df.Municipio_Ibge,
        df.Municipio
    ).agg(
        F.sum(df.VL_PagadorPF).alias("Valor_total_pago_PF")
    )
)

display(df_per_year_pf_pago)

In [0]:
# valor total pago por pessoas juridicas por mes ano e municipio

import pyspark.sql.functions as F

df_per_year_pj_pago = (
    df.groupBy(
        F.year(df.AnoMes).alias("Ano"),
        df.Municipio_Ibge,
        df.Municipio
    ).agg(
        F.sum(df.VL_PagadorPJ).alias("Valor_total_pago_PJ")
    )
)

display(df_per_year_pj_pago)


In [0]:
import pyspark.sql.functions as F

# total por estado
df_per_year_estado = (
    df.groupBy(
        df.Estado,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.VL_PagadorPF+df.VL_PagadorPJ).alias("Total Pago Estado")
    )
)

display(df_per_year_estado)

In [0]:
import pyspark.sql.functions as F

# total por regiao
df_per_year_regiao = (
    df.groupBy(
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.VL_PagadorPF+df.VL_PagadorPJ).alias("Total Pago Região")
    )
)

display(df_per_year_regiao)

In [0]:
import pyspark.sql.functions as F

# total recebido por regiao
df_per_year_regiao_recebedor = (
    df.groupBy(
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.VL_RecebedorPF+df.VL_RecebedorPJ).alias("Total Recebido Região")
    )
)

display(df_per_year_regiao_recebedor)

In [0]:
import pyspark.sql.functions as F

# total recebido por regiao
df_per_year_estado_recebedor = (
    df.groupBy(
        df.Estado,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.VL_RecebedorPF+df.VL_RecebedorPJ).alias("Total Recebido Estado")
    )
)

display(df_per_year_estado_recebedor)

In [0]:
import pyspark.sql.functions as F

# Total recebido e pago por Estado e Ano
df_estado_ano = (
    df.groupBy(
        df.Estado,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.VL_RecebedorPF + df.VL_RecebedorPJ).alias("Total_Recebido"),
        F.sum(df.VL_PagadorPF + df.VL_PagadorPJ).alias("Total_Pago")
    )
    .orderBy("Estado", "Ano")
)

display(df_estado_ano)

In [0]:
import pyspark.sql.functions as F

# Total recebido e pago por Região e Ano
df_regiao_ano = (
    df.groupBy(
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.VL_RecebedorPF + df.VL_RecebedorPJ).alias("Total_Recebido"),
        F.sum(df.VL_PagadorPF + df.VL_PagadorPJ).alias("Total_Pago")
    )
    .orderBy("Regiao", "Ano")
)

display(df_regiao_ano)

In [0]:
import pyspark.sql.functions as F


df_pf_ano = (
    df.groupBy(
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.QT_PES_PagadorPF).alias("Quantidade de Pessoas Físicas Pagadoras"),
    )
    .orderBy("Regiao", "Ano")
)

display(df_pf_ano)

In [0]:
import pyspark.sql.functions as F


df_pj_ano = (
    df.groupBy(
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.QT_PES_PagadorPJ ).alias("Quantidade de Pessoas Jurídicas Pagadoras"),
    )
    .orderBy("Regiao", "Ano")
)

display(df_pj_ano)

In [0]:
import pyspark.sql.functions as F


df_recebedor_pj_ano = (
    df.groupBy(
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.QT_PES_RecebedorPJ ).alias("Quantidade de Pessoas Jurídicas Recebedoras"),
    )
    .orderBy("Regiao", "Ano")
)

display(df_recebedor_pj_ano)

In [0]:
import pyspark.sql.functions as F


df_recebedor_pf_ano = (
    df.groupBy(
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        F.sum(df.QT_PES_RecebedorPF ).alias("Quantidade de Pessoas Físicas Recebedoras"),
    )
    .orderBy("Regiao", "Ano")
)

display(df_recebedor_pf_ano)

In [0]:
import pyspark.sql.functions as F

df_consolidado_municipio = (
    df.groupBy(
        df.Municipio_Ibge,
        df.Municipio,
        df.Estado,
        df.Regiao,
        F.year(df.AnoMes).alias("Ano")
    ).agg(
        # Valores pagos
        F.sum(df.VL_PagadorPF).alias("Total_Pago_PF"),
        F.sum(df.VL_PagadorPJ).alias("Total_Pago_PJ"),
        F.sum(df.VL_PagadorPF + df.VL_PagadorPJ).alias("Total_Pago"),
        
        # Valores recebidos
        F.sum(df.VL_RecebedorPF).alias("Total_Recebido_PF"),
        F.sum(df.VL_RecebedorPJ).alias("Total_Recebido_PJ"),
        F.sum(df.VL_RecebedorPF + df.VL_RecebedorPJ).alias("Total_Recebido"),
        
        # Quantidades
        F.sum(df.QT_PES_PagadorPF).alias("Qtd_Pagadores_PF"),
        F.sum(df.QT_PES_PagadorPJ).alias("Qtd_Pagadores_PJ"),
        F.sum(df.QT_PES_RecebedorPF).alias("Qtd_Recebedores_PF"),
        F.sum(df.QT_PES_RecebedorPJ).alias("Qtd_Recebedores_PJ"),
        
        # Saldo
        (F.sum(df.VL_RecebedorPF + df.VL_RecebedorPJ) - 
         F.sum(df.VL_PagadorPF + df.VL_PagadorPJ)).alias("Saldo")
    )
    .orderBy("Estado", "Municipio", "Ano")
)

display(df_consolidado_municipio)

df_consolidado_municipio.write.mode("overwrite").saveAsTable('pix_consolidado')



In [0]:
spark.sql("DESCRIBE DETAIL pix_consolidado").show(100)