# 🔌CONEXÃO
Estabelecer conexão com os bancos

In [1]:
import psycopg2
from psycopg2.extras import DictCursor
import fdb
from dotenv import load_dotenv
import os

load_dotenv()

# Conexões
cnx_dest = fdb.connect(
    user=os.getenv("FDB_USER"),
    password=os.getenv("FDB_PASS"),
    host=os.getenv("FDB_HOST"),
    port=int(os.getenv("FDB_PORT")),
    database=os.getenv("FDB_PATH"),
    charset="WIN1252"
)
cur_dest = cnx_dest.cursor()


cnx_orig = psycopg2.connect(
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASS"),
    host=os.getenv("PG_HOST"),
    database=os.getenv("PG_DB"),
    options="-c search_path={}".format(os.getenv("PG_SCHEMA"))
)
cnx_orig.autocommit = True
cur_orig = cnx_orig.cursor(cursor_factory=DictCursor)

def commit():
    cnx_dest.commit()

# 🛠️ FERRAMENTAS
Funções, variáveis cache, hashmaps

In [2]:
global cadest, empresa, exercicio
cadest = {}
empresa = cur_dest.execute("SELECT empresa FROM cadcli").fetchone()[0]
exercicio = '2025' #cur_dest.execute("SELECT mexer FROM cadcli").fetchone()[0]

def limpa_tabela(tabelas):
    for tabela in tabelas:
        cur_dest.execute(f"DELETE FROM {tabela}")
    commit()

def cria_coluna(tabela, coluna):
    try:
        cur_dest.execute(f"ALTER TABLE {tabela} ADD {coluna} VARCHAR(255)")
    except fdb.DatabaseError as e:
        print(f"Erro ao criar coluna {coluna} na tabela {tabela}: {e}")
    else:
        commit()

def cria_coluna(tabela, coluna):
    try:
        cur_dest.execute(f"ALTER TABLE {tabela} ADD {coluna} VARCHAR(255)")
    except fdb.DatabaseError as e:
        print(f"Erro ao criar coluna {coluna} na tabela {tabela}: {e}")
    else:
        commit()

def to_cp1252_safe(value, replace_with=''):
    """
    Converte uma string para cp1252 de forma segura.
    Remove ou substitui caracteres inválidos.

    :param value: valor a converter
    :param replace_with: string usada no lugar de caracteres inválidos (default = '')
    :return: valor limpo
    """
    if isinstance(value, str):
        try:
            # Tenta converter diretamente
            value.encode('cp1252')
            return value
        except UnicodeEncodeError:
            # Remove/substitui caracteres inválidos
            return value.encode('cp1252', errors='replace').decode('cp1252').replace('?', replace_with)
    return value

cur_dest.execute("SELECT codreduz, max(cadpro) FROM cadest group by 1")
if cur_dest.description is None:
    print("CADEST VAZIA!")
else:
    cadest = {k:v for k,v in cur_dest.execute("SELECT codreduz, max(cadpro) FROM cadest group by 1").fetchall()}


# 🏛️ PATRIMÔNIO
Extração, tratamento e carregamento dos dados referentes ao módulo patrimônio

## CADASTROS BASE

### TIPO DE MOVIMENTAÇÕES

In [3]:
limpa_tabela(("pt_tipomov",))

valores = {
    "A": "Aquisição",
    "B": "Baixa",
    "T": "Transferência",
    "R": "Procedimento Contábil",
    "P": "Transferência de Plano Contábil",
}

cur_dest.executemany("INSERT INTO PT_TIPOMOV (codigo_tmv, descricao_tmv) VALUES (?, ?)", valores.items())

commit()

### TIPO DE AJUSTE

In [4]:
limpa_tabela(("pt_cadajuste",))

cur_dest.execute(f"INSERT INTO PT_CADAJUSTE (CODIGO_AJU, EMPRESA_AJU, DESCRICAO_AJU) VALUES (1, {empresa}, 'REAVALIAÇÃO (ANTES DO CORTE)')")

commit()

### TIPOS DE BAIXA

In [5]:
limpa_tabela(("pt_cadbai",))

insert = cur_dest.prep("INSERT INTO PT_CADBAI (CODIGO_BAI, EMPRESA_BAI, DESCRICAO_BAI) VALUES (?, ?, ?)")

cur_orig.execute("select id, descricao from pinhalpm.sppmotivobaixa s")

for row in cur_orig:
    cur_dest.execute(insert, (row['id'], empresa, row['descricao']))
commit()

### CONSERVAÇÃO

In [6]:
limpa_tabela(("pt_cadsit",))

insert = cur_dest.prep("INSERT INTO PT_CADSIT (CODIGO_SIT, EMPRESA_SIT, DESCRICAO_SIT) VALUES (?, ?, ?)")

cur_orig.execute("select id, s.estadoconservacao  from sppconservaporcent s")

