### Env Config

In [5]:
import os
while not os.path.exists('.gitignore'):
	os.chdir(os.path.abspath(os.path.join('../')))
from common_imports import *

In [6]:
# Aproximação da quantidade de linhas esperadas no arquivo final 
TOTAL_ROW_COUNT = None
# Randomizar amostras coletadas de cada arquivo
DO_RANDOMIZE_SAMPLE_ROWS = True

# Exportar os arquivos tratados ou não
DO_EXPORT_FILES = False
# Tipo de arquivo a ser gerado
OUTPUT_FILE_TYPE = 'parquet' # 'csv' ou 'parquet'

# Rodar ou não a visualização dos recursos (Demora muito mais tempo)
RUN_RESOURCE_VISUALIZATION = False

In [7]:
def export_to_file(df: DataFrame, path: str):
	if DO_EXPORT_FILES:
		if not os.path.exists(path):
			os.makedirs(path)
		if OUTPUT_FILE_TYPE == 'csv':
			df.write.csv(f'{path}/df.csv', header=True, mode='overwrite')
		else:
			df.write.parquet(f'{path}/df.parquet', mode='overwrite')

### Criando dataframe

In [8]:
files = os.listdir('_data/raw/operacoes-credito')
files.sort()

conf = SparkConf() \
    .setAppName("Operacoes Creditarias") \
    .set("spark.driver.memory", "12g") \
    .set("spark.executor.memory", "12g") \
    .set("spark.executor.memoryOverhead", "12g")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

print(f'Dataset "Operacoes Creditarias" em PySpark {spark.version}')

df = None
for file in files:
	print(f'Processando o arquivo {file}')

	temp_df = spark.read.csv(f'_data/raw/operacoes-credito/{file}', sep=';', header=True, encoding='utf-8')

	if TOTAL_ROW_COUNT:
		rows_per_file = TOTAL_ROW_COUNT // len(files)
		fraction = rows_per_file / temp_df.count()
		temp_df = temp_df.sample(withReplacement=False, fraction=fraction, seed=random.randint(0, 10000))

	if df is None:
		df = temp_df
	else:
		df = df.union(temp_df)

24/10/04 17:24:46 WARN Utils: Your hostname, administrador-Inspiron-3501 resolves to a loopback address: 127.0.1.1; using 10.18.6.197 instead (on interface wlp0s20f3)
24/10/04 17:24:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/04 17:24:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Dataset "Operacoes Creditarias" em PySpark 3.5.3
Processando o arquivo planilha_201206.csv
Processando o arquivo planilha_201207.csv
Processando o arquivo planilha_201208.csv
Processando o arquivo planilha_201209.csv
Processando o arquivo planilha_201210.csv
Processando o arquivo planilha_201211.csv
Processando o arquivo planilha_201212.csv
Processando o arquivo planilha_201301.csv
Processando o arquivo planilha_201302.csv
Processando o arquivo planilha_201303.csv
Processando o arquivo planilha_201304.csv
Processando o arquivo planilha_201305.csv
Processando o arquivo planilha_201306.csv
Processando o arquivo planilha_201307.csv
Processando o arquivo planilha_201308.csv
Processando o arquivo planilha_201309.csv
Processando o arquivo planilha_201310.csv
Processando o arquivo planilha_201311.csv
Processando o arquivo planilha_201312.csv
Processando o arquivo planilha_201401.csv
Processando o arquivo planilha_201402.csv
Processando o arquivo planilha_201403.csv
Processando o arquivo plani

In [9]:
if RUN_RESOURCE_VISUALIZATION:
	df.count()

### Ajuste de tipagem dos dados

In [10]:
df.printSchema()

