#Bienvenidos al Demo del Pipeline de Cálculo Automático de Balances de Energía EONTEC

##Introduccion

Los balances de Energía se calculan con un programa de despachos y la información real de despacho de los generadores, la primera se obtiene de XM el ente regulador del mercado de energía de Colombia y el segundo, de los reportes periódicos que saca el generador, generalmente son diarios.

Tambien se averigua la diferencia del plan vs lo real, y se puede obtener el valor de la energía generada real, vs la remanente que hace falta, y que hay que cubrir con contratos de energía. 

*"Si me sobra vendo la Energía, si me falta, la tengo que comprar"*

En este proceso la **ETL** se encargará de extraer la información de las fuentes mencionadas arriba, de forma remota, posteriormente, la transformará a un formato manejable, la cruzará y filtrará, y finalmente la depositará o la disponibilizará para descarga en un destino para que los analistas o el regulador la usen.

###Fuentes

- Descarga Archivo de despacho horario de las centrales de energia consolidado
- Consumo de API de despacho programado para la fecha del ente regulador.
- Consumo de API de Precio de la energia por kWh para el día asignado

###Transformaciones

- Cambio de formato de Excel a Spark Columnar
- Cambio de formato de JSON a Spark Columnar de datos de las API.
- Combinación de datos utilizando SQL.
- Agregación, cálculos de los balances por unidad de generación (por planta)
- Identificación de operación: Compra o Venta.

###Transferencias o Cargas
- Carga por web a Bucket o Carpeta en la nube del regulador


### Preparación de prerequisitos

En esta sección se instalan los paquetes, librerías y componentes que le permitirán al pipeline realizar sus procesos internos.

In [0]:
%sh
pip install openpyxl xlrd pydrive

You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-db88405a-f329-4f85-9ed2-1ce62635bdeb/bin/python -m pip install --upgrade pip' command.


## Declaracion de Librerías y Variables

En estas dos celdas importamos las librerías de nuestro lenguaje (Python) y algunos frameworks como Pyspark y Pandas y definimos las variables:

- Fecha de inicio del balance
- Fecha fin del balance
- Id del archivo publico de despacho de generacion


In [0]:
import requests
import json
from pyspark.sql.functions import flatten,col,explode
import pandas as pd
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import os

In [0]:
fecha_inicio="2024-06-09"
fecha_fin="2024-06-20"
id_archivo_balance_drive=""
api_key_1=""
api_key_2=""


# (E) Fuentes

### (*E*) Extraccion del API de Precios de la Energía

Aquí descargamos del API los datos en formato JSON, usamos una librería nativa de Python llamada Requests y con ella consumimos el api de la URL, y almacenamos el resultado en una estructura de datos denominada un dataframe, que servirá para procesamiento posterior.

In [0]:
url = f"https://www.simem.co/backend-files/api/PublicData?startDate={fecha_inicio}&endDate={fecha_fin}&datasetId=96D56E"

response = requests.get(url)
dfPreciosBolsa=spark.read.json(sc.parallelize([response.text]))
        

### (*E*) Extraccion del Plan de Despacho de Energía por Unidad y Central

Aqui descargamos del API los datos del despacho en formato JSON, usamos una librería nativa de Python llamada Requests y con ella consumimos el api de la URL, y almacenamos el resultado en una estructura de datos denominada un dataframe, que servirá para procesamiento posterior.

In [0]:
url = f"https://www.simem.co/backend-files/api/PublicData?startdate={fecha_inicio}&enddate={fecha_fin}&datasetId=ff027b"
response = requests.get(url)
dfDespachosUnidades=spark.read.json(sc.parallelize([response.text]))
    

### (*E*) Descarga del archivo con nuestros datos reales de generación

En estas 2 celdas, descargamos de Google Drive nuestro archivo de consolidado de generación, igual que antes, usamos Requests y con ella descargamos el archivo de la URL, en este caso tenemos que traer el archivo a una ubicación local, y posteriormente lo cargamos con Pandas, un framework de procesamiento de datos de python para analítica, y lo cargamos como un dataframe de Pandas.

In [0]:

download_url = f'https://drive.google.com/uc?export=download&id={id_archivo_balance_drive}'
local_file_path = '/tmp/balances3.xlsx'
response = requests.get(download_url)
with open(local_file_path, 'wb') as file:
    file.write(response.content)

In [0]:

dfArchivoCapacidad = pd.read_excel(local_file_path,engine='openpyxl')
display(dfArchivoCapacidad)

  Expected bytes, got a 'datetime.datetime' object
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


Unnamed: 0,Unnamed: 1,Unnamed: 2,Unnamed: 3,Unnamed: 4,Unnamed: 5
,Programa de Generación por Planta,,,,
,Empresa Generadora ACME S.A. E.S.P.,,,,
,Despacho Real 10/07/2024,,,,
,,,,,
,FECHA,PLANTA,GENERADOR,CAPACIDAD (Kwh),CODIGO
,"java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id=""Etc/UTC"",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2024,MONTH=6,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=10,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=0,HOUR_OF_DAY=0,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]",TERMOZIPA,2,33000,ZPA2
,"java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id=""Etc/UTC"",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2024,MONTH=6,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=10,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=1,HOUR_OF_DAY=1,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]",TERMOZIPA,2,33000,ZPA2
,"java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id=""Etc/UTC"",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2024,MONTH=6,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=10,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=2,HOUR_OF_DAY=2,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]",TERMOZIPA,2,33000,ZPA2
,"java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id=""Etc/UTC"",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2024,MONTH=6,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=10,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=3,HOUR_OF_DAY=3,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]",TERMOZIPA,2,33000,ZPA2
,"java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id=""Etc/UTC"",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2024,MONTH=6,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=10,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=4,HOUR_OF_DAY=4,MINUTE=0,SECOND=0,MILLISECOND=0,ZONE_OFFSET=?,DST_OFFSET=?]",TERMOZIPA,2,20834,ZPA2


# (T) Transformaciones


## Filtrado de Columnas

Nuestras fuentes, vienen en un estado crudo, o "raw", y hay datos, que no nos servirán para nuestro propósito, por lo cual, la primera transformación será quitar, o filtrar, los atributos que no se necesitan.

### (*T*) Filtrado de los atributos que traen el precio de bolsa


La documentación del API nos muestra que trae muchas columnas de información, denominados metadatos, necesitamos puntualmente los datos válidos para nuestro análisis. 

```
CodigoDuracion:string
CodigoPlanta:string
FechaHora:string
Valor:long
```

En estas 3 celdas, transformamos el formato utilizando funciones como aplanar o explotar, colocando alias y realizando un filtro o un select.


In [0]:
dfPreciosBolsaFiltrado=dfPreciosBolsa.select(explode(col("result.records")).alias("results")).select(col("results.*"))

Esta celda sirve para mostrar los resultados del dataframe del Precio de Bolsa Filtrado

In [0]:
display(dfPreciosBolsaFiltrado)

CodigoDuracion,CodigoVariable,Fecha,UnidadMedida,Valor,Version
P1D,PPBOGReal,2024-06-20,COP/kWh,308.2145,TX1
P1D,PPBOGReal,2024-06-20,COP/kWh,322.0811,TXF
P1D,PPBO,2024-06-20,COP/kWh,316.4945,TXF
P1D,PPBOGReal,2024-06-20,COP/kWh,322.0841,TXR
P1D,PPBO,2024-06-20,COP/kWh,316.4859,TXR
P1D,PPBOGReal,2024-06-20,COP/kWh,308.2125,TX2
P1D,PPBO,2024-06-20,COP/kWh,302.7103,TX2
P1D,PPBOGReal,2024-06-19,COP/kWh,304.6868,TX1
P1D,PPBO,2024-06-19,COP/kWh,310.2853,TXR
P1D,PPBOGReal,2024-06-19,COP/kWh,318.4008,TXR


Databricks visualization. Run in Databricks to view.

Esta celda registra el dataframe como una tabla o vista temporal **PrecioBolsa**, a la que podremos hacerle consultas SQL.

In [0]:
dfPreciosBolsaFiltrado.createOrReplaceTempView("PreciosBolsa")

