<a href="https://colab.research.google.com/github/Marcelo-Ferraz-de-Oliveira/Controle-aplicacoes-financeiras/blob/main/Extrator_nota_corretagem.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#Este módulo extrai as operaçãoes das notas de corretagem
#Suporta atualmente as corretoras:
#Clear
#XP
#Genial

!pip install tabula-py
import tabula
import pandas as pd
import json



In [2]:
def str_to_br_currency(string):
  #Write description
  if type(string) != type('string'):
    raise ValueError('Não foi passada uma string para conversão em moeda!')
  string = string[::-1]
  string = string.replace('.','')
  string = string.replace(',','.',1)
  string = string.replace(',','')
  string = string[::-1]
  return float(string)

def us_currency_to_float(n):
  #Write description
  #Números com 2 ou mais pontos (pandas vai converter pra string)
  if type(n) == str:
    return float(n.replace('.',''))
  #Números com 1 ponto
  elif n == int(n):
    return float(n)
  else: 
    return float(n*1000)

def fix_sep_negocios(df):
  word_df = df.copy()
  word_df['Preço'] = word_df['Preço'].apply(str_to_br_currency)
  word_df['Valor Operação'] = word_df['Valor Operação'].apply(str_to_br_currency)
  word_df['Quantidade'] = word_df['Quantidade'].apply(us_currency_to_float)
  return word_df

def fix_sep_custos(df):
#Write description
  word_df = df.copy()
  word_df['Valor'] = word_df['Valor'].apply(str_to_br_currency)
  return word_df

In [3]:
def definir_corretora(file_to_open, passwd):
# Write description
# Verifica qual o nome da corretora no cabeçalho da nota
  area_header = [0,0,28,50]
  df_corretora = json.dumps(tabula.io.read_pdf(file_to_open, pages = 1, area=area_header, stream = True, relative_area= True, password = passwd, output_format='json'))
  if (df_corretora.find('genial') != -1):
    return 'genial'
  elif (df_corretora.find('xp') != -1):
    return 'xp'
  elif (df_corretora.find('clear') != -1):
    return 'clear'
  raise ValueError("Corretora não suportada!")



In [7]:
#Ativa as mensagens na console (dados da nota durante o processamento)
debug = False

def integrate(datas, negocios, custos):
# """
# Integra cada data com seus respectivos negócios e planilhas de custos
# """
  if not len(datas) == len(negocios) == len(custos):
    raise ValueError("Datas, negócios e custos em quantidades diferentes. Os dados não foram extraídos corretamente!")  
  var = []
  for n in range(0,len(datas)):
    if debug: display([datas[n],negocios[n], custos[n]])
    var.append([datas[n],negocios[n], custos[n]])
  return var

def extrair_data(file_to_open, passwd, formato, corretora, area_datas):
# """
# Write description
# """  

  #Extração do número da nota, folha e data
  df_datas = tabula.io.read_pdf(file_to_open, stream = True, area=area_datas, pages = 'all',relative_area= True, password = passwd)
  if debug: display(df_datas)
  #Correção e mudança de nome dos campos
  df_datas_c = []
  for content in df_datas:
    if 'Unnamed: 0' in content.columns:
      content = content[['Nr. nota','Unnamed: 0','Data pregão']]
    content.columns = ['Nr. nota','Folha','Data']
    content['Corretora'] = corretora
    content['id'] = content.index    
    if formato == 'json': content = content.to_dict(orient='records')
    df_datas_c.append(content)
  if debug: display(df_datas_c)
  return df_datas_c

def extrair_negocios(file_to_open, passwd, formato, corretora, area_negocios):
# """
# Write description
# """  
  #Extração dos negócios realizados
  df_negocios = tabula.io.read_pdf(file_to_open, stream = True, area=area_negocios, pages = 'all',relative_area= True, password = passwd)
  if debug: display(df_negocios)
  #Exclui associações incorretas (NANs) e colunas desnecessárias, renomeia colunas e corrige os separadores numéricos
  df_negocios_c = []
  for content in df_negocios:
    content = content.dropna(axis=1, how='any')
    if content['Tipo mercado'][0] == 'OPCAO DE COMPRA':
      content = content.drop(['Prazo','Unnamed: 0'], axis = 1)
      content.columns = ['Negociação','C/V','Tipo de mercado','Papel','Quantidade','Preço','Valor Operação','D/C']
    elif content['Tipo mercado'][0] == 'VISTA':
      content.columns = ['Negociação','C/V','Tipo de mercado','Papel','Quantidade','Preço','Valor Operação','D/C']
    else: continue
    content['id'] = content.index
    content = fix_sep_negocios(content)
    if formato == 'json': content = content.to_dict(orient='records')
    df_negocios_c.append(content)
  return df_negocios_c

def extrair_custos(file_to_open, passwd, formato, corretora, area_custos, columns_custos, campos):
# """
# Write description
# """  
    #Extração da planilha de custos da nota
    df_custos = tabula.io.read_pdf(file_to_open, stream = True, area=area_custos, pages = 'all',relative_area= True, password = passwd, columns=columns_custos)
    if debug: display(df_custos)
    #Seleção dos campos necessários, mudança de nome e inclusão de valores faltantes
    df_custos_c = []
    for content in df_custos:
      campos_custos= ((content[campos[0]:campos[1]],content[campos[2]:campos[3]],content[campos[4]:campos[5]]))
      content = pd.concat(campos_custos)
      content.columns=['Custo','Valor','C/D']
      content['Valor'] = content['Valor'].fillna('0')
      content['C/D'] = content['C/D'].fillna('N/A')
      content['id'] = content.index
      content = fix_sep_custos(content)
      if formato =='json': content = content.to_dict(orient='records')
      df_custos_c.append(content)
    return df_custos_c

def extrair_dados(file_to_open, passwd, formato = 'dataframe'):
# """
# Write description
# """ 
#Função principal que retorna uma lista de dataframes ou jsons com
#planilha de cabeçalho, negócios e custos de cada nota de corretagem 

#Verifica qual a corretora antes de continuar
  corretora = definir_corretora(file_to_open, passwd)
  if debug: print(corretora)

#Seleciona as áreas de análise do PDF para cada corretora
  if corretora in ('clear','xp'):
    area_datas = [0,70,8,100]
    area_negocios = [28,0,53,100]
    area_custos = [53,50,95,100]
    colunas_custos=[450,550,600]
    campos_custos = [2,4,6,9,12,18]
  elif corretora == 'genial':
    area_datas = [0,70,8,100]
    area_negocios = [25,0,53,100]
    area_custos = [53,50,85,100]
    colunas_custos=[450,560,800]
    campos_custos = [2,4,6,9,11,16]
  else:
    raise ValueError("Corretora não suportada"+corretora)


#Extração
  df_datas_c = extrair_data(file_to_open, passwd, formato, corretora, area_datas)
  df_negocios_c = extrair_negocios(file_to_open,passwd, formato, corretora, area_negocios)
  df_custos_c = extrair_custos(file_to_open,passwd, formato, corretora, area_custos, colunas_custos, campos_custos)
  
  return (integrate(df_datas_c, df_negocios_c,df_custos_c))


In [8]:
#TESTES UNITÁRIOS

import unittest


class TestExtrator_nota_corretagem(unittest.TestCase):
  def test_str_to_br_currency(self):
    self.assertEqual(str_to_br_currency('1.432,00'),1432.0)
    self.assertEqual(str_to_br_currency('600,00'),600.0)
    self.assertEqual(str_to_br_currency('500,32'),500.32)
    self.assertEqual(str_to_br_currency('1.432,32'),1432.32)
    with self.assertRaises(ValueError):
      str_to_br_currency(1432)

unittest.main(argv=[''], verbosity=3, exit=False)

#tests
# display(us_currency_to_float(100.0))
#display(us_currency_to_float(1.8))
# display(us_currency_to_float('1.100.0'))
# display(us_currency_to_float('1.000.000'))

#tests
#definir_corretora(file_to_open,passwd)


test_str_to_br_currency (__main__.TestExtrator_nota_corretagem) ... ok

----------------------------------------------------------------------
Ran 1 test in 0.004s

OK


<unittest.main.TestProgram at 0x7fa7b73a1ad0>

In [9]:
#TESTE DE INTEGRAÇÃO

#Testa a extração com notas de cada uma das corretoras e uma nota com múltiplas páginas
files = ["/content/nota-de-corretagem-clear.pdf",
         '/content/nota-de-corretagem-xp.pdf',
         '/content/nota-de-corretagem-genial.pdf',
         '/content/nota-de-corretagem-clear-multiplas-paginas.pdf'
         ]
passwd = '007'

df_negocios = []
for file in files:
  df_negocio = extrair_dados(file, passwd, formato='json')
  df_negocios.append (df_negocio)

display(df_negocios)
with open('/content/resultado.json','w',encoding='utf-8') as f:
  json.dump(df_negocios, f)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


[[[[{'Corretora': 'clear',
     'Data': '25/01/2022',
     'Folha': 1,
     'Nr. nota': 899105,
     'id': 0}],
   [{'C/V': 'V',
     'D/C': 'C',
     'Negociação': '1-BOVESPA',
     'Papel': 'IRBRB330ON 3,30',
     'Preço': 0.13,
     'Quantidade': 2600.0,
     'Tipo de mercado': 'OPCAO DE COMPRA',
     'Valor Operação': 338.0,
     'id': 0}],
   [{'C/D': 'D', 'Custo': 'Taxa de liquidação', 'Valor': 0.09, 'id': 2},
    {'C/D': 'D', 'Custo': 'Taxa de Registro', 'Valor': 0.23, 'id': 3},
    {'C/D': 'D', 'Custo': 'Taxa de termo/opções', 'Valor': 0.0, 'id': 6},
    {'C/D': 'D', 'Custo': 'Taxa A.N.A.', 'Valor': 0.0, 'id': 7},
    {'C/D': 'D', 'Custo': 'Emolumentos', 'Valor': 0.12, 'id': 8},
    {'C/D': 'D', 'Custo': 'Taxa Operacional', 'Valor': 0.0, 'id': 12},
    {'C/D': 'N/A', 'Custo': 'Execução', 'Valor': 0.0, 'id': 13},
    {'C/D': 'N/A', 'Custo': 'Taxa de Custódia', 'Valor': 0.0, 'id': 14},
    {'C/D': 'N/A', 'Custo': 'Impostos', 'Valor': 0.0, 'id': 15},
    {'C/D': 'N/A',
     'Custo