root
 |-- data_base: string (nullable = true)
 |-- uf: string (nullable = true)
 |-- tcb: string (nullable = true)
 |-- sr: string (nullable = true)
 |-- cliente: string (nullable = true)
 |-- ocupacao: string (nullable = true)
 |-- cnae_secao: string (nullable = true)
 |-- cnae_subclasse: string (nullable = true)
 |-- porte: string (nullable = true)
 |-- modalidade: string (nullable = true)
 |-- origem: string (nullable = true)
 |-- indexador: string (nullable = true)
 |-- numero_de_operacoes: string (nullable = true)
 |-- a_vencer_ate_90_dias: string (nullable = true)
 |-- a_vencer_de_91_ate_360_dias: string (nullable = true)
 |-- a_vencer_de_361_ate_1080_dias: string (nullable = true)
 |-- a_vencer_de_1081_ate_1800_dias: string (nullable = true)
 |-- a_vencer_de_1801_ate_5400_dias: string (nullable = true)
 |-- a_vencer_acima_de_5400_dias: string (nullable = true)
 |-- vencido_acima_de_15_dias: string (nullable = true)
 |-- carteira_ativa: string (nullable = true)
 |-- carteira_inad

In [11]:
MONETARY_COLS = ['a_vencer_ate_90_dias', 'a_vencer_de_91_ate_360_dias', 'a_vencer_de_361_ate_1080_dias', 'a_vencer_de_1081_ate_1800_dias', 'a_vencer_de_1801_ate_5400_dias', 'a_vencer_acima_de_5400_dias', 'vencido_acima_de_15_dias', 'carteira_ativa', 'carteira_inadimplida_arrastada', 'ativo_problematico']

CATEGORY_COLS = ['uf', 'tcb', 'sr', 'ocupacao', 'cnae_secao', 'cnae_subclasse', 'porte', 'modalidade', 'origem', 'indexador']

QUANTITY_COLS = ['numero_de_operacoes']

DATE_COLS = ['data_base']

for column in df.columns:
	if column in MONETARY_COLS:
		df = df.withColumn(column, regexp_replace(df[column], ',', '.').cast(DecimalType(20, 2)))
		df = df.withColumnRenamed(column, f'vl_{column}')
	elif column in CATEGORY_COLS:
		df = df.withColumnRenamed(column, f'ct_{column}')
	elif column in QUANTITY_COLS:
		df = df.withColumn(column, regexp_replace(df[column], '<= ', '').cast(IntegerType()))
		df = df.withColumnRenamed(column, f'qt_{column}')
	elif column in DATE_COLS:
		df = df.withColumn(column, to_date(df[column], 'yyyy-MM-dd'))
		df = df.withColumnRenamed(column, f'dt_{column}')

In [12]:
df.printSchema()

root
 |-- dt_data_base: date (nullable = true)
 |-- ct_uf: string (nullable = true)
 |-- ct_tcb: string (nullable = true)
 |-- ct_sr: string (nullable = true)
 |-- cliente: string (nullable = true)
 |-- ct_ocupacao: string (nullable = true)
 |-- ct_cnae_secao: string (nullable = true)
 |-- ct_cnae_subclasse: string (nullable = true)
 |-- ct_porte: string (nullable = true)
 |-- ct_modalidade: string (nullable = true)
 |-- ct_origem: string (nullable = true)
 |-- ct_indexador: string (nullable = true)
 |-- qt_numero_de_operacoes: integer (nullable = true)
 |-- vl_a_vencer_ate_90_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_91_ate_360_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_361_ate_1080_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_1081_ate_1800_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_1801_ate_5400_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_acima_de_5400_dias: decimal(20,2) (nullable = true)
 |-- vl_vencido_acima_de_

In [13]:
if RUN_RESOURCE_VISUALIZATION:
    df.groupBy('cliente').count().orderBy('count', ascending=False).show()

In [14]:
df = df.withColumnRenamed('cliente', 'ct_classificacao')

#### Assim percebemos que já existe uma coluna com a classificação do cliente, então vamos retirar essa informação das outras colunas

In [15]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('ct_porte').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_modalidade').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_ocupacao').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_cnae_secao').count().orderBy('count', ascending=False).show()
	df.groupBy('ct_cnae_subclasse').count().orderBy('count', ascending=False).show()

In [16]:
df = df.withColumn('ct_porte', trim(substring(col('ct_porte'), 6, 100)))
df = df.withColumn('ct_modalidade', trim(substring(col('ct_modalidade'), 6, 100)))
df = df.withColumn('ct_ocupacao', trim(substring(col('ct_ocupacao'), 6, 100)))
df = df.withColumn('ct_cnae_secao', trim(substring(col('ct_cnae_secao'), 6, 100)))
df = df.withColumn('ct_cnae_subclasse', trim(substring(col('ct_cnae_subclasse'), 6, 100)))