### (*T*) Filtrado de los atributos que traen el despacho de las plantas

Al igual que los precios de bolsa, el API de los despachos trae datos excesivos, que no necesitamos, vamos a extraer los datos de utilidad:


```
CodigoDuracion:string
CodigoPlanta:string
FechaHora:string
Valor:long
```
Al igual que en el caso anterior, en las próximas 3 celdas aplanamos estructuras, renombramos con un alias y filtramos con una selección.

In [0]:
dfResultsDespachosUnidades=dfDespachosUnidades.select(explode(col("result.records")).alias("results")).select(col("results.*"))

Celda para mostrar los resultados del Despacho de las Unidades de Generación

In [0]:
display(dfResultsDespachosUnidades)

CodigoDuracion,CodigoPlanta,FechaHora,Valor
PT1H,2SI1,2024-06-20 13:00:00,15000.0
PT1H,2SDR,2024-06-20 13:00:00,9800.0
PT1H,2S9Q,2024-06-20 13:00:00,0.0
PT1H,2S9L,2024-06-20 13:00:00,450.0
PT1H,2S8U,2024-06-20 13:00:00,1500.0
PT1H,2S8S,2024-06-20 13:00:00,250.0
PT1H,2S8N,2024-06-20 13:00:00,5000.0
PT1H,2S8I,2024-06-20 13:00:00,7500.0
PT1H,2S78,2024-06-20 13:00:00,290.0
PT1H,2S6U,2024-06-20 13:00:00,6500.0


Celda donde registramos el dataframe con los datos descargados de los despachos en la tabla **Despachos**.

In [0]:
dfResultsDespachosUnidades.createOrReplaceTempView("Despachos")

### (*T*) Filtrado de Plantas

El despacho trae todas las plantas, de nuevo, tenemos que filtrar, pero ahora solo debemos traer las que nos interesan, por ejemplo Zipaquirá, Guavio, Quimbo y Chivor, de la Empresa de Energía ACME. Lo haremos con SQL.

Luego guardaremos ese filtro en otro data frame, para delimitar el set de datos para calcular el balance con las plantas que nos importan.

Primero diseñamos el SQL para filtrar con un condicional, las plantas que nos interesan con una sentencia **IN**:

In [0]:
%sql
SELECT * FROM Despachos WHERE CodigoPlanta IN ('ZPA2','ZPA3','ZPA4','ZPA5','GVIO','QUI1','CHVR') order by Valor,FechaHora DESC

CodigoDuracion,CodigoPlanta,FechaHora,Valor
PT1H,CHVR,2024-06-20 23:00:00,0.0
PT1H,CHVR,2024-06-20 22:00:00,0.0
PT1H,CHVR,2024-06-20 21:00:00,0.0
PT1H,CHVR,2024-06-20 20:00:00,0.0
PT1H,CHVR,2024-06-20 19:00:00,0.0
PT1H,CHVR,2024-06-20 18:00:00,0.0
PT1H,CHVR,2024-06-20 17:00:00,0.0
PT1H,CHVR,2024-06-20 16:00:00,0.0
PT1H,CHVR,2024-06-20 15:00:00,0.0
PT1H,CHVR,2024-06-20 14:00:00,0.0


La query resultante, la agregamos a Spark, para poderla trabajar con los otros datos en un dataframe llamado **dfDespachosAcme**.

In [0]:
dfDespachosAcme=spark.sql("""
                          SELECT * FROM Despachos WHERE CodigoPlanta IN ('ZPA2','ZPA3','ZPA4','ZPA5','GVIO','QUI1','CHVR') order by Valor,FechaHora DESC
                          """)

Con esta celda desplegamos los resultados de los despachos filtrados por planta.

In [0]:
display(dfDespachosAcme)

CodigoDuracion,CodigoPlanta,FechaHora,Valor
PT1H,CHVR,2024-06-20 23:00:00,0.0
PT1H,CHVR,2024-06-20 22:00:00,0.0
PT1H,CHVR,2024-06-20 21:00:00,0.0
PT1H,CHVR,2024-06-20 20:00:00,0.0
PT1H,CHVR,2024-06-20 19:00:00,0.0
PT1H,CHVR,2024-06-20 18:00:00,0.0
PT1H,CHVR,2024-06-20 17:00:00,0.0
PT1H,CHVR,2024-06-20 16:00:00,0.0
PT1H,CHVR,2024-06-20 15:00:00,0.0
PT1H,CHVR,2024-06-20 14:00:00,0.0


