# Extração de API Useall

## Tools and functions

In [1]:
import pandas as pd
import requests
import json
import urllib.parse
import time

# --- Configurações ---
BASE_URL = "https://extracao.useallcloud.com.br/api/v1/json/"
HEADERS = {
    "accept": "application/json",
    "use-relatorio-token": "eyJJZCI6ImNkMDNkMzhiLWZhNzUtNDg4Yi04NDA1LTg5OTU1MzBjNjFiMSIsIlN0cmluZ0NvbmV4YW8iOiJBeHh4IFh2a2VqST10bGRxZGZ4Rzk7c3FJMSBCbz1ScUl4WHlnYWVoO2dNcXp6djFvPXNxSXVleGd5SW5TSSQkOyIsIkNvZGlnb1VzdWFyaW8iOjczMzIsIkNvZGlnb1RlbmFudCI6MTQzfQ=="
}

def buscar_dados_api(identificacao, nome_arquivo, backend_filters=None, extra_params=None):
    """Busca dados na API UseAll e retorna um DataFrame (ou None em caso de erro/vazio)"""
    
    query_params = {"Identificacao": identificacao}
    
    if backend_filters:
        query_params["FiltrosSqlQuery"] = json.dumps(backend_filters, ensure_ascii=False)
        
    if extra_params:
        query_params.update(extra_params)

    print(f"[{time.strftime('%H:%M:%S')}] Iniciando extração: {nome_arquivo}...")
    
    while True:
        try:
            response = requests.get(BASE_URL, headers=HEADERS, params=query_params, timeout=500)
            
            if response.status_code == 429:
                print(f"[{time.strftime('%H:%M:%S')}] Erro 429 (Too Many Requests) em {nome_arquivo}. Aguardando 185 segundos...")
                time.sleep(185)
                continue
                
            response.raise_for_status()

            data = response.json()
            df = pd.DataFrame(data)
            return df

        except requests.exceptions.Timeout:
            print(f"[{time.strftime('%H:%M:%S')}] Timeout atingido para {nome_arquivo}. Aguardando 185 segundos...")
            time.sleep(185)
            continue
            
        except Exception as e:
            print(f"[{time.strftime('%H:%M:%S')}] Erro irrecuperável em {nome_arquivo}: {str(e)}")
            return None

def salvar_parquet(df, nome_arquivo):
    """Salva o DataFrame em arquivo parquet"""
    if df is not None and not df.empty:
        # Garante extensão .parquet
        if not nome_arquivo.endswith('.parquet'):
            nome_arquivo += '.parquet'
        
        try:
            df.to_parquet(nome_arquivo, index=False)
            print(f"[{time.strftime('%H:%M:%S')}] Sucesso ao salvar: {nome_arquivo} (Linhas: {len(df)})")
        except Exception as e:
            print(f"[{time.strftime('%H:%M:%S')}] Erro ao salvar {nome_arquivo}: {str(e)}")
    else:
        print(f"[{time.strftime('%H:%M:%S')}] Nada a salvar para {nome_arquivo} (DataFrame vazio ou None)")

def verificar_tipos_dados():
    print("\n" + "=" * 40)
    print(f"[{time.strftime('%H:%M:%S')}] VERIFICAÇÃO DE TIPOS DE DADOS")
    print("=" * 40)

    encontrou = False

    for nome, obj in globals().items():
        if isinstance(obj, pd.DataFrame):
            encontrou = True
            print(f"\nDataFrame: {nome}")
            if not obj.empty:
                print("-" * 30)
                print(obj.dtypes)
                print("-" * 30)
            else:
                print("  (DataFrame vazio)")

    if not encontrou:
        print("Nenhum DataFrame encontrado em memória.")

# --- Defines Auxiliares de Filtro ---
def filtro_simples(nome, valor):
    return {"Nome": nome, "Valor": valor, "Operador": None, "Descricao": None, "ValorFormatado": None}

def carregar_dfs_globais(tarefas):
    print(f"[{time.strftime('%H:%M:%S')}] --- INICIANDO CARGA EM MEMÓRIA ---")

    for t in tarefas:
        nome = t["nome"]
        df = buscar_dados_api(
            t["id"],
            nome,
            t.get("filtros"),
            t.get("extra_params")
        )

        if df is not None:
            globals()[nome] = df
        else:
            print(f"[{time.strftime('%H:%M:%S')}] Falha ao carregar {nome}")

