# Configurações iniciais

## Aplica configurações ao kernel

In [1]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.app.name": "Extração - Relatório Conta Remunerada"
         },
 "kind":"pyspark"
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6795,application_1664549316508_3333,pyspark,idle,Link,Link,,
6797,application_1664549316508_3340,pyspark,idle,Link,Link,,
6798,application_1664549316508_3342,pyspark,idle,Link,Link,,
6801,application_1664549316508_3345,pyspark,idle,Link,Link,,
6802,application_1664549316508_3346,pyspark,idle,Link,Link,,
6804,application_1664549316508_3348,pyspark,idle,Link,Link,,
6805,application_1664549316508_3349,pyspark,idle,Link,Link,,
6810,application_1664549316508_3361,pyspark,idle,Link,Link,,
6811,application_1664549316508_3363,pyspark,idle,Link,Link,,
6812,application_1664549316508_3364,pyspark,busy,Link,Link,,


## Importa bibliotecas e aplica configurações ao spark

In [2]:
from pyspark.sql.window import *
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
from pyspark.storagelevel import StorageLevel
from datetime import date, timedelta, datetime
import math
import datetime
import json
import pandas as pd
import boto3
import os
import pytz
import io
import zipfile
from dateutil.relativedelta import relativedelta

spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.orc.enabled","true")
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
spark.conf.set("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
spark.conf.set("spark.databricks.io.cache.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6818,application_1664549316508_3386,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Funções

Esta função faz o download de arquivos .csv disponibilizados em um Bucket e gera um dataframe a ser utilizado para atualização de tabelas auxiliares do modelo. Esses arquivos são de responsabilidade do time de Business.

In [3]:
def atualiza_bases_auxiliares(bucket, diretorio, filename):
    
    s3_resource.Bucket(bucket).download_file(diretorio + filename,filename)
    
    base_auxiliar = spark.createDataFrame(pd.read_csv(filename,sep = ';',encoding = 'utf-8').astype(str)).dropDuplicates()
    
    return base_auxiliar

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Esta função cria tabelas sandbox em um Bucket do S3 para auxiliares no processamento de grandes massas de dados. Se não houver um schema para seu modelo pode-se criar um dentro do Jupyterlab através do código:

```python
spark.sql('''
create database sandbox_{nome do seu projeto}
location 's3://{bucket do seu projeto, normalmente será: dock-datalake-dev}/{diretório/path de onde os dados serão disponibilizados}/'
''')
```

Exemplo de utilização do código:

```python
spark.sql('''
create database sandbox_conta_remunerada
location 's3://dock-datalake-dev/home/regulatory/conta_remunerada/'
''')
```

In [4]:
def s3_custom_persistence(df, database, table):
    
    (df.write
     .mode('overwrite')
     .format('parquet')
     .saveAsTable(f'{database}.{table}', path = f's3://[seu caminho do S3 aqui]/{database}/{table}')
    )
    print(f'{table} salvo no s3.')
    return spark.table(f'{database}.{table}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Esta função cria os arquivos .csv no Bucket fazendo uma repartição pela variável npart.

In [5]:
def s3_arquivos_csv(npart, df, bucket, diretorio):
    
    df.repartition(npart).write.mode('overwrite').csv(f's3://{bucket}/{diretorio}',header='True',sep = ';',encoding = 'ISO-8859-1')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Esta função renomeia os arquivos .csv extraíndo a parte (0000) gerada ao criar os arquivos .csv para identificação da ordem dos arquivos.

In [6]:
def s3_rename_files(bucket, diretorio, filename, data_arquivo, key_issuer, decendio_filename):
    
    s3_client = boto3.client("s3")
    response = s3_client.list_objects_v2(Bucket=bucket,Prefix=f'{diretorio}')
    files = response.get("Contents")
    
    for file in files:
        if file.get('Size') > 0:
            if file["Key"].startswith(f'{diretorio}') and file["Key"].endswith(('.csv', '.xlsx')):
                
                part = file.get('Key').split(f'{diretorio}/')[1].split('-')[1]
            
                old_key = file.get('Key')
                issuer_id = key_issuer.split('_')[0]
                issuer_name = key_issuer.split('_')[1]
                new_key = f'{diretorio}/ACCOUNTING_IR_REMUNERATION_{part}_{filename}_{decendio_filename}_{issuer_id}_{issuer_name}_{data_arquivo}.csv'
    
                s3_client.copy_object(Bucket=bucket, CopySource=f"{bucket}/{old_key}", Key=new_key)
                s3_client.delete_object(Bucket=bucket, Key=old_key)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Esta função cria um arquivo .zip com todos os arquivos de tamanho maior que 0 e deleta os arquivos .csv do Bucket, mantendo uma única cópia desses dados.

In [7]:
def s3_arquivo_zip(bucket, diretorio, filename, data_arquivo, key_issuer, decendio_filename):
    
    s3_client = boto3.client("s3")
    response = s3_client.list_objects_v2(Bucket=bucket,Prefix=f'{diretorio}')
    files = response.get("Contents")
    zip_buffer = io.BytesIO()

    with zipfile.ZipFile(zip_buffer, 'a', compression=zipfile.ZIP_DEFLATED, compresslevel = 9) as arquivoZip:
        for file in files:
            if file.get('Size') > 0:
                if file["Key"].startswith(f'{diretorio}') and file["Key"].endswith(('.csv', '.xlsx')):

                    csv_obj = s3_client.get_object(Bucket=bucket, Key=file.get('Key'))
                    body = csv_obj['Body']
                    infile_content = body.read().decode('ISO-8859-1')

                    arquivo = file.get('Key').split(f'{diretorio}/')[1]

                    arquivoZip.writestr(arquivo, infile_content)
                    
                    s3_client.delete_object(Bucket=bucket, Key=file.get('Key')) # Deleta o arquivo do Bucket depois de ter anexado ao zip
                    
    issuer_id = key_issuer.split('_')[0]
    issuer_name = key_issuer.split('_')[1]

    s3_client.put_object(Bucket=bucket, Key=f'{diretorio}/ACCOUNTING_IR_REMUNERATION_0000_{filename}{decendio_filename}_{issuer_id}_{issuer_name}_{data_arquivo}.zip', Body=zip_buffer.getvalue())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Esta função transfere dados entre Buckets usando o lambda.

In [8]:
def call_lambda(lambda_name: str, payload: dict):
    client = boto3.client('lambda', 'us-east-1')
    response = client.invoke(
        FunctionName=lambda_name,
        Payload=json.dumps(payload)
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Variáveis

## Variáveis constantes

In [9]:
data_processamento = datetime.datetime.now(pytz.timezone("America/Sao_Paulo")).strftime("%Y-%m-%d") # data que será utilizada para o diretório do Caronte
data_arquivo = datetime.datetime.now(pytz.timezone("America/Sao_Paulo")).strftime("%Y%m%d_%H%M%S") # data que será utilizada no nome do arquivo
limite_linhas_excel = 1048575 # setando a quantidade máxima de linhas que uma aba do Excel aceita

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Criando uma variável para informar o nome do bucket e o caminho do diretório que será utilizado no processo

In [10]:
nomeBucket = '' # bucket de destino para os dados gerados
nomeBucket_arquivos = '' # bucket onde estão os arquivos .csv que irão gerar as tabelas auxiliares
nomeDiretorioExtracao_diario = '' # parte padrão do path que irá armazenar os dados do relatório de conta remunerada por dia
nomeDiretorioExtracao_sumarizado = '' # parte padrão do path que irá armazenar os dados do relatório de conta remunerada sumarizado
nomeDiretorio_arquivos = ''  # path aonde estão disponibilizados os arquivos .csv que irão gerar as tabelas auxiliares

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Criando uma representação da interface de recursos do S3

In [11]:
s3_resource = boto3.resource('s3')
bucket = s3_resource.Bucket(nomeBucket)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Criando variáveis de nome dos arquivos

In [12]:
filename_diario = f'REMUNERATIONPERDAY' # parte padrão do nome do arquivo que irá conter os dados do relatório de conta remunerada por dia
filename_sumarizado = f'REMUNERATION' # parte padrão do nome do arquivo que irá conter os dados do relatório de conta remunerada sumarizado

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Atualização de tabelas auxiliares

In [13]:
conta_remunerada_decendios = atualiza_bases_auxiliares(nomeBucket_arquivos,nomeDiretorio_arquivos,'conta_remunerada_decendios.csv') # roda a função que gera o dataframe para a tabela auxiliar
df_decendios = s3_custom_persistence(conta_remunerada_decendios,'sandbox_conta_remunerada','stg_decendios') # roda a função que atualiza a tabela auxiliar no S3

os.remove('conta_remunerada_decendios.csv') # Removendo os arquivos CSVs do diretório local

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

stg_decendios salvo no s3.

In [14]:
conta_remunerada_issuers = atualiza_bases_auxiliares(nomeBucket_arquivos,nomeDiretorio_arquivos,'conta_remunerada_issuers.csv') # roda a função que gera o dataframe para a tabela auxiliar
df_issuers = s3_custom_persistence(conta_remunerada_issuers,'sandbox_conta_remunerada','stg_issuers') # roda a função que atualiza a tabela auxiliar no S3

os.remove('conta_remunerada_issuers.csv') # Removendo os arquivos CSVs do diretório local

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

stg_issuers salvo no s3.

# Definição de variáveis que se alteram ao longo do tempo

In [15]:
# data_execucao = '2022-09-30'
data_execucao = datetime.datetime.now(pytz.timezone("America/Sao_Paulo")).strftime("%Y-%m-%d")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Definindo as variáveis do decêndio a ser processado com base na data do processamento e a tabela auxiliar.

In [16]:
decendio = df_decendios.filter(f"extract(day from cast('{data_execucao}' as date)) >= start_day and extract(day from cast('{data_execucao}' as date)) <= end_day").first()['decendio']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
decendio_filename = df_decendios.filter(f"extract(day from cast('{data_execucao}' as date)) >= start_day and extract(day from cast('{data_execucao}' as date)) <= end_day").first()['filename']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Definindo a variável do ano_mês a ser processado com base na data do processamento e a tabela auxiliar.

In [18]:
date_diff = (
    df_decendios
    .filter('extract(day from current_date) >= start_day and extract(day from current_date) <= end_day')
    .withColumn('date_diff',col('date_diff').cast('int'))
    .first()['date_diff']
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
ano_mes = (datetime.datetime.now(pytz.timezone("America/Sao_Paulo")).replace(day=1) + relativedelta(months=date_diff)).strftime("%Y%m")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Definindo a variável de issuers a serem processados com base na data do processamento e a tabela auxiliar.

In [20]:
aux_issuers = df_issuers.select(concat_ws(",", collect_set(col('filename'))).alias("filename")).withColumn('filename_array',split(col('filename'),','))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
issuers = aux_issuers.first()['filename']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
issuer_array = aux_issuers.first()['filename_array']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Criação de tabelas sandbox para extração de dados

## Relatório Diário

### Emissores

Cria a sandbox com dados dos emissores que precisarão ser processados para o relatório de contas remuneradas por dia.

In [23]:
query_diario = f'''
select
    base.issuer_id as id_emissor,
    base.issuer_name as emissor,
    base.account_id as id_conta,
    base.bearer_document as cpf_cnpj,
    base.bearer_name as nome,
    base.yearmonth as anomes,
    date_format(base.transaction_date, 'dd/MM/yyyy') as data_lancamento,
    cast(cast(base.quantity_transactions as integer) as string) as quantidade_transacoes,
    cast(cast(base.amount_transactions as decimal(38,2)) as string) as valor_transacoes,
    cast(cast(base.retention_basis as decimal(38,2)) as string)as base_retencao,
    cast(cast(base.due_income_tax as decimal(38,2)) as string) as ir_devido
from
    app_regulatory.daily_bearing_account base
inner join
    sandbox_conta_remunerada.stg_issuers issuers
    on cast(base.issuer_id as int) = cast(issuers.issuer_id as int)
        and issuers.filename <> '000_DOCK'
where
    1=1
    and base.yearmonth = '{ano_mes}'
    and base.decendio = '{decendio}'
order by
    base.bearer_document,
    base.bearer_name,
    base.transaction_date,
    base.decendio
'''

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df_diario = (spark
             .sql(query_diario)
             .withColumn('quantidade_transacoes',regexp_replace(col('quantidade_transacoes'),"\.",""))
             .withColumn('valor_transacoes',regexp_replace(col('valor_transacoes'),"\.","\,"))
             .withColumn('base_retencao',regexp_replace(col('base_retencao'),"\.","\,"))
             .withColumn('ir_devido',regexp_replace(col('ir_devido'),"\.","\,"))
            )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
df_diario_persisted = s3_custom_persistence(
    df_diario
    ,'sandbox_conta_remunerada'
    ,f'stg_issuer_daily_bearing_account'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

stg_issuer_daily_bearing_account salvo no s3.

### Dock

Cria a sandbox com dados da Dock que precisarão ser processados para o relatório de contas remuneradas por dia.

In [26]:
query_diario_dock = f'''
select
    base.issuer_id as id_emissor,
    base.issuer_name as emissor,
    base.account_id as id_conta,
    base.bearer_document as cpf_cnpj,
    base.bearer_name as nome,
    base.yearmonth as anomes,
    date_format(base.transaction_date, 'dd/MM/yyyy') as data_lancamento,
    cast(cast(base.quantity_transactions as integer) as string) as quantidade_transacoes,
    cast(cast(base.amount_transactions as decimal(38,2)) as string) as valor_transacoes,
    cast(cast(base.retention_basis as decimal(38,2)) as string)as base_retencao,
    cast(cast(base.due_income_tax as decimal(38,2)) as string) as ir_devido
from
    app_regulatory.daily_bearing_account base
inner join
    sandbox_conta_remunerada.stg_issuers issuers
    on cast(base.issuer_id as int) = cast(issuers.issuer_id as int)
        and issuers.filename = '000_DOCK'
where
    1=1
    and yearmonth = '{ano_mes}'
    and decendio = '{decendio}'
order by
    base.bearer_document,
    base.bearer_name,
    base.transaction_date,
    base.decendio
'''

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
df_diario_dock = (spark
                  .sql(query_diario_dock)
                  .withColumn('quantidade_transacoes',regexp_replace(col('quantidade_transacoes'),"\.",""))
                  .withColumn('valor_transacoes',regexp_replace(col('valor_transacoes'),"\.","\,"))
                  .withColumn('base_retencao',regexp_replace(col('base_retencao'),"\.","\,"))
                  .withColumn('ir_devido',regexp_replace(col('ir_devido'),"\.","\,"))
                 )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df_diario_dock_persisted = s3_custom_persistence(
    df_diario_dock
    ,'sandbox_conta_remunerada'
    ,'stg_dock_daily_bearing_account'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

stg_dock_daily_bearing_account salvo no s3.

## Relatório Sumarizado

### Emissores

Cria a sandbox com dados dos emissores que precisarão ser processados para o relatório de contas remuneradas sumarizado.

In [29]:
query_sumarizado = f'''
select
    base.issuer_id as id_emissor,
    base.issuer_name as emissor,
    base.account_id as id_conta,
    base.bearer_document as cpf_cnpj,
    base.bearer_name as nome,
    base.yearmonth as anomes,
    base.decendio,
    cast(cast(base.quantity_transactions as integer) as string) as quantidade_transacoes,
    cast(cast(base.amount_transactions as decimal(38,2)) as string) as valor_transacoes,
    cast(cast(base.retention_basis as decimal(38,2)) as string) as base_retencao,
    cast(cast(base.due_income_tax as decimal(38,2)) as string) as ir_devido
from
    app_regulatory.summarized_bearing_account base
inner join
    sandbox_conta_remunerada.stg_issuers issuers
    on cast(base.issuer_id as int) = cast(issuers.issuer_id as int)
        and issuers.filename <> '000_DOCK'
where
    1=1
    and base.yearmonth = '{ano_mes}'
    and base.decendio = '{decendio}'
order by
    base.bearer_document,
    base.bearer_name,
    base.decendio
'''

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
df_sumarizado = (spark
                 .sql(query_sumarizado)
                 .withColumn('quantidade_transacoes',regexp_replace(col('quantidade_transacoes'),"\.",""))
                 .withColumn('valor_transacoes',regexp_replace(col('valor_transacoes'),"\.","\,"))
                 .withColumn('base_retencao',regexp_replace(col('base_retencao'),"\.","\,"))
                 .withColumn('ir_devido',regexp_replace(col('ir_devido'),"\.","\,"))
                )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
df_sumarizado_persisted = s3_custom_persistence(
    df_sumarizado
    ,'sandbox_conta_remunerada'
    ,f'stg_issuer_summarized_bearing_account'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

stg_issuer_summarized_bearing_account salvo no s3.

### Dock

Cria a sandbox com dados da Dock que precisarão ser processados para o relatório de contas remuneradas sumarizado.

In [32]:
query_sumarizado_dock = f'''
select
    base.issuer_id as id_emissor,
    base.issuer_name as emissor,
    base.account_id as id_conta,
    base.bearer_document as cpf_cnpj,
    base.bearer_name as nome,
    base.yearmonth as anomes,
    base.decendio,
    cast(cast(base.quantity_transactions as integer) as string) as quantidade_transacoes,
    cast(cast(base.amount_transactions as decimal(38,2)) as string) as valor_transacoes,
    cast(cast(base.retention_basis as decimal(38,2)) as string) as base_retencao,
    cast(cast(base.due_income_tax as decimal(38,2)) as string) as ir_devido
from
    app_regulatory.summarized_bearing_account base
inner join
    sandbox_conta_remunerada.stg_issuers issuers
    on cast(base.issuer_id as int) = cast(issuers.issuer_id as int)
        and issuers.filename = '000_DOCK'
where
    1=1
    and yearmonth = '{ano_mes}'
    and decendio = '{decendio}'
order by
    base.bearer_document,
    base.bearer_name,
    base.decendio
'''

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
df_sumarizado_dock = (spark
                      .sql(query_sumarizado_dock)
                      .withColumn('quantidade_transacoes',regexp_replace(col('quantidade_transacoes'),"\.",""))
                      .withColumn('valor_transacoes',regexp_replace(col('valor_transacoes'),"\.","\,"))
                      .withColumn('base_retencao',regexp_replace(col('base_retencao'),"\.","\,"))
                      .withColumn('ir_devido',regexp_replace(col('ir_devido'),"\.","\,"))
                     )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
df_sumarizado_dock_persisted = s3_custom_persistence(
    df_sumarizado_dock
    ,'sandbox_conta_remunerada'
    ,'stg_dock_summarized_bearing_account'
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

stg_dock_summarized_bearing_account salvo no s3.

# Extração dos dados

## Criando arquivos csv dentro do Bucket

Percorre em loop todos os dados que precisam ser processados e gera os arquivos .csv no Bucket.

In [35]:
for i in issuer_array:
    
    issuer_id = i.split('_')[0]
    key_issuer = i
    
    if i == '000_DOCK':
        
        df_relatorio_por_dia = df_diario_dock_persisted
        df_relatorio_sumarizado = df_sumarizado_dock_persisted
        
    else:
        
        df_relatorio_por_dia = df_diario_persisted.filter(f"cast(id_emissor as integer) = cast({issuer_id} as integer)")
        df_relatorio_sumarizado = df_sumarizado_persisted.filter(f"cast(id_emissor as integer) = cast({issuer_id} as integer)")
        
    npart_diario = math.ceil(df_relatorio_por_dia.count() / limite_linhas_excel) # number of partitions when storing in disk
    diretorio_relatorio_por_dia = f'{nomeDiretorioExtracao_diario}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
    s3_arquivos_csv(npart_diario,df_relatorio_por_dia,nomeBucket,diretorio_relatorio_por_dia)

    print(f"Criados arquivos csv do relatório por dia do issuer {str(i)} com {str(npart_diario)} parte(s).")

    npart_sumarizado = math.ceil(df_relatorio_sumarizado.count() / limite_linhas_excel) # number of partitions when storing in disk
    diretorio_sumarizado = f'{nomeDiretorioExtracao_sumarizado}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
    s3_arquivos_csv(npart_sumarizado,df_relatorio_sumarizado,nomeBucket,diretorio_sumarizado)

    print(f"Criados arquivos csv do relatório sumarizado do issuer {str(i)} com {str(npart_sumarizado)} parte(s).")
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Criados arquivos csv do relatório por dia do issuer 222_BRADESCO com 4 parte(s).
Criados arquivos csv do relatório sumarizado do issuer 222_BRADESCO com 2 parte(s).
Criados arquivos csv do relatório por dia do issuer 144_DOTZ com 1 parte(s).
Criados arquivos csv do relatório sumarizado do issuer 144_DOTZ com 1 parte(s).
Criados arquivos csv do relatório por dia do issuer 000_DOCK com 1 parte(s).
Criados arquivos csv do relatório sumarizado do issuer 000_DOCK com 1 parte(s).

## Renomeando arquivos csv

Percorre em loop todos os dados que precisam ser processados e renomeia os arquivos .csv no Bucket.

In [36]:
for i in issuer_array:
    
    issuer_id = i.split('_')[0]
    key_issuer = i
    
    diretorio_relatorio_por_dia = f'{nomeDiretorioExtracao_diario}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
    s3_rename_files(nomeBucket, diretorio_relatorio_por_dia, filename_diario, data_arquivo, key_issuer, decendio_filename)
    
    print(f"Renomeados arquivos csv do relatório por dia do issuer {str(issuer_id)}.")
    
    diretorio_sumarizado = f'{nomeDiretorioExtracao_sumarizado}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
    s3_rename_files(nomeBucket, diretorio_sumarizado, filename_sumarizado, data_arquivo, key_issuer, decendio_filename)
    
    print(f"Renomeados arquivos csv do relatório sumarizado do issuer {str(issuer_id)}.")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Renomeados arquivos csv do relatório por dia do issuer 222.
Renomeados arquivos csv do relatório sumarizado do issuer 222.
Renomeados arquivos csv do relatório por dia do issuer 144.
Renomeados arquivos csv do relatório sumarizado do issuer 144.
Renomeados arquivos csv do relatório por dia do issuer 000.
Renomeados arquivos csv do relatório sumarizado do issuer 000.

## Escrever dados no Caronte

Percorre em loop todos os dados que precisam ser processados e gera uma cópia dos arquivos .csv do Bucket para o Caronte.

In [37]:
LAMBDA_FUNCTION_NAME = "data-engineering-sftp"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
nomeBucket_Caronte = 'dock-services-prd'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
diretorio_Caronte = 'finance/accounting/ir/remuneration'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# for row in df_issuers.collect():
    
#     if str(row['filename']) != '000_DOCK':
        
#         issuer_id = str(row["issuer_id"])
#         issuer_name = str(row['issuer_name']).lower()
#         key_issuer = str(row['filename'])
        
#         diretorio_relatorio_por_dia = f'{nomeDiretorioExtracao_diario}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
        
#         PAYLOAD_EVENT_PER_DAY = {
#                 "sftp":[
#                         { "emissor": f"{issuer_name}-{issuer_id}",
#                             "source_bucket": f"{nomeBucket}",
#                             "source_prefix": f"{diretorio_relatorio_por_dia}",
#                             "target_bucket": f"{nomeBucket_Caronte}",
#                             "target_prefix": f"{issuer_name}-{issuer_id}/{diretorio_Caronte}/{data_processamento}/",
#                             "file_name": "",
#                             "processo": f"Relatório de Conta Remunerada por dia {key_issuer}"
#                         }
#                         ]
#                 }
         
#         call_lambda(LAMBDA_FUNCTION_NAME, PAYLOAD_EVENT_PER_DAY)
        
#         print(f'Escritos no diretório do Caronte os arquivos do relatório de contas remuneradas por dia do issuer {key_issuer}')
        
#         diretorio_sumarizado = f'{nomeDiretorioExtracao_sumarizado}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
        
#         PAYLOAD_EVENT_SUMARIZADO = {
#                 "sftp":[
#                         { "emissor": f"{issuer_name}-{issuer_id}",
#                             "source_bucket": f"{nomeBucket}",
#                             "source_prefix": f"{diretorio_sumarizado}",
#                             "target_bucket": f"{nomeBucket_Caronte}",
#                             "target_prefix": f"{issuer_name}-{issuer_id}/{diretorio_Caronte}/{data_processamento}/",
#                             "file_name": "",
#                             "processo": f"Relatório de Conta Remunerada Sumarizado {key_issuer}"
#                         }
#                         ]
#                 }
         
#         call_lambda(LAMBDA_FUNCTION_NAME, PAYLOAD_EVENT_SUMARIZADO)
        
#         print(f'Escritos no diretório do Caronte os arquivos do relatório de contas remuneradas sumarizado do issuer {key_issuer}')
        

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Criando o arquivo Zip

Percorre em loop todos os dados que precisam ser processados, cria um arquivo .zip com os arquivos .csv no Bucket e remoeve os arquivos .csv do Bucket.

In [41]:
for i in issuer_array:
    
    issuer_id = i.split('_')[0]
    key_issuer = i
    
    diretorio_relatorio_por_dia = f'{nomeDiretorioExtracao_diario}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
    s3_arquivo_zip(nomeBucket, diretorio_relatorio_por_dia, filename_diario, data_arquivo, key_issuer, decendio_filename)
    
    print(f"Criado arquivo zip do relatório por dia do issuer {str(issuer_id)}.")
    
    diretorio_sumarizado = f'{nomeDiretorioExtracao_sumarizado}/data_processamento={data_arquivo}/key_issuer={key_issuer}/anomes={ano_mes}/decendio={decendio}'
    s3_arquivo_zip(nomeBucket, diretorio_sumarizado, filename_sumarizado, data_arquivo, key_issuer, decendio_filename)
    
    print(f"Criado arquivo zip do relatório sumarizado do issuer {str(issuer_id)}.")
    

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Criado arquivo zip do relatório por dia do issuer 222.
Criado arquivo zip do relatório sumarizado do issuer 222.
Criado arquivo zip do relatório por dia do issuer 144.
Criado arquivo zip do relatório sumarizado do issuer 144.
Criado arquivo zip do relatório por dia do issuer 000.
Criado arquivo zip do relatório sumarizado do issuer 000.

In [42]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…