Registramos los despachos filtrados por planta, en la tabla **dfCapacidadDespachosAcme**

In [0]:
dfDespachosAcme.createOrReplaceTempView("dfCapacidadDespachosAcme")

### (*T*) Cambio de Formato Interno

Ahora, **cambiaremos** el archivo de excel con nuestra capacidad, que fue cargado como una estructura de **Pandas, a una estructura de Spark**, para poder usar SQL con los datos descargados desde XM.

Para transformarlos, **aplicaremos un esquema, que aplicará unos tipos de dato, y les dará nombres a las columnas o atributos**.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schemaArchivo = StructType([
    StructField("useless", StringType(), True),
    StructField("fecha", StringType(), True),
    StructField("planta", StringType(), True),
    StructField("generador", StringType(), True),
    StructField("capacidad", StringType(), True),
    StructField("codigo", StringType(), True)
])
dfTransformadoCapacidad = spark.createDataFrame(dfArchivoCapacidad,schema=schemaArchivo)
dfTransformadoCapacidad.createOrReplaceTempView("CapacidadArchivo")

### (*T*) Vamos a extraer las fechas del archivo de capacidades y quitar con un filtro las celdas que no nos sirven

El archivo de Excel **trae muchos datos innecesarios**, debemos **quitar esas celdas**, y extraer las fechas para poder utilizar esos datos del archivo con los datos del ente regulador (xm), eso lo haremos con SQL con algunas técnicas:

- Expresiones regulares.
- Sustituciones de texto.
- Condicionales.

Y posteriormente lo volvemos a dejar en un dataframe listo para utilizar.

Primero, filtramos con SQL, las celdas que no son vacías o nulas, donde el generador no esté vacío, y omitimos otros caracteres, de paso extraemos através de expresiones regulares, el día, el mes, el año y la hora.

In [0]:
%sql
SELECT  b.anio,b.mes,b.dia,b.hora, b.codigo, b.capacidad  FROM 
(SELECT SUBSTR(A.FECHA, instr(A.fecha,"YEAR")+5,4) AS anio, 
regexp_replace(SUBSTR(A.FECHA, instr(A.fecha,"MONTH")+6,2),',','') AS mes,
regexp_replace(SUBSTR(A.FECHA, instr(A.fecha,"DAY_OF_MONTH")+13,2),',','') AS dia,
regexp_replace(SUBSTR(A.FECHA, instr(A.fecha,"HOUR_OF_DAY")+12,2),',','') AS hora,
* FROM CapacidadArchivo a WHERE generador IS NOT NULL AND generador != "NaN" AND generador != "GENERADOR") b

anio,mes,dia,hora,codigo,capacidad
2024,6,10,0,ZPA2,33000
2024,6,10,1,ZPA2,33000
2024,6,10,2,ZPA2,33000
2024,6,10,3,ZPA2,33000
2024,6,10,4,ZPA2,20834
2024,6,10,5,ZPA2,21626
2024,6,10,6,ZPA2,26030
2024,6,10,7,ZPA2,33000
2024,6,10,8,ZPA2,33000
2024,6,10,9,ZPA2,33000


Una vez validada nuestra query, usamos Spark para crear el dataframe **dfArchivoCapacidadLimpiado**

In [0]:
dfArchivoCapacidadLimpiado=spark.sql("""SELECT  b.anio,b.mes,b.dia,b.hora, b.codigo, b.capacidad  FROM 
(SELECT SUBSTR(A.FECHA, instr(A.fecha,"YEAR")+5,4) AS anio, 
regexp_replace(SUBSTR(A.FECHA, instr(A.fecha,"MONTH")+6,2),',','') AS mes,
regexp_replace(SUBSTR(A.FECHA, instr(A.fecha,"DAY_OF_MONTH")+13,2),',','') AS dia,
regexp_replace(SUBSTR(A.FECHA, instr(A.fecha,"HOUR_OF_DAY")+12,2),',','') AS hora,
* FROM CapacidadArchivo a WHERE generador IS NOT NULL AND generador != "NaN" AND generador != "GENERADOR") b""")

