In [1]:
%%capture
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv",
          "spark.app.name": "Acompanha o salda da conta corrente (Banco Bradesco) no intervalo dos arquivos(cnab240) recebidos"
         },
"kind":"pyspark"
"driverMemory": "8000M"
}

In [2]:

# importa as bibliotecas

from pyspark.sql.types import StringType
from pyspark.sql.functions import *

import boto3
import pandas as pd
from datetime import datetime, timedelta
import pytz
from slack_sdk.webhook import WebhookClient


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1220,application_1681166481024_153634,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:

# lista os arquivos que estão na pasta 

def lista_arquivo():
    
    # Lista os arquivos que estão na pasta e captura o mais recente
    

    hoje = datetime.now(pytz.timezone('America/Sao_Paulo'))
    
    
    # Verifica se é segunda-feira - primeiro arquivo vem no sabado
    # 0 é segunda-feira
    
    num = hoje.weekday()    
    if num == 0:
        td = timedelta(2)
    else:    
        td = timedelta(0)
        
    
    dia_atual = hoje - td
    
    data_atual_str = dia_atual.strftime('%Y-%m-%d')

    s3_client = boto3.client("s3")
    bucket_name = 'bucket_name'
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix='pasta_arquivos', Delimiter='/')
    files = response.get("Contents", [])

    arqs = pd.DataFrame(files)
    
    arqs = arqs[['Key', 'LastModified']]

    arqs = arqs.sort_values(by=['LastModified'], ascending=False)

    arqs['LastModified'] = arqs['LastModified'].dt.tz_convert('America/Sao_Paulo')

    arqs = arqs[arqs['LastModified'].dt.strftime('%Y-%m-%d') >= data_atual_str]
    
    return arqs



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:

#### SALDO DE ABERTURA

def primeiro_arquivo():
    
    # Captura o primeiro arquivo do dia
    arq_primeiro = lista_arquivo().iloc[len(lista_arquivo())-1][0]


    # Abre o arquivo na AWS- S3 e coloca em uma tabela
    arq_cnab_primeiro = spark.read.format('csv').option('header', 'false').load(f's3://bucket{arq_primeiro}')
    arq_cnab_primeiro.createOrReplaceTempView('cnab_bradesco_primeiro')

    # captura a data e hora do primeiro arquivo
    qyery_arq_1 = '''
        SELECT 
        substring(_c0, 8, 1) as Registro ,
        concat(to_date(substring(_c0, 144, 8), 'ddMMyyyy'), " ",substring(_c0, 152, 2), ":" , substring(_c0, 154, 2), ":", substring(_c0, 156, 2)) as Data_Hora_Arquivo_str_Abertura

        FROM cnab_bradesco_primeiro
        WHERE substring(_c0, 8, 1) = '0'
    '''
    consulta_cnab_abertura = spark.sql(qyery_arq_1)

    data_aberura = consulta_cnab_abertura.collect()[0][1]


    # Procura dentro da tabela acima o saldo de abertura da conta
    query_lcto = '''
                SELECT 
                    substring(_c0, 8, 1) as Registro , 
                    substring(_c0, 59, 12) as Conta , 
                    substring(_c0, 151, 18) / 100 as Valor ,
                    substring(_c0, 170, 3) as Categoria ,
                    substring(_c0, 169, 1) as Tipo,
                    to_date(substring(_c0, 143, 8), 'ddMMyyyy') as Data_lancamento

                    FROM cnab_bradesco_primeiro 
                    WHERE substring(_c0, 8, 1) = '5' 
                    AND substring(_c0, 59, 12) = 8230
                '''

    consulta_cnab = spark.sql(query_lcto)    

    saldo_conta_abertura = consulta_cnab.collect()[0][2]
    
    return saldo_conta_abertura, data_aberura, arq_primeiro


# Executa 1x a consulta, depois o valor fica fixo até o dia seguinte

saldo_conta_abertura_g = primeiro_arquivo()[0]
data_aberura_g = primeiro_arquivo()[1]


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:

#### SALDO e APORTES DO ULTIMO ARQUIVO 

import pyspark.sql.functions as F 

def ultimo_arquivo_saldos_aportes():
    
    # captura o ultimo arquivo
    arq_ultimo = lista_arquivo().iloc[0][0]

    # Abre o arquivo na AWS- S3 e coloca em uma tabela
    arq_cnab_ultimo = spark.read.format('csv').option('header', 'false').load(f'bucket/{arq_ultimo}')
    arq_cnab_ultimo.createOrReplaceTempView('cnab_bradesco')


    # Procura na tabela a linha '0' (zero) e traz a data e hora que gerou o arquivo
    #  a data inicial passa a ser essa data/hora
    qyery_arq = '''
        SELECT 
        substring(_c0, 8, 1) as Registro ,
        concat(to_date(substring(_c0, 144, 8), 'ddMMyyyy'), " ",substring(_c0, 152, 2), ":" , substring(_c0, 154, 2), ":", substring(_c0, 156, 2)) as Data_Hora_Arquivo_str

        FROM cnab_bradesco
        WHERE substring(_c0, 8, 1) = '0'
    '''
    consulta_cnab = spark.sql(qyery_arq)
    
    data_inicio = consulta_cnab.collect()[0][1]


    # Procura dentro da tabela acima o saldo da conta 8230
    #  atribui o saldo da conta na variavel valor
    query_lcto = '''
                SELECT 
                    substring(_c0, 8, 1) as Registro , 
                    substring(_c0, 59, 12) as Conta , 
                    substring(_c0, 151, 18) / 100 as Valor ,
                    substring(_c0, 170, 3) as Categoria ,
                    substring(_c0, 169, 1) as Tipo,
                    to_date(substring(_c0, 143, 8), 'ddMMyyyy') as Data_lancamento

                    FROM cnab_bradesco 
                    WHERE substring(_c0, 8, 1) = '5' 
                    AND substring(_c0, 59, 12) = 8230
                '''

    consulta_cnab = spark.sql(query_lcto)   

    saldo_conta = consulta_cnab.collect()[0][2]
    
    ## APORTES REALIZADOS
    
    query_lcto = '''
            SELECT 
                substring(_c0, 8, 1) as Registro , 
                substring(_c0, 59, 12) as Conta , 
                substring(_c0, 151, 18) / 100 as Valor ,
                substring(_c0, 169, 1) as Tipo,
                substring(_c0, 170, 3) as Cod_Lcto,
                to_date(substring(_c0, 143, 8), 'ddMMyyyy') as Data_lancamento

                FROM cnab_bradesco 
                WHERE substring(_c0, 8, 1) = '3' 
                AND substring(_c0, 59, 12) = 8230
                AND substring(_c0, 169, 1) = 'C'
                AND substring(_c0, 170, 3) = 209
            '''

    consulta_cnab_aporte = spark.sql(query_lcto)
    
    total_aporte = consulta_cnab_aporte.agg(F.sum("Valor")).collect()[0][0]    
    
    return saldo_conta, data_inicio, total_aporte

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:

# total liquidado no periodo entre o ult arquivo é a hora da msg

def total_liq_periodo():
    data_atual_str = datetime.now(pytz.timezone('America/Sao_Paulo')).strftime('%Y-%m-%d %H:%M:%S')        # pega a data/hora atual e ajusta o timezone
    data_hoje = datetime.now(pytz.timezone('America/Sao_Paulo')).strftime('%Y-%m-%d')

    data_inicio = ultimo_arquivo_saldos_aportes()[1]

    query = f'''
            SELECT para consultar o total liquidado no dia dentro do sistema'
         '''
    consulta = spark.sql(query)                              # executa a query

    consulta_dia = consulta.groupby(to_date("data")).sum('valor')

    total_dia = consulta_dia.collect()[0][1]

    consulta_periodo = consulta.filter((consulta.data > data_inicio) & (consulta.data < data_atual_str) )

    total_periodo_df = consulta_periodo.groupby(to_date("data")).sum('amount')

    total_periodo = total_periodo_df.collect()[0][1]
    
    return total_periodo, total_dia   


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:


def montar_mensgem():

    saldo_conta_abertura = saldo_conta_abertura_g
    data_aberura = data_aberura_g
    saldo_conta = ultimo_arquivo_saldos_aportes()[0]
    data_inicio = ultimo_arquivo_saldos_aportes()[1]
    total_aporte = ultimo_arquivo_saldos_aportes()[2]
    total_liq = total_liq_periodo()[1]
    total_liq_dia = total_liq_periodo()[0]
    
    saldo_atual = saldo_conta - total_liq

    mensagem = f'''
    Bradesco - conta 8230-9\n
    Saldo Abertura do dia : R$ {saldo_conta_abertura:,.2f} em {data_aberura}\n\n
    Total de aportes: R$ {total_aporte:,.2f}\n    
    Total liquidado: R$ - {total_liq:,.2f}\n\n
    Saldo Atual: R$ {saldo_atual:,.2f}  
    ###########################
    '''
    
    return mensagem


In [26]:

# Envia a mensagem pelo slack

def enviar_mensagem():
    
    url = "para enviar a mensagem no salck"
    webhook = WebhookClient(url)
    
    response = webhook.send(text=montar_mensgem())
    assert response.status_code == 200
    assert response.body == "ok"
    




FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
enviar_mensagem()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…