# Prorrata ERNC
Este programa tiene por objetivo realizar el re-calculo de curtailment para el sistema, en base a metodología propuesta por la NTCyO.

## LECTURA DE DATOS
Los datos deben ser extraidos desde el accdb en potencia neta (no por la potencia, sino por los marginales no truncados). La lista de datos que se deben extraer son:
1. Generación de cada central.
2. Perfil de generación de cada central.
3. Barra asociada a cada central.
4. Costos marginales para cada.
5. Curtailment por central (quizas por barra es suficiente).
6. Potencia máxima.
7. Generación disponible.
8. Estado de operación.

Para la lectura, a modo de determinar la mejor query al sistema, sin tener que lidiar con los problemas de MS Access, se cargan las tablas en DuckDB y se utiliza jupysql para probar SQL.

In [None]:
import polars as pl
import duckdb as duck

from pathlib import Path
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import (
    engine,
    create_engine,
    inspect
)

path_prg = Path(r"../data/Model PRGdia_Full_Definitivo Solution.accdb").absolute()

if not path_prg.exists():
    raise ValueError(f"Path: {path_prg} does not exists.")

connection_string = (
    r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};"
    rf"DBQ={path_prg.as_posix()};"
    r"ExtendedAnsiSQL=1;"
)
connection_url = engine.URL.create(
    "access+pyodbc",
    query={"odbc_connect": connection_string}
)

try:
    prg_engine = create_engine(connection_url)
    tables = inspect(prg_engine).get_table_names()

    conn = duck.connect("PCP.duckdb")
    #conn.execute("CREATE SCHEMA IF NOT EXISTS bronze")

    for table in tables:
        print(f"trabajando en tabla: {table}...")
        df = pl.read_database(query=f"SELECT * FROM {table}", connection=prg_engine)
        conn.execute(f"CREATE OR REPLACE TABLE {table} AS SELECT * FROM df")

except SQLAlchemyError as e:
    print(f"Error: {e}")

finally:
    conn.close()
    prg_engine.dispose()

## REVISIÓN DUCKDB
Con la data carga en la base de datos, empezamos a mirar como armar la mejor query

In [None]:
# Esto es para carga la extensión y leer la base de datos.
import duckdb

conn_pcp = duckdb.connect("pcp.duckdb")

# load de la extensión para sql
%load_ext sql
%sql conn_pcp --alias duck

In [None]:
# Esto es para cerrar las conexiones, usarlo al terminar de revisar
%sql --close duck
conn.close()

Clasica query para sacar la generación, levemente modificada para sacar las otras propiedades de una, no es necesario tener una query por dato.

In [None]:
%%sql
SELECT 
    t_child.name AS generator,
    t_property.name AS property,
    t_period_0.datetime,
    t_data_0.key_id AS data_key,
    t_data_0.period_id AS data_period,
    t_data_0.value,
FROM ((((((((t_membership
INNER JOIN t_collection ON t_membership.collection_id = t_collection.collection_id)
INNER JOIN t_object AS t_parent ON t_membership.parent_object_id = t_parent.object_id)
INNER JOIN t_object AS t_child ON t_membership.child_object_id = t_child.object_id)
INNER JOIN t_property ON t_collection.collection_id = t_property.collection_id)
INNER JOIN t_key ON t_membership.membership_id = t_key.membership_id AND t_property.property_id = t_key.property_id)
INNER JOIN t_data_0 ON t_key.key_id = t_data_0.key_id)
INNER JOIN t_phase_3 ON t_data_0.period_id = t_phase_3.period_id)
INNER JOIN t_period_0 ON t_phase_3.interval_id = t_period_0.interval_id)
INNER JOIN t_category ON t_child.category_id = t_category.category_id
WHERE t_collection.collection_id = 1 AND t_property.property_id IN (1, 28, 200, 219) AND t_category.category_id IN (95, 96, 99, 100)

Similar a la anterior, una query para sacar los datos con marginales negativos, no se necesita el resto.

In [None]:
%%sql
SELECT 
    t_child.name AS node,
    t_period_0.datetime,
    t_data_0.key_id AS data_key,
    t_data_0.period_id AS data_period,
    t_data_0.value AS marginal_cost,
