### Env Config

In [1]:
# Aproximação da quantidade de linhas esperadas no arquivo final 
TOTAL_ROW_COUNT = 2000000
# Randomizar amostras coletadas de cada arquivo
DO_RANDOMIZE_SAMPLE_ROWS = True
# Tipo de arquivo a ser gerado
OUTPUT_FILE_TYPE = 'parquet' # 'csv' ou 'parquet'
# Rodar ou não a visualização dos recursos (Demora muito mais tempo)
RUN_RESOURCE_VISUALIZATION = False

In [2]:
%%capture
%pip install requests pyspark

In [3]:
import zipfile, requests, os
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType
from pyspark.sql.functions import regexp_replace, to_date, col, trim, substring, when

In [4]:
def export_to_file(df: DataFrame, path: str):
	if not os.path.exists(path):
		os.makedirs(path)
	if OUTPUT_FILE_TYPE == 'csv':
		df.write.csv(f'{path}/df.csv', header=True, mode='overwrite')
	else:
		df.write.parquet(f'{path}/df.parquet', mode='overwrite')

### Extração Manual e manutenção da integridade da zona Staging

In [5]:
planilhas = [
	'planilha_2012',
	'planilha_2013',
	'planilha_2014',
	'planilha_2015',
	'planilha_2016',
	'planilha_2017',
	'planilha_2018',
	'planilha_2019',
	'planilha_2020',
	'planilha_2021',
	'planilha_2022',
	'planilha_2023',
	'planilha_2024',
]

for planilha in planilhas:
	if not os.path.exists(f'data/staging/{planilha}.zip'):
		url = f'https://www.bcb.gov.br/pda/desig/{planilha}.zip'

		print(f'Baixando arquivo "{planilha}.zip"...')
		response = requests.get(url)

		with open(f'data/staging/{planilha}.zip', 'wb') as f:
			f.write(response.content)

### Exportando para zona Raw

In [6]:
if not os.path.exists('data/raw'):
	os.makedirs('data/raw')

for planilha in planilhas:
    zip_obj = zipfile.ZipFile(f'data/staging/{planilha}.zip', 'r')
    extracted_files = zip_obj.namelist()

    for file in extracted_files:
        extracted_path = os.path.join(f'data/raw', file)
        
        if not os.path.exists(extracted_path):
            print(f'Extraindo {file}...')
            try:
                zip_obj.extract(file, 'data/raw')
            except zipfile.BadZipFile:
                print(f"Erro: O arquivo {planilha}.zip está corrompido.")
            except Exception as e:
                print(f"Erro ao extrair {file}: {e}")

    zip_obj.close()

### Criando dataframe

In [7]:
files = os.listdir('data/raw')
files.sort()

spark = SparkSession.builder \
    .appName("Data Processing") \
    .getOrCreate()

print(f'PySpark {spark.version}')

df = None
for file in files:
    print(f'Processando o arquivo {file}')

    temp_df = spark.read.csv(f'data/raw/{file}', sep=';', header=True, encoding='utf-8')

    if df is None:
        df = temp_df
    else:
        df = df.union(temp_df)

24/10/02 18:08:20 WARN Utils: Your hostname, administrador-Inspiron-3501 resolves to a loopback address: 127.0.1.1; using 10.18.6.197 instead (on interface wlp0s20f3)
24/10/02 18:08:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/02 18:08:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


PySpark 3.5.3
Processando o arquivo planilha_201206.csv
Processando o arquivo planilha_201207.csv
Processando o arquivo planilha_201208.csv
Processando o arquivo planilha_201209.csv
Processando o arquivo planilha_201210.csv
Processando o arquivo planilha_201211.csv
Processando o arquivo planilha_201212.csv
Processando o arquivo planilha_201301.csv
Processando o arquivo planilha_201302.csv
Processando o arquivo planilha_201303.csv
Processando o arquivo planilha_201304.csv
Processando o arquivo planilha_201305.csv
Processando o arquivo planilha_201306.csv
Processando o arquivo planilha_201307.csv
Processando o arquivo planilha_201308.csv
Processando o arquivo planilha_201309.csv
Processando o arquivo planilha_201310.csv
Processando o arquivo planilha_201311.csv
Processando o arquivo planilha_201312.csv
Processando o arquivo planilha_201401.csv
Processando o arquivo planilha_201402.csv
Processando o arquivo planilha_201403.csv
Processando o arquivo planilha_201404.csv
Processando o arquiv

