Overview

Goal: Send PIX data stored in Google Drive as .zip file to DWH
    
Steps:
1. Import libraries
2. Create dfs to be inserted into PostgreSQL:    
3. Insert df into PostgreSQL table
4. Move files to parent folder
5. Send message in brbank_recon_notify with daily PIX recon

# Import libraries

In [1]:
import pandas as pd              # create dfs
import numpy as np               # transform data
#import glob                      # union all files in one df
import psycopg2                  # connect to postgres
import psycopg2.extras as extras # insert df into postgres
import os                        # get path
import json                      # used to login in PostgreSQL
import sys                       # install libs
#import shutil                    # to move file from one folder to one level up after inserting data
from pydrive2.auth import GoogleAuth # authenticate google
from pydrive2.drive import GoogleDrive # use google drive methods
import datetime
import zipfile

from postgres_funcs import *
from pix_funcs import *
#from slackbot_funcs import *

%load_ext autoreload
%autoreload 2

# Create dfs to be inserted into PostgreSQL

## Append all data from files in a Google Drive folder in one single df

### Establish Google Drive authentication

In [2]:
gauth = GoogleAuth()
gauth.LocalWebserverAuth() # Creates local webserver and auto handles authentication.
#gauth.CommandLineAuth()
drive = GoogleDrive(gauth)


Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?client_id=131180798743-f0496t9sf5v6i6t7geeahu6fjvbua1u1.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8080%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive&access_type=online&response_type=code

Authentication successful.


### Set initial configs

In [3]:
## Set variables
team_drive_id = '0AMWVBx0Ntw8tUk9PVA'                      # team_drive_id to find files
pix_to_dwh_folder_id = '1PD2cMY4IsS_XIVe9k32N_PryYyy6l5Xr' # folder where PIX .zip files should be to be sent to DWH
df_list = []                                               # df list with data from files

### Create file list with csvs in to_dwh folder that will turn into df

In [4]:
## Create file list with csvs that should turn into df
# Note that specific folder "to_dwh" id is specified as pix_to_dwh_folder_id 
file_list = drive.ListFile({'q': "'%s' in parents and trashed=false" % pix_to_dwh_folder_id, 
                            'corpora': 'teamDrive', 
                            'teamDriveId': team_drive_id, 
                            'includeTeamDriveItems': True,
                            'supportsTeamDrives': True
                           }).GetList()

for file in file_list:
    print('file:',file['title'])

file: JDPI_RELATORIO_FINANCEIRO_07022022_065117.zip


In [5]:
# Get all folders in Google Drive
drive_folders = drive.ListFile({'q':"mimeType='application/vnd.google-apps.folder' and trashed=false", 
                                    'corpora': 'teamDrive', 
                                    'teamDriveId': team_drive_id, 
                                    'includeTeamDriveItems': True, 
                                    'supportsTeamDrives': True
                                 }).GetList()
print('Folders:')
for num, folder in enumerate(drive_folders, start=1):
    print(num,'', folder['title'])

Folders:
1  previous
2  23 Open Banking
3  Stone
4  Pags
5  Banco Inter
6  Mercado Pago
7  txt
8  csv
9  Opt In
10  3. Opt In 2021 (new flow)
11  2. Account Creation 2022
12  Provisões
13  2022-02
14  202202
15  to_dwh
16  202202
17  to_dwh
18  to_dwh
19  Pagamentos Confirmados
20  NDs
21  Devolução de Pagamento
22  202202
23  to_dwh
24  02.2022
25  to_dwh
26  02.2022
27  2022-02 incentive amendment
28  853
29  645
30  2. Fevereiro
31  JD
32  Extrato diário 645 e 853 - 2021
33  Epay Regarga
34  01 Recon
35  00 Mismatches
36  12 TED-In/TED-Out
37  11 Mobile Topup
38  10 Bill Payments
39  09 Card Activation
40  Jan.2022
41  Opt In
42  Pix Home
43  videos
44  Q1
45  2022
46  12-2021
47  853
48  645
49  1. Janeiro
50  2021
51  2022
52  Provisões
53  to_dwh
54  01.2022
55  to_dwh
56  01.2022
57  2022-01
58  202201
59  to_dwh
60  202201
61  to_dwh
62  to_dwh
63  Pagamentos confirmados
64  Devolução de pagamento
65  ND
66  202201
67  String - V2, V3, V4
68  Dec.2021
69  Pix - Limits Data
70  