Con el Dataframe, Creamos la tabla temporal **ArchivoCapacidadLimpiado**

In [0]:
dfArchivoCapacidadLimpiado.createOrReplaceTempView("ArchivoCapacidadLimpiado")

Paso siguiente, de la tabla con los despachos programados del ente regulador **dfCapacidadDespachosAcme**, vamos a extraer las columnas o atributos con los que vamos a cruzar la información: día, mes, año, hora y valor, junto al código de la planta.

In [0]:
%sql
SELECT  day(FechaHora) as dia_despacho, month(FechaHora) as mes_despacho, year(FechaHora) as anio_despacho,hour(FechaHora) as hora_despacho, Valor as capacidad, CodigoPlanta as codigo FROM dfCapacidadDespachosAcme

dia_despacho,mes_despacho,anio_despacho,hora_despacho,capacidad,codigo
20,6,2024,23,0.0,CHVR
20,6,2024,22,0.0,CHVR
20,6,2024,21,0.0,CHVR
20,6,2024,20,0.0,CHVR
20,6,2024,19,0.0,CHVR
20,6,2024,18,0.0,CHVR
20,6,2024,17,0.0,CHVR
20,6,2024,16,0.0,CHVR
20,6,2024,15,0.0,CHVR
20,6,2024,14,0.0,CHVR


Guardamos el SQL como un dataframe **dfDespachosAcmeTransformado**

In [0]:
dfDespachosAcmeTransformado=spark.sql("""SELECT  day(FechaHora) as dia_despacho, month(FechaHora) as mes_despacho, year(FechaHora) as anio_despacho,hour(FechaHora) as hora_despacho, Valor as capacidad, CodigoPlanta as codigo FROM dfCapacidadDespachosAcme""")

Y lo registramos como una tabla **DespachosAcmeTransformado**

In [0]:
dfDespachosAcmeTransformado.createOrReplaceTempView("DespachosAcmeTransformado")

### (*T*) Cálculo del balance de energía.

Ya tenemos dos dataframes, uno con los datos del despacho que vienen del ente regulador **DespachosAcmeTransformado** y otro con los datos limpios de nuestro archivo de excel **ArchivoCapacidadLimpiado**, en esta sección, usaremos SQL para restar el despacho programado, de nuestra capacidad por hora, y luego realizamos una sumatoria para sacar el **balance consolidado de energía por planta y unidad**.

Como es usual, primero diseñamos la query en SQL, dentro del parentesis grande la **subquery** une la tabla del despacho del regulador, con el archivo de capacidades, usando el **código, el año, el mes, el día y la hora**.

Posteriormente, convertimos la **capacidad** que viene como texto con **cast** y restamos el valor de la **capacidad reportada por el regulador**, de la **capacidad real de mi archivo**, y eso lo llamamos **balance_disponible_horario**.

En la query externa (fuera del parentesis), sacamos la **Sumatoria con SUM** y ese es el valor **consolidado del día de nuestra planta en Kilowatts Hora**.

In [0]:
%sql
SELECT c.anio, c.mes, c.dia, c.codigo, SUM(c.balance_disponible_horario) AS consolidado_planta
FROM
(
  SELECT a.anio, a.mes, a .dia, a.hora, a.codigo, cast(a.capacidad AS DECIMAL) - cast(b.capacidad as DECIMAL) as balance_disponible_horario FROM ArchivoCapacidadLimpiado a JOIN DespachosAcmeTransformado b
  ON
  a.codigo = b. codigo AND
  a.anio = b.anio_despacho AND
  a.mes = b.mes_despacho AND
  a.dia = b.dia_despacho AND
  a.hora = b.hora_despacho
) c 
GROUP BY c.anio, c.mes, c.dia, c.codigo 



