In [0]:
# ======================================================================================
# ENVIRONMENT CONFIGURATION (AWS Academy Lab)
# Professional note: In production, we would use Databricks Secret Scopes
# to avoid exposing credentials, or CloudFormation / IaC so S3 data
# is automatically registered in the Databricks Data Catalog.
# ======================================================================================

from pyspark.sql.functions import current_timestamp, input_file_name

path_bronze_csv = "YOUR_PATH"

access_key = "YOUR_ACCESS_KEY"
secret_key = "YOUR_SECRET_KEY"
session_token = "YOUR_SESSION_TOKEN"

# 1. Read using the hidden _metadata column
# Unity Catalog requires explicitly selecting fields from the _metadata object
df_bronze = (
    spark.read
         .format("csv")
         .option("header", "true")
         .option("sep", ";")
         .option("inferSchema", "true")
         .option("fs.s3a.access.key", access_key)
         .option("fs.s3a.secret.key", secret_key)
         .option("fs.s3a.session.token", session_token)
         .option(
             "fs.s3a.aws.credentials.provider",
             "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"
         )
         .load(path_bronze_csv)
         # Select all CSV columns plus the source file path from metadata
         .select("*", "_metadata.file_path")
         .withColumnRenamed("file_path", "nm_arquivo_origem")
         .withColumn("dh_ingestao_bronze", current_timestamp())
)

# 2. Persist data into the Bronze layer
spark.sql("CREATE DATABASE IF NOT EXISTS cvm_p210")

# Tip: In the Bronze layer we use overwrite mode to ensure the S3 snapshot
# always reflects the current state, avoiding duplicated files that Spark
# may read multiple times due to wildcard (*) issues
df_bronze.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("cvm_p210.bronze_inf_diario")

print("Bronze table successfully created using Unity Catalog standards!")
display(spark.table("cvm_p210.bronze_inf_diario").limit(5))

print("Bronze layer updated with Unity Catalog metadata!")

Bronze criada com sucesso usando padrões do Unity Catalog!


TP_FUNDO_CLASSE,CNPJ_FUNDO_CLASSE,ID_SUBCLASSE,DT_COMPTC,VL_TOTAL,VL_QUOTA,VL_PATRIM_LIQ,CAPTC_DIA,RESG_DIA,NR_COTST,nm_arquivo_origem,dh_ingestao_bronze
CLASSES - FIF,00.017.024/0001-53,,2026-01-02,1067199.26,41.630978,1198104.76,0.0,0.0,1,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026-01-10T19:39:43.816Z
CLASSES - FIF,00.017.024/0001-53,,2026-01-05,1067797.26,41.649965,1198651.19,0.0,0.0,1,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026-01-10T19:39:43.816Z
CLASSES - FIF,00.017.024/0001-53,,2026-01-06,1068412.18,41.6696883,1199218.81,0.0,0.0,1,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026-01-10T19:39:43.816Z
CLASSES - FIF,00.017.024/0001-53,,2026-01-07,1069011.36,41.6884867,1199759.81,0.0,0.0,1,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026-01-10T19:39:43.816Z
CLASSES - FIF,00.017.024/0001-53,,2026-01-08,1069607.59,41.705779,1200257.47,0.0,0.0,1,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026-01-10T19:39:43.816Z


Bronze atualizada com metadados do Unity Catalog!


In [0]:
from pyspark.sql.functions import col, to_date, year, month, current_timestamp, coalesce
from delta.tables import DeltaTable

# 1. COLUMN HANDLING (CVM 175 vs Legacy Schema)
# Spark suggested the column 'CNPJ_FUNDO_CLASSE'.
# We use a dynamic approach to support both schemas without breaking the pipeline.
df_bronze = spark.table("cvm_p210.bronze_inf_diario")

# Detect which CNPJ column exists in the Bronze layer to avoid runtime errors
col_cnpj = "CNPJ_FUNDO_CLASSE" if "CNPJ_FUNDO_CLASSE" in df_bronze.columns else "CNPJ_FUNDO"