for row in cur_orig:
    cur_dest.execute(insert, (row['id'], empresa, row['estadoconservacao']))
commit()

### TIPOS DE BEM

In [7]:
limpa_tabela(("pt_cadtip",))

insert = cur_dest.prep("INSERT INTO PT_CADTIP (CODIGO_TIP, EMPRESA_TIP, DESCRICAO_TIP) VALUES (?, ?, ?)")

cur_orig.execute("select id, substr(nome,1,60) nome from pinhalpm.gcpespecificacao as g order by 1")

for row in cur_orig:
    cur_dest.execute(insert, (row['id'], empresa, row['nome']))
commit()

### UNIDADE E SUBUNIDADE

In [8]:
limpa_tabela(("pt_cadpats", "pt_cadpatd"))

insert = cur_dest.prep("INSERT INTO PT_CADPATD (EMPRESA_DES, CODIGO_DES, NAUNI_DES, OCULTAR_DES) values (?, ?, ?, 'N')")
insert_cadpats = cur_dest.prep(f"INSERT INTO PT_CADPATS (codigo_set, empresa_set, codigo_des_set, noset_set, ocultar_set) VALUES (?, {empresa}, ?, ?, ?)")

cur_orig.execute("select id, nome, pai, case when baixa is not null then 'S' else 'N' end ocultar from cadlocalfisico c order by 3 desc, 1 asc")
    
for row in cur_orig:
    if row['pai'] is None:
        cur_dest.execute(insert, (empresa, row['id'], row['nome'], row['ocultar']))
        commit()
    else:
        cur_dest.execute(insert_cadpats, (row['id'], row['pai'], row['nome'], row['ocultar']))
commit()

### GRUPO

In [9]:
limpa_tabela(("pt_cadpatg",))

cur_dest.execute(f"INSERT INTO PT_CADPATG (CODIGO_GRU, EMPRESA_GRU, NOGRU_GRU) VALUES (1, {empresa}, 'MÓVEIS')")
cur_dest.execute(f"INSERT INTO PT_CADPATG (CODIGO_GRU, EMPRESA_GRU, NOGRU_GRU) VALUES (2, {empresa}, 'IMÓVEIS')")

commit()

## BENS

In [10]:
limpa_tabela(("pt_cadpat",))

dict_conservacoes = {k:v for k, v in cur_dest.execute("SELECT descricao_sit, codigo_sit FROM pt_cadsit").fetchall()}
dict_codigos_cpl = {k:v for k, v in cur_dest.execute("SELECT titco, balco FROM CONPLA_TCE WHERE balco STARTING '123'").fetchall()}

insert = cur_dest.prep(f"""
insert into pt_cadpat (
    codigo_pat,
    empresa_pat,
    codigo_gru_pat,
    chapa_pat,
    codigo_cpl_pat,
    codigo_set_pat,
    codigo_set_atu_pat,
    orig_pat,
    codigo_tip_pat,
    codigo_sit_pat,
    discr_pat,
    datae_pat,
    dtlan_pat,
    valaqu_pat,
    valatu_pat,
    codigo_for_pat,
    percenqtd_pat,
    dae_pat,
    valres_pat,
    percentemp_pat,
    nempg_pat,
    anoemp_pat,
    dtpag_pat,
    hash_sinc,
    codigo_bai_pat)
VALUES(?,{empresa},1,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""")

cur_orig.execute("""
select
	a.id codigo_pat,
	case when b.codigocompleto like '002%' then 2 else 1 end codigo_gru_pat,
	to_char(a.codigoreduzido, 'fm000000') chapa_pat,
	b.nome nome_cpl,
	c.codigo_set_pat,
	a.localfisicoatual codigo_set_atu_pat,
	orig_pat,
	a.especificacao codigo_tip_pat,	
	a.estadoincorporacao nome_sit,
	substr(a.descricao, 1, 255) discr_pat,
	a.tombamento datae_pat,
	a.inicioatividade dtlan_pat,
	a.valorincorporacao valaqu_pat,
	a.valoratual valatu_pat,
	a.fornecedor codigo_for_pat,
	a.vidautil,
	case when a.vidautil is not null then 'V' else 'N' end dae_pat,
	a.valorincorporacao*(a.percvlrresidual/100) valres_pat,
	'M' percentemp_pat,
	e.datamovimentacao dtpag_pat,
	e.motivo_baixa codigo_bai_pat,
    f.empenho,
	f.anoempenho
from
	sppbempatrimonial a
left join gcpespecificacao b on a.especificacao = b.id
left join (with cte as (select svt.bempatrimonial, coalesce(localorigem, localdestino) codigo_set_pat, row_number() over (partition by bempatrimonial order by id asc) rn from spp_vw_transf svt
	) select * from cte where rn = 1) c on c.bempatrimonial = a.id
left join (select
		bempatrimonial,
		case when substring(tipo_incorporacao,1,1) = 'L' then 'I'
		when substring(tipo_incorporacao,1,1) = 'A' then 'C'
		else substring(tipo_incorporacao,1,1) end orig_pat
	from
		spp_vw_incorporacoes) d on d.bempatrimonial = a.id
left join (select datamovimentacao, motivo_baixa, bempatrimonial from spp_vw_baixas) e
on e.bempatrimonial = a.id
left join (select bempatrimonial, empenho, anoempenho from spp_vw_bemorigem) f on
a.id = f.bempatrimonial
order by 3
""")

