# 🔌CONEXÃO
Estabelecer conexão com os bancos

In [14]:
import psycopg2
from psycopg2.extras import DictCursor
import fdb
from dotenv import load_dotenv
import os

load_dotenv()

# Conexões
cnx_dest = fdb.connect(
    user=os.getenv("FDB_USER"),
    password=os.getenv("FDB_PASS"),
    host=os.getenv("FDB_HOST"),
    port=int(os.getenv("FDB_PORT")),
    database=os.getenv("FDB_PATH"),
    charset="WIN1252"
)
cur_dest = cnx_dest.cursor()


cnx_orig = psycopg2.connect(
    user=os.getenv("PG_USER"),
    password=os.getenv("PG_PASS"),
    host=os.getenv("PG_HOST"),
    database=os.getenv("PG_DB"),
    options="-c search_path={}".format(os.getenv("PG_SCHEMA"))
)
cnx_orig.autocommit = True
cur_orig = cnx_orig.cursor(cursor_factory=DictCursor)

def commit():
    cnx_dest.commit()

# 🛠️ FERRAMENTAS
Funções, variáveis cache, hashmaps

In [15]:
global cadest, empresa, exercicio
cadest = {}
empresa = cur_dest.execute("SELECT empresa FROM cadcli").fetchone()[0]
exercicio = '2025' #cur_dest.execute("SELECT mexer FROM cadcli").fetchone()[0]

def limpa_tabela(tabelas):
    for tabela in tabelas:
        cur_dest.execute(f"DELETE FROM {tabela}")
    commit()

def cria_coluna(tabela, coluna):
    try:
        cur_dest.execute(f"ALTER TABLE {tabela} ADD {coluna} VARCHAR(255)")
    except fdb.DatabaseError as e:
        print(f"Erro ao criar coluna {coluna} na tabela {tabela}: {e}")
    else:
        commit()

def cria_coluna(tabela, coluna):
    try:
        cur_dest.execute(f"ALTER TABLE {tabela} ADD {coluna} VARCHAR(255)")
    except fdb.DatabaseError as e:
        print(f"Erro ao criar coluna {coluna} na tabela {tabela}: {e}")
    else:
        commit()

def to_cp1252_safe(value, replace_with=''):
    """
    Converte uma string para cp1252 de forma segura.
    Remove ou substitui caracteres inválidos.

    :param value: valor a converter
    :param replace_with: string usada no lugar de caracteres inválidos (default = '')
    :return: valor limpo
    """
    if isinstance(value, str):
        try:
            # Tenta converter diretamente
            value.encode('cp1252')
            return value
        except UnicodeEncodeError:
            # Remove/substitui caracteres inválidos
            return value.encode('cp1252', errors='replace').decode('cp1252').replace('?', replace_with)
    return value

cur_dest.execute("SELECT codreduz, max(cadpro) FROM cadest group by 1")
if cur_dest.description is None:
    print("CADEST VAZIA!")
else:
    cadest = {k:v for k,v in cur_dest.execute("SELECT codreduz, max(cadpro) FROM cadest group by 1").fetchall()}

sigla_codigo = {
    "0": (0, 'NÃO INFORMADO'),
    "B": (1, 'BOM'),
    "R": (2, 'REGULAR'),
    "I": (3, 'IRREGULAR'),
    "O": (4, 'ÓTIMO'),
    "P": (5, 'PÉSSIMO'),
}

depara_origens = {
    'CP':'C',
    'IN':'C',
    'DO':'D',
    'OU':'X',
    'CO':'O',
    'EM':'E'
}


# 🏛️ PATRIMÔNIO
Extração, tratamento e carregamento dos dados referentes ao módulo patrimônio

## CADASTROS BASE

### TIPO DE MOVIMENTAÇÕES

In [16]:
limpa_tabela(("pt_tipomov",))

valores = {
    "A": "Aquisição",
    "B": "Baixa",
    "T": "Transferência",
    "R": "Procedimento Contábil",
    "P": "Transferência de Plano Contábil",
}

cur_dest.executemany("INSERT INTO PT_TIPOMOV (codigo_tmv, descricao_tmv) VALUES (?, ?)", valores.items())

commit()

### TIPO DE AJUSTE

In [17]:
limpa_tabela(("pt_cadajuste",))

cur_dest.execute(f"INSERT INTO PT_CADAJUSTE (CODIGO_AJU, EMPRESA_AJU, DESCRICAO_AJU) VALUES (1, {empresa}, 'REAVALIAÇÃO (ANTES DO CORTE)')")

commit()

### TIPOS DE BAIXA