def carregar_tarefa_complexa(tarefa):
    nome = tarefa["nome"]

    df = buscar_dados_api(
        tarefa["id"],
        nome,
        tarefa.get("filtros"),
        tarefa.get("extra_params")
    )

    if df is not None:
        globals()[nome] = df
    else:
        print(f"[{time.strftime('%H:%M:%S')}] Falha ao carregar {nome}")


pipeline_start = time.time()
print(f"--- Pipeline iniciada em {time.strftime('%d/%m/%Y %H:%M:%S')} ---")


--- Pipeline iniciada em 13/01/2026 11:11:15 ---


## Variaveis de filtros

### SIMPLES

In [2]:
params_fixos = {"pagina": 1, "qtderegistros": 1}

tarefas_simples = [
    {
        "nome": "dfuseallitens",
        "id": "m2_estoque_item",
        "filtros": [
            filtro_simples("DATAHORAALTERACAOINI", "01/01/1900"),
            filtro_simples("DATAHORAALTERACAOFIM", "01/01/2027")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallunidades",
        "id": "m2_estoque_unidade",
        "filtros": [
            filtro_simples("DATAHORAALTERACAOINI", "01/01/1900"),
            filtro_simples("DATAHORAALTERACAOFIM", "01/01/2027")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallsegmentos",
        "id": "m2_vendas_segmento",
        "filtros": [
            filtro_simples("DATAHORAALTERACAOINI", "01/01/1900"),
            filtro_simples("DATAHORAALTERACAOFIM", "01/01/2027")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallcidades",
        "id": "m2_geral_cidades",
        "filtros": [
            filtro_simples("DATAHORAALTERACAOINI", "01/01/1900"),
            filtro_simples("DATAHORAALTERACAOFIM", "01/01/2027")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallsolcompra",
        "id": "m2_compras_m2_compras_solicitacao_de_compras__extra",
        "filtros": [
            filtro_simples("DATAINI", "01/01/1900"),
            filtro_simples("DataFim", "01/01/2500")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallfiliais",
        "id": "m2_geral_filiais",
        "filtros": [
            filtro_simples("DATAHORAALTINI", "01/01/1900, 11:00:00"),
            filtro_simples("DATAHORAALTFIM", "01/01/2500, 14:00:00")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallempresas",
        "id": "m2_geral_empresas",
        "filtros": [
            filtro_simples("DATAHORAALTINI", "01/01/2022, 11:00:00"),
            filtro_simples("DATAHORAALTFIM", "01/01/2027, 14:00:00")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallexpedição",
        "id": "m2_vendas_extracao_de_dados__saida_expedicao",
        "filtros": [
            filtro_simples("data1", "01/01/1900"),
            filtro_simples("data2", "01/01/2500")
        ],
        "extra_params": params_fixos
    },
    {
        "nome": "dfuseallclientesfornecedore",
        "id": "m2_geral_clientes__fornecedores",
        "filtros": [
            filtro_simples("DATAHORAALTERACAOINI", "01/01/1900"),
            filtro_simples("DATAHORAALTERACAOFIM", "01/01/2027")
        ],
        "extra_params": params_fixos
    }
]


### COMPLEXAS

In [None]:

filtros_req = [
    {"Nome": "IDFILIAL", "Valor": [333,334,335,336,387,404,520,558,578,339,340,342,343,341,344,345,346,381,389,390], "Operador": 1},
    {"Nome": "DATA", "Valor": "01/01/2010,01/01/2027", "Operador": 8, "TipoPeriodoData": 5},
    {"Nome": "DATAPREVATEND", "Valor": "01/01/2010,01/01/2027", "Operador": 8, "TipoPeriodoData": 8},
    {"Nome": "CLASSGRUPOITEM", "Valor": ""},
    {"Nome": "CLASSCONTACDC", "Valor": ""},
    {"Nome": "quebra", "Valor": 1},
    {"Nome": "FILTROSWHERE", "Valor": " AND IDEMPRESA = 211"}
]

filtros_estoque = [
    {"Nome": "ADDATA", "Valor": "08/01/2010"},
    {
        "Nome": "FILTROSWHERE",
        "Valor": " AND EXISTS (SELECT 1 FROM USE_USUARIOS_FILIAIS UFILIAIS "
                "WHERE UFILIAIS.IDEMPRESA = T.IDEMPRESA "
                "AND UFILIAIS.IDFILIAL = T.IDFILIAL "
                "AND UFILIAIS.IDUSUARIO = 7332) "
                "AND T.IDFILIAL IN (333,334,336,404,335,387,520,558,578)"
    },
    {"Nome": "ANQUEBRA", "Valor": 0}
]

filtros_atend = [
    {
        "Nome": "FILTROSWHERE",
        "Valor": (
            "WHERE IDEMPRESA = 211 "
            "AND IDFILIAL IN (333,334,335,336,387,404,520,558,339,578,340,342,343,341,344,345,346,381,389,390) "
            "AND DATA_REQ BETWEEN '01/01/1900' AND '01/01/2900' "
            "AND DATA_ATEND BETWEEN '01/01/1900' AND '01/01/2900'"
        )
    }
]

params_atend = {
    "NomeOrganizacao": "SETUP SERVICOS ESPECIALIZADOS LTDA",
    "Parametros": json.dumps([
        {"Nome": "usecellmerging", "Valor": True},
        {"Nome": "quebra", "Valor": 0}
    ])
}

# ===============================
# BLOCO 1 — REQUISIÇÕES
# ===============================

tarefa_requisicoes = {
    "nome": "dfuseallrequisicoes",
    "id": "m2_estoque_requisicao_de_materiais",
    "filtros": filtros_req,
    "extra_params": None
}


# ===============================
# BLOCO 2 — ESTOQUE
# ===============================

tarefa_estoque = {
    "nome": "dfuseallestoque",
    "id": "09249662000174_m2_estoque_saldo_de_estoque__setup",
    "filtros": filtros_estoque,
    "extra_params": None
}

# ===============================
# BLOCO 3 — ATENDIMENTO DE REQUISIÇÕES
# ===============================

tarefa_atendimento = {
    "nome": "dfuseallatendimentodereq",
    "id": "m2_estoque_atendimentos_de_requisicao",
    "filtros": filtros_atend,
    "extra_params": params_atend
}

## Criando DataFrames

### Usando funções

In [4]:
carregar_dfs_globais(tarefas_simples)

[11:11:15] --- INICIANDO CARGA EM MEMÓRIA ---
[11:11:15] Iniciando extração: dfuseallitens...
[11:11:16] Erro 429 (Too Many Requests) em dfuseallitens. Aguardando 185 segundos...
[11:14:23] Iniciando extração: dfuseallunidades...
[11:14:24] Iniciando extração: dfuseallsegmentos...
[11:14:25] Iniciando extração: dfuseallcidades...
[11:14:26] Iniciando extração: dfuseallsolcompra...
[11:14:28] Iniciando extração: dfuseallfiliais...
[11:14:29] Iniciando extração: dfuseallempresas...
[11:14:30] Iniciando extração: dfuseallexpedição...
[11:14:44] Iniciando extração: dfuseallclientesfornecedore...


In [5]:
carregar_tarefa_complexa(tarefa_requisicoes)

[11:14:52] Iniciando extração: dfuseallrequisicoes...


In [6]:
carregar_tarefa_complexa(tarefa_estoque)

[11:15:02] Iniciando extração: dfuseallestoque...


In [7]:
carregar_tarefa_complexa(tarefa_atendimento)

[11:15:44] Iniciando extração: dfuseallatendimentodereq...


### Verificando Tipos

In [8]:
# --- 3. Verificação de Tipos ---
print(f"[{time.strftime('%H:%M:%S')}] --- INICIANDO VERIFICAÇÃO DE TIPOS ---")
verificar_tipos_dados()

[11:21:10] --- INICIANDO VERIFICAÇÃO DE TIPOS ---

[11:21:10] VERIFICAÇÃO DE TIPOS DE DADOS

DataFrame: dfuseallitens
------------------------------
IDITEM                 int64
DATAHORAALTERACAO     object
IDENTIFICACAO         object
DESCRICAO             object
IDCLASSFISCAL          int64
IDUN                   int64
IDUNVENDA            float64
QUANTEMBVENDA        float64
PESOBRUTO            float64
PESOLIQ              float64
OBS                   object
ATIVO                  int64
COMPLEMENTO           object
IDGRUPOITEM          float64
IDTIPOITEM             int64
QUANTMINVENDA        float64
CODBARRA              object
LIBERADO              object
FORMATOCODBAR        float64
dtype: object
------------------------------

DataFrame: dfuseallunidades
------------------------------
IDUN                  int64
IDUNIDADETENANT       int64
DESCRICAO            object
SIGLA                object
SIGLAFCI             object
DATAHORAALTERACAO    object
STATUS               object

## Configurações Banco de Dados 

In [9]:
import io
import time
import psycopg2
from sqlalchemy import create_engine, text

# ---------------- CONFIG ----------------

DB_URL = "postgresql+psycopg2://postgres:4102@localhost:5432/SETUP"

PG_CONN_INFO = {
    "dbname": "SETUP",
    "user": "postgres",
    "password": "4102",
    "host": "localhost",
    "port": 5432,
}

SCHEMA = "useall"

# ---------------------------------------

engine = create_engine(DB_URL)

# garante schema
with engine.connect() as conn:
    conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}"))
    conn.commit()


## Staging - Bronze - Dados Brutos tipos indefinidos

In [25]:
ordem_staging = [
    # simples iniciais
    "dfuseallitens",
    "dfuseallunidades",
    "dfuseallsegmentos",
    "dfuseallcidades",

    # complexas no meio
    "dfuseallrequisicoes",
    "dfuseallestoque",
    "dfuseallatendimentodereq",

    # simples finais
    "dfuseallsolcompra",
    "dfuseallfiliais",
    "dfuseallempresas",
    "dfuseallexpedição",
    "dfuseallclientesfornecedore",
]


In [36]:
def log(msg: str):
    print(f"[{time.strftime('%H:%M:%S')}] {msg}")


def copy_df_to_postgres(df, schema: str, table: str):
    import psycopg2
    import io

    buffer = io.StringIO()
    df.to_csv(
        buffer,
        index=False,
        header=False,
        sep="\t",
        na_rep="\\N"
    )
    buffer.seek(0)

    conn = psycopg2.connect(**PG_CONN_INFO)
    cur = conn.cursor()

    sql = f"""
        COPY {schema}.{table}
        FROM STDIN
        WITH (FORMAT CSV, DELIMITER E'\t', NULL '\\N')
    """

    cur.copy_expert(sql, buffer)

    conn.commit()
    cur.close()
    conn.close()

tabelas_criadas = 0
dfs_nao_encontrados = []

log("INICIANDO CARGA STAGING (COPY FROM)")

for df_nome in ordem_staging:
    df = globals().get(df_nome)

    if df is None or df.empty:
        dfs_nao_encontrados.append(df_nome)
        continue

    tabela = "staging_" + df_nome.replace("dfuseall", "")
    tabela = tabela.lower()

    log(f"Preparando tabela {SCHEMA}.{tabela} | Linhas: {len(df)}")

    # 1️⃣ cria estrutura (DDL leve)
    with engine.connect() as conn:
        df.head(0).to_sql(
            name=tabela,
            con=conn,
            schema=SCHEMA,
            if_exists="replace",
            index=False
        )
        conn.commit()

    log(f"Iniciando COPY para {SCHEMA}.{tabela}")

    # 2️⃣ carga pesada via COPY
    copy_df_to_postgres(df, SCHEMA, tabela)

    log(f"[OK] Tabela {SCHEMA}.{tabela} carregada com sucesso")

    tabelas_criadas += 1


# ---------------- FINAL ----------------

log("--------------------------------------------------")

if tabelas_criadas == 0:
    log("Nenhuma tabela staging foi criada.")
    log("DataFrames não encontrados:")
    for nome in dfs_nao_encontrados:
        log(f" - {nome}")
else:
    log(f"{tabelas_criadas} tabelas staging criadas com sucesso.")

log("PROCESSO FINALIZADO")


[13:35:03] INICIANDO CARGA STAGING (COPY FROM)
[13:35:03] Preparando tabela useall.staging_itens | Linhas: 15846
[13:35:03] Iniciando COPY para useall.staging_itens
[13:35:04] [OK] Tabela useall.staging_itens carregada com sucesso
[13:35:04] Preparando tabela useall.staging_unidades | Linhas: 59
[13:35:04] Iniciando COPY para useall.staging_unidades
[13:35:04] [OK] Tabela useall.staging_unidades carregada com sucesso
[13:35:04] Preparando tabela useall.staging_cidades | Linhas: 6141
[13:35:04] Iniciando COPY para useall.staging_cidades
[13:35:04] [OK] Tabela useall.staging_cidades carregada com sucesso
[13:35:04] Preparando tabela useall.staging_requisicoes | Linhas: 6297
[13:35:04] Iniciando COPY para useall.staging_requisicoes
[13:35:04] [OK] Tabela useall.staging_requisicoes carregada com sucesso
[13:35:04] Preparando tabela useall.staging_estoque | Linhas: 132365
[13:35:04] Iniciando COPY para useall.staging_estoque
[13:35:06] [OK] Tabela useall.staging_estoque carregada com sucess

## Silver definindo tipos automaticamente

In [47]:
SCHEMA = "useall"
SAMPLE_LIMIT = 50000

import pandas as pd
import re

# Detecta formato de data
def is_date_series(s: pd.Series):
    sample = s.dropna().astype(str).head(50)
    formats = [
        "%Y-%m-%d",
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%dT%H:%M:%S",
        "%d/%m/%Y",
        "%d/%m/%Y %H:%M:%S",
    ]
    for fmt in formats:
        try:
            pd.to_datetime(sample, format=fmt)
            return fmt
        except:
            continue
    return None

# Inferência de tipo
def infer_column_type_final(series: pd.Series) -> dict:
    s = series.dropna()
    if s.empty:
        return {"type": "text"}

    # BOOLEAN lógico
    if s.astype(str).isin(["0","1","true","false","True","False"]).all():
        return {"type": "boolean"}

    # DATE / TIMESTAMP
    date_fmt = is_date_series(s)
    if date_fmt:
        return {"type": "timestamp", "format": date_fmt}

    # INTEGER
    if s.astype(str).str.fullmatch(r"-?\d+").all():
        return {"type": "bigint"}

    # DECIMAL
    if s.astype(str).str.fullmatch(r"-?\d+(\.\d+)?").all():
        return {"type": "numeric(18,4)"}

    return {"type": "text"}

from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = :schema
          AND table_type = 'BASE TABLE'
          AND table_name LIKE 'staging_%'
    """), {"schema": SCHEMA})

    staging_tables = [row[0] for row in result.fetchall()]

# Função nome silver
def silver_table_name(staging_table: str) -> str:
    return staging_table.replace("staging_", "silver_")

# Monta dicionário de metadata
schema_silver = {}

for staging_table in staging_tables:
    if not staging_table.startswith("staging_"):
        continue
    silver_table = silver_table_name(staging_table)
    log(f"Profiling {SCHEMA}.{staging_table} -> {SCHEMA}.{silver_table}")

    df_sample = pd.read_sql(
    f'SELECT * FROM {SCHEMA}."{staging_table}" LIMIT {SAMPLE_LIMIT}',
    engine
    )

    schema_silver[silver_table] = {
        "staging_table": staging_table,
        "columns": {
            col.lower(): {
                **infer_column_type_final(df_sample[col]),
                "source_col": col  # nome REAL na staging
            }
            for col in df_sample.columns
        }
    }

# Cria cast SQL
def generate_cast_sql(col_dest, meta):
    col_src = meta["source_col"]

    col_sql = f'"{col_src}"'
    col_txt = f'{col_sql}::text'

    if meta["type"] == "boolean":
        return f"""
        CASE
            WHEN lower({col_txt}) IN ('1','true','sim','s','y','yes') THEN true
            WHEN lower({col_txt}) IN ('0','false','nao','n','no') THEN false
            ELSE NULL
        END AS "{col_dest}"
        """

    if meta["type"] == "timestamp":
        fmt = meta.get("format")
        if fmt:
            return f"""
            CASE
                WHEN {col_txt} = '' THEN NULL
                ELSE {col_txt}::{ 'timestamp' if 'H' in fmt else 'date' }
            END AS "{col_dest}"
            """
        else:
            return f"""
            CASE
                WHEN {col_sql} IS NULL OR {col_txt} = '' THEN NULL
                ELSE {col_sql}::timestamp
            END AS "{col_dest}"
            """

    if meta["type"] in ("bigint","numeric(18,4)"):
        return f"""
        CASE
            WHEN {col_txt} ~ '^-?\\d+(\\.\\d+)?$' THEN {col_txt}::{meta["type"]}
            ELSE NULL
        END AS "{col_dest}"
        """

    return f'{col_sql}::text AS "{col_dest}"'

# Gera CREATE TABLE
def generate_create_table(schema, table, columns: dict):
    cols = ",\n  ".join(f'"{col_dest}" {meta["type"]}' for col_dest, meta in columns.items())
    return f"""
    DROP TABLE IF EXISTS {schema}."{table}";
    CREATE TABLE {schema}."{table}" (
      {cols}
    );
    """

# Cria tabelas silver
for silver_table, meta in schema_silver.items():
    log(f"Criando tabela silver {SCHEMA}.{silver_table}")
    ddl = generate_create_table(SCHEMA, silver_table, meta["columns"])
    with engine.begin() as conn:
        conn.execute(text(ddl))

# Gera INSERT
def generate_insert_cast(staging_schema, SCHEMA, staging_table, silver_table, columns):
    selects = ",\n".join(generate_cast_sql(col_dest, meta) for col_dest, meta in columns.items())
    return f"""
    INSERT INTO {SCHEMA}."{silver_table}"
    SELECT
      {selects}
    FROM {staging_schema}."{staging_table}";
    """

# Carrega dados
for silver_table, meta in schema_silver.items():
    staging_table = meta["staging_table"]
    columns = meta["columns"]
    log(f"Carregando dados em {SCHEMA}.{silver_table}")
    sql = generate_insert_cast(SCHEMA, SCHEMA, staging_table, silver_table, columns)
    try:
        with engine.begin() as conn:
            conn.execute(text(sql))
        log(f"[OK] {SCHEMA}.{silver_table} carregada")
    except Exception as e:
        log(f"[ERRO] {SCHEMA}.{silver_table} -> {e}")

log("--------------------------------------------------")
log("PROCESSO FINALIZADO")


[14:02:14] Profiling useall.staging_atendimentodereq -> useall.silver_atendimentodereq
[14:02:18] Profiling useall.staging_cidades -> useall.silver_cidades
[14:02:18] Profiling useall.staging_clientesfornecedore -> useall.silver_clientesfornecedore
[14:02:21] Profiling useall.staging_empresas -> useall.silver_empresas
[14:02:21] Profiling useall.staging_estoque -> useall.silver_estoque
[14:02:23] Profiling useall.staging_expedição -> useall.silver_expedição
[14:02:26] Profiling useall.staging_filiais -> useall.silver_filiais
[14:02:26] Profiling useall.staging_itens -> useall.silver_itens
[14:02:26] Profiling useall.staging_requisicoes -> useall.silver_requisicoes
[14:02:27] Profiling useall.staging_solcompra -> useall.silver_solcompra
[14:02:27] Profiling useall.staging_unidades -> useall.silver_unidades
[14:02:27] Criando tabela silver useall.silver_atendimentodereq
[14:02:27] Criando tabela silver useall.silver_cidades
[14:02:27] Criando tabela silver useall.silver_clientesfornecedo

## Gold - Adicionando novas colunas e agregando valor

In [46]:
from sqlalchemy import create_engine, text

engine = create_engine(DB_URL)

sql = """
DO $$
DECLARE
    r RECORD;
    gold_table TEXT;
BEGIN
    FOR r IN
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = 'useall'
          AND table_name LIKE 'silver_%'
    LOOP

        gold_table := replace(r.table_name, 'silver_', 'gold_');

        -- CASO ESPECIAL: SILVER_REQUISICOES
        IF r.table_name = 'silver_requisicoes' THEN

            -- cria se não existir
            EXECUTE format(
                'CREATE TABLE IF NOT EXISTS useall.%I AS
                 SELECT
                     *,
                     CASE status::int
                         WHEN 0  THEN ''Digitado''
                         WHEN 1  THEN ''Aberto''
                         WHEN 3  THEN ''Cancelado''
                         WHEN 10 THEN ''Parcial''
                         WHEN 11 THEN ''Atendido''
                         ELSE ''Desconhecido''
                     END AS py_desc_status
                 FROM useall.silver_requisicoes
                 WHERE false;',
                gold_table
            );

            -- limpa e reinsere
            EXECUTE format('TRUNCATE TABLE useall.%I;', gold_table);

            EXECUTE format(
                'INSERT INTO useall.%I
                 SELECT
                     *,
                     CASE status::int
                         WHEN 0  THEN ''Digitado''
                         WHEN 1  THEN ''Aberto''
                         WHEN 3  THEN ''Cancelado''
                         WHEN 10 THEN ''Parcial''
                         WHEN 11 THEN ''Atendido''
                         ELSE ''Desconhecido''
                     END AS py_desc_status
                 FROM useall.silver_requisicoes;',
                gold_table
            );

        -- DEMAIS TABELAS
        ELSE

            -- cria se não existir
            EXECUTE format(
                'CREATE TABLE IF NOT EXISTS useall.%I AS
                 SELECT * FROM useall.%I WHERE false;',
                gold_table,
                r.table_name
            );

            -- limpa e reinsere
            EXECUTE format('TRUNCATE TABLE useall.%I;', gold_table);

            EXECUTE format(
                'INSERT INTO useall.%I
                 SELECT * FROM useall.%I;',
                gold_table,
                r.table_name
            );

        END IF;

    END LOOP;
END $$;
"""

with engine.begin() as conn:
    conn.execute(text(sql))