anio,mes,dia,codigo,consolidado_planta
2024,6,10,ZPA2,655706
2024,6,10,ZPA3,-392658
2024,6,10,ZPA4,528688
2024,6,10,GVIO,2070950
2024,6,10,QUI1,3779724
2024,6,10,ZPA5,-483396


Si la query anterior funciona, la guardamos con Spark, en el dataframe **dfBalanceConsolidado** y creamos la tabla **BalanceConsolidado**

In [0]:
dfBalanceConsolidado = spark.sql("""SELECT c.anio, c.mes, c.dia, c.codigo, SUM(c.balance_disponible_horario) AS consolidado_planta
FROM
(
  SELECT a.anio, a.mes, a .dia, a.hora, a.codigo, cast(a.capacidad AS DECIMAL) - cast(b.capacidad as DECIMAL) as balance_disponible_horario FROM ArchivoCapacidadLimpiado a JOIN DespachosAcmeTransformado b
  ON
  a.codigo = b. codigo AND
  a.anio = b.anio_despacho AND
  a.mes = b.mes_despacho AND
  a.dia = b.dia_despacho AND
  a.hora = b.hora_despacho
) c 
GROUP BY c.anio, c.mes, c.dia, c.codigo 
""")
dfBalanceConsolidado.createOrReplaceTempView("BalanceConsolidado")


Ahora con el precio de la bolsa que está en la tabla **Precios Bolsa** vamos a **multiplicar** de la capacidad en el **Balance Consolidado** que acabamos de calcular y eso nos dará el valor de la energía en miles de millones de pesos.

In [0]:
%sql
SELECT c.anio, c.mes, c.dia, c.codigo, c.consolidado_planta, (c.consolidado_planta * b.valor)/1000 as Compromisos_MCOP
FROM BalanceConsolidado c JOIN 
(
SELECT DAY(a.Fecha) as dia,Month(a.Fecha) as mes,Year(a.Fecha) as anio, a.Valor as valor 
FROM PreciosBolsa a 
WHERE a.CodigoVariable ="PPBOGReal" AND Version="TXR"
) b 
ON c.dia = b.dia AND
c.mes = b.mes AND
c.anio = b.anio


anio,mes,dia,codigo,consolidado_planta,Compromisos_MCOP
2024,6,10,ZPA5,-483396,-148353.8940228
2024,6,10,QUI1,3779724,1159994.6497932
2024,6,10,GVIO,2070950,635573.105335
2024,6,10,ZPA4,528688,162253.97711839998
2024,6,10,ZPA3,-392658,-120506.4653394
2024,6,10,ZPA2,655706,201235.7124058


Aqui guardamos lo que hicimos en SQL en el dataframe **BalancesComprasVentasEnergía**

In [0]:
dfBalancesComprasVentasEnergia=spark.sql("""
          SELECT c.anio, c.mes, c.dia, c.codigo, c.consolidado_planta, (c.consolidado_planta * b.valor)/1000 as Compromisos_MCOP
FROM BalanceConsolidado c JOIN 
(
SELECT DAY(a.Fecha) as dia,Month(a.Fecha) as mes,Year(a.Fecha) as anio, a.Valor as valor 
FROM PreciosBolsa a 
WHERE a.CodigoVariable ="PPBOGReal" AND Version="TXR"
) b 
ON c.dia = b.dia AND
c.mes = b.mes AND
c.anio = b.anio
          """)
dfBalancesComprasVentasEnergia.createOrReplaceTempView("BalancesComprasVentasEnergia")

Finalmente, para crear un reporte que vamos a enviar al regulador, vamos a agregar una columna **Operación**, que consistirá en evaluar, si el **compromiso es negativo**, debemos comprar energía (la planta no genera lo planeado y hay que ir a bolsa), si el **compromiso es positivo**, tenemos un excedente, podemos vender esa energía en bolsa.

In [0]:
%sql
SELECT a.*, CASE WHEN a.Compromisos_MCOP < 0 THEN "Comprar" ELSE "Vender" END as Operacion FROM BalancesComprasVentasEnergia a