### Ajuste de tipagem dos dados

In [8]:
df.dtypes

[('data_base', 'string'),
 ('uf', 'string'),
 ('tcb', 'string'),
 ('sr', 'string'),
 ('cliente', 'string'),
 ('ocupacao', 'string'),
 ('cnae_secao', 'string'),
 ('cnae_subclasse', 'string'),
 ('porte', 'string'),
 ('modalidade', 'string'),
 ('origem', 'string'),
 ('indexador', 'string'),
 ('numero_de_operacoes', 'string'),
 ('a_vencer_ate_90_dias', 'string'),
 ('a_vencer_de_91_ate_360_dias', 'string'),
 ('a_vencer_de_361_ate_1080_dias', 'string'),
 ('a_vencer_de_1081_ate_1800_dias', 'string'),
 ('a_vencer_de_1801_ate_5400_dias', 'string'),
 ('a_vencer_acima_de_5400_dias', 'string'),
 ('vencido_acima_de_15_dias', 'string'),
 ('carteira_ativa', 'string'),
 ('carteira_inadimplida_arrastada', 'string'),
 ('ativo_problematico', 'string')]

In [9]:
FLOAT_COLS = [
    'a_vencer_ate_90_dias',
    'a_vencer_de_91_ate_360_dias',
    'a_vencer_de_361_ate_1080_dias',
    'a_vencer_de_1081_ate_1800_dias',
    'a_vencer_de_1801_ate_5400_dias',
    'a_vencer_acima_de_5400_dias',
    'vencido_acima_de_15_dias',
    'carteira_ativa',
    'carteira_inadimplida_arrastada',
    'ativo_problematico'
]
CATEGORY_COLS = [
    'uf',
    'tcb',
    'sr',
    'ocupacao',
    'cnae_secao',
    'cnae_subclasse',
    'porte',
    'modalidade',
    'origem',
    'indexador'
]
INT_COLS = [
    'numero_de_operacoes'
]
DATE_COLS = [
	'data_base'
]

for column in df.columns:
	if column in FLOAT_COLS:
		df = df.withColumn(column, regexp_replace(df[column], ',', '.'))
		df = df.withColumn(column, df[column].cast(DoubleType()))
		df = df.withColumnRenamed(column, 'vl_' + column)
	elif column in CATEGORY_COLS:
		df = df.withColumnRenamed(column, 'ct_' + column)
	elif column in INT_COLS:
		df = df.withColumn(column, regexp_replace(df[column], '<= ', ''))
		df = df.withColumn(column, df[column].cast(IntegerType()))
		df = df.withColumnRenamed(column, 'nu_' + column)
	elif column in DATE_COLS:
		df = df.withColumn(column, to_date(df[column], 'yyyy-MM-dd'))
		df = df.withColumnRenamed(column, 'dt_' + column)

In [10]:
df.dtypes

