In [1]:
import requests, zipfile
from datetime import date, timedelta
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, DecimalType, DateType, TimestampType, FloatType
from pyspark.sql.functions import udf, col, lit, when, trim, month, year
from pyspark import SparkFiles

#### call lib

In [2]:
%run ./work/lib.ipynb



:: loading settings :: url = jar:file:/usr/local/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-42f75bc7-7e07-4919-a31b-56bf12a94cf2;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 352ms :: artifacts dl 6ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runt

#### Parametros

In [3]:
number_exercises = 5
end_year = date.today().year
start_year = end_year - number_exercises + 1

year_list = list(range(start_year, end_year + 1))

#### request cvm

In [4]:
def get_cvm_financial_statement(fs_type: str, year: int, spark=spark):
    """
    financial statements type (fs_type)
    -- BPA - Assets
    -- BPP - Liabilities
    -- DRE - Income Statement
    -- DFC_MI - Indirect Cash Flow
    -- DFC_MD - Direct Cash Flow
    """
    import pandas as pd

    url = f'https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/ITR/DADOS/itr_cia_aberta_{year}.zip'
    arquivo_zip = f'itr_cia_aberta_{year}.zip'
    arquivo_csv = f'itr_cia_aberta_{fs_type}_con_{year}.csv'

    download = requests.get(url)

    with open(arquivo_zip, 'wb') as arquivo_cvm:
        arquivo_cvm.write(download.content)

    arquivo_zip = zipfile.ZipFile(arquivo_zip)

    stocks_df = pd.read_csv(arquivo_zip.open(name=arquivo_csv), sep=';', encoding='ISO-8859-1')

    df = spark.createDataFrame(stocks_df)

    return df

In [5]:
stocks_df = \
    spark \
        .read \
        .format('csv') \
        .options(header='True', delimiter=';', encoding='iso-8859-1') \
        .csv('{}/extract/stock_list.csv'.format(LAKE_HOME)) \
        .select(
            trim('CNPJ').alias('cnpj'),
            col('TICKER').alias('ticker')
        )

                                                                                

In [6]:
asset_df = None
liabilities_df = None

for year in year_list:

    if asset_df == None:
        asset_df = get_cvm_financial_statement('BPA', year=year)
    else:
        asset_df = asset_df.unionAll(get_cvm_financial_statement('BPA', year=year))

    if liabilities_df == None:
        liabilities_df = get_cvm_financial_statement('BPP', year=year)
    else:
        liabilities_df = liabilities_df.unionAll(get_cvm_financial_statement('BPP', year=year))

In [12]:
temp_asset_df = \
    asset_df \
        .select(
            trim('CNPJ_CIA').alias('cnpj'),
            col('DT_REFER').cast('date').alias('base_date'),
            col('GRUPO_DFP').alias('financial_statement_type'),
            col('MOEDA').alias('currency'),
            col('ESCALA_MOEDA').alias('scale'),
            col('ORDEM_EXERC').alias('order'),
            lit('').cast('date').alias('start_of_period'),
            col('DT_FIM_EXERC').cast('date').alias('end_of_period'),
            trim('CD_CONTA').alias('id_account'),
            col('DS_CONTA').alias('account_description'),
            col('VL_CONTA').cast('decimal(15, 2)').alias('value'),
            col('ST_CONTA_FIXA').alias('account_status'),
            lit('Assets').alias('type'),
            lit('YTD').alias('period_type')
        )

In [13]:
temp_liabilities_df = \
    liabilities_df \
        .select(
            trim('CNPJ_CIA').alias('cnpj'),
            col('DT_REFER').cast('date').alias('base_date'),
            col('GRUPO_DFP').alias('financial_statement_type'),
            col('MOEDA').alias('currency'),
            col('ESCALA_MOEDA').alias('scale'),
            col('ORDEM_EXERC').alias('order'),
            lit('').cast('date').alias('start_of_period'),
            col('DT_FIM_EXERC').cast('date').alias('end_of_period'),
            trim('CD_CONTA').alias('id_account'),
            col('DS_CONTA').alias('account_description'),
            col('VL_CONTA').cast('decimal(15, 2)').alias('value'),
            col('ST_CONTA_FIXA').alias('account_status'),
            lit('Liabilities').alias('type'),
            lit('YTD').alias('period_type')
        )

In [16]:
fact_balance = \
    temp_asset_df.unionAll(temp_liabilities_df) \
        .join(
            stocks_df,
            on=['cnpj'],
            how='inner'
        ) \
        .where(col('order') == 'ÚLTIMO')

##### Write to DW

In [18]:
df = fact_balance

write_to_dw(df, 'fact_balance')

                                                                                