anio,mes,dia,codigo,consolidado_planta,Compromisos_MCOP,Operacion
2024,6,10,ZPA5,-483396,-148353.8940228,Comprar
2024,6,10,QUI1,3779724,1159994.6497932,Vender
2024,6,10,GVIO,2070950,635573.105335,Vender
2024,6,10,ZPA4,528688,162253.97711839998,Vender
2024,6,10,ZPA3,-392658,-120506.4653394,Comprar
2024,6,10,ZPA2,655706,201235.7124058,Vender


Preparamos el Data Frame de Salida **dfReporteCompraVentaEnergiaAcme** para poderlo enviar en nuestra etapa **L** al destino

In [0]:
dfReporteCompraVentaEnergiaAcme=spark.sql("""SELECT a.*, CASE WHEN a.Compromisos_MCOP < 0 THEN "Comprar" ELSE "Vender" END as Operacion FROM BalancesComprasVentasEnergia a """)

In [0]:
display(dfReporteCompraVentaEnergiaAcme)

anio,mes,dia,codigo,consolidado_planta,Compromisos_MCOP,Operacion
2024,6,10,ZPA5,-483396,-148353.8940228,Comprar
2024,6,10,QUI1,3779724,1159994.6497932,Vender
2024,6,10,GVIO,2070950,635573.105335,Vender
2024,6,10,ZPA4,528688,162253.97711839998,Vender
2024,6,10,ZPA3,-392658,-120506.4653394,Comprar
2024,6,10,ZPA2,655706,201235.7124058,Vender


##(*L*) Transferencia de Archivo de Resultados a Destino

Ahora tomamos el resultado de las compras y ventas de energía y lo transferimos al destino para que lo cargue el usuario final.

Vamos a simular la carga del archivo a un servicio de transferencia de archivos llamado https://fastupload.io 

Vamos a **obtener el CSV**, para ello vamos a usar python para **validar si nuestro archivo es un CSV y si es, lo copiamos** a una ruta que podamos acceder.

In [0]:
local_csv_path = "dbfs:/reporteSalidaRegulador/"
dfReporteCompraVentaEnergiaAcme.coalesce(1).write.option("header", "true").mode("overwrite").csv(local_csv_path)
filepath=""
for archivo in dbutils.fs.ls("/reporteSalidaRegulador"):
 if '.csv' in archivo.name:
    filepath=archivo.path
print(filepath)

dbutils.fs.cp(filepath, f"file:/tmp/dfReporteCompraVentaEnergiaAcme.csv")

## *(L)* Autenticación en el sitio remoto del ente regulador

Para poder **cargar archivos**, hay que **iniciar sesión** en la plataforma, para ello se utiliza un API, https://fastupload.io/api#authorize , en el api, la plataforma iniciará sesión y obtendrá un token (secuencia de texto y números) que nos permitirá subir el archivo.


In [0]:
file_upload_access_token = ""
file_upload_account_id = ""
params = {'key1': api_key_1, 'key2': api_key_2}
response=requests.get("https://fastupload.io/api/v2/authorize", params)
json_response = json.loads(response.text)
try:
  file_upload_access_token = json_response["data"]["access_token"]
  file_upload_account_id = json_response["data"]["account_id"]
except:
  print("Error autenticando y autorizando en el servicio remoto de carga de archivos.")


Así se ve un token de un API 

In [0]:
print(file_upload_access_token)

## *(L)* Ahora vamos a cargar el Archivo finalmente con el token

Para ello usamos otra API https://fastupload.io/api#file-upload, la cual nos pide algunos parámetros, como el nombre del archivo, el token, la cuenta y el folder, al final, al imprimir la respuesta, veremos que el arhivo fue cargado "File Uploaded".

In [0]:
upload_folder_id = ""
json_response=""
with open("/tmp/dfReporteCompraVentaEnergiaAcme.csv", "rb") as archivo:
  files = {"upload_file": (archivo.name, archivo)}  # Create a dictionary for the file upload

  data = {
    "access_token": file_upload_access_token,
    "account_id": file_upload_account_id,
    "folder_id": upload_folder_id  # Include folder_id if provided
  }

  response = requests.post("https://fastupload.io/api/v2/file/upload", files=files, data=data)
  
print(response.text)