[('dt_data_base', 'date'),
 ('ct_uf', 'string'),
 ('ct_tcb', 'string'),
 ('ct_sr', 'string'),
 ('cliente', 'string'),
 ('ct_ocupacao', 'string'),
 ('ct_cnae_secao', 'string'),
 ('ct_cnae_subclasse', 'string'),
 ('ct_porte', 'string'),
 ('ct_modalidade', 'string'),
 ('ct_origem', 'string'),
 ('ct_indexador', 'string'),
 ('nu_numero_de_operacoes', 'int'),
 ('vl_a_vencer_ate_90_dias', 'double'),
 ('vl_a_vencer_de_91_ate_360_dias', 'double'),
 ('vl_a_vencer_de_361_ate_1080_dias', 'double'),
 ('vl_a_vencer_de_1081_ate_1800_dias', 'double'),
 ('vl_a_vencer_de_1801_ate_5400_dias', 'double'),
 ('vl_a_vencer_acima_de_5400_dias', 'double'),
 ('vl_vencido_acima_de_15_dias', 'double'),
 ('vl_carteira_ativa', 'double'),
 ('vl_carteira_inadimplida_arrastada', 'double'),
 ('vl_ativo_problematico', 'double')]

In [11]:
if RUN_RESOURCE_VISUALIZATION:
    df.groupBy('cliente').count().orderBy('count', ascending=False).show()

In [12]:
df = df.withColumnRenamed('cliente', 'ct_classificacao')

#### Assim percebemos que já existe uma coluna com a classificação do cliente, então vamos retirar essa informação das outras colunas

In [13]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('ct_porte').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_modalidade').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_ocupacao').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_cnae_secao').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_cnae_subclasse').count().orderBy('count', ascending=False).show()

In [14]:
df.withColumn('ct_porte', trim(substring(col('ct_porte'), 6, 100)))
df.withColumn('ct_modalidade', trim(substring(col('ct_modalidade'), 6, 100)))
df.withColumn('ct_ocupacao', trim(substring(col('ct_ocupacao'), 6, 100)))
df.withColumn('ct_cnae_secao', trim(substring(col('ct_cnae_secao'), 6, 100)))
df.withColumn('ct_cnae_subclasse', trim(substring(col('ct_cnae_subclasse'), 6, 100)))

DataFrame[dt_data_base: date, ct_uf: string, ct_tcb: string, ct_sr: string, ct_classificacao: string, ct_ocupacao: string, ct_cnae_secao: string, ct_cnae_subclasse: string, ct_porte: string, ct_modalidade: string, ct_origem: string, ct_indexador: string, nu_numero_de_operacoes: int, vl_a_vencer_ate_90_dias: double, vl_a_vencer_de_91_ate_360_dias: double, vl_a_vencer_de_361_ate_1080_dias: double, vl_a_vencer_de_1081_ate_1800_dias: double, vl_a_vencer_de_1801_ate_5400_dias: double, vl_a_vencer_acima_de_5400_dias: double, vl_vencido_acima_de_15_dias: double, vl_carteira_ativa: double, vl_carteira_inadimplida_arrastada: double, vl_ativo_problematico: double]

### Exportando para zona Trusted

In [15]:
export_to_file(df, 'data/trusted')

24/10/02 18:09:05 WARN DAGScheduler: Broadcasting large task binary with size 5.4 MiB
24/10/02 18:09:06 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:09:15 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:09:21 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:09:23 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:09:27 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:09:30 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00

In [16]:
df = df.withColumn('vl_carteira_ativa_n_arrastada', col('vl_carteira_ativa') - col('vl_carteira_inadimplida_arrastada'))

In [17]:
if RUN_RESOURCE_VISUALIZATION:
	df.select('vl_carteira_ativa_n_arrastada').describe().show()

In [18]:
if RUN_RESOURCE_VISUALIZATION:
	df.agg(
		sum(when(col('vl_a_vencer_acima_de_5400_dias') != 0.00, 1).otherwise(0)).alias('count_acima_5400'),
		sum(when(col('vl_a_vencer_de_1801_ate_5400_dias') != 0.00, 1).otherwise(0)).alias('count_1801_5400'),
		sum(when(col('vl_a_vencer_de_361_ate_1080_dias') != 0.00, 1).otherwise(0)).alias('count_361_1080'),
		sum(when(col('vl_a_vencer_de_91_ate_360_dias') != 0.00, 1).otherwise(0)).alias('count_91_360'),
		sum(when(col('vl_a_vencer_de_1081_ate_1800_dias') != 0.00, 1).otherwise(0)).alias('count_1081_1800'),
		sum(when(col('vl_a_vencer_ate_90_dias') != 0.00, 1).otherwise(0)).alias('count_ate_90')
	).show()