FROM ((((((((t_membership
INNER JOIN t_collection ON t_membership.collection_id = t_collection.collection_id)
INNER JOIN t_object AS t_parent ON t_membership.parent_object_id = t_parent.object_id)
INNER JOIN t_object AS t_child ON t_membership.child_object_id = t_child.object_id)
INNER JOIN t_property ON t_collection.collection_id = t_property.collection_id)
INNER JOIN t_key ON t_membership.membership_id = t_key.membership_id AND t_property.property_id = t_key.property_id)
INNER JOIN t_data_0 ON t_key.key_id = t_data_0.key_id)
INNER JOIN t_phase_3 ON t_data_0.period_id = t_phase_3.period_id)
INNER JOIN t_period_0 ON t_phase_3.interval_id = t_period_0.interval_id)
INNER JOIN t_category ON t_child.category_id = t_category.category_id
WHERE t_collection.collection_id = 245 AND t_property.property_id = 1233 AND t_data_0.value < 0

Query bonita con CTE para extraer la relación entre barra y generador. Lamentablemente no hay CTE en MSACCESS por lo que se reformula en el .sql

In [None]:
%%sql
WITH node_obj AS (
    SELECT 
        t_object.object_id AS node_id,
        t_object.name AS node,
    FROM t_object
    INNER JOIN t_class ON t_object.class_id = t_class.class_id
    WHERE t_class.name = 'Node'
), gen_obj AS (
    SELECT 
        t_object.object_id AS gen_id,
        t_object.name AS generator,
    FROM t_object
    INNER JOIN t_class ON t_object.class_id = t_class.class_id
    WHERE t_class.name = 'Generator' AND t_object.category_id IN (95, 96, 99, 100)
)

SELECT
    node_obj.node,
    gen_obj.generator,
FROM t_membership
INNER JOIN node_obj ON t_membership.child_object_id = node_obj.node_id
INNER JOIN gen_obj ON t_membership.parent_object_id = gen_obj.gen_id
WHERE t_membership.collection_id = 12

## Juntando la información
Con el trabajo de armar los SQL, ahora se pasa a solo usar polars para disminuir la necesidad de otra libreria `DuckDB` (por mucho que me guste esta db).

In [None]:
import polars as pl

from pathlib import Path
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import (
    engine,
    create_engine,
)

# Lectura de SQL Barra-Generador
path_sql_node = Path(r"../poc_prorrataerv/sql/gen_node.sql").absolute()
with open(path_sql_node, "r") as file:
    sql_node = file.read()

# Lectura de SQL con data de generacion
path_sql_gen = Path(r"../poc_prorrataerv/sql/gen_data.sql").absolute()
with open(path_sql_gen, "r") as file:
    sql_gen = file.read()

# lectura de SQL con data de barras con costos marginales menor a 0
path_sql_cmg = Path(r"../poc_prorrataerv/sql/cmg_data.sql").absolute()
with open(path_sql_cmg, "r") as file:
    sql_cmg = file.read()

# Inicio de captura de datos en dataframes
path_prg = Path(r"../data/Model PRGdia_Full_Definitivo Solution.accdb").absolute()

if not path_prg.exists():
    raise ValueError(f"Path: {path_prg} does not exists.")

connection_string = (
    r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};"
    rf"DBQ={path_prg.as_posix()};"
    r"ExtendedAnsiSQL=1;"
)
connection_url = engine.URL.create(
    "access+pyodbc",
    query={"odbc_connect": connection_string}
)

try:
    prg_engine = create_engine(connection_url)

    df_nodes = pl.read_database(query=sql_node, connection=prg_engine)
    df_gen = pl.read_database(query=sql_gen, connection=prg_engine)
    df_cmg = pl.read_database(query=sql_cmg, connection=prg_engine)

except SQLAlchemyError as e:
    print(f"Error: {e}")

finally:
    prg_engine.dispose()


In [None]:
# Lectura de otros datos pmgd
path_pmgd = Path(r"W:/41 Dpto Pronosticos/Vertimiento_ERNC/Lista_PMGDs.xlsx").absolute()
df_pmgd = pl.read_excel(
    source=path_pmgd,
    sheet_name="Hoja1",
    xlsx2csv_options={"skip_empty_lines": True},
    read_csv_options={"new_columns": ["Nombre_CDC","Centrales"]},
)

# lectura de lista de centrales vetadas
path_vetados = Path(r"R:/Aplicaciones/Prorrateo_Vertimiento/Centrales_Vetadas.xlsx").absolute()
df_vetados = pl.read_excel(
    source=path_vetados,
    sheet_name="Hoja1",
    xlsx2csv_options={"skip_empty_lines": True},
    read_csv_options={"new_columns": ["Centrales"]},
)