df_silver_novo = (
    df_bronze
        .select(
            col(col_cnpj).alias("CNPJ_FUNDO"),  # Standardize column name in the Silver layer
            "DT_COMPTC",
            "VL_QUOTA",
            "VL_PATRIM_LIQ",
            "CAPTC_DIA",
            "RESG_DIA",
            "nm_arquivo_origem"
        )
        .withColumn("DT_COMPTC", to_date(col("DT_COMPTC"), "yyyy-MM-dd"))
        .withColumn("VL_PATRIM_LIQ", col("VL_PATRIM_LIQ").cast("double"))
        .withColumn("CAPTC_DIA", col("CAPTC_DIA").cast("double"))
        .withColumn("RESG_DIA", col("RESG_DIA").cast("double"))
        .withColumn("ano", year(col("DT_COMPTC")))
        .withColumn("mes", month(col("DT_COMPTC")))
        .withColumn("dh_processamento_silver", current_timestamp())
        .filter("VL_PATRIM_LIQ > 0")
)

# 2. MERGE LOGIC (Duplicate Prevention)
if spark.catalog.tableExists("cvm_p210.silver_inf_diario"):
    print(">>> Silver table detected. Starting MERGE operation...")
    target_table = DeltaTable.forName(spark, "cvm_p210.silver_inf_diario")

    (
        target_table.alias("hist")
            .merge(
                df_silver_novo.alias("novos"),
                "hist.CNPJ_FUNDO = novos.CNPJ_FUNDO AND hist.DT_COMPTC = novos.DT_COMPTC"
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
    )
else:
    print(">>> Creating Silver table for the first time...")
    (
        df_silver_novo.write
            .format("delta")
            .mode("overwrite")
            .partitionBy("ano", "mes")
            .saveAsTable("cvm_p210.silver_inf_diario")
    )

# 3. PERFORMANCE OPTIMIZATION
spark.sql("OPTIMIZE cvm_p210.silver_inf_diario ZORDER BY (CNPJ_FUNDO)")

print("✅ Silver layer successfully completed!")
display(spark.table("cvm_p210.silver_inf_diario").limit(5))

>>> Criando Tabela Silver pela primeira vez...
✅ Camada Silver concluída com sucesso!


CNPJ_FUNDO,DT_COMPTC,VL_QUOTA,VL_PATRIM_LIQ,CAPTC_DIA,RESG_DIA,nm_arquivo_origem,ano,mes,dh_processamento_silver
00.017.024/0001-53,2026-01-02,41.630978,1198104.76,0.0,0.0,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026,1,2026-01-10T19:40:06.234Z
00.017.024/0001-53,2026-01-05,41.649965,1198651.19,0.0,0.0,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026,1,2026-01-10T19:40:06.234Z
00.017.024/0001-53,2026-01-06,41.6696883,1199218.81,0.0,0.0,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026,1,2026-01-10T19:40:06.234Z
00.017.024/0001-53,2026-01-07,41.6884867,1199759.81,0.0,0.0,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026,1,2026-01-10T19:40:06.234Z
00.017.024/0001-53,2026-01-08,41.705779,1200257.47,0.0,0.0,s3a://project-cvm-210-lab-471112952127/cvm-transactions-daily/ano=2026/mes=01/inf_diario_fi_202601.csv,2026,1,2026-01-10T19:40:06.234Z


In [0]:
# ======================================================================================
# TECHNICAL NOTE & BUSINESS ASSUMPTIONS (DATA GOVERNANCE):
# 1. Due to banking secrecy regulations, the CVM does not provide granular
#    portability data from Payment Institutions (IPs) and Banks.
# 2. PORTABILITY PROXY:
#    Since there is no direct CPF-level mapping, portability is inferred by
#    monitoring anomalous inflows and outflows (Subscriptions and Redemptions).
# 3. BUSINESS RULE:
#    Net movements greater than 5% of the fund's Net Asset Value (NAV) within a
#    single month are classified as 'Portability Events', as they deviate from
#    organic behavior.
# ======================================================================================

from pyspark.sql.functions import col, sum, when, abs, round, avg, max, min

# 1. Aggregation and Strategic KPI Calculation
df_gold_final = (
    spark.table("cvm_p210.silver_inf_diario")
        .groupBy("CNPJ_FUNDO", "ano", "mes")
        .agg(
            sum("CAPTC_DIA").alias("total_subscriptions"),
            sum("RESG_DIA").alias("total_redemptions"),
            avg("VL_PATRIM_LIQ").alias("avg_net_asset_value"),
            max("VL_QUOTA").alias("max_nav_per_share"),
            min("VL_QUOTA").alias("min_nav_per_share")
        )
        .withColumn(
            "net_flow",
            round(col("total_subscriptions") - col("total_redemptions"), 2)
        )
)

# 2. Portability Classification and Fund Health Assessment
df_gold_insights = (
    df_gold_final
        .withColumn(
            "turnover_rate",
            round(
                (abs(col("total_subscriptions")) + abs(col("total_redemptions"))) /
                col("avg_net_asset_value"),
                4
            )
        )
        .withColumn(
            "is_portability_event",
            when(
                abs(col("net_flow")) > (col("avg_net_asset_value") * 0.05),
                "Yes"
            ).otherwise("No")
        )
        .withColumn(
            "business_classification",
            when(
                (col("is_portability_event") == "Yes") & (col("net_flow") > 0),
                "Inbound Portability (Growth Opportunity)"
            )
            .when(
                (col("is_portability_event") == "Yes") & (col("net_flow") < 0),
                "Outbound Portability (Capital Loss)"
            )
            .when(
                col("net_flow") > 0,
                "Organic Growth"
            )
            .otherwise(
                "Retail Redemption"
            )
        )
        # Monthly NAV Volatility (key driver analysis for portability behavior)
        .withColumn(
            "monthly_nav_variation_pct",
            round(
                ((col("max_nav_per_share") - col("min_nav_per_share")) /
                 col("min_nav_per_share")) * 100,
                2
            )
        )
)

# 3. Final Write to Gold Layer
(
    df_gold_insights.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("cvm_p210.gold_cvm210_analytics")
)

print("✨ Gold table successfully created with business insights and governance context.")
display(spark.table("cvm_p210.gold_cvm210_analytics"))


✨ Tabela Gold Finalizada com Insights de Negócio e Metadados.


CNPJ_FUNDO,ano,mes,total_captacao,total_resgate,patrimonio_medio,cota_maxima,cota_minima,fluxo_liquido,taxa_rotatividade,is_portabilidade,classificacao_negocio,variacao_cota_mes
05.575.327/0001-05,2026,1,0.0,0.0,5071568.078,127.434788912316,127.109504562542,0.0,0.0,Não,Resgate de Varejo,0.26
06.288.668/0001-62,2026,1,0.0,3245286.23,1186754030.176,118.243365,115.745316,-3245286.23,0.0027,Não,Resgate de Varejo,2.16
07.667.580/0001-14,2026,1,0.0,0.0,74536538.47,8.5289033843,8.3514559095,0.0,0.0,Não,Resgate de Varejo,2.12
09.911.408/0001-90,2026,1,0.0,0.0,82903418.93200001,4.046541,4.015541,0.0,0.0,Não,Resgate de Varejo,0.77
12.053.727/0001-16,2026,1,58178.87,135471.46000000002,24979700.628,2.1208099,2.1091833,-77292.59,0.0078,Não,Resgate de Varejo,0.55
13.409.667/0001-94,2026,1,0.0,0.0,2640590.122,7.15822638752,7.101246837096,0.0,0.0,Não,Resgate de Varejo,0.8
15.674.503/0001-10,2026,1,0.0,28800.65,8411035.672,318.356418,317.240474,-28800.65,0.0034,Não,Resgate de Varejo,0.35
17.898.668/0001-09,2026,1,0.0,0.0,171269235.938,3.46645967,3.46090006,0.0,0.0,Não,Resgate de Varejo,0.16
19.689.941/0001-02,2026,1,1000.0,1035.94,28254975.076,2.474407438,2.406475425,-35.94,0.0001,Não,Resgate de Varejo,2.82
20.649.608/0001-59,2026,1,0.0,0.0,16784672.802,0.67303208,0.67080806,0.0,0.0,Não,Resgate de Varejo,0.33