In [19]:
df = df.withColumn(
    'ct_faixa_meses_ate_vencimento',
    when(col('vl_a_vencer_acima_de_5400_dias') != 0.00, '> 180')
     .when(col('vl_a_vencer_de_1801_ate_5400_dias') != 0.00, '36-180')
     .when(col('vl_a_vencer_de_1081_ate_1800_dias') != 0.00, '18-36')
     .when(col('vl_a_vencer_de_361_ate_1080_dias') != 0.00, '12-18')
     .when(col('vl_a_vencer_de_91_ate_360_dias') != 0.00, '3-12')
     .when(col('vl_a_vencer_ate_90_dias') != 0.00, '0-3')
     .otherwise(None)
)

In [20]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('ct_faixa_meses_ate_vencimento').count().orderBy('count', ascending=False).show()

In [21]:
df = df.withColumn('vl_media_carteira_ativa_por_operacao', col('vl_carteira_ativa') / col('nu_numero_de_operacoes'))

In [22]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('vl_media_carteira_ativa_por_operacao').count().orderBy('count', ascending=False).show()

In [23]:
df = df.withColumn('vl_media_carteira_inadimplida_por_operacao', col('vl_carteira_inadimplida_arrastada') / col('nu_numero_de_operacoes'))

In [24]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('vl_media_carteira_inadimplida_por_operacao').count().orderBy('count', ascending=False).show()

In [25]:
df.dtypes

[('dt_data_base', 'date'),
 ('ct_uf', 'string'),
 ('ct_tcb', 'string'),
 ('ct_sr', 'string'),
 ('ct_classificacao', 'string'),
 ('ct_ocupacao', 'string'),
 ('ct_cnae_secao', 'string'),
 ('ct_cnae_subclasse', 'string'),
 ('ct_porte', 'string'),
 ('ct_modalidade', 'string'),
 ('ct_origem', 'string'),
 ('ct_indexador', 'string'),
 ('nu_numero_de_operacoes', 'int'),
 ('vl_a_vencer_ate_90_dias', 'double'),
 ('vl_a_vencer_de_91_ate_360_dias', 'double'),
 ('vl_a_vencer_de_361_ate_1080_dias', 'double'),
 ('vl_a_vencer_de_1081_ate_1800_dias', 'double'),
 ('vl_a_vencer_de_1801_ate_5400_dias', 'double'),
 ('vl_a_vencer_acima_de_5400_dias', 'double'),
 ('vl_vencido_acima_de_15_dias', 'double'),
 ('vl_carteira_ativa', 'double'),
 ('vl_carteira_inadimplida_arrastada', 'double'),
 ('vl_ativo_problematico', 'double'),
 ('vl_carteira_ativa_n_arrastada', 'double'),
 ('ct_faixa_meses_ate_vencimento', 'string'),
 ('vl_media_carteira_ativa_por_operacao', 'double'),
 ('vl_media_carteira_inadimplida_por_oper

### Exportando para zona Refined

In [26]:
export_to_file(df, 'data/refined')

24/10/02 18:32:44 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
24/10/02 18:32:54 WARN DAGScheduler: Broadcasting large task binary with size 6.3 MiB
24/10/02 18:32:55 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
ERROR:root:KeyboardInterrupt while sending command.              (0 + 8) / 1168]
Traceback (most recent call last):
  File "/home/consultor/.local/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/consultor/.local/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
24/10/02 18:33:00

KeyboardInterrupt: 

24/10/02 18:33:02 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:33:03 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:33:04 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:33:08 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:33:10 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:33:11 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
24/10/02 18:33:15 WARN MemoryManager: Total allocation exceeds 95,00% 