### Exportando para zona Trusted

In [17]:
export_to_file(df, '_data/trusted/operacoes-credito')

In [18]:
df = df.withColumn('vl_carteira_ativa_n_arrastada', col('vl_carteira_ativa') - col('vl_carteira_inadimplida_arrastada'))

In [19]:
if RUN_RESOURCE_VISUALIZATION:
	df.select('vl_carteira_ativa_n_arrastada').describe().show()

In [20]:
if RUN_RESOURCE_VISUALIZATION:
	df.agg(
		sum(when(col('vl_a_vencer_acima_de_5400_dias') != 0.00, 1).otherwise(0)).alias('count_acima_5400'),
		sum(when(col('vl_a_vencer_de_1801_ate_5400_dias') != 0.00, 1).otherwise(0)).alias('count_1801_5400'),
		sum(when(col('vl_a_vencer_de_361_ate_1080_dias') != 0.00, 1).otherwise(0)).alias('count_361_1080'),
		sum(when(col('vl_a_vencer_de_91_ate_360_dias') != 0.00, 1).otherwise(0)).alias('count_91_360'),
		sum(when(col('vl_a_vencer_de_1081_ate_1800_dias') != 0.00, 1).otherwise(0)).alias('count_1081_1800'),
		sum(when(col('vl_a_vencer_ate_90_dias') != 0.00, 1).otherwise(0)).alias('count_ate_90')
	).show()

In [21]:
df = df.withColumn(
    'ct_faixa_meses_ate_vencimento',
    when(col('vl_a_vencer_acima_de_5400_dias') != 0.00, '> 180')
     .when(col('vl_a_vencer_de_1801_ate_5400_dias') != 0.00, '36-180')
     .when(col('vl_a_vencer_de_1081_ate_1800_dias') != 0.00, '18-36')
     .when(col('vl_a_vencer_de_361_ate_1080_dias') != 0.00, '12-18')
     .when(col('vl_a_vencer_de_91_ate_360_dias') != 0.00, '3-12')
     .when(col('vl_a_vencer_ate_90_dias') != 0.00, '0-3')
     .otherwise(None)
)

In [22]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('ct_faixa_meses_ate_vencimento').count().orderBy('count', ascending=False).show()

In [23]:
df = df.withColumn('vl_media_carteira_ativa_por_operacao', col('vl_carteira_ativa') / col('qt_numero_de_operacoes'))

In [24]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('vl_media_carteira_ativa_por_operacao').count().orderBy('count', ascending=False).show()

In [25]:
df = df.withColumn('vl_media_carteira_inadimplida_por_operacao', col('vl_carteira_inadimplida_arrastada') / col('qt_numero_de_operacoes'))

In [26]:
if RUN_RESOURCE_VISUALIZATION:
	df.groupBy('vl_media_carteira_inadimplida_por_operacao').count().orderBy('count', ascending=False).show()

In [27]:
df.printSchema()

root
 |-- dt_data_base: date (nullable = true)
 |-- ct_uf: string (nullable = true)
 |-- ct_tcb: string (nullable = true)
 |-- ct_sr: string (nullable = true)
 |-- ct_classificacao: string (nullable = true)
 |-- ct_ocupacao: string (nullable = true)
 |-- ct_cnae_secao: string (nullable = true)
 |-- ct_cnae_subclasse: string (nullable = true)
 |-- ct_porte: string (nullable = true)
 |-- ct_modalidade: string (nullable = true)
 |-- ct_origem: string (nullable = true)
 |-- ct_indexador: string (nullable = true)
 |-- qt_numero_de_operacoes: integer (nullable = true)
 |-- vl_a_vencer_ate_90_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_91_ate_360_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_361_ate_1080_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_1081_ate_1800_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_de_1801_ate_5400_dias: decimal(20,2) (nullable = true)
 |-- vl_a_vencer_acima_de_5400_dias: decimal(20,2) (nullable = true)
 |-- vl_vencido_

### Exportando para zona Refined

In [28]:
export_to_file(df, '_data/refined/operacoes-credito')

In [29]:
spark.stop()