for row in cur_orig:
    codsit = dict_conservacoes.get(row['nome_sit'], 1)
    codigo_cpl_pat = dict_codigos_cpl.get(row['codigo_pat'], None)

    cur_dest.execute(insert, (
        row['codigo_pat'],
        row['chapa_pat'],
        codigo_cpl_pat,
        row['codigo_set_pat'],
        row['codigo_set_atu_pat'],
        row['orig_pat'],
        row['codigo_tip_pat'],
        codsit,
        row['discr_pat'],
        row['datae_pat'],
        row['dtlan_pat'],
        row['valaqu_pat'],
        row['valatu_pat'],
        row['codigo_for_pat'],
        row['vidautil'],
        row['dae_pat'],
        row['valres_pat'],
        row['percentemp_pat'],
        row['empenho'],
        row['anoempenho'],
        row['dtpag_pat'],
        row['codigo_pat'],
        row['codigo_bai_pat']
    ))
commit()

### MOVIMENTAÇÃO

In [11]:
limpa_tabela(("pt_movbem",))
cria_coluna("pt_movbem", "codigo_set_ant_mov")

insert = cur_dest.prep("""
INSERT
	INTO
		pt_movbem (codigo_mov,
		empresa_mov,
		codigo_pat_mov,
		data_mov,
		tipo_mov,
		codigo_set_mov,
		historico_mov,
		codigo_cpl_mov,
		codigo_bai_mov,
		valor_mov,
		depreciacao_mov,
		codigo_set_ant_mov,
		dt_contabil,
		lote_mov)
	VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""")

cur_orig.execute("""
select
	id_movbem codigo_mov,
	a.bempatrimonial codigo_pat_mov,
	datamovimentacao::date data_mov,
	'A' tipo_mov,
	b.codigo_set_pat codigo_set_mov,
	historico,
	null codigo_cpl,
	null codigo_bai,
	valor valor_mov,
	'N' depreciacao_mov,
	null codigo_set_ant_mov,
	datamovimentacao::date dt_contabil,
	id lote_mov
from
	spp_vw_incorporacoes a
left join (with cte as (select svt.bempatrimonial, coalesce(localorigem, localdestino) codigo_set_pat, row_number() over (partition by bempatrimonial order by id asc) rn from spp_vw_transf svt
	) select * from cte where rn = 1) b on b.bempatrimonial = a.bempatrimonial
union all
select
	id_movbem,
	bempatrimonial,
	datamovimentacao::date,
	'T',
	svt.localdestino,
	historico,
	null,
	null,
	0,
	'N',
	svt.localorigem,
	datamovimentacao::date,
	id
from
	spp_vw_transf svt
union all
select
	id_movbem,
	bempatrimonial,
	datamovimentacao::date,
	'R',
	0,
	coalesce(historico, tipo_alteracao),
	null,
	null,
	valorfinal-valorinicial,
	'N',
	null,
	datamovimentacao::date,
	id
from
	spp_vw_alteracaovalores a
union all
select
	id_movbem,
	a.bempatrimonial,
	datamovimentacao::date,
	'B',
	codigo_set_pat,
	historico,
	null,
	null,
	-valorbaixa,
	'N',
	null,
	datamovimentacao::date,
	id
from
	spp_vw_baixas a 
left join (with cte as (select svt.bempatrimonial, coalesce(localorigem, localdestino) codigo_set_pat, row_number() over (partition by bempatrimonial order by id asc) rn from spp_vw_transf svt
	) select * from cte where rn = 1) b on b.bempatrimonial = a.bempatrimonial
order by 1
""")

batch = []
batch_size = 10000
for row in cur_orig:
	batch.append((
		row['codigo_mov'],
		empresa,
		row['codigo_pat_mov'],
		row['data_mov'],
		row['tipo_mov'],
		row['codigo_set_mov'],
		to_cp1252_safe(row['historico']),
		row['codigo_cpl'],
		row['codigo_bai'],
		row['valor_mov'],
		row['depreciacao_mov'],
		row['codigo_set_ant_mov'],
		row['dt_contabil'],
		row['lote_mov']
	))
	if len(batch) >= batch_size:
		cur_dest.executemany(insert, batch)
		batch = []
		commit()
if batch:
	cur_dest.executemany(insert, batch)
commit()