In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()

In [None]:
df = spark.read.parquet(r"data\\tbl_raw_bovespa\\year=2025\\month=07\\day=14\\b56f25e1f97d4624a37e3a52cf76e99e.snappy.parquet")

In [7]:
df.printSchema()

root
 |-- page_pageNumber: long (nullable = true)
 |-- page_pageSize: long (nullable = true)
 |-- page_totalRecords: long (nullable = true)
 |-- page_totalPages: long (nullable = true)
 |-- header_date: string (nullable = true)
 |-- header_text: string (nullable = true)
 |-- header_part: string (nullable = true)
 |-- header_partAcum: string (nullable = true)
 |-- header_textReductor: string (nullable = true)
 |-- header_reductor: string (nullable = true)
 |-- header_theoricalQty: string (nullable = true)
 |-- results_segment: string (nullable = true)
 |-- results_cod: string (nullable = true)
 |-- results_asset: string (nullable = true)
 |-- results_type: string (nullable = true)
 |-- results_part: string (nullable = true)
 |-- results_partAcum: string (nullable = true)
 |-- results_theoricalQty: string (nullable = true)



In [8]:
df.show(n=5, truncate=False)

+---------------+-------------+-----------------+---------------+-----------+------------------------+-----------+---------------+-------------------+-------------------+-------------------+---------------------------+-----------+-------------+------------+------------+----------------+--------------------+
|page_pageNumber|page_pageSize|page_totalRecords|page_totalPages|header_date|header_text             |header_part|header_partAcum|header_textReductor|header_reductor    |header_theoricalQty|results_segment            |results_cod|results_asset|results_type|results_part|results_partAcum|results_theoricalQty|
+---------------+-------------+-----------------+---------------+-----------+------------------------+-----------+---------------+-------------------+-------------------+-------------------+---------------------------+-----------+-------------+------------+------------+----------------+--------------------+
|1              |120          |84               |1              |14/07/25

In [9]:
df = df.drop("page_pageNumber","page_pageSize","page_totalRecords","page_totalPages","header_text" ,"header_part","header_partAcum","header_textReductor","header_reductor","header_theoricalQty") \
    .drop_duplicates() \
    .na.drop() \
    .withColumnsRenamed({
        "results_segment": "nom_setor",
        "results_asset": "nom_empresa",
        "results_cod": "cod_acao",
        "results_type" : "des_tipo_acao",
        "results_theoricalQty": "qtd_teorica",
        "results_part": "perc_participacao_setor",
        "results_partAcum": "perc_participacao_setor_acumulada",
        "header_date": "data_ref"
    }) \
    .withColumn("nom_setor", sf.trim("nom_setor")) \
    .withColumn("nom_empresa", sf.trim("nom_empresa")) \
    .withColumn("cod_acao", sf.trim("cod_acao")) \
    .withColumn("des_tipo_acao", sf.trim("des_tipo_acao")) \
    .withColumn("perc_participacao_setor",sf.regexp_replace("perc_participacao_setor", ",", ".").cast("decimal(5,3)")) \
    .withColumn("perc_participacao_setor_acumulada",sf.regexp_replace("perc_participacao_setor_acumulada", ",", ".").cast("decimal(5,3)")) \
    .withColumn("qtd_teorica", sf.regexp_replace("qtd_teorica", "[. ]", "").cast("bigint")) \
    .withColumn("data_ref", sf.to_date("data_ref", "dd/MM/yy")) \
    .withColumn("year", sf.year("data_ref")) \
    .withColumn("month", sf.month("data_ref")) \
    .withColumn("day", sf.day("data_ref")) \
    .select(
        *[
            "nom_setor",
            "nom_empresa",
            "cod_acao",
            "des_tipo_acao",
            "qtd_teorica",
            "perc_participacao_setor",
            "perc_participacao_setor_acumulada",
            "data_ref",
            "year",
            "month",
            "day"
        ]
    )