In [18]:
limpa_tabela(("pt_cadbai",))

insert = cur_dest.prep("INSERT INTO PT_CADBAI (CODIGO_BAI, EMPRESA_BAI, DESCRICAO_BAI) VALUES (?, ?, ?)")
cur_dest.execute(insert, (1, empresa, 'BAIXA'))
commit()

DatabaseError: ('Error while executing SQL statement:\n- SQLCODE: -530\n- violation of FOREIGN KEY constraint "CFK_PT_CADPAT_3" on table "PT_CADPAT"\n- Foreign key references are present for the record\n- Problematic key value is ("PK_PT_CADBAI" = 25000000000000001)', -530, 335544466)

### CONSERVAÇÃO

In [None]:
limpa_tabela(("pt_cadsit",))

insert = cur_dest.prep("INSERT INTO PT_CADSIT (CODIGO_SIT, EMPRESA_SIT, DESCRICAO_SIT) VALUES (?, ?, ?)")

cur_orig.execute("""select coalesce(nullif(trim("ST_BEM"),''),'0') st_bem from "SCH"."PATRIMON" p group by 1""")

for row in cur_orig:
    info = sigla_codigo[row['st_bem']]
    cur_dest.execute(insert, (info[0], empresa, info[1]))
commit()

### TIPOS DE BEM

In [None]:
limpa_tabela(("pt_cadtip",))
cria_coluna("pt_cadtip", "ANOS_VIDA_UTIL")
cria_coluna("pt_cadtip", "VALOR_RESIDUAL")
cria_coluna("pt_cadtip", "PC_DEPREC")

insert = cur_dest.prep("INSERT INTO PT_CADTIP (CODIGO_TIP, EMPRESA_TIP, DESCRICAO_TIP, ANOS_VIDA_UTIL, VALOR_RESIDUAL, PC_DEPREC) VALUES (?, ?, ?, ?, ?, ?)")

cur_orig.execute("""select "CD_CLASSEPR", "DS_CLASSEPR", "ANOS_VIDA_UTIL", "VALOR_RESIDUAL", "PC_DEPREC" from "SCH"."CLASSEPR" order by 1""")

for row in cur_orig:
    cur_dest.execute(insert, (row['CD_CLASSEPR'], empresa, row['DS_CLASSEPR'][:60], row['ANOS_VIDA_UTIL'], row['VALOR_RESIDUAL'], row['PC_DEPREC']))
commit()

### UNIDADE E SUBUNIDADE

In [None]:
limpa_tabela(("pt_cadpats", "pt_cadpatd"))
cria_coluna("pt_cadpats", "codant")
cria_coluna("pt_cadpats", "idetcmgo")

insert = cur_dest.prep("INSERT INTO PT_CADPATD (EMPRESA_DES, CODIGO_DES, NAUNI_DES, OCULTAR_DES) values (?, ?, ?, 'N')")
insert_cadpats = cur_dest.prep(f"INSERT INTO PT_CADPATS (codigo_set, empresa_set, codigo_des_set, noset_set, ocultar_set, codant, idetcmgo) VALUES (?, {empresa}, ?, ?, ?, ?, ?)")

cur_orig.execute("""
select
	a."RECNUM" codigo_set,
	coalesce(a."ID_UNIDADE", '0') codigo_des_set,
	a."DESCRICAO" noset_set,
	case
		when a."ATIVO" = 1 then 'N'
		else 'S'
	end ocultar,
	a."ID_CADSET" codant,
	"ID_IDETCMGO" id_idetcmgo
from
	"SCH"."CADSET" a
union all
select
	'0' codigo_set,
	a."CODIGO" codigo_des_set,
	a."NOME",
	'N',
	null,
	null
from
	"SCH"."CADUNI" a
order by 1
""")

cur_dest.execute(insert, (empresa, 0, 'CONVERSÃO', 'N'))
commit()

cur_dest.execute(insert_cadpats, (0, 0, 'NÃO INFORMADO', 'N', None, None, None))
commit()
    
for row in cur_orig:
    if row['codigo_set'] == 0:
        cur_dest.execute(insert, (empresa, row['codigo_des_set'], row['noset_set'][:60], row['ocultar']))
        commit()
    else:
        cur_dest.execute(insert_cadpats, (row['codigo_set'], row['codigo_des_set'], row['noset_set'][:80], row['ocultar'], row['codant'], row['id_idetcmgo']))
commit()

### GRUPO

In [None]:
limpa_tabela(("pt_cadpatg",))