In [None]:
# Check errores de curtailement por centrales no pintadas en rojo.
(
    df_gen
    .filter(
        pl.col("generator").is_in(
            pl.concat(
                [df_vetados["Centrales"].unique(),
                 df_pmgd["Centrales"].unique()]
            )
        )
    )
    .pivot(
        values="value",
        columns="property",
        index=["generator", "datetime"]
    )
    .filter(
        pl.col("Units Generating") == 1,
        pl.col("Capacity Curtailed") != 0,
    )
    .select(
        pl.exclude("Units Generating")
    )
    .group_by(pl.col("generator").alias("Generator"))
    .agg(pl.col("Capacity Curtailed").sum().alias("Total Capacity Curtailed"))
)

In [None]:
# transformación de data para generación, eliminando centrales vetadas y pmgd
# y pivotear la data para tener las columnas de las propiedades
# y filtrar las centrales que no estan generando
df_gen_pivot = (
    df_gen
    .filter(
        ~pl.col("generator").is_in(df_vetados["Centrales"].unique()),
        ~pl.col("generator").is_in(df_pmgd["Centrales"].unique()),
    )
    .pivot(
        values="value",
        columns="property",
        index=["generator", "datetime"]
    )
    .filter(
        pl.col("Units Generating") == 1,
    )
    .select(
        pl.exclude("Units Generating")
    )
)
df_gen_pivot

In [None]:
def calc_error(df: pl.LazyFrame, original_col: str = "Generation", prorrata_col: str = "Prorrata", over_col: str = "datetime") -> pl.LazyFrame:
    return (
        df
        .with_columns(
            pl.when(pl.col("Prorrata").lt(0))
            .then(pl.col("Prorrata").abs())
            .otherwise(0)
            .alias("Error"),
            pl.when(pl.col("Prorrata").lt(0))
            .then(0)
            .otherwise(pl.col("Prorrata"))
            .alias("Prorrata"),
        )
    )

def check_error(df: pl.LazyFrame, error_col: str = "Error", tol: float = 1e-3) -> bool:
    return df.select(pl.col(error_col).ge(tol).any()).collect().item()

def show_total_error(df: pl.LazyFrame, error_col: str = "Error") -> float:
    return df.select(pl.col(error_col).sum().alias("Total Error")).collect().item()

def calc_prorrata(df: pl.LazyFrame, target_col: str = "Prorrata", error_col: str = "Error", weight_col: str = "Max Capacity", over_col: str = "datetime") -> pl.LazyFrame:
    return (
        df
        .with_columns(
            (pl.col(target_col) - pl.col(error_col).sum().over(over_col) * pl.col(weight_col) / pl.col(weight_col).sum().over(over_col)).alias("Prorrata"),
        )
    )

In [None]:
def process_prorrata(df: pl.LazyFrame, target_col: str = "Prorrata", error_col: str = "Error", weight_col: str = "Max Capacity", over_col: str = "datetime") -> pl.LazyFrame:
    df_processed = calc_prorrata(df,target_col,error_col)
    df_processed = calc_error(df_processed)

    print(check_error(df_processed))
    print(show_total_error(df_processed))

    if check_error(df_processed):
        return process_prorrata(df_processed)
    
    return df_processed

In [None]:
test_data = (
    df_cmg
    .join(df_nodes, on="node", how="inner")
    .join(df_gen_pivot, on=["generator","datetime"], how="inner")
)

In [None]:
test_prorrata_func = process_prorrata(test_data.lazy(),"Available Capacity","Capacity Curtailed")

In [None]:
test_prorrata_func.collect().describe()

In [None]:
(
    test_prorrata_func
    .sort(by="datetime")
    .group_by("datetime")
    .agg(
        #pl.col("Prorrata_Curt").sum().alias("Total_Curatiled"),
        pl.col("Generation").sum().alias("Total_Gen"),
        pl.col("Prorrata").sum().alias("Total_Gen_Prorrata"),
        #pl.col("Prorrata").min().alias("Min_Gen_Prorrata"),
        #pl.col("Prorrata").filter(pl.col("Prorrata").lt(0)).sum().alias("Sum_Gen_Prorrata"),
        pl.col("Error").sum().alias("Total_Error"),
        (pl.col("Prorrata") - pl.col("Error")).sum().alias("Sum_Prorrata_error"),
        (pl.col("Generation") - (pl.col("Prorrata") - pl.col("Error"))).sum().alias("Test_total"),
    )
).collect()

In [None]:
test_prorrata_func.collect()

In [None]:
df_gen

In [None]:
data_to_update = (
    test_prorrata_func
    .join(
        df_gen.filter(pl.col("property") == "Generation").lazy(),
        on=["generator","datetime"],
        how="inner"
    )
    .select(
        #pl.col("generator"),
        #pl.col("datetime"),
        #pl.col("Generation"),
        #pl.col("value").alias("Generation_Ori"),
        pl.col("data_key").alias("key_id"),
        pl.col("data_period").alias("period_id"),
        pl.col("Prorrata").alias("value"),
    )
    .sort(by=["key_id","period_id"])
    .collect()
)
data_to_update