In [None]:
df.show(n=5, truncate=False)

+---------------------------------+------------+--------+-------------+-----------+-----------------------+---------------------------------+----------+----+-----+---+
|nom_setor                        |nom_empresa |cod_acao|des_tipo_acao|qtd_teorica|perc_participacao_setor|perc_participacao_setor_acumulada|data_ref  |year|month|day|
+---------------------------------+------------+--------+-------------+-----------+-----------------------+---------------------------------+----------+----+-----+---+
|Cons N Cíclico / Pr Pessoal Limp |NATURA      |NATU3   |ON      NM   |845713747  |0.394                  |0.394                            |2025-07-14|2025|7    |14 |
|Telecomunicação                  |TELEF BRASIL|VIVT3   |ON           |764884256  |1.168                  |1.980                            |2025-07-14|2025|7    |14 |
|Cons N Ciclico/Agropecuária      |SLC AGRICOLA|SLCE3   |ON      NM   |194261422  |0.168                  |0.168                            |2025-07-14|2025|7  

In [32]:
df.groupingSets(
    [   
        ("nom_setor","nom_empresa","data_ref","year","month","day"), 
        ("nom_empresa","data_ref","year","month","day"), 
        ("data_ref","year","month","day"), 
     ],
    "nom_setor","nom_empresa","data_ref","year","month","day"
).agg(
        sf.count(sf.col("cod_acao")).alias("qtd_registros"),
        sf.count_distinct(sf.col("cod_acao")).alias("qtd_acao"),
        sf.count_distinct(sf.col("des_tipo_acao")).alias("qtd_tipos_acao"),
        sf.sum(sf.col("qtd_teorica")).alias("qtd_teorica_acumulada"),
        sf.max(sf.col("qtd_teorica")).alias("qtd_teorica_max"),
        sf.min(sf.col("qtd_teorica")).alias("qtd_teorica_min"),
        sf.avg(sf.col("perc_participacao_setor")).alias("avg_participacao_setor_total"),
        sf.avg(sf.col("perc_participacao_setor_acumulada")).alias("avg_participacao_setor_acumulada_total")
) \
.withColumn("nom_setor", sf.coalesce("nom_setor", sf.lit("Total Geral"))) \
.withColumn("nom_empresa", sf.coalesce("nom_empresa", sf.lit("Total Geral"))) \
.withColumn("dth_etl_processamento", sf.current_timestamp()) \
.withColumn("qtd_dias_atraso", sf.date_diff("data_ref","dth_etl_processamento")) \
.select(
    "nom_empresa",
    "qtd_registros",
    "qtd_acao",
    "qtd_tipos_acao",
    "qtd_teorica_acumulada",
    "qtd_teorica_max",
    "qtd_teorica_min",
    "avg_participacao_setor_total",
    "avg_participacao_setor_acumulada_total",
    "data_ref",
    "dth_etl_processamento",
    "qtd_dias_atraso",
    "year",
    "month",
    "day",
    "nom_setor",
).show()

+------------+-------------+--------+--------------+---------------------+---------------+---------------+----------------------------+--------------------------------------+----------+---------------------+---------------+----+-----+---+--------------------+
| nom_empresa|qtd_registros|qtd_acao|qtd_tipos_acao|qtd_teorica_acumulada|qtd_teorica_max|qtd_teorica_min|avg_participacao_setor_total|avg_participacao_setor_acumulada_total|  data_ref|dth_etl_processamento|qtd_dias_atraso|year|month|day|           nom_setor|
+------------+-------------+--------+--------------+---------------------+---------------+---------------+----------------------------+--------------------------------------+----------+---------------------+---------------+----+-----+---+--------------------+
|  YDUQS PART|            1|       1|             1|            260249057|      260249057|      260249057|                   0.1720000|                             2.1650000|2025-07-14| 2025-07-12 16:16:...|             