grupos = [
    (1, empresa, 'CONSUMO'),
    (2, empresa, 'SERVIÇOS'),
    (3, empresa, 'MÓVEIS'),
    (4, empresa, 'IMÓVEIS'),
    (5, empresa, 'NATUREZA INDUSTRIAL'),
    (6, empresa, 'DOMÍNIO PÚBLICO'),
    (7, empresa, 'TÍTULOS E VALORES')
]

cur_dest.executemany("INSERT INTO PT_CADPATG (CODIGO_GRU, EMPRESA_GRU, NOGRU_GRU) VALUES (?, ?, ?)", grupos)

commit()

## BENS

### CADASTRO

In [19]:
limpa_tabela(("pt_cadpat",))

insert = cur_dest.prep(f"""
insert into pt_cadpat (
    codigo_pat,
    empresa_pat,
    codigo_gru_pat,
    chapa_pat,
    codigo_cpl_pat,
    codigo_set_pat,
    codigo_set_atu_pat,
    orig_pat,
    codigo_tip_pat,
    codigo_sit_pat,
    discr_pat,
    datae_pat,
    dtlan_pat,
    valaqu_pat,
    valatu_pat,
    codigo_for_pat,
    percenqtd_pat,
    dae_pat,
    valres_pat,
    percentemp_pat,
    nempg_pat,
    anoemp_pat,
    dtpag_pat,
    hash_sinc,
    codigo_bai_pat,
    obs_pat)
VALUES(?,{empresa},?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""")

cur_orig.execute("""
select
	a."NR_TOMBAMENTO" codigo_pat,
	a."DS_TIPO_PROD" codigo_gru_pat,
	to_char(row_number() over (partition by a."DS_TIPO_PROD" order by a."NR_TOMBAMENTO"), 'fm000000') chapa_pat,
	substring("CODIGO_CADPLC",2,9) codigo_cpl_pat,
	coalesce(codigo_set_pat,0) codigo_set_pat,
	coalesce(e."RECNUM",0) codigo_set_atu_pat,
	coalesce(a."TP_AQUISICAO",'CP') orig_pat,
	a."CD_CLASSEPR" codigo_tip_pat,
	coalesce(nullif(trim("ST_BEM"),''),'0') codigo_sit_pat,
	b."DS_PRODUTO" discr_pat,
	a."DT_TOMBAMENTO" datae_pat,
	a."DT_TOMBAMENTO" dtlan_pat,
	a."VL_ATUAL_BEM" valaqu_pat,
	a."VL_BEM" valatu_pat,
	c."RECNUM" codigo_for_pat,
	coalesce(nullif(a."ANOS_VIDA_UTIL"*12,0),d."ANOS_VIDA_UTIL"*12) percenqtd_pat,
	a."VL_ATUAL_BEM" * (coalesce(nullif(a."VALOR_RESIDUAL",0),d."VALOR_RESIDUAL")/100) valres_pat,
	'M' percentemp_pat,
	a."DT_BAIXA" dtpag_pat,
    case when a."DT_BAIXA" is not null then 1 else null end codigo_bai_pat,
    "DS_DETALHE" obs,
    case when coalesce(nullif(a."ANOS_VIDA_UTIL"*12,0),d."ANOS_VIDA_UTIL"*12) <> 0 then 'V' else 'N' end dae_pat
from
	"SCH"."PATRIMON" a
join "SCH"."PRODUTOS" b using ("CD_PRODUTO")
left join "SCH"."FORNEC" c on a."CD_FORNEC" = c."NR_CGC_CPF" 
join "SCH"."CLASSEPR" d on a."CD_CLASSEPR" = d."CD_CLASSEPR" 
left join "SCH"."CADSET" e on a."ID_CADSET" = e."ID_CADSET"
left join (SELECT DISTINCT ON ("NR_TOMBAMENTO")
       "NR_TOMBAMENTO",
       b."RECNUM" codigo_set_pat
FROM "SCH"."PATRIAND" a
join "SCH"."CADSET" b on a."ID_CADSET" = b."ID_CADSET"
ORDER BY "NR_TOMBAMENTO", a."ID_CADSET") f on f."NR_TOMBAMENTO" = a."NR_TOMBAMENTO" """)

for row in cur_orig:
    codsit = sigla_codigo.get(row['codigo_sit_pat'], 0)[0]
    orig = depara_origens.get(row['orig_pat'], 'X')

    cur_dest.execute(insert, (
        row['codigo_pat'],
        row['codigo_gru_pat'],
        row['chapa_pat'],
        row['codigo_cpl_pat'],
        row['codigo_set_pat'],
        row['codigo_set_atu_pat'],
        orig,
        row['codigo_tip_pat'],
        codsit,
        row['discr_pat'],
        row['datae_pat'],
        row['dtlan_pat'],
        row['valaqu_pat'],
        row['valatu_pat'],
        row['codigo_for_pat'],
        row['percenqtd_pat'],
        row['dae_pat'],
        row['valres_pat'],
        row['percentemp_pat'],
        None,
        None,
        row['dtpag_pat'],
        row['codigo_pat'],
        row['codigo_bai_pat'],
        row['obs']
    ))
