In [2]:
from config.config import java_home_location
from pyspark.sql import SparkSession

java_home_location()

spark = (
    SparkSession
        .builder 
        .getOrCreate()
)

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/21 00:13:34 WARN Utils: Your hostname, Matheuss-MacBook-Air.local, resolves to a loopback address: 127.0.0.1; using 192.168.0.215 instead (on interface en0)
26/02/21 00:13:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/21 00:14:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
from functools import reduce
from pathlib import Path
from pyspark.sql import functions as F
from CVM import *

def consolidate_year(
    spark: SparkSession,
    base_folder: Path,
    entity: str,
    year: str,
    acronyms: list
):
    """
    Consolidates all acronyms for a given year into one DataFrame.
    """

    dfs = []

    for acronym in acronyms:
        path = base_folder / entity / year / acronym

        if not path.exists():
            print(f"[WARNING] Path not found: {path}")
            continue

        df = (
            spark.read
                .format("csv")
                .option("header", "true")
                .option("sep", ";")
                .option("inferSchema", "true")
                .option("encoding", "ISO-8859-1")
                .load(str(path))
        )

        # add metadata columns
        df = (
            df
            .withColumn("YEAR", F.lit(year))
            .withColumn("ACRONYM", F.lit(acronym))
            .withColumn("ENTITY", F.lit(entity))
        )

        dfs.append(df)

    if not dfs:
        return None

    return reduce(
        lambda a, b: a.unionByName(b, allowMissingColumns=True),
        dfs
    )


Downloading: https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/DFP/DADOS/dfp_cia_aberta_2020.zip ...  ✅ 
Extraction completed! ✅ 
Moving dfp_cia_aberta_DRA_con_2020.csv → 2020/
Moving dfp_cia_aberta_DVA_con_2020.csv → 2020/
Moving dfp_cia_aberta_parecer_2020.csv → 2020/
Moving dfp_cia_aberta_DVA_ind_2020.csv → 2020/
Moving dfp_cia_aberta_DRA_ind_2020.csv → 2020/
Moving dfp_cia_aberta_BPP_con_2020.csv → 2020/
Moving dfp_cia_aberta_BPP_ind_2020.csv → 2020/
Moving dfp_cia_aberta_DMPL_con_2020.csv → 2020/
Moving dfp_cia_aberta_DFC_MI_ind_2020.csv → 2020/
Moving dfp_cia_aberta_DFC_MI_con_2020.csv → 2020/
Moving dfp_cia_aberta_DMPL_ind_2020.csv → 2020/
Moving dfp_cia_aberta_composicao_capital_2020.csv → 2020/
Moving dfp_cia_aberta_BPA_ind_2020.csv → 2020/
Moving dfp_cia_aberta_DRE_con_2020.csv → 2020/
Moving dfp_cia_aberta_DFC_MD_con_2020.csv → 2020/
Moving dfp_cia_aberta_2020.csv → 2020/
Moving dfp_cia_aberta_BPA_con_2020.csv → 2020/
Moving dfp_cia_aberta_DRE_ind_2020.csv → 2020/
Moving dfp_cia

In [12]:
def consolidate_all_years(
    spark: SparkSession,
    base_folder: Path,
    entity: str,
    years: list,
    acronyms: list
):
    """
    Consolidates all years and acronyms into a single DataFrame.
    """

    dfs = []

    for year in years:
        df = consolidate_year(
            spark, base_folder, entity, year, acronyms
        )

        if df is not None:
            dfs.append(df)

    if not dfs:
        return None

    return reduce(
        lambda a, b: a.unionByName(b, allowMissingColumns=True),
        dfs
    )