In [None]:
test_dict = data_to_update.head().to_dicts()
for row in test_dict:
    print(row)

In [None]:
from sqlalchemy import (
    engine,
    create_engine,
    inspect,
)
from sqlalchemy.sql.expression import (
    update,
    table,
    column,
)

from pathlib import Path

import polars as pl

df_data = pl.read_csv("../t_data_0.csv")

t_data_0 = table("t_data_0", column("key_id"), column("period_id"), column("value"))
#stmt = update(t_data_0).where(t_data_0.c.key_id == 1).values(value=0.0)

# Inicio de captura de datos en dataframes
path_prg = Path(r"../data/Model PRGdia_Full_Definitivo Solution_neto.accdb").absolute()

if not path_prg.exists():
    raise ValueError(f"Path: {path_prg} does not exists.")

connection_string = (
    r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};"
    rf"DBQ={path_prg.as_posix()};"
    r"ExtendedAnsiSQL=1;"
)
connection_url = engine.URL.create(
    "access+pyodbc",
    query={"odbc_connect": connection_string}
)

prg_engine = create_engine(connection_url)
tables = inspect(prg_engine).get_table_names()

with prg_engine.begin() as conn:
    for row in df_data.head().to_dicts():
        stmt = (
            t_data_0.update()
            .where(t_data_0.c.key_id == row.get("key_id"))
            .where(t_data_0.c.period_id == row.get("period_id"))
            .values(value=row.get("value"))
        )
        restuls = conn.execute(stmt)
        print(restuls.)
    #conn.commit()


In [None]:
import pyodbc
import polars as pl
from pathlib import Path

df_data = pl.read_csv("../t_data_0.csv")
to_update = df_data.head().to_dicts()


path_prg = Path(r"../data/Model PRGdia_Full_Definitivo Solution_neto.accdb").absolute()

connection_string = (
    r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};"
    rf"DBQ={path_prg.as_posix()};"
    r"ExtendedAnsiSQL=1;"
)

cnxn = pyodbc.connect(connection_string)
crsr = cnxn.cursor()

#sql = "SELECT TOP 10 * FROM t_data_0 "
#update_sql = r"UPDATE t_data_0 SET t_data_0.value = ? WHERE t_data_0.key_id = ? AND t_data_0.period_id = ?;"
#params = [(data['value'],data['key_id'],data['period_id']) for data in to_update]
#crsr.execute(update_sql, params)
#crsr.commit()
#cnxn.close()

try:
    cnxn.autocommit = False
    params = params = [(data['value'],data['key_id'],data['period_id']) for data in to_update]
    crsr.executemany("UPDATE t_data_0 SET t_data_0.value = ? WHERE t_data_0.key_id = ? AND t_data_0.period_id = ?;", params)
except pyodbc.DatabaseError as err:
    cnxn.rollback()
else:
    cnxn.commit()
finally:
    cnxn.close()

In [None]:
[(data['value'],data['key_id'],data['period_id']) for data in to_update]

In [None]:
df_data.head().to_dict(as_series=False)['key_id']

In [None]:
from pathlib import Path
path_prg = Path(r"../data/Model PRGdia_Full_Definitivo Solution_neto.accdb").absolute()

connection_string = (
    r"DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};"
    rf"DBQ={path_prg.as_posix()};"
    r"ExtendedAnsiSQL=1;"
)
connection_string

## Cambio de criterio
Ajutando código para cambiar como se calcula.

In [None]:
from pathlib import Path
from prorrata.extract import DataExtractor


path_prg = r"C:\Users\felipe.bastidas\PyProyectos\Test_data\PRG20240214"
PATH_PMGD_EXCLUDE =  r"C:\Users\felipe.bastidas\PyProyectos\Test_data\Otros\Lista_PMGDs.xlsx"
PATH_BANNED_GENERATORS = r"C:\Users\felipe.bastidas\PyProyectos\Test_data\Otros\Centrales_Vetadas.xlsx"
PATH_ACCDB_INPUT = r"Datos/Model PRGdia_Full_Definitivo Solution/Model PRGdia_Full_Definitivo Solution.accdb"

data = DataExtractor(
    Path(path_prg).joinpath(PATH_ACCDB_INPUT),
    Path(PATH_PMGD_EXCLUDE),
    Path(PATH_BANNED_GENERATORS)
)

data.extract_data()