commit()

### MOVIMENTAÇÃO

In [None]:
limpa_tabela(("pt_movbem",))
cria_coluna("pt_movbem", "codigo_set_ant_mov")
cria_coluna("pt_movbem", "descrmov")

insert = cur_dest.prep("""
INSERT
	INTO
		pt_movbem (codigo_mov,
		empresa_mov,
		codigo_pat_mov,
		data_mov,
		tipo_mov,
		codigo_set_mov,
		historico_mov,
		codigo_cpl_mov,
		codigo_bai_mov,
		valor_mov,
		depreciacao_mov,
		codigo_set_ant_mov,
		dt_contabil,
		lote_mov,
		hash_sinc,
		descrmov)
	VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""")

### AQUISIÇÕES

In [None]:
limpa_tabela(("pt_movbem where tipo_mov = 'A'",))

insert = cur_dest.prep("""
INSERT
	INTO
		pt_movbem (codigo_mov,
		empresa_mov,
		codigo_pat_mov,
		data_mov,
		tipo_mov,
		codigo_set_mov,
		historico_mov,
		codigo_cpl_mov,
		codigo_bai_mov,
		valor_mov,
		depreciacao_mov,
		codigo_set_ant_mov,
		dt_contabil,
		lote_mov,
		hash_sinc,
		descrmov)
	VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""")

cur_orig.execute("""
select distinct
	a."NR_TOMBAMENTO" codigo_pat_mov,
	a."DT_DOCUMENTO" data_mov,
	a."DT_DOCUMENTO_INVD",
	case 
		when a."CD_TRANSPAT" in (1,3,4,7,13) then 'A'
		when a."CD_TRANSPAT" in (2,8) then 'B'		
		when a."CD_TRANSPAT" in (9,10,12,14) then 'R'		
		when a."CD_TRANSPAT" in (11) then 'T'		
	end tipo_mov,
	coalesce(f.codigo_set_pat,'0') codigo_set_mov,
	a."HIST_TRANSACAO" historico_mov,
	case when a."CD_TRANSPAT" in (2,8,9,10,12) then "VL_DOCUMENTO"*-1 when a."CD_TRANSPAT" = 11 then 0 else "VL_DOCUMENTO" end valor_mov,
	case when a."CD_TRANSPAT" in (9,10) then 'S' else 'N' end depreciacao_mov,
	case when a."CD_TRANSPAT" in (2,8) then 1 else null end codigo_bai,
	c."CD_TRANSPAT"||' - '||c."DS_TRANSPAT" descrmov,
	0 codigo_set_ant_mov
from
	"SCH"."HISTPAT" a
left join (with CTE as
(select "NR_TOMBAMENTO", b."RECNUM" codigo_set_pat, row_number() over (partition by "NR_TOMBAMENTO" order by a."RECNUM" asc) rn FROM "SCH"."PATRIAND" a
join "SCH"."CADSET" b on a."ID_CADSET" = b."ID_CADSET")
select * from CTE where rn = 1) f on f."NR_TOMBAMENTO" = a."NR_TOMBAMENTO"
join "SCH"."TRANSPAT" c on a."CD_TRANSPAT" = c."CD_TRANSPAT"
where (a."CD_TRANSPAT" in  (1,3,4,7,13))
""")

for i, row in enumerate(cur_orig):
	i += 1
	cur_dest.execute(insert, (
		i,
		empresa,
		row['codigo_pat_mov'],
		row['data_mov'],
		row['tipo_mov'],
		row['codigo_set_mov'],
		to_cp1252_safe(row['historico_mov']),
		None,
		row['codigo_bai'],
		row['valor_mov'],
		row['depreciacao_mov'],
		row['codigo_set_ant_mov'],
		row['data_mov'],
		None,
		i,
		row['descrmov']
	))
commit()

cur_dest.execute("""
INSERT
	INTO
		pt_movbem (codigo_mov,
		empresa_mov,
		codigo_pat_mov,
		data_mov,
		tipo_mov,
		codigo_set_mov,
		valor_mov,
		dt_contabil)
SELECT
	gen_id(GEN_PT_MOVBEM_ID, 1),
	3,
	codigo_pat,
	a.DATAE_PAT,
	'A',
	codigo_set_pat,
	valaqu_pat,
	datae_pat
FROM
	pt_cadpat a
WHERE
	NOT EXISTS (
	SELECT
		1
	FROM
		pt_movbem x
	WHERE
		a.codigo_pat = x.codigo_pat_mov
		AND tipo_mov = 'A')
""")
commit()