1147  Happy 50
1148  simple_recon_report-master
1149  simple_recon_report-master
1150  Annual Report SumUp UK Traduzido 2017
1151  Annual Report SumUp UK Traduzido 2016
1152  Annual Report SumUp UK Traduzido 2018
1153  Application
1154  Assetz
1155  Hub NDs
1156  Hub CSVs
1157  Xsfera
1158  Homologation
1159  20 Studies
1160  Macrodiagrama 1.0
1161  LGPD
1162  3rd batch
1163  BTG
1164  Certificado
1165  Valid
1166  Santander
1167  _old
1168  old
1169  SCD
1170  Versão Roberta
1171  HUB Fintech
1172  Afiliação Visa
1173  License
1174  2nd batch
1175  1st batch
1176  Amex
1177  Elo
1178  Hiper
1179  financials
1180  Focus Group Acesso Merchants
1181  License
1182  Master
1183  Conductor
1184  CIP - Mastercard
1185  Pismo
1186  Matera
1187  CIP
1188  Manuais RTM SLC
1189  RTM
1190  Drawings
1191  Drawings
1192  Acesso
1193  Visa
1194  User Interviews
1195  Manuais Cards VISA
1196  User Interviews & Testing
1197  SumUp Docs
1198  BBNK
1199  Dock
1200  07 Banking Partners
1201  Tech
1202  C

In [6]:
## Overall:
## 1. Download .zip file, extract files in temp folder, delete .zipfile, move to parent folder in Google Drive
## 2. Go to temp folder and create a list of extracted files
## 3. Upload all files extracted to Google Drive

## 1. Download .zip file, extract files in temp folder, delete .zipfile, move to parent folder in Google Drive
for file in file_list: 
    
    # Create file in RAM
    fileDownloaded = drive.CreateFile({'id':file['id'],
                                   'corpora': 'teamDrive', 
                                    'teamDriveId': team_drive_id, 
                                    'includeItemsFromAllDrives': True,
                                    'supportsAllDrives': True
                                   })
    
    # Download file to current directory
    print('Downloading', file['title'])
    fileDownloaded.GetContentFile(file['title'])
    
    # Extract zip file to temp directory
    print('Extracting', file['title'])
    with zipfile.ZipFile(os.path.join(os.getcwd(),file['title']), 'r') as zip_ref:
        zip_ref.extractall(os.path.join(os.getcwd(), 'temp'))
    
    # Delete downloaded file
    print('Deleting', file['title'])
    os.remove(file['title'])
    
    # Get file folder_id
    folder_id = file['parents'][0]['id']
    
    for folder in drive_folders:

        # get the parent_folder_id 
        if (folder['id'] == folder_id):
            parent_folder_id = folder['parents'][0]['id']
            break

   # Change current folder to parent_folder
    file['parents'] = [{
                        'id': parent_folder_id,
                      }]
    
    # Upload to parent folder
    file.Upload(param={'supportsTeamDrives':True}) 
    print('File moved to parent folder')

Downloading JDPI_RELATORIO_FINANCEIRO_07022022_065117.zip
Extracting JDPI_RELATORIO_FINANCEIRO_07022022_065117.zip
Deleting JDPI_RELATORIO_FINANCEIRO_07022022_065117.zip
File moved to parent folder


### Create df, append to df_list and delete files in temp

In [7]:
# Go to temp folder and create a list of extracted files
os.chdir('temp')

# Create list of extracted files
extracted_file_list = []
extracted_file_list = [f for f in os.listdir(os.getcwd()) if os.path.isfile(os.path.join(os.getcwd(), f))]

# Create df from files that name starts with JDPI_RELATORIO_FINANCEIRO and delete all files extracted
for file in extracted_file_list:
    if file.startswith('JDPI_RELATORIO_FINANCEIRO'):
        df=pd.read_csv(file,index_col=None,header=0,sep=';')
        
        # Append to df_list
        df_list.append(df)
        print(file, 'appended to df_list')
    
    else:
        pass 
    
    # Delete file
    os.remove(file)
    print('file: ', file, 'removed')

# Go back to parent folder
os.chdir('..')

JDPI_RELATORIO_FINANCEIRO_07022022_065117.csv appended to df_list
file:  JDPI_RELATORIO_FINANCEIRO_07022022_065117.csv removed
file:  SUMARIO_JDPI_RELATORIO_FINANCEIRO_07022022_065117.csv removed


  exec(code_obj, self.user_global_ns, self.user_ns)


### Union all df and rename columns

In [8]:
## Union all dataframes
df_all_data = pd.concat(df_list, axis=0, ignore_index=True)