In [None]:
import polars as pl
from prorrata.transform_cdc import DataProcessor as CDC
from prorrata.transform import DataProcessor as ORI

procesor_cdc = CDC.from_extractor(data)
procesor_cdc.process_prorrata()

procesor_ori = ORI(data)
procesor_ori.process_prorrata()

In [None]:
compare_data = (
    procesor_cdc.data.select(["node","datetime","generator","Generation","Max Capacity", "Available Capacity", "Prorrata"])
    .join(
        procesor_ori.data.select(["node","datetime","generator","Prorrata"]),
        on=["node","datetime","generator"],
        how="inner"
    )
    .rename({"Prorrata": "Prorrata_cdc", "Prorrata_right": "Prorrata_ori"})
    .collect()
)
#compare_data

In [None]:
from datetime import datetime
(
    compare_data
    .with_columns(
        (pl.col("Available Capacity") / pl.col("Max Capacity")).alias("Porc. Available"),
        (pl.col("Prorrata_ori") / pl.col("Max Capacity")).alias("Porc. ori"),
        (pl.col("Prorrata_cdc") / pl.col("Max Capacity")).alias("Porc. cdc"),
        
    )
    #.filter(pl.col("generator").eq("CHAPIQUINA"))
    #.filter(pl.col("datetime").eq(datetime(2024,2,14,10)))
    #.filter(pl.col("Prorrata_ori").lt(pl.col("Prorrata_cdc")))
    .with_columns(
        pl.when(pl.col("Porc. ori").gt(pl.col("Porc. cdc"))).then(True).otherwise(False).alias("Test"),

    )
    #.filter(pl.col("Max Capacity").lt(pl.col("Available Capacity").max().over("generator")))
    #.filter(pl.col("Available Capacity").max())
    #.sort("datetime")
    #.head(30)
    .write_csv("compare_data.csv", datetime_format="%Y-%m-%d %T")
)

In [None]:
(
    procesor_cdc.data
    #.filter(pl.col("generator").eq("CHAPIQUINA"))
    .group_by("datetime")
    .agg(
        Sum_Gen = pl.sum("Generation"),
        Sum_Available = pl.sum("Available Capacity"), 
        Sum_Prorrata = pl.sum("Prorrata"),
        Sum_Error = pl.sum("Error"),
        Error = pl.sum("Prorrata") - pl.sum("Generation")
    )
    .sort("datetime")
    .collect()
)

In [None]:
(
    procesor_ori.data
    #.filter(pl.col("generator").eq("CHAPIQUINA"))
    .group_by("datetime")
    .agg(
        Sum_Gen = pl.sum("Generation"),
        Sum_Available = pl.sum("Available Capacity"), 
        Sum_Prorrata = pl.sum("Prorrata"),
        Sum_Error = pl.sum("Error"),
        Error = pl.sum("Prorrata") - pl.sum("Generation")
    )
    .sort("datetime")
    .collect()
)

In [None]:
(
    procesor_ori.data
    .filter(
        pl.col("Available Capacity").gt(pl.col("Max Capacity"))
    )
    .select(
        pl.col("generator").unique(),
    )
    .collect()
)

# PRUEBA DE CASOS ERROR

Que pasa si no hay cmg menor a 0? o no hay curtailment? ... armar caso apra esto.

In [1]:
from pathlib import Path
from prorrata.extract import DataExtractor


path_prg = r"C:\Users\felipe.bastidas\PyProyectos\Test_data\PRG20240214"
PATH_PMGD_EXCLUDE =  r"C:\Users\felipe.bastidas\PyProyectos\Test_data\Otros\Lista_PMGDs.xlsx"
PATH_BANNED_GENERATORS = r"C:\Users\felipe.bastidas\PyProyectos\Test_data\Otros\Centrales_Vetadas.xlsx"
PATH_ACCDB_INPUT = r"Datos/Model PRGdia_Full_Definitivo Solution/Model PRGdia_Full_Definitivo Solution.accdb"

data = DataExtractor(
    Path(path_prg).joinpath(PATH_ACCDB_INPUT),
    Path(PATH_PMGD_EXCLUDE),
    Path(PATH_BANNED_GENERATORS)
)

data.extract_data()

In [2]:
data.check_curtailment()

True

In [None]:
import polars as pl

(
    data.gen
    .filter(pl.col("property").eq("Capacity Curtailed"))
    .select(pl.col("value").sum())
)

In [None]:
from prorrata.transform import DataProcessor

row, _ = data.cmg.shape

if row == 0:
    raise ValueError("No hay curtailment")

data_processor = DataProcessor(data)
data_processor.process_prorrata()