cur_dest.execute("""
delete FROM pt_movbem WHERE FK_PT_CADPAT IS null
""")
commit()

cur_dest.execute("""
MERGE INTO pt_cadpat a USING (SELECT valor_mov, codigo_set_mov, codigo_pat_mov FROM pt_movbem WHERE tipo_mov = 'A') b
ON a.codigo_pat = b.codigo_pat_mov
WHEN MATCHED THEN UPDATE SET a.valaqu_pat = b.valor_mov, a.codigo_set_pat = b.codigo_set_mov
""")
commit()

### DEPRECIAÇÕES

In [28]:
limpa_tabela(("pt_movbem where tipo_mov <> 'A'",))

cur_orig.execute("""
select
	a."NR_TOMBAMENTO" codigo_pat_mov,
	a."DT_DOCUMENTO" data_mov,
	case 
		when a."CD_TRANSPAT" in (1,3,4,7,13) then 'A'
		when a."CD_TRANSPAT" in (2,8) then 'B'		
		when a."CD_TRANSPAT" in (9,10,12,14) then 'R'		
		when a."CD_TRANSPAT" in (11) then 'T'		
	end tipo_mov,
	coalesce(coalesce(c."RECNUM",c2."RECNUM"),'0') codigo_set_mov,
	a."HIST_TRANSACAO" historico_mov,
	case when a."CD_TRANSPAT" in (2,8,9,10,12) then "VL_DOCUMENTO"*-1 when a."CD_TRANSPAT" = 11 then 0 else "VL_DOCUMENTO" end valor_mov,
	case when a."CD_TRANSPAT" in (9,10) then 'S' else 'N' end depreciacao_mov,
	coalesce(c3."RECNUM", 0) codigo_set_ant_mov,
	case when a."CD_TRANSPAT" in (2,8) then 1 else null end codigo_bai,
	d."CD_TRANSPAT"||' - '||d."DS_TRANSPAT" descrmov
from
	"SCH"."HISTPAT" a
left join "SCH"."PATRIAND" b on a."ID_PATRIAND" = b."RECNUM"
left join "SCH"."CADSET" c on a."ID_CADSET" = c."ID_CADSET"
left join "SCH"."CADSET" c2 on b."ID_CADSET" = c2."ID_CADSET"
left join "SCH"."CADSET" c3 on b."ID_CADSET_ORIGEM" = c3."ID_CADSET"
join "SCH"."TRANSPAT" d on a."CD_TRANSPAT" = d."CD_TRANSPAT"
where (a."CD_TRANSPAT" not in  (10,11,1,3,4,7,13)) and "VL_DOCUMENTO" <> 0
union all
select codigo_pat_mov, data_mov, tipo_mov, codigo_set_mov, historico_mov, valor_mov, depreciacao_mov, codigo_set_ant_mov, codigo_bai::int, Null from (
select
	"NR_TOMBAMENTO" codigo_pat_mov,
	"DT_ANDAMENTO" data_mov,
	'T' tipo_mov,
	coalesce(nullif(trim(b."RECNUM"::text),''),'0')::int codigo_set_mov,
	"OBS" historico_mov,
	0 valor_mov,
	'N' depreciacao_mov,
	c."RECNUM" codigo_set_ant_mov,
	null codigo_bai,
	row_number() over (partition by a."NR_TOMBAMENTO" order by a."RECNUM" asc) rn
from
	"SCH"."PATRIAND" a
left join "SCH"."CADSET" b on a."ID_CADSET" = b."ID_CADSET"
left join "SCH"."CADSET" c on a."ID_CADSET_ORIGEM" = c."ID_CADSET")
 CTE where rn > 1
 order by 1, 2
""")

i = cur_dest.execute("select max(codigo_mov) from pt_movbem").fetchone()[0]

for row in cur_orig:
	i += 1
	cur_dest.execute(insert, (
		i,
		empresa,
		row['codigo_pat_mov'],
		row['data_mov'],
		row['tipo_mov'],
		row['codigo_set_mov'],
		to_cp1252_safe(row['historico_mov']),
		None,
		row['codigo_bai'],
		row['valor_mov'],
		row['depreciacao_mov'],
		row['codigo_set_ant_mov'],
		row['data_mov'],
		None,
		i,
		row['descrmov']
	))
commit()