## Rename columns
df_all_data = df_all_data.rename(columns={"Data Hora": "date_day",
                                            "Tipo Operação": "operation",
                                            "IdFimAFim": "end_to_end_id",
                                            "IdFimAFimOriginal": "end_to_end_id_original",
                                            "NumCtrlSTR": "str_pix_control_number",
                                            "D/C": "direction",
                                            "Valor": "amount",
                                            "PSP Recebedor": "receiver_psp",
                                            "Agência Recebedora": "receiver_branch_number",
                                            "Conta Recebedora": "receiver_account_number",
                                            "CPF/CNPJ Recebedora": "receiver_national_document_id",
                                            "Nome Cliente Recebedor": "receiver_name",
                                            "PSP Pagador": "sender_psp",
                                            "Agência Pagadora": "sender_branch_number",
                                            "Conta Pagadora": "sender_account_number",
                                            "CPF/CNPJ Pagador": "sender_national_document_id",
                                            "Nome Cliente Pagador": "sender_name",
                                            "Situação": "status",
                                            "Tipo Iniciação": "sender_document_kind"
                                        }
                                )
## Print first rows
df_all_data.head()

Unnamed: 0,date_day,operation,end_to_end_id,end_to_end_id_original,str_pix_control_number,direction,amount,CNPJ Iniciador Pagamento,receiver_psp,receiver_branch_number,receiver_account_number,receiver_national_document_id,receiver_name,sender_psp,sender_branch_number,sender_account_number,sender_national_document_id,sender_name,status,sender_document_kind
0,06/02/2022 02:59:53.461,Geral,E3724123020220206025938540991656,,,Debito,16.0,,360305.0,3880.0,12880000009701316725,51077280000.0,LUCAS DE ALMEIDA CASSEMIRO,37241230.0,1.0,9485729000.0,51077280000.0,Lucas De Almeida Cassimiro,Efetivado,Chave
1,06/02/2022 02:59:52.289,Geral,E60746948202202060259A6500cVkWAo,,,Credito,10.0,,37241230.0,1.0,5432111499,82483880000.0,,60746948.0,65.0,395811.0,5744527000.0,BARBARA TEREZA JORGE,Efetivado,Chave
2,06/02/2022 02:59:52.089,Geral,E00360305202202052359c9ca9ac1b8a,,,Credito,200.0,,37241230.0,1.0,8332560120,3864985000.0,,360305.0,1911.0,7981726000.0,2770410000.0,CLEIDE FERNANDES DA SILVA,Efetivado,Chave
3,06/02/2022 02:59:51.140,Geral,E3724123020220206025937055153471,,,Debito,30.0,,0.0,982.0,928917,37911130000000.0,FLYTOP - GESTAO DE CONTRO,37241230.0,1.0,7108296000.0,30760890000.0,tania maria filgueiras da silva,Efetivado,QrCode Dinamico
4,06/02/2022 02:59:27.459,Geral,E3724123020220206025909237248787,,,Debito,10.0,,416968.0,1.0,141617870,37217050000000.0,JORGE VILLAS BOAS MARTINS 92095690753,37241230.0,1.0,7733346000.0,17516530000.0,lucas cordeiro,Efetivado,Chave


In [18]:
filtered_df = df_all_data[(df_all_data['amount'].isnull())]
filtered_df.head()
            

Unnamed: 0,date_day,operation,end_to_end_id,end_to_end_id_original,str_pix_control_number,direction,amount,CNPJ Iniciador Pagamento,receiver_psp,receiver_branch_number,receiver_account_number,receiver_national_document_id,receiver_name,sender_psp,sender_branch_number,sender_account_number,sender_national_document_id,sender_name,status,sender_document_kind
6866,NaT,Efetivado,Chave,,,,,,,,,,,,,,,,,


## Transform and create columns 

In [9]:
# Convert date_transaction from object to datetime
df_all_data['date_day'] = pd.to_datetime(df_all_data['date_day'], format='%d/%m/%Y %H:%M:%S.%f')

# Convert amount from reais to cents ($)
df_all_data['amount'] = df_all_data['amount']*100

# Convert from float to int
df_all_data['amount'] = df_all_data['amount'].round(0).astype(int)

# Create inserted_at column
df_all_data['inserted_at'] = pd.Timestamp('now')

