In [1]:
import os, sys
sys.path.insert(0, '..')

In [2]:
import pandas as pd
from sqlalchemy import inspect, MetaData, Table, Column, Integer, String, Float, DateTime, Boolean, text, Index
from sqlalchemy.engine import Engine
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import exists

In [3]:
from fbpyutils import xlsx as XL, db as DB, file as FU

In [4]:
from sqlalchemy import create_engine

In [5]:
CEI_DB_URL = 'postgresql+psycopg2://fbnet:fbnet@localhost:5432/MyMoney'
# CEI_DB_URL = 'sqlite:///file:{}?mode=memory&cache=shared&uri=true'.format('DB_CEI')

In [6]:
CEI_CONNECTION = create_engine(CEI_DB_URL)

In [7]:
CEI_FILE_PATH = FU.build_platform_path('C:', '/mnt/c', ['Users','fcjbispo','Meu Drive','Finanças','Extratos & Faturas','CEI','zz_output','tb_stg_posicao_acoes.csv'])
CEI_FILE_PATH

'/mnt/c/Users/fcjbispo/Meu Drive/Finanças/Extratos & Faturas/CEI/zz_output/tb_stg_posicao_acoes.csv'

In [8]:
CEI_DATAFRAME=pd.read_csv(CEI_FILE_PATH, sep='|')

In [10]:
CEI_DATAFRAME.head(3)

Unnamed: 0,codigo_produto,nome_produto,instituicao,codigo_isin,tipo_produto,escriturador,quantidade,quantidade_disponivel,quantidade_indisponivel,motivo,preco_unitario,valor_operacao,arquivo_origem,data_referencia
0,ABEV3,ABEV3 - AMBEV S.A.,CLEAR CORRETORA - GRUPO XP,BRABEVACNOR1,ON,BANCO BRADESCO S/A,118.0,118.0,,-,18.05,2129.9,posicao,2019-11-29 17:38:36
1,ALUP11,ALUP11 - ALUPAR INVESTIMENTO S/A,INTER DTVM LTDA,BRALUPCDAM15,UNIT,-,88.0,88.0,,-,26.0,2288.0,posicao,2019-11-29 17:38:36
2,BBSE3,BBSE3 - BB SEGURIDADE PARTICIPAÇÕES S.A.,CLEAR CORRETORA - GRUPO XP,BRBBSEACNOR5,ON,BANCO DO BRASIL S/A,61.0,61.0,,-,34.45,2101.45,posicao,2019-11-29 17:38:36


In [11]:
CEI_DATAFRAME.dtypes

codigo_produto              object
nome_produto                object
instituicao                 object
codigo_isin                 object
tipo_produto                object
escriturador                object
quantidade                 float64
quantidade_disponivel      float64
quantidade_indisponivel    float64
motivo                      object
preco_unitario             float64
valor_operacao             float64
arquivo_origem              object
data_referencia             object
dtype: object

In [14]:
schema, table_name, keys, connection, index, dataframe = 'cei', 'posicao_acoes', ['codigo_produto', 'instituicao', 'data_referencia'], CEI_CONNECTION, True, CEI_DATAFRAME

In [15]:
DB.create_table_from_dataframe(dataframe, 'posicao_acoes', connection, 'cei', keys, True)

In [50]:
conn = connection.connect()

In [16]:
# Get the table object
metadata = MetaData(schema)
table = Table(table_name, metadata, autoload_with=connection)

In [17]:
insert_count = 0
update_count = 0
failure_count = 0

In [18]:
operation = 'upsert'

In [64]:
for k in keys:
    values[f'k_{k}'] = values[k]

values

{'codigo_produto': 'BCRI12',
 'nome_produto': 'BCRI12 - BANESTES RECEBÍVEIS IMOBILIÁRIOS FDO INV IMOB FII',
 'instituicao': 'VITREO DTVM S.A.',
 'codigo_isin': 'BRBCRID07M18',
 'tipo_produto': 'Direito',
 'escriturador': 'BRL TRUST DTVM S.A',
 'quantidade': 1.0,
 'quantidade_disponivel': 1.0,
 'quantidade_indisponivel': nan,
 'motivo': '-',
 'preco_unitario': 0.0,
 'valor_operacao': 0.0,
 'arquivo_origem': 'posicao',
 'data_referencia': '2020-11-30 17:51:24',
 'k_codigo_produto': 'BCRI12',
 'k_instituicao': 'VITREO DTVM S.A.',
 'k_data_referencia': '2020-11-30 17:51:24'}

In [66]:
values

{'codigo_produto': 'ABEV3',
 'nome_produto': 'ABEV3 - AMBEV S.A.',
 'instituicao': 'CLEAR CORRETORA - GRUPO XP',
 'codigo_isin': 'BRABEVACNOR1',
 'tipo_produto': 'ON',
 'escriturador': 'BANCO BRADESCO S/A',
 'quantidade': 118.0,
 'quantidade_disponivel': 118.0,
 'quantidade_indisponivel': nan,
 'motivo': '-',
 'preco_unitario': 18.05,
 'valor_operacao': 2129.9,
 'arquivo_origem': 'posicao',
 'data_referencia': '2019-11-29 17:38:36'}

In [69]:
if operation == 'upsert':
    with connection.begin() as trans:
        for _, row in dataframe.iterrows():
            values = {col: row[col] for col in dataframe.columns}
            # Check if row exists in the table based on keys
            exists_query = table.select().where(
                exists(
                    table.select().where(
                        text(' AND '.join([f"{col} = :{col}" for col in keys]))
                    )
                )
            ).params(**values)
            if conn.execute(exists_query).fetchone():
                # Update params
                params = {
                    values[f'{k}']: values[k]
                    for k in keys
                }
                # Perform update
                update_stmt = table.update().where(
                    text(' AND '.join([f"{col} = :{col}" for col in keys]))
                ).params(**params)
                conn.execute(update_stmt.values(**values))
                update_count += 1
            else:
                # Perform insert
                try:
                    insert_stmt = table.insert().values(**values)
                    conn.execute(insert_stmt)
                    insert_count += 1
                except IntegrityError:
                    # Handle failure due to unique constraint violation
                    failure_count += 1
                    continue
        trans.commit()

NotImplementedError: params() is not supported for INSERT/UPDATE/DELETE statements. To set the values for an INSERT or UPDATE statement, use stmt.values(**parameters).

In [None]:
connection.engine.engine.exe