In [1]:
import polars as pl
import pandas as pd

In [2]:
import cx_Oracle  # o usa cx_Oracle si tu sistema lo requiere
from sqlalchemy import create_engine

In [3]:
import configparser

config = configparser.ConfigParser()
config.read('config.ini')
# leer variables
host: str = config['oracle']['host']
port: str = config['oracle']['port']
sid: str = config['oracle']['service_name']
user: str = config['oracle']['user']
password: str = config['oracle']['password']

In [4]:
dsn: str = cx_Oracle.makedsn(host, port, sid)
engine = create_engine(f'oracle+cx_oracle://{user}:{password}@{dsn}')

In [5]:
query = """
-- Descripcion: Script para analizar la tabla de utilizaciones por servicio
SELECT
	TO_CHAR(TO_DATE(UT."Fecha de Radicacion", 'YYYY-MM-DD'), 'YYYYMM') AS PERIODO_RADICACION,
	TO_CHAR(TO_DATE(UT."Fecha_prestacion" , 'YYYY-MM-DD'), 'YYYYMM') AS PERIODO_PRESTACION,
	UT."Tipo de Cuenta" AS TIPO_CUENTA,
	UT."Estado_Factura" AS ESTADO_FACTURA,
	UT."Lugar" AS LUGAR,
	TR."Nombre_Ciudad_DANE" AS MUNICIPIO_PRESTADOR,
	TR."Departamento" AS DEPARTAMENTO_PRESTADOR,
	TR."Regional" AS REGIONAL_PRESTADOR,
	UT."Descripcion OSI" AS DESCRIPCION_OSI,
	UT."Grupo_Principal" AS GRUPO_PRINCIPAL,
	UT."Nombre del prestador" AS NOMBRE_PRESTADOR,
	UT."IDENTIFICACIÓN_DEL_PRESTADOR" AS NIT_PRESTADOR,
	UT."SUCURSAL_IPS" AS NOMBRE_SUCURSAL,
	COUNT(1) AS REGISTROS,
	SUM(UT."Cantidad Procedimiento") AS CANTIDAD_PROCEDIMIENTO,
	ROUND(SUM(REPLACE(UT."Valor_Pagado", '.',','))) AS VALOR_PAGADO,
	ROUND(SUM(REPLACE(UT."VrGlosado Procedimiento", '.',','))) AS VALOR_GLOSADO,
	ROUND(SUM(REPLACE(UT."VrFacturado Procedimiento", '.',','))) AS VALOR_FACTURADO
FROM
	TBL_OPE_UTILIZACIONES_2023 UT
LEFT JOIN "Tb_Regiones" TR
    ON
	TR."Region" = UT."DANE_Prestador"
GROUP BY
	TO_CHAR(TO_DATE("Fecha de Radicacion", 'YYYY-MM-DD'), 'YYYYMM'),
	TO_CHAR(TO_DATE(UT."Fecha_prestacion" , 'YYYY-MM-DD'), 'YYYYMM'),
	UT."Tipo de Cuenta",
	UT."Estado_Factura",
	UT."Lugar",
	TR."Nombre_Ciudad_DANE",
	TR."Departamento",
	TR."Regional",
	UT."Descripcion OSI",
	UT."Grupo_Principal",
    UT."Nombre del prestador",
    UT."IDENTIFICACIÓN_DEL_PRESTADOR",
    UT."SUCURSAL_IPS"
ORDER BY
	VALOR_PAGADO DESC
"""


In [6]:
# Parámetros de lectura en chunks
iter_batches = True
batch_size = 100000  # Número de filas por chunk

# Lista para almacenar los chunks
chunks = []
registros = 0

try:
    # Lee los datos en chunks usando iter_batches
    for df_chunk in pl.read_database(
        query=query,
        connection=engine,
        iter_batches=iter_batches,
        batch_size=batch_size
    ):
        # Agrega cada chunk a la lista
        chunks.append(df_chunk)
        
        registros += df_chunk.shape[0]
        
        print(f"Registros leídos: {registros}")
        
    # Consolida todos los chunks en un solo DataFrame
    df_consolidado = pl.concat(chunks)

except Exception as e:
    print("Error al leer la base de datos:", e)

Registros leídos: 100000
Registros leídos: 200000
Registros leídos: 300000
Registros leídos: 400000
Registros leídos: 500000
Registros leídos: 600000
Registros leídos: 700000
Registros leídos: 800000
Registros leídos: 900000
Registros leídos: 1000000
Registros leídos: 1100000
Registros leídos: 1200000
Registros leídos: 1300000
Registros leídos: 1400000
Registros leídos: 1500000
Registros leídos: 1600000
Registros leídos: 1700000
Registros leídos: 1800000
Registros leídos: 1900000
Registros leídos: 2000000
Registros leídos: 2100000
Registros leídos: 2200000
Registros leídos: 2300000
Registros leídos: 2400000
Registros leídos: 2500000
Registros leídos: 2600000
Registros leídos: 2700000
Registros leídos: 2800000
Registros leídos: 2900000
Registros leídos: 3000000
Registros leídos: 3100000
Registros leídos: 3200000
Registros leídos: 3300000
Registros leídos: 3400000
Registros leídos: 3500000
Registros leídos: 3600000
Registros leídos: 3700000
Registros leídos: 3800000
Registros leídos: 390

In [7]:
from google.cloud import bigquery

In [8]:
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "credenciales_api_Google.json"

In [9]:
client = bigquery.Client()

In [10]:
df_consolidado.columns = [col.upper() for col in df_consolidado.columns]

In [11]:
# transformar nulos a vacíos
df_consolidado = df_consolidado.fill_null('')
# si es numero nulo reemplazar por 0
df_consolidado = df_consolidado.fill_null(0)

In [12]:
# Definir el esquema de la tabla (opcional pero recomendado)
schema = [
    bigquery.SchemaField("PERIODO_RADICACION", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("PERIODO_PRESTACION", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("TIPO_CUENTA", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("ESTADO_FACTURA", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("LUGAR", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("MUNICIPIO_PRESTADOR", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("DEPARTAMENTO_PRESTADOR", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("REGIONAL_PRESTADOR", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("DESCRIPCION_OSI", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("GRUPO_PRINCIPAL", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("NOMBRE_PRESTADOR", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("NIT_PRESTADOR", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("NOMBRE_SUCURSAL", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("REGISTROS", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("CANTIDAD_PROCEDIMIENTO", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("VALOR_PAGADO", "FLOAT", mode="REQUIRED"),
    bigquery.SchemaField("VALOR_GLOSADO", "FLOAT", mode="REQUIRED"),
    bigquery.SchemaField("VALOR_FACTURADO", "FLOAT", mode="REQUIRED")
]


In [13]:
# Configurar el trabajo de carga
job_config = bigquery.LoadJobConfig(
    schema=schema,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,  # Opciones: WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY
    create_disposition=bigquery.CreateDisposition.CREATE_IF_NEEDED,  # Crea la tabla si no existe
)

In [15]:

# Cargar el DataFrame a BigQuery
job = client.load_table_from_dataframe(
    df_consolidado.to_pandas(), 
    'pruebas-406413.utilizaciones.resumen_servicios_2023', 
    job_config=job_config
)