# Convert from float to int -> pandas version doesn't accept null values in int data type (allows as Int64 but then there is a problem when inserting data to PostgreSQL as null values comes as <NA> not identified as null value)
df_all_data['receiver_psp'] = df_all_data['receiver_psp'].round(0).fillna(0).astype(int)
df_all_data['receiver_branch_number'] = df_all_data['receiver_branch_number'].round(0).fillna(0).astype(int)
df_all_data['receiver_account_number'] = pd.to_numeric(df_all_data['receiver_account_number'],errors='coerce').round(0).fillna(0).astype(int)
format_national_document_id(df=df_all_data, col_name='receiver_national_document_id')
df_all_data['sender_psp'] = df_all_data['sender_psp'].round(0).fillna(0).astype(int)
df_all_data['sender_branch_number'] = df_all_data['sender_branch_number'].round(0).fillna(0).astype(int)
df_all_data['sender_account_number'] = pd.to_numeric(df_all_data['sender_account_number'],errors='coerce').round(0).fillna(0).astype(int)
format_national_document_id(df=df_all_data, col_name='sender_national_document_id')


# Insert positive and negative in amount column
df_all_data['amount'] = df_all_data.apply(convert_to_sided_amount,axis=1)

# Removing status = 'Pendente'
df_all_data = df_all_data[df_all_data['status'] != 'Pendente']

# Create pix_key column
df_all_data['pix_key'] = df_all_data.apply(create_pix_key,axis=1)

# Drop duplicated values
#df_all_data = df_all_data.drop_duplicates(keep='last')

# Defining columns in case JD add/remove columns and make sure only DWH cols are inserted
pix_cols = ['date_day',
           'operation',
           'end_to_end_id',
           'end_to_end_id_original',
           'str_pix_control_number',
           'direction',
            'amount',
            'receiver_psp',
            'receiver_branch_number',
            'receiver_account_number',
            'receiver_national_document_id',
            'receiver_name',
            'sender_psp',
            'sender_branch_number',
            'sender_account_number',
            'sender_national_document_id',
            'sender_name',
            'status',
            'sender_document_kind',
            'inserted_at',
            'pix_key'
           ]

df_all_data = df_all_data[pix_cols]

## Printing first rows
df_all_data.head()

#df_all_data['colE'] = df_all_data['colB'].astype(str) + '-' + df_all_data['colD']

IntCastingNaNError: Cannot convert non-finite values (NA or inf) to integer

# Insert df into PostgreSQL table

## Connect in sumup_dwh postgres database

In [None]:
# Import login credentials file
with open('postgres_login.json') as data_file:
    data = json.load(data_file)

user = data['user']
password =  data['password']

# Parameters to connect to postgres database
postgres_param_dic = {
    "host"      : "dwh.internal.sumup.com",
    "database"  : "sumup_dwh",
    "user"      : user,
    "password"  : password
}

## Insert data into recon_jd_transactions

In [None]:
%%time
# Connecting to the database
conn = connect_to_postgresql(params_dic=postgres_param_dic)

# Inserting jd data into recon_jd_transactions
insert_df_to_table(conn=conn, df=df_all_data, table='br_bank.recon_jd_transactions_raw')

# Close the connection
conn.close()

# Slackbot send daily recon

## Import slack configs

In [None]:
from slack import WebClient
from slack.errors import SlackApiError

In [None]:
# Import login credentials file
with open('slackbot_token.json') as data_file:
    slackbot_config = json.load(data_file)

bot_token = slackbot_config['token']

In [None]:
# Define yesterday date
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)

# Connect slackbot
client = WebClient(bot_token)

In [None]:
def send_slack_message(channel, text_message):
    try:
        response = client.chat_postMessage(
                                            channel = channel,
                                            text= text_message
                                            )
    except:
        pass

## Consolidate data

In [None]:
# Connect to PostgreSQL, create df and close connection
with connect_to_postgresql(params_dic=postgres_param_dic) as conn:
    df_slack = pd.read_sql_query(consolidated_pix_query, conn)

# Create date column
df_slack['date'] = df_slack['date_day'].apply(lambda x: x.date())

df_slack.head()

## Send message to brbank_recon_notify

In [None]:
# Send message: if it is Monday then send recon from Friday to Sunday, else send yesterday only
if today.weekday() == weekday_dic['Monday']:
    for count_day in range(3,0,-1):
        send_slack_message(channel='#brbank_recon_notify',                   
                           text_message = get_daily_pix_mismatch(df=df_slack,
                                                                 day=today-datetime.timedelta(days=count_day)
                                                                )
                          )
        
else:
    send_slack_message(channel='#brbank_recon_notify',
                       text_message = get_daily_pix_mismatch(df=df_slack,day=yesterday)
                      )

In [None]:
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib