### **1. Upload 10 parquets**

#### Flow the following steps:
1. Install pkgs
pip install pandas pyarrow boto3 python-dotenv
2. .env
set all the credentials in that file
3. Create and upload parquets

In [None]:
#-- Load modules --#
import pandas as pd
import numpy as np
import boto3
import datetime
import os

In [3]:
#-- Create parquets --#
def s3_upload_parquet(initDate: str):
    #-- Create dataframe --#
    rngDate = pd.date_range(start=initDate.strftime('%Y-%m-%d'), periods=18, freq='1MS')
    nmember = 200
    file_name = f"Hydro-LongFc_{initDate:%Y-%m-%d}.parquet"
    basinName = ['QN-Mantaro', 'QN-Santa', 'QN-Rimac', 'QN-Vilcanota']
    initDates = [initDate]*len(rngDate)*nmember*len(basinName)
    #-- Dataframe --#
    sampleData = pd.DataFrame(
        {
            'name': [s for s in basinName for _ in range(len(rngDate)) for member in range(nmember)],
            'initDate': initDates,
            'date': np.tile(rngDate.tolist(),len(basinName)*nmember),
            'member': [i+1 for _ in range(len(rngDate)) for a in range(len(basinName)) for i in range(nmember)],
            'QNfc': 100+5*np.random.randn((len(rngDate)),len(basinName),nmember).flatten(),
            'QN': 100+5.5*np.random.randn((len(rngDate)),len(basinName),nmember).flatten(),
            'QNclim': 90+5.3*np.random.randn((len(rngDate)),len(basinName),nmember).flatten(),
        }
    )
    #-- Save locally --#
    local_file = f'forecast_{initDate:%Y-%m-%d}.parquet'
    sampleData.to_parquet(local_file, index=False)
    #-- Set credentials --#
    import boto3
    from dotenv import load_dotenv
    load_dotenv('.env')
    #-- Fix parameters --#
    bucket = 'cdh-hydrolongterm-514438'
    base = 'longterm-forecast'
    version = 'v1.0'
    #-- Upload to S3 --#
    s3 = boto3.client('s3')
    key = f"{base}/monthly/run_date={initDate.strftime('%Y-%m-%d')}/version={version}/ensemble/forecast.parquet"
    s3.upload_file(local_file, bucket, key)

#-- Run through dates --#
initDate = '2024-01-01'
endDate = '2024-12-01'
rngDateIndex = pd.date_range(start=initDate, end=endDate, freq='1MS')
for initDate in rngDateIndex:
    print(f"Uploading {initDate:%Y-%m-%d}...")
    s3_upload_parquet(initDate)
    print(f"Uploaded {initDate:%Y-%m-%d}!")

Uploading 2024-01-01...


S3UploadFailedError: Failed to upload forecast_2024-01-01.parquet to cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-01-01/version=v1.0/ensemble/forecast.parquet: An error occurred (ExpiredToken) when calling the PutObject operation: The provided token has expired.

### **2. Query Buckets**

In [4]:
#-- Connect to DuckDB and set credentials --#
import duckdb, os
con = duckdb.connect()
con.execute("SET s3_region=?", [os.getenv("AWS_DEFAULT_REGION")])
con.execute("SET s3_access_key_id=?", [os.getenv("AWS_ACCESS_KEY_ID")])
con.execute("SET s3_secret_access_key=?", [os.getenv("AWS_SECRET_ACCESS_KEY")])
con.execute("SET s3_session_token=?", [os.getenv("AWS_SESSION_TOKEN")])

<duckdb.duckdb.DuckDBPyConnection at 0x7c9967c889b0>

In [5]:
#-- Build the list of dates to request --#
bucket = 'cdh-hydrolongterm-514438'
base = 'longterm-forecast'
version = 'v1.0'
uris = [
    f"s3://{bucket}/{base}/monthly/run_date={initDate.strftime('%Y-%m-%d')}/version={version}/ensemble/forecast.parquet" 
    for initDate in rngDateIndex
]

In [6]:
uris

['s3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-01-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-02-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-03-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-04-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-05-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-06-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-07-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-08-01/version=v1.0/ensemble/forecast.parquet',
 's3://cdh-hydrolongterm-514438/longterm-forecas

In [7]:
's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-01-01/version=v1.0/ensemble/forecast.parquet'

's3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-01-01/version=v1.0/ensemble/forecast.parquet'

In [8]:
#-- Create a view merging all the parquets --#
sql_union = " UNION ALL ".join([f"SELECT * FROM read_parquet('{u}')" for u in uris])
con.execute(f"CREATE OR REPLACE VIEW raw_long AS {sql_union}")

<duckdb.duckdb.DuckDBPyConnection at 0x7c9967c889b0>

In [9]:
#-- How many rows per run --#
q1 = con.execute("""
    SELECT initDate AS run_date, COUNT(*) AS rows
    FROM raw_long
    GROUP BY 1 ORDER BY 1
""").df()
print(q1)

     run_date   rows
0  2024-01-01  14400
1  2024-02-01  14400
2  2024-03-01  14400
3  2024-04-01  14400
4  2024-05-01  14400
5  2024-06-01  14400
6  2024-07-01  14400
7  2024-08-01  14400
8  2024-09-01  14400
9  2024-10-01  14400
10 2024-11-01  14400
11 2024-12-01  14400


In [10]:
#-- Mean of ensemble --#
q2 = con.execute("""
    SELECT name, date AS targetDate, AVG(QNfc) AS QN_mean
    FROM raw_long
    GROUP BY name, targetDate
    ORDER BY name, targetDate
""").df()
print(q2.head())

         name targetDate     QN_mean
0  QN-Mantaro 2024-01-01   99.778802
1  QN-Mantaro 2024-02-01   99.736616
2  QN-Mantaro 2024-03-01  100.071649
3  QN-Mantaro 2024-04-01   99.972756
4  QN-Mantaro 2024-05-01  100.082035


In [11]:
#-- Anomalies --#
q3 = con.execute("""
    SELECT name, date AS targetDate, AVG(QNfc) - AVG(QNclim) AS anom
    FROM raw_long
    GROUP BY name, targetDate
    ORDER BY name, targetDate
""").df()
print(q3.head())

         name targetDate       anom
0  QN-Mantaro 2024-01-01   9.061734
1  QN-Mantaro 2024-02-01  10.287259
2  QN-Mantaro 2024-03-01  10.204871
3  QN-Mantaro 2024-04-01  10.013447
4  QN-Mantaro 2024-05-01   9.966547


In [12]:
q3

Unnamed: 0,name,targetDate,anom
0,QN-Mantaro,2024-01-01,9.061734
1,QN-Mantaro,2024-02-01,10.287259
2,QN-Mantaro,2024-03-01,10.204871
3,QN-Mantaro,2024-04-01,10.013447
4,QN-Mantaro,2024-05-01,9.966547
...,...,...,...
111,QN-Vilcanota,2026-01-01,10.211200
112,QN-Vilcanota,2026-02-01,10.261312
113,QN-Vilcanota,2026-03-01,9.675485
114,QN-Vilcanota,2026-04-01,10.130510


In [14]:
# --- Parámetros de TU dataset / región ---
DB            = "lake"
REGION        = "eu-west-1"
WORKGROUP     = "primary"  # cambia si usas otro
BUCKET        = "cdh-hydrolongterm-514438"
BASE          = "longterm-forecasts"
VERSION       = "v1.0"

# Athena necesita un staging S3 donde pueda escribir resultados de consulta:
# Usa una carpeta del MISMO bucket si tienes permiso de escritura; si no, usa otro bucket propio.
ATHENA_STAGING = f"s3://{BUCKET}/athena-staging/"   # ajústalo si te da AccessDenied

# --- Conexión Athena ---
from pyathena import connect
import pandas as pd
from datetime import date
from dateutil.relativedelta import relativedelta

conn = connect(
    s3_staging_dir=ATHENA_STAGING,
    region_name=REGION,
    work_group=WORKGROUP,
)
cur = conn.cursor()

# 1) DB (si no existe)
cur.execute(f"CREATE DATABASE IF NOT EXISTS {DB}")

# 2) Tabla externa sobre RAW (particionada por ruta)
# Ubicación física:
RAW_LOCATION = f"s3://{BUCKET}/{BASE}/raw/"

cur.execute(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {DB}.hydro_longterm_raw (
  name        string,
  initDate    date,
  date        date,
  member      int,
  QNfc        double,
  QN          double,
  QNclim      double
)
PARTITIONED BY (
  run_date date,
  version  string
)
STORED AS PARQUET
LOCATION '{RAW_LOCATION}'
""")

# 3) Agregar particiones explícitas (enero..octubre 2024, v1.0)
run_dates = [date(2024,1,1) + relativedelta(months=i) for i in range(10)]

for rd in run_dates:
    part_loc = (
        f"s3://{BUCKET}/{BASE}/raw/"
        f"run_date={rd:%Y-%m-%d}/version={VERSION}/"
    )
    cur.execute(f"""
    ALTER TABLE {DB}.hydro_longterm_raw
      ADD IF NOT EXISTS PARTITION (run_date=DATE '{rd:%Y-%m-%d}', version='{VERSION}')
      LOCATION '{part_loc}'
    """)

# 4) Consulta de prueba (mean del ensamble por cuenca y mes objetivo)
sql_mean = f"""
SELECT
  name,
  date AS targetDate,
  AVG(QNfc) AS QN_mean
FROM {DB}.hydro_longterm_raw
WHERE version = '{VERSION}'
  AND run_date BETWEEN DATE '2024-01-01' AND DATE '2024-10-01'
GROUP BY name, targetDate
ORDER BY name, targetDate
"""
df_mean = pd.read_sql(sql_mean, conn)
print(df_mean.head())

# 5) Crear CURATED con CTAS (Parquet + particionado + tu ubicación)
CURATED_MEAN_LOCATION = f"s3://{BUCKET}/{BASE}/curated/meanEnsemble/"

# Si la tabla ya existe, puedes DROP TABLE primero o hacer INSERT en una ya creada
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {DB}.longterm_mean
WITH (
  format = 'PARQUET',
  external_location = '{CURATED_MEAN_LOCATION}',
  partitioned_by = ARRAY['run_date','version']
) AS
SELECT
  name,
  date AS targetDate,
  AVG(QNfc) AS QN_mean,
  run_date,
  version
FROM {DB}.hydro_longterm_raw
WHERE version = '{VERSION}'
  AND run_date BETWEEN DATE '2024-01-01' AND DATE '2024-10-01'
GROUP BY name, date, run_date, version
""")

# (Opcional) Leer el curated recién creado:
df_curated = pd.read_sql(f"SELECT * FROM {DB}.longterm_mean ORDER BY name, targetDate LIMIT 20", conn)
print(df_curated.head())


  df_mean = pd.read_sql(sql_mean, conn)


DatabaseError: Execution failed on sql: 
SELECT
  name,
  date AS targetDate,
  AVG(QNfc) AS QN_mean
FROM lake.hydro_longterm_raw
WHERE version = 'v1.0'
  AND run_date BETWEEN DATE '2024-01-01' AND DATE '2024-10-01'
GROUP BY name, targetDate
ORDER BY name, targetDate

COLUMN_NOT_FOUND: line 8:16: Column 'targetdate' cannot be resolved or requester is not authorized to access requested resources
unable to rollback

## **2. Google Earth Engine (GEE)**

In [1]:
import ee
import os, json, base64
from google.oauth2 import service_account

In [None]:
class GEE_Client:
    """ Class based on authentication GEE """
    def __init__(self, key_path):
        self.key_path = key_path
        self._authenticate()
    def _authenticate(self):
        """ Authenticate to GEE """
        try:
            with open(self.key_path, 'r') as source:
                cred = json.load(source)
        except:
            cred = json.loads(base64.b64decode(self.key_path))
        scopes = [
            "https://www.googleapis.com/auth/earthengine",
            "https://www.googleapis.com/auth/devstorage.read_only",
        ]
        credentials = service_account.Credentials.from_service_account_info(cred, scopes=scopes)
        ee.Initialize(credentials)
        print('GEE authenticate with local json file')
class gfsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=27830, bands=('wwind10','t2m'), key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_above_ground',
            'v_component_of_wind_10m_above_ground',
            'temperature_2m_above_ground'
        ]
        self.renameBands = ['uwind10','vwind10','t2m']
        self.bands = list(bands)          # salida que quieres
        self.imgCollection = None

    def _compute_wind_speed(self, img: ee.Image) -> ee.Image:
        w10 = img.expression(
            'sqrt(u*u + v*v)',
            {
                'u': img.select('u_component_of_wind_10m_above_ground'),
                'v': img.select('v_component_of_wind_10m_above_ground'),
            }
        ).rename('wwind10')
        return img.select(self.raw_bands).rename(self.renameBands).addBands(w10)

    def getForecasts(self, *, start_date: str, end_date: str):
        coll = (ee.ImageCollection('NOAA/GFS0P25')
                .filter(ee.Filter.date(start_date, end_date))
                .select(self.raw_bands)
                .map(self._compute_wind_speed)
                .select(self.bands))

        def _reduce(img: ee.Image):
            t = self._safe_time(img)
            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set('date', ee.Date(t).format()))
        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()
        initDate = pd.to_datetime(
            [str(x)[:10] for x in col.aggregate_array('system:index').getInfo()],
            format='%Y%m%d%H', errors='coerce'
        )
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )
        out = {'initDate': initDate, 'date': fdate, 'name': col.aggregate_array('name').getInfo()}
        for b in self.bands:
            out[b] = col.aggregate_array(b).getInfo()
        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)

#-- Run main script --#
GEE_Client(key_path="/home/cenciso/Downloads/my-gee-dashboard-c87a3d557c85.json")


GEE authenticate with local json file


<__main__.GeeClient at 0x7beed3fea320>

In [10]:
# -*- coding: utf-8 -*-
import os, json, base64
import ee
import pandas as pd
from google.oauth2 import service_account

# ========================
# Base: autenticación GEE
# ========================
class GEE_Client:
    """Class based on authentication GEE."""
    def __init__(self, key_path):
        """
        key_path: ruta al JSON de servicio
                  ó cadena Base64 del JSON (si no es ruta válida).
        """
        self.key_path = key_path
        self._authenticate()

    def _authenticate(self):
        """Authenticate to GEE."""
        try:
            with open(self.key_path, 'r', encoding='utf-8') as source:
                cred = json.load(source)
        except Exception:
            # Si no es archivo, asumimos que es Base64 del JSON
            cred = json.loads(base64.b64decode(self.key_path).decode('utf-8'))

        scopes = [
            "https://www.googleapis.com/auth/earthengine",
            "https://www.googleapis.com/auth/devstorage.read_only",
        ]
        credentials = service_account.Credentials.from_service_account_info(cred, scopes=scopes)
        ee.Initialize(credentials)
        print('✅ GEE authenticated.')

    @staticmethod
    def _safe_time(img: ee.Image):
        """Usa forecast_time si existe; si no, system:time_start."""
        return ee.Algorithms.If(
            img.propertyNames().contains('forecast_time'),
            img.get('forecast_time'),
            img.get('system:time_start')
        )

    @staticmethod
    def fc_from_geojson(path_geojson: str) -> ee.FeatureCollection:
        """Carga un FeatureCollection desde un GeoJSON local (cada feature con properties.name)."""
        with open(path_geojson, "r", encoding="utf-8") as f:
            gj = json.load(f)
        if "features" not in gj:
            raise ValueError("GeoJSON inválido: falta 'features'.")
        return ee.FeatureCollection(gj["features"])


# ========================
# GFS (NOAA/GFS0P25)
# ========================
class gfsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=27830, bands=('wwind10','t2m'), key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_above_ground',
            'v_component_of_wind_10m_above_ground',
            'temperature_2m_above_ground'
        ]
        self.renameBands = ['uwind10','vwind10','t2m']
        self.bands = list(bands)  # salida deseada
        self.imgCollection = None

    def _compute_wind_speed(self, img: ee.Image) -> ee.Image:
        w10 = img.expression(
            'sqrt(u*u + v*v)',
            {
                'u': img.select('u_component_of_wind_10m_above_ground'),
                'v': img.select('v_component_of_wind_10m_above_ground'),
            }
        ).rename('wwind10')
        return img.select(self.raw_bands).rename(self.renameBands).addBands(w10)

    def getForecasts(self, *, start_date: str, end_date: str):
        coll = (ee.ImageCollection('NOAA/GFS0P25')
                .filter(ee.Filter.date(start_date, end_date))
                .select(self.raw_bands)
                .map(self._compute_wind_speed)
                .select(self.bands))

        def _reduce(img: ee.Image):
            t = self._safe_time(img)
            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set('date', ee.Date(t).format()))
        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()
        initDate = pd.to_datetime(
            [str(x)[:10] for x in col.aggregate_array('system:index').getInfo()],
            format='%Y%m%d%H', errors='coerce'
        )
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )
        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo()
        }
        for b in self.bands:
            out[b] = col.aggregate_array(b).getInfo()
        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ========================
# IFS (ECMWF/NRT_FORECAST/IFS/OPER)
# ========================
class ifsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=28000, key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_sfc',
            'v_component_of_wind_10m_sfc',
            'u_component_of_wind_100m_sfc',
            'v_component_of_wind_100m_sfc',
            'temperature_2m_sfc'
        ]
        self.outBands = ['wwind10','wwind100','t2m']
        self.imgCollection = None

    @staticmethod
    def _ws(img: ee.Image, uVar: str, vVar: str, name: str) -> ee.Image:
        w = img.expression('sqrt(u*u + v*v)', {'u': img.select(uVar), 'v': img.select(vVar)}).rename(name)
        return img.addBands(w)

    def getForecasts(self, *, initDate):
        """
        initDate: 'YYYY-MM-DDTHH:mm:ss' o epoch ms (int/long).
        Filtra por propiedad 'creation_time'.
        """
        t_ms = ee.Date(initDate).millis() if isinstance(initDate, str) else ee.Number(initDate)

        coll = (ee.ImageCollection('ECMWF/NRT_FORECAST/IFS/OPER')
                .filter(ee.Filter.eq('creation_time', t_ms))
                .select(self.raw_bands))

        # Magnitudes 10 m y 100 m
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_10m_sfc',
                                             'v_component_of_wind_10m_sfc',
                                             'wwind10'))
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_100m_sfc',
                                             'v_component_of_wind_100m_sfc',
                                             'wwind100'))

        # Seleccionar salidas y renombrar t2m
        coll = coll.map(lambda img: img.select(['wwind10','wwind100','temperature_2m_sfc'],
                                               self.outBands))

        def _reduce(img: ee.Image):
            t = self._safe_time(img)
            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set('date', ee.Date(t).format()))
        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()
        initDate = pd.to_datetime(
            [str(x)[:10] for x in col.aggregate_array('system:index').getInfo()],
            format='%Y%m%d%H', errors='coerce'
        )
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )
        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo(),
            'wwind10': col.aggregate_array('wwind10').getInfo(),
            'wwind100': col.aggregate_array('wwind100').getInfo(),
            't2m': col.aggregate_array('t2m').getInfo(),
        }
        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ========================
# Ejemplo de uso (COMENTADO)
# ========================
if __name__ == "__main__":
    # 1) Autenticación rápida (opcional, ya se hace en cada clase hija):
    # GEE_Client(key_path="/home/cenciso/Downloads/my-gee-dashboard-c87a3d557c85.json")

    # 2) Construye tus puntos (o usa el helper fc_from_geojson)
    # fc = GEE_Client.fc_from_geojson("data/geo/wind_points.geojson")
    fc = ee.FeatureCollection([
        ee.Feature(ee.Geometry.Point([-75.18, -14.95]), {"name": "Punta Lomitas"}),
        ee.Feature(ee.Geometry.Point([-79.40,  -7.39]), {"name": "Cupisnique"}),
    ])

    KEY = "/home/cenciso/Downloads/my-gee-dashboard-c87a3d557c85.json"

    # 3) GFS
    gfs = gfsForecast(eFeaturesLocations=fc, key_path=KEY)
    gfs.getForecasts(start_date="2025-01-01", end_date="2025-01-02")
    df_gfs = gfs.getDataFrame()
    print("GFS:\n", df_gfs.head())

    # 4) IFS
    ifs = ifsForecast(eFeaturesLocations=fc, key_path=KEY)
    ifs.getForecasts(initDate="2024-11-13T12:00:00")  # o epoch ms
    df_ifs = ifs.getDataFrame()
    print("IFS:\n", df_ifs.head())


✅ GEE authenticated.
GFS:
     initDate                date        name   wwind10        t2m
0 2025-01-01 2025-01-01 00:00:00  Cupisnique  4.299598  22.553918
1 2025-01-01 2025-01-01 01:00:00  Cupisnique  3.966811  22.430261
2 2025-01-01 2025-01-01 02:00:00  Cupisnique  3.585778  22.387567
3 2025-01-01 2025-01-01 03:00:00  Cupisnique  3.532136  22.314294
4 2025-01-01 2025-01-01 04:00:00  Cupisnique  3.193365  22.199213
✅ GEE authenticated.
IFS:
              initDate                date        name   wwind10  wwind100  \
0 2024-11-13 12:00:00 2024-11-13 12:00:00  Cupisnique  0.545229  0.589975   
1 2024-11-13 12:00:00 2024-11-13 15:00:00  Cupisnique  2.414017  2.158632   
2 2024-11-13 12:00:00 2024-11-13 18:00:00  Cupisnique  3.574126  3.756272   
3 2024-11-13 12:00:00 2024-11-13 21:00:00  Cupisnique  4.009469  4.688546   
4 2024-11-13 12:00:00 2024-11-14 00:00:00  Cupisnique  2.459300  3.081749   

         t2m  
0  16.676599  
1  21.736810  
2  24.094598  
3  21.989191  
4  17.843378

In [14]:
# -*- coding: utf-8 -*-
import os, json, base64
import ee
import pandas as pd
from google.oauth2 import service_account

# ========================
# Base: autenticación GEE
# ========================
class GEE_Client:
    """Autenticación a GEE usando ruta al JSON o cadena Base64 del JSON."""
    def __init__(self, key_path):
        """
        key_path: ruta al JSON de servicio  O  cadena Base64 del JSON.
        """
        self.key_path = key_path
        self._authenticate()

    def _authenticate(self):
        """Authenticate to GEE."""
        try:
            with open(self.key_path, 'r', encoding='utf-8') as source:
                cred = json.load(source)
        except Exception:
            # Si no es archivo, asumimos que es Base64 del JSON
            cred = json.loads(base64.b64decode(self.key_path).decode('utf-8'))

        scopes = [
            "https://www.googleapis.com/auth/earthengine",
            "https://www.googleapis.com/auth/devstorage.read_only",
        ]
        credentials = service_account.Credentials.from_service_account_info(cred, scopes=scopes)
        ee.Initialize(credentials)
        print('✅ GEE authenticated.')

    @staticmethod
    def _safe_time(img: ee.Image):
        """Usa forecast_time si existe; si no, system:time_start."""
        return ee.Algorithms.If(
            img.propertyNames().contains('forecast_time'),
            img.get('forecast_time'),
            img.get('system:time_start')
        )

    @staticmethod
    def fc_from_geojson(path_geojson: str) -> ee.FeatureCollection:
        """Carga un FeatureCollection desde un GeoJSON local (cada feature con properties.name)."""
        with open(path_geojson, "r", encoding="utf-8") as f:
            gj = json.load(f)
        if "features" not in gj:
            raise ValueError("GeoJSON inválido: falta 'features'.")
        return ee.FeatureCollection(gj["features"])


# ========================
# GFS (NOAA/GFS0P25)
# ========================
class gfsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=27830, bands=('wwind10','t2m'), key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_above_ground',
            'v_component_of_wind_10m_above_ground',
            'temperature_2m_above_ground'
        ]
        self.renameBands = ['uwind10','vwind10','t2m']
        self.bands = list(bands)  # salida deseada
        self.imgCollection = None

    def _compute_wind_speed(self, img: ee.Image) -> ee.Image:
        w10 = img.expression(
            'sqrt(u*u + v*v)',
            {
                'u': img.select('u_component_of_wind_10m_above_ground'),
                'v': img.select('v_component_of_wind_10m_above_ground'),
            }
        ).rename('wwind10')
        return img.select(self.raw_bands).rename(self.renameBands).addBands(w10)

    def getForecasts(self, *, start_date: str, end_date: str):
        coll = (ee.ImageCollection('NOAA/GFS0P25')
                .filter(ee.Filter.date(start_date, end_date))
                .select(self.raw_bands)
                .map(self._compute_wind_speed)
                .select(self.bands))

        def _reduce(img: ee.Image):
            # Formatea directamente en hora de Perú dentro de GEE
            t = self._safe_time(img)
            date_pe = ee.Date(t).format("YYYY-MM-dd'T'HH", 'America/Lima')
            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            # Guardamos la zona por trazabilidad (opcional)
            return fc.map(lambda f: f.set({'date': date_pe, 'tz': 'America/Lima'}))

        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()

        # initDate: intenta system:index; si no existe, cae a system:time_start (ms)
        idx = col.aggregate_array('system:index').getInfo()
        if idx:
            initDate = pd.to_datetime([str(x)[:10] for x in idx], format='%Y%m%d%H', errors='coerce')
        else:
            ts = col.aggregate_array('system:time_start').getInfo()
            initDate = pd.to_datetime(ts, unit='ms', errors='coerce')

        # date ya viene en America/Lima como string "YYYY-MM-DDTHH"
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )

        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo()
        }
        for b in self.bands:
            out[b] = col.aggregate_array(b).getInfo()

        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ========================
# IFS (ECMWF/NRT_FORECAST/IFS/OPER)
# ========================
class ifsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=28000, key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_sfc',
            'v_component_of_wind_10m_sfc',
            'u_component_of_wind_100m_sfc',
            'v_component_of_wind_100m_sfc',
            'temperature_2m_sfc'
        ]
        self.outBands = ['wwind10','wwind100','t2m']
        self.imgCollection = None

    @staticmethod
    def _ws(img: ee.Image, uVar: str, vVar: str, name: str) -> ee.Image:
        w = img.expression('sqrt(u*u + v*v)', {'u': img.select(uVar), 'v': img.select(vVar)}).rename(name)
        return img.addBands(w)

    def getForecasts(self, *, initDate):
        """
        initDate: 'YYYY-MM-DDTHH:mm:ss' o epoch ms (int/long).
        Filtra por propiedad 'creation_time'.
        """
        t_ms = ee.Date(initDate).millis() if isinstance(initDate, str) else ee.Number(initDate)

        coll = (ee.ImageCollection('ECMWF/NRT_FORECAST/IFS/OPER')
                .filter(ee.Filter.eq('creation_time', t_ms))
                .select(self.raw_bands))

        # Magnitudes 10 m y 100 m (FIX correcto para v a 100 m)
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_10m_sfc',
                                             'v_component_of_wind_10m_sfc',
                                             'wwind10'))
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_100m_sfc',
                                             'v_component_of_wind_100m_sfc',
                                             'wwind100'))

        # Seleccionar salidas y renombrar t2m
        coll = coll.map(lambda img: img.select(['wwind10','wwind100','temperature_2m_sfc'],
                                               self.outBands))

        def _reduce(img: ee.Image):
            # Formatea directamente en hora de Perú dentro de GEE
            t = self._safe_time(img)
            date_pe = ee.Date(t).format("YYYY-MM-dd'T'HH", 'America/Lima')
            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set({'date': date_pe, 'tz': 'America/Lima'}))

        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()

        # initDate: intenta system:index; si no, cae a system:time_start (ms)
        idx = col.aggregate_array('system:index').getInfo()
        if idx:
            initDate = pd.to_datetime([str(x)[:10] for x in idx], format='%Y%m%d%H', errors='coerce')
        else:
            ts = col.aggregate_array('system:time_start').getInfo()
            initDate = pd.to_datetime(ts, unit='ms', errors='coerce')

        # date ya en America/Lima "YYYY-MM-DDTHH"
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )

        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo(),
            'wwind10': col.aggregate_array('wwind10').getInfo(),
            'wwind100': col.aggregate_array('wwind100').getInfo(),
            't2m': col.aggregate_array('t2m').getInfo(),
        }
        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ========================
# Ejemplo de uso (COMENTADO)
# ========================
if __name__ == "__main__":
    # Carga tus puntos (o usa fc_from_geojson)
    # fc = GEE_Client.fc_from_geojson("data/geo/wind_points.geojson")
    fc = ee.FeatureCollection([
        ee.Feature(ee.Geometry.Point([-75.18, -14.95]), {"name": "Punta Lomitas"}),
        ee.Feature(ee.Geometry.Point([-79.40,  -7.39]), {"name": "Cupisnique"}),
    ])

    KEY = "/home/cenciso/Downloads/my-gee-dashboard-c87a3d557c85.json"

    # GFS: rango de fechas
    gfs = gfsForecast(eFeaturesLocations=fc, key_path=KEY)
    gfs.getForecasts(start_date="2025-01-01", end_date="2025-01-02")
    df_gfs = gfs.getDataFrame()
    print(df_gfs.head())

    # IFS: init único (creation_time)
    ifs = ifsForecast(eFeaturesLocations=fc, key_path=KEY)
    ifs.getForecasts(initDate="2024-11-13T12:00:00")
    df_ifs = ifs.getDataFrame()
    print(df_ifs.head())

✅ GEE authenticated.
    initDate                date        name   wwind10        t2m
0 2025-01-01 2024-12-31 19:00:00  Cupisnique  4.299598  22.553918
1 2025-01-01 2024-12-31 20:00:00  Cupisnique  3.966811  22.430261
2 2025-01-01 2024-12-31 21:00:00  Cupisnique  3.585778  22.387567
3 2025-01-01 2024-12-31 22:00:00  Cupisnique  3.532136  22.314294
4 2025-01-01 2024-12-31 23:00:00  Cupisnique  3.193365  22.199213
✅ GEE authenticated.
             initDate                date        name   wwind10  wwind100  \
0 2024-11-13 12:00:00 2024-11-13 07:00:00  Cupisnique  0.545229  0.589975   
1 2024-11-13 12:00:00 2024-11-13 10:00:00  Cupisnique  2.414017  2.158632   
2 2024-11-13 12:00:00 2024-11-13 13:00:00  Cupisnique  3.574126  3.756272   
3 2024-11-13 12:00:00 2024-11-13 16:00:00  Cupisnique  4.009469  4.688546   
4 2024-11-13 12:00:00 2024-11-13 19:00:00  Cupisnique  2.459300  3.081749   

         t2m  
0  16.676599  
1  21.736810  
2  24.094598  
3  21.989191  
4  17.843378  


In [13]:
# -*- coding: utf-8 -*-
import os, json, base64
import ee
import pandas as pd
import datetime
from google.oauth2 import service_account

# ========================
# Base: autenticación GEE
# ========================
class GEE_Client:
    """Autenticación a GEE usando ruta al JSON o cadena Base64 del JSON."""
    _initialized = False
    def __init__(self, key_path):
        """
        key_path: ruta al JSON de servicio  O  cadena Base64 del JSON.
        """
        self.key_path = key_path
        self._authenticate()

    def _authenticate(self):
        """Authenticate to GEE."""
        if not GEE_Client._initialized:
            try:
                with open(self.key_path, 'r', encoding='utf-8') as source:
                    cred = json.load(source)
            except Exception:
                # Si no es archivo, asumimos que es Base64 del JSON
                cred = json.loads(base64.b64decode(self.key_path).decode('utf-8'))

            scopes = [
                "https://www.googleapis.com/auth/earthengine",
                "https://www.googleapis.com/auth/devstorage.read_only",
            ]
            credentials = service_account.Credentials.from_service_account_info(cred, scopes=scopes)
            ee.Initialize(credentials)
            GEE_Client._initialized = True
            print('GEE authenticated.')

    @staticmethod
    def _safe_time(img: ee.Image):
        """Use forecast_time is exits"""
        return ee.Algorithms.If(
            img.propertyNames().contains('forecast_time'),
            img.get('forecast_time'),
            img.get('system:time_start')
        )

    @staticmethod
    def fc_from_geojson(path_geojson: str) -> ee.FeatureCollection:
        """Load a FeatureCollection GeoJSON """
        with open(path_geojson, "r", encoding="utf-8") as f:
            gj = json.load(f)
        if "features" not in gj:
            raise ValueError("GeoJSON invalid: miss 'features'.")
        return ee.FeatureCollection(gj["features"])


# ========================
# GFS (NOAA/GFS0P25)
# ========================
class gfsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=27830, bands=('wwind10','t2m'), key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_above_ground',
            'v_component_of_wind_10m_above_ground',
            'temperature_2m_above_ground'
        ]
        self.renameBands = ['uwind10','vwind10','t2m']
        self.bands = list(bands)  
        self.imgCollection = None

    def _compute_wind_speed(self, img: ee.Image) -> ee.Image:
        w10 = img.expression(
            'sqrt(u*u + v*v)',
            {
                'u': img.select('u_component_of_wind_10m_above_ground'),
                'v': img.select('v_component_of_wind_10m_above_ground'),
            }
        ).rename('wwind10')
        return img.select(self.raw_bands).rename(self.renameBands).addBands(w10)

    def getForecasts(self, *, start_date: str, end_date: str):
        coll = (ee.ImageCollection('NOAA/GFS0P25')
                .filter(ee.Filter.date(start_date, end_date))
                .select(self.raw_bands)
                .map(self._compute_wind_speed)
                .select(self.bands))

        def _reduce(img: ee.Image):
            t_valid = self._safe_time(img)
            date_pe = ee.Date(t_valid).format("YYYY-MM-dd'T'HH", 'America/Lima')
            idx = ee.String(img.get('system:index')).slice(0, 10)  # 'YYYYMMddHH'
            init_utc = ee.Date.parse('YYYYMMddHH', idx)            # interpreta en UTC
            init_pe  = init_utc.format("YYYY-MM-dd'T'HH", 'America/Lima')

            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set({'date': date_pe,
                                           'initDate': init_pe,
                                           'tz': 'America/Lima'}))

        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()
        initDate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('initDate').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )

        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo()
        }
        for b in self.bands:
            out[b] = col.aggregate_array(b).getInfo()

        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ==================================
# IFS (ECMWF/NRT_FORECAST/IFS/OPER)
# ==================================
class ifsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=28000, key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_sfc',
            'v_component_of_wind_10m_sfc',
            'u_component_of_wind_100m_sfc',
            'v_component_of_wind_100m_sfc',
            'temperature_2m_sfc'
        ]
        self.outBands = ['wwind10','wwind100','t2m']
        self.imgCollection = None

    @staticmethod
    def _ws(img: ee.Image, uVar: str, vVar: str, name: str) -> ee.Image:
        w = img.expression('sqrt(u*u + v*v)', {'u': img.select(uVar), 'v': img.select(vVar)}).rename(name)
        return img.addBands(w)

    def getForecasts(self, *, initDate):
        """
        initDate in UTC (string 'YYYY-MM-DDTHH:mm:ss' o epoch ms).
        - Filter in UTC (creation_time).
        - Change 'initDate' for America/Lima.
        """
        t_ms = ee.Date(initDate).millis() if isinstance(initDate, str) else ee.Number(initDate)

        coll = (ee.ImageCollection('ECMWF/NRT_FORECAST/IFS/OPER')
                .filter(ee.Filter.eq('creation_time', t_ms))
                .select(self.raw_bands))

        #-- Magnitudes 10 m and 100 m  --#
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_10m_sfc',
                                             'v_component_of_wind_10m_sfc',
                                             'wwind10'))
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_100m_sfc',
                                             'v_component_of_wind_100m_sfc',
                                             'wwind100'))

        coll = coll.map(lambda img: img.select(['wwind10','wwind100','temperature_2m_sfc'],
                                               self.outBands))

        #-- Prepare initDate (consulta) en Lima --#
        init_pe = ee.Date(t_ms).format("YYYY-MM-dd'T'HH", 'America/Lima')

        def _reduce(img: ee.Image):
            t_valid = self._safe_time(img)
            date_pe = ee.Date(t_valid).format("YYYY-MM-dd'T'HH", 'America/Lima')
            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set({'date': date_pe,
                                           'initDate': init_pe,
                                           'tz': 'America/Lima'}))

        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()

        #-- initDate (consulta) y date ya en Lima "YYYY-MM-DDTHH" --#
        initDate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('initDate').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )

        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo(),
            'wwind10': col.aggregate_array('wwind10').getInfo(),
            'wwind100': col.aggregate_array('wwind100').getInfo(),
            't2m': col.aggregate_array('t2m').getInfo(),
        }
        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ==============================
# Ejemplo de uso (COMENTADO)
# ========================
if __name__ == "__main__":
    KEY = "/home/cenciso/Downloads/my-gee-dashboard-c87a3d557c85.json"
    GEE_Client(key_path=KEY)
    #-- Load all windfarms --#
    fc = ee.FeatureCollection([
        ee.Feature(ee.Geometry.Point([-79.48,  -7.55]), {"name": "W.F. Cupisnique"}),
        ee.Feature(ee.Geometry.Point([-78.98,  -6.45]), {"name": "W.F. Duna"}),
        ee.Feature(ee.Geometry.Point([-78.98,  -6.45]), {"name": "W.F. Huambos"}),
        ee.Feature(ee.Geometry.Point([-75.07, -15.41]), {"name": "W.F. Marcona"}),
        ee.Feature(ee.Geometry.Point([-75.92, -14.64]), {"name": "W.F. Punta Lomitas"}),
        ee.Feature(ee.Geometry.Point([-75.14, -15.39]), {"name": "W.F. San Juan"}),
        ee.Feature(ee.Geometry.Point([-81.20,  -4.56]), {"name": "W.F. Talara"}),
        ee.Feature(ee.Geometry.Point([-75.06, -15.38]), {"name": "W.F. Tres Hermanas"}),
        ee.Feature(ee.Geometry.Point([-75.04, -15.06]), {"name": "W.F. Wayra Ext"}),
        ee.Feature(ee.Geometry.Point([-75.05, -15.04]), {"name": "W.F. Wayra I"})
        ])
    #-- Start/End Dates --#
    def rangeDateRetrieve(delta=24):
        endDate = datetime.datetime.now()
        startDate = endDate - datetime.timedelta(hours=delta)
        return startDate, endDate

    #-- GFS --#
    gfs = gfsForecast(eFeaturesLocations=fc, key_path=KEY)
    startDate, endDate = rangeDateRetrieve()
    gfs.getForecasts(start_date=startDate.strftime("%Y-%m-%d"), 
                     end_date=endDate.strftime("%Y-%m-%d"))
    df_gfs = gfs.getDataFrame()
    df_gfs['model'] = 'GFS'
    #-- Add wind at 100 m --#
    windHeightEq = lambda w, alpha: w*((90/10)**alpha)
    z = 90
    z0 = 10
    alpha = .14
    df_gfs['wwind100'] = df_gfs['wwind10']* (z / z0) ** alpha

    #-- IFS --#
    ifs = ifsForecast(eFeaturesLocations=fc, key_path=KEY)
    ifs.getForecasts(initDate=startDate.strftime("%Y-%m-%dT12:00:00"))
    df_ifs = ifs.getDataFrame()
    df_ifs['model'] = 'IFS'

    #-- Concat all forecast --#
    dataGfsIfs = pd.concat([df_gfs.query("date>=20240101"),df_ifs], ignore_index=True)
    dataGfsIfs['nleadHour'] = ((dataGfsIfs['date'] - dataGfsIfs['initDate']).dt.total_seconds() / 3600).astype(int)
    dataGfsIfs['nleadDays'] = (dataGfsIfs['nleadHour'] / 24).astype(int)
    #-- Choose the last forecast --#
    dataGfsIfs = dataGfsIfs.groupby('model', group_keys=True).apply(
        lambda x: x[x['initDate'] == x['initDate'].max()],
        include_groups=False,
        ).reset_index(level=0)
    #-- Save as parquet --#
    dataGfsIfs.to_parquet("../dataset/windSpeedFcs.parquet", index=False)

GEE authenticated.


In [1]:
# -*- coding: utf-8 -*-
import os, json, base64
import ee
import pandas as pd
import datetime
from google.oauth2 import service_account

# ========================
# Base: autenticación GEE
# ========================
class GEE_Client:
    """Autenticación a GEE usando ruta al JSON o cadena Base64 del JSON."""
    _initialized = False
    def __init__(self, key_path):
        """
        key_path: ruta al JSON de servicio  O  cadena Base64 del JSON.
        """
        self.key_path = key_path
        self._authenticate()

    def _authenticate(self):
        """Authenticate to GEE."""
        if not GEE_Client._initialized:
            try:
                with open(self.key_path, 'r', encoding='utf-8') as source:
                    cred = json.load(source)
            except Exception:
                # Si no es archivo, asumimos que es Base64 del JSON
                cred = json.loads(base64.b64decode(self.key_path).decode('utf-8'))

            scopes = [
                "https://www.googleapis.com/auth/earthengine",
                "https://www.googleapis.com/auth/devstorage.read_only",
            ]
            credentials = service_account.Credentials.from_service_account_info(cred, scopes=scopes)
            ee.Initialize(credentials)
            GEE_Client._initialized = True
            print('GEE authenticated.')

    @staticmethod
    def _safe_time(img: ee.Image):
        """Use forecast_time is exits"""
        return ee.Algorithms.If(
            img.propertyNames().contains('forecast_time'),
            img.get('forecast_time'),
            img.get('system:time_start')
        )

    @staticmethod
    def fc_from_geojson(path_geojson: str) -> ee.FeatureCollection:
        """Load a FeatureCollection GeoJSON """
        with open(path_geojson, "r", encoding="utf-8") as f:
            gj = json.load(f)
        if "features" not in gj:
            raise ValueError("GeoJSON invalid: miss 'features'.")
        return ee.FeatureCollection(gj["features"])


# ========================
# GFS (NOAA/GFS0P25)
# ========================
class gfsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=27830, bands=('wwind10','t2m'), key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_above_ground',
            'v_component_of_wind_10m_above_ground',
            'temperature_2m_above_ground'
        ]
        self.renameBands = ['uwind10','vwind10','t2m']
        self.bands = list(bands)  
        self.imgCollection = None

    def _compute_wind_speed(self, img: ee.Image) -> ee.Image:
        w10 = img.expression(
            'sqrt(u*u + v*v)',
            {
                'u': img.select('u_component_of_wind_10m_above_ground'),
                'v': img.select('v_component_of_wind_10m_above_ground'),
            }
        ).rename('wwind10')
        return img.select(self.raw_bands).rename(self.renameBands).addBands(w10)

    def getForecasts(self, *, start_date: str, end_date: str):
        coll = (ee.ImageCollection('NOAA/GFS0P25')
                .filter(ee.Filter.date(start_date, end_date))
                .select(self.raw_bands)
                .map(self._compute_wind_speed)
                .select(self.bands))

        def _reduce(img: ee.Image):
            t_valid = self._safe_time(img)
            date_pe = ee.Date(t_valid).format("YYYY-MM-dd'T'HH", 'America/Lima')
            idx = ee.String(img.get('system:index')).slice(0, 10)  # 'YYYYMMddHH'
            init_utc = ee.Date.parse('YYYYMMddHH', idx)            # interpreta en UTC
            init_pe  = init_utc.format("YYYY-MM-dd'T'HH", 'America/Lima')

            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set({'date': date_pe,
                                           'initDate': init_pe,
                                           'tz': 'America/Lima'}))

        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()
        initDate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('initDate').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )

        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo()
        }
        for b in self.bands:
            out[b] = col.aggregate_array(b).getInfo()

        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ==================================
# IFS (ECMWF/NRT_FORECAST/IFS/OPER)
# ==================================
class ifsForecast(GEE_Client):
    def __init__(self, eFeaturesLocations, scale=28000, key_path=None):
        super().__init__(key_path=key_path)
        self.scale = scale
        self.eFeaturesLocations = eFeaturesLocations
        self.raw_bands = [
            'u_component_of_wind_10m_sfc',
            'v_component_of_wind_10m_sfc',
            'u_component_of_wind_100m_sfc',
            'v_component_of_wind_100m_sfc',
            'temperature_2m_sfc'
        ]
        self.outBands = ['wwind10','wwind100','t2m']
        self.imgCollection = None

    @staticmethod
    def _ws(img: ee.Image, uVar: str, vVar: str, name: str) -> ee.Image:
        w = img.expression('sqrt(u*u + v*v)', {'u': img.select(uVar), 'v': img.select(vVar)}).rename(name)
        return img.addBands(w)

    def getForecasts(self, *, initDate):
        """
        initDate in UTC (string 'YYYY-MM-DDTHH:mm:ss' o epoch ms).
        - Filter in UTC (creation_time).
        - Change 'initDate' for America/Lima.
        """
        t_ms = ee.Date(initDate).millis() if isinstance(initDate, str) else ee.Number(initDate)

        coll = (ee.ImageCollection('ECMWF/NRT_FORECAST/IFS/OPER')
                .filter(ee.Filter.eq('creation_time', t_ms))
                .select(self.raw_bands))

        #-- Magnitudes 10 m and 100 m  --#
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_10m_sfc',
                                             'v_component_of_wind_10m_sfc',
                                             'wwind10'))
        coll = coll.map(lambda img: self._ws(img,
                                             'u_component_of_wind_100m_sfc',
                                             'v_component_of_wind_100m_sfc',
                                             'wwind100'))

        coll = coll.map(lambda img: img.select(['wwind10','wwind100','temperature_2m_sfc'],
                                               self.outBands))

        #-- Prepare initDate (consulta) en Lima --#
        init_pe = ee.Date(t_ms).format("YYYY-MM-dd'T'HH", 'America/Lima')

        def _reduce(img: ee.Image):
            t_valid = self._safe_time(img)
            date_pe = ee.Date(t_valid).format("YYYY-MM-dd'T'HH", 'America/Lima')
            fc = img.reduceRegions(self.eFeaturesLocations, ee.Reducer.mean(), self.scale)
            return fc.map(lambda f: f.set({'date': date_pe,
                                           'initDate': init_pe,
                                           'tz': 'America/Lima'}))

        self.imgCollection = coll.map(_reduce)

    def getDataFrame(self) -> pd.DataFrame:
        col = self.imgCollection.flatten()

        #-- initDate (consulta) y date ya en Lima "YYYY-MM-DDTHH" --#
        initDate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('initDate').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )
        fdate = pd.to_datetime(
            [str(x)[:13] for x in col.aggregate_array('date').getInfo()],
            format='%Y-%m-%dT%H', errors='coerce'
        )

        out = {
            'initDate': initDate,
            'date': fdate,
            'name': col.aggregate_array('name').getInfo(),
            'wwind10': col.aggregate_array('wwind10').getInfo(),
            'wwind100': col.aggregate_array('wwind100').getInfo(),
            't2m': col.aggregate_array('t2m').getInfo(),
        }
        return pd.DataFrame(out).sort_values(['name','initDate','date']).reset_index(drop=True)


# ==============================
# Ejemplo de uso (COMENTADO)
# ========================
if __name__ == "__main__":
    KEY = "/home/cenciso/Downloads/my-gee-dashboard-c87a3d557c85.json"
    GEE_Client(key_path=KEY)
    #-- Load all windfarms --#
    fc = ee.FeatureCollection([
        ee.Feature(ee.Geometry.Point([-79.48,  -7.55]), {"name": "W.F. Cupisnique"}),
        ee.Feature(ee.Geometry.Point([-78.98,  -6.45]), {"name": "W.F. Duna"}),
        ee.Feature(ee.Geometry.Point([-78.98,  -6.45]), {"name": "W.F. Huambos"}),
        ee.Feature(ee.Geometry.Point([-75.07, -15.41]), {"name": "W.F. Marcona"}),
        ee.Feature(ee.Geometry.Point([-75.92, -14.64]), {"name": "W.F. Punta Lomitas"}),
        ee.Feature(ee.Geometry.Point([-75.14, -15.39]), {"name": "W.F. San Juan"}),
        ee.Feature(ee.Geometry.Point([-81.20,  -4.56]), {"name": "W.F. Talara"}),
        ee.Feature(ee.Geometry.Point([-75.06, -15.38]), {"name": "W.F. Tres Hermanas"}),
        ee.Feature(ee.Geometry.Point([-75.04, -15.06]), {"name": "W.F. Wayra Ext"}),
        ee.Feature(ee.Geometry.Point([-75.05, -15.04]), {"name": "W.F. Wayra I"})
        ])
    #-- Start/End Dates --#
    def rangeDateRetrieve(delta=8):
        endDate = datetime.datetime.now()
        startDate = endDate - datetime.timedelta(days=delta)
        return startDate, endDate

    #-- GFS --#
    gfs = gfsForecast(eFeaturesLocations=fc, key_path=KEY)
    startDate, endDate = rangeDateRetrieve()
    gfs.getForecasts(start_date=startDate.strftime("%Y-%m-%d"), 
                     end_date=endDate.strftime("%Y-%m-%d"))
    df_gfs = gfs.getDataFrame()
    df_gfs['model'] = 'GFS'
    #-- Add wind at 100 m --#
    windHeightEq = lambda w, alpha: w*((90/10)**alpha)
    z = 90
    z0 = 10
    alpha = .14
    df_gfs['wwind100'] = df_gfs['wwind10']* (z / z0) ** alpha

    #-- IFS --#
    ifs = ifsForecast(eFeaturesLocations=fc, key_path=KEY)
    #-- Loop --#
    rngDateIfs = pd.date_range(start=startDate.strftime('%Y-%m-%d'),
                               end=endDate.strftime('%Y-%m-%d'), 
                               freq='12h')
    ifsContainer = []
    for sdt in rngDateIfs:
        ifs.getForecasts(initDate=sdt.strftime("%Y-%m-%dT%H:00:00"))
        df_ifs = ifs.getDataFrame()
        df_ifs['model'] = 'IFS'
        ifsContainer.append(df_ifs)
    df_ifs = pd.concat(ifsContainer, ignore_index=True)
    #-- Concat all forecast --#
    dataGfsIfs = pd.concat([df_gfs.query("date>=20240101"),df_ifs], ignore_index=True)
    dataGfsIfs['nleadHour'] = ((dataGfsIfs['date'] - dataGfsIfs['initDate']).dt.total_seconds() / 3600).astype(int)
    dataGfsIfs['nleadDays'] = (dataGfsIfs['nleadHour'] / 24).astype(int)
    #-- Choose the last forecast --#
    # dataGfsIfs = dataGfsIfs.groupby('model', group_keys=True).apply(
    #     lambda x: x[x['initDate'] == x['initDate'].max()],
    #     include_groups=False,
    #     ).reset_index(level=0)
    #-- Save as parquet --#
    dataGfsIfs.to_parquet("../dataset/windSpeedFcs.parquet", index=False)

*** Earth Engine *** Share your feedback by taking our Annual Developer Satisfaction Survey: https://google.qualtrics.com/jfe/form/SV_7TDKVSyKvBdmMqW?ref=4i2o6


GEE authenticated.


In [2]:
dataGfsIfs

Unnamed: 0,initDate,date,name,wwind10,t2m,model,wwind100,nleadHour,nleadDays
0,2025-09-13 19:00:00,2025-09-13 19:00:00,W.F. Cupisnique,7.306956,16.388635,GFS,9.938720,0,0
1,2025-09-13 19:00:00,2025-09-13 20:00:00,W.F. Cupisnique,6.623333,16.282220,GFS,9.008875,1,0
2,2025-09-13 19:00:00,2025-09-13 21:00:00,W.F. Cupisnique,6.023108,16.324701,GFS,8.192466,2,0
3,2025-09-13 19:00:00,2025-09-13 22:00:00,W.F. Cupisnique,5.129387,16.198358,GFS,6.976851,3,0
4,2025-09-13 19:00:00,2025-09-13 23:00:00,W.F. Cupisnique,4.400807,16.006433,GFS,5.985856,4,0
...,...,...,...,...,...,...,...,...,...
81325,2025-09-21 19:00:00,2025-10-05 19:00:00,W.F. Wayra I,4.247760,18.357385,IFS,5.403200,336,14
81326,2025-09-21 19:00:00,2025-10-06 01:00:00,W.F. Wayra I,0.130853,15.821466,IFS,0.194518,342,14
81327,2025-09-21 19:00:00,2025-10-06 07:00:00,W.F. Wayra I,0.896410,15.954126,IFS,0.930132,348,14
81328,2025-09-21 19:00:00,2025-10-06 13:00:00,W.F. Wayra I,4.087511,30.063318,IFS,4.495761,354,14


In [10]:
rngDateIfs

DatetimeIndex(['2025-09-08 00:00:00', '2025-09-08 12:00:00',
               '2025-09-09 00:00:00', '2025-09-09 12:00:00',
               '2025-09-10 00:00:00', '2025-09-10 12:00:00',
               '2025-09-11 00:00:00', '2025-09-11 12:00:00',
               '2025-09-12 00:00:00', '2025-09-12 12:00:00',
               '2025-09-13 00:00:00', '2025-09-13 12:00:00',
               '2025-09-14 00:00:00', '2025-09-14 12:00:00',
               '2025-09-15 00:00:00', '2025-09-15 12:00:00',
               '2025-09-16 00:00:00'],
              dtype='datetime64[ns]', freq='12h')

In [11]:
dataGfsIfs.groupby(['name','model','initDate']).agg({'date':['min','max']})

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,date,date
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,min,max
name,model,initDate,Unnamed: 3_level_2,Unnamed: 4_level_2
W.F. Cupisnique,GFS,2025-09-07 19:00:00,2025-09-07 19:00:00,2025-09-23 19:00:00
W.F. Cupisnique,GFS,2025-09-08 01:00:00,2025-09-08 01:00:00,2025-09-24 01:00:00
W.F. Cupisnique,GFS,2025-09-08 07:00:00,2025-09-08 07:00:00,2025-09-24 07:00:00
W.F. Cupisnique,GFS,2025-09-08 13:00:00,2025-09-08 13:00:00,2025-09-24 13:00:00
W.F. Cupisnique,GFS,2025-09-08 19:00:00,2025-09-08 19:00:00,2025-09-24 19:00:00
...,...,...,...,...
W.F. Wayra I,IFS,2025-09-13 19:00:00,2025-09-13 19:00:00,2025-09-28 19:00:00
W.F. Wayra I,IFS,2025-09-14 07:00:00,2025-09-14 07:00:00,2025-09-29 07:00:00
W.F. Wayra I,IFS,2025-09-14 19:00:00,2025-09-14 19:00:00,2025-09-29 19:00:00
W.F. Wayra I,IFS,2025-09-15 07:00:00,2025-09-15 07:00:00,2025-09-30 07:00:00


In [47]:
dataGfsIfs.groupby('model', group_keys=True).apply(
        lambda x: x[x['initDate'] == x['initDate'].max()],
        include_groups=False,
        ).reset_index(level=0)

Unnamed: 0,model,initDate,date,name,wwind10,t2m,wwind100,nleadHour,nleadDays
627,GFS,2025-09-15 13:00:00,2025-09-15 13:00:00,W.F. Cupisnique,5.859033,20.327295,7.969296,0,0
628,GFS,2025-09-15 13:00:00,2025-09-15 14:00:00,W.F. Cupisnique,6.002487,20.628595,8.164418,1,0
629,GFS,2025-09-15 13:00:00,2025-09-15 15:00:00,W.F. Cupisnique,6.566195,20.314630,8.931157,2,0
630,GFS,2025-09-15 13:00:00,2025-09-15 16:00:00,W.F. Cupisnique,6.078243,19.427301,8.267458,3,0
631,GFS,2025-09-15 13:00:00,2025-09-15 17:00:00,W.F. Cupisnique,6.569983,18.964990,8.936310,4,0
...,...,...,...,...,...,...,...,...,...
9205,IFS,2025-09-15 07:00:00,2025-09-29 07:00:00,W.F. Wayra I,1.581058,13.514246,1.940975,336,14
9206,IFS,2025-09-15 07:00:00,2025-09-29 13:00:00,W.F. Wayra I,0.204385,27.026941,0.271043,342,14
9207,IFS,2025-09-15 07:00:00,2025-09-29 19:00:00,W.F. Wayra I,2.664290,20.915735,3.168192,348,14
9208,IFS,2025-09-15 07:00:00,2025-09-30 01:00:00,W.F. Wayra I,0.420670,16.321558,0.414938,354,14


In [52]:
pd.read_parquet('../dataset/windSpeedFcs.parquet')

Unnamed: 0,model,initDate,date,name,wwind10,t2m,wwind100,nleadHour,nleadDays
0,GFS,2025-09-15 13:00:00,2025-09-15 13:00:00,W.F. Cupisnique,5.859033,20.327295,7.969296,0,0
1,GFS,2025-09-15 13:00:00,2025-09-15 14:00:00,W.F. Cupisnique,6.002487,20.628595,8.164418,1,0
2,GFS,2025-09-15 13:00:00,2025-09-15 15:00:00,W.F. Cupisnique,6.566195,20.314630,8.931157,2,0
3,GFS,2025-09-15 13:00:00,2025-09-15 16:00:00,W.F. Cupisnique,6.078243,19.427301,8.267458,3,0
4,GFS,2025-09-15 13:00:00,2025-09-15 17:00:00,W.F. Cupisnique,6.569983,18.964990,8.936310,4,0
...,...,...,...,...,...,...,...,...,...
2935,IFS,2025-09-15 07:00:00,2025-09-29 07:00:00,W.F. Wayra I,1.581058,13.514246,1.940975,336,14
2936,IFS,2025-09-15 07:00:00,2025-09-29 13:00:00,W.F. Wayra I,0.204385,27.026941,0.271043,342,14
2937,IFS,2025-09-15 07:00:00,2025-09-29 19:00:00,W.F. Wayra I,2.664290,20.915735,3.168192,348,14
2938,IFS,2025-09-15 07:00:00,2025-09-30 01:00:00,W.F. Wayra I,0.420670,16.321558,0.414938,354,14


In [41]:
dataGfsIfs

Unnamed: 0,initDate,date,name,wwind10,t2m,wwind100,nleadHour,nleadDays
0,2025-09-15 13:00:00,2025-09-15 13:00:00,W.F. Cupisnique,5.859033,20.327295,7.969296,0,0
1,2025-09-15 13:00:00,2025-09-15 14:00:00,W.F. Cupisnique,6.002487,20.628595,8.164418,1,0
2,2025-09-15 13:00:00,2025-09-15 15:00:00,W.F. Cupisnique,6.566195,20.314630,8.931157,2,0
3,2025-09-15 13:00:00,2025-09-15 16:00:00,W.F. Cupisnique,6.078243,19.427301,8.267458,3,0
4,2025-09-15 13:00:00,2025-09-15 17:00:00,W.F. Cupisnique,6.569983,18.964990,8.936310,4,0
...,...,...,...,...,...,...,...,...
2935,2025-09-15 07:00:00,2025-09-29 07:00:00,W.F. Wayra I,1.581058,13.514246,1.940975,336,14
2936,2025-09-15 07:00:00,2025-09-29 13:00:00,W.F. Wayra I,0.204385,27.026941,0.271043,342,14
2937,2025-09-15 07:00:00,2025-09-29 19:00:00,W.F. Wayra I,2.664290,20.915735,3.168192,348,14
2938,2025-09-15 07:00:00,2025-09-30 01:00:00,W.F. Wayra I,0.420670,16.321558,0.414938,354,14


In [35]:
dataGfsIfs.groupby('model').apply(
    lambda x: x[x['initDate'] == x['initDate'].max()]
).reset_index(drop=True)

  dataGfsIfs.groupby('model').apply(


Unnamed: 0,initDate,date,name,wwind10,t2m,model,wwind100,nleadHour,nleadDays
0,2025-09-15 13:00:00,2025-09-15 13:00:00,W.F. Cupisnique,5.859033,20.327295,GFS,7.969296,0,0
1,2025-09-15 13:00:00,2025-09-15 14:00:00,W.F. Cupisnique,6.002487,20.628595,GFS,8.164418,1,0
2,2025-09-15 13:00:00,2025-09-15 15:00:00,W.F. Cupisnique,6.566195,20.314630,GFS,8.931157,2,0
3,2025-09-15 13:00:00,2025-09-15 16:00:00,W.F. Cupisnique,6.078243,19.427301,GFS,8.267458,3,0
4,2025-09-15 13:00:00,2025-09-15 17:00:00,W.F. Cupisnique,6.569983,18.964990,GFS,8.936310,4,0
...,...,...,...,...,...,...,...,...,...
2935,2025-09-15 07:00:00,2025-09-29 07:00:00,W.F. Wayra I,1.581058,13.514246,IFS,1.940975,336,14
2936,2025-09-15 07:00:00,2025-09-29 13:00:00,W.F. Wayra I,0.204385,27.026941,IFS,0.271043,342,14
2937,2025-09-15 07:00:00,2025-09-29 19:00:00,W.F. Wayra I,2.664290,20.915735,IFS,3.168192,348,14
2938,2025-09-15 07:00:00,2025-09-30 01:00:00,W.F. Wayra I,0.420670,16.321558,IFS,0.414938,354,14


In [38]:
(1940/10)/2

97.0

In [33]:
dataGfsIfs

Unnamed: 0,initDate,date,name,wwind10,t2m,model,wwind100,nleadHour,nleadDays
0,2025-09-14 19:00:00,2025-09-14 19:00:00,W.F. Cupisnique,6.829865,16.770135,GFS,9.289793,0,0
1,2025-09-14 19:00:00,2025-09-14 20:00:00,W.F. Cupisnique,6.395040,16.554102,GFS,8.698357,1,0
2,2025-09-14 19:00:00,2025-09-14 21:00:00,W.F. Cupisnique,5.220991,16.489496,GFS,7.101448,2,0
3,2025-09-14 19:00:00,2025-09-14 22:00:00,W.F. Cupisnique,4.514660,16.404260,GFS,6.140715,3,0
4,2025-09-14 19:00:00,2025-09-14 23:00:00,W.F. Cupisnique,4.312327,16.258051,GFS,5.865508,4,0
...,...,...,...,...,...,...,...,...,...
9205,2025-09-15 07:00:00,2025-09-29 07:00:00,W.F. Wayra I,1.581058,13.514246,IFS,1.940975,336,14
9206,2025-09-15 07:00:00,2025-09-29 13:00:00,W.F. Wayra I,0.204385,27.026941,IFS,0.271043,342,14
9207,2025-09-15 07:00:00,2025-09-29 19:00:00,W.F. Wayra I,2.664290,20.915735,IFS,3.168192,348,14
9208,2025-09-15 07:00:00,2025-09-30 01:00:00,W.F. Wayra I,0.420670,16.321558,IFS,0.414938,354,14


In [None]:
# Selecciona el grupo correspondiente al último initDate disponible (último forecast)
ultimo_initDate = dataGfsIfs['initDate'].max()
df_ultimo_forecast = dataGfsIfs[dataGfsIfs['initDate'] == ultimo_initDate]
df_ultimo_forecast

In [None]:
my dataframe has the co

In [34]:
dataGfsIfs.groupby(['name','model']).agg({'initDate':'last'})

Unnamed: 0_level_0,Unnamed: 1_level_0,initDate
name,model,Unnamed: 2_level_1
W.F. Cupisnique,GFS,2025-09-15 13:00:00
W.F. Cupisnique,IFS,2025-09-15 07:00:00
W.F. Duna,GFS,2025-09-15 13:00:00
W.F. Duna,IFS,2025-09-15 07:00:00
W.F. Huambos,GFS,2025-09-15 13:00:00
W.F. Huambos,IFS,2025-09-15 07:00:00
W.F. Marcona,GFS,2025-09-15 13:00:00
W.F. Marcona,IFS,2025-09-15 07:00:00
W.F. Punta Lomitas,GFS,2025-09-15 13:00:00
W.F. Punta Lomitas,IFS,2025-09-15 07:00:00


In [31]:
dataGfsIfs.groupby(['name','model','initDate']).tail(1)

Unnamed: 0,initDate,date,name,wwind10,t2m,model,wwind100,nleadHour,nleadDays
208,2025-09-14 19:00:00,2025-09-30 19:00:00,W.F. Cupisnique,4.178983,18.627313,GFS,5.684137,384,16
417,2025-09-15 01:00:00,2025-10-01 01:00:00,W.F. Cupisnique,3.156311,16.825372,GFS,4.293127,384,16
626,2025-09-15 07:00:00,2025-10-01 07:00:00,W.F. Cupisnique,2.689424,16.430658,GFS,3.658081,384,16
835,2025-09-15 13:00:00,2025-10-01 13:00:00,W.F. Cupisnique,6.775535,21.15365,GFS,9.215895,384,16
1044,2025-09-14 19:00:00,2025-09-30 19:00:00,W.F. Duna,2.013218,13.527307,GFS,2.738323,384,16
1253,2025-09-15 01:00:00,2025-10-01 01:00:00,W.F. Duna,1.067021,12.025385,GFS,1.451333,384,16
1462,2025-09-15 07:00:00,2025-10-01 07:00:00,W.F. Duna,1.832488,14.230676,GFS,2.4925,384,16
1671,2025-09-15 13:00:00,2025-10-01 13:00:00,W.F. Duna,3.514501,18.15365,GFS,4.780327,384,16
1880,2025-09-14 19:00:00,2025-09-30 19:00:00,W.F. Huambos,2.013218,13.527307,GFS,2.738323,384,16
2089,2025-09-15 01:00:00,2025-10-01 01:00:00,W.F. Huambos,1.067021,12.025385,GFS,1.451333,384,16


In [28]:
dataGfsIfs

Unnamed: 0,initDate,date,name,wwind10,t2m,model,wwind100,nleadHour,nleadDays
0,2025-09-14 19:00:00,2025-09-14 19:00:00,W.F. Cupisnique,6.829865,16.770135,GFS,9.289793,0,0
1,2025-09-14 19:00:00,2025-09-14 20:00:00,W.F. Cupisnique,6.395040,16.554102,GFS,8.698357,1,0
2,2025-09-14 19:00:00,2025-09-14 21:00:00,W.F. Cupisnique,5.220991,16.489496,GFS,7.101448,2,0
3,2025-09-14 19:00:00,2025-09-14 22:00:00,W.F. Cupisnique,4.514660,16.404260,GFS,6.140715,3,0
4,2025-09-14 19:00:00,2025-09-14 23:00:00,W.F. Cupisnique,4.312327,16.258051,GFS,5.865508,4,0
...,...,...,...,...,...,...,...,...,...
9205,2025-09-15 07:00:00,2025-09-29 07:00:00,W.F. Wayra I,1.581058,13.514246,IFS,1.940975,336,14
9206,2025-09-15 07:00:00,2025-09-29 13:00:00,W.F. Wayra I,0.204385,27.026941,IFS,0.271043,342,14
9207,2025-09-15 07:00:00,2025-09-29 19:00:00,W.F. Wayra I,2.664290,20.915735,IFS,3.168192,348,14
9208,2025-09-15 07:00:00,2025-09-30 01:00:00,W.F. Wayra I,0.420670,16.321558,IFS,0.414938,354,14


In [30]:
dataGfsIfs.groupby(['name','model','initDate']).agg({'date':['min','max']})

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,date,date
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,min,max
name,model,initDate,Unnamed: 3_level_2,Unnamed: 4_level_2
W.F. Cupisnique,GFS,2025-09-14 19:00:00,2025-09-14 19:00:00,2025-09-30 19:00:00
W.F. Cupisnique,GFS,2025-09-15 01:00:00,2025-09-15 01:00:00,2025-10-01 01:00:00
W.F. Cupisnique,GFS,2025-09-15 07:00:00,2025-09-15 07:00:00,2025-10-01 07:00:00
W.F. Cupisnique,GFS,2025-09-15 13:00:00,2025-09-15 13:00:00,2025-10-01 13:00:00
W.F. Cupisnique,IFS,2025-09-15 07:00:00,2025-09-15 07:00:00,2025-09-30 07:00:00
W.F. Duna,GFS,2025-09-14 19:00:00,2025-09-14 19:00:00,2025-09-30 19:00:00
W.F. Duna,GFS,2025-09-15 01:00:00,2025-09-15 01:00:00,2025-10-01 01:00:00
W.F. Duna,GFS,2025-09-15 07:00:00,2025-09-15 07:00:00,2025-10-01 07:00:00
W.F. Duna,GFS,2025-09-15 13:00:00,2025-09-15 13:00:00,2025-10-01 13:00:00
W.F. Duna,IFS,2025-09-15 07:00:00,2025-09-15 07:00:00,2025-09-30 07:00:00


In [19]:
startDate.strftime("%Y-%m-%d %H:00")

'2025-09-15 00:00'

In [13]:
startDate, endDate = rangeDateRetrieve()

In [15]:
startDate.strftime("%Y-%m-%dT:%H:00")

'2025-09-15T:18:00'

In [16]:
endDate.strftime("%Y-%m-%dT:%H:00")

'2025-09-16T:00:00'

In [11]:
datetime.datetime.now() - datetime.timedelta(hours=6)

datetime.datetime(2025, 9, 15, 18, 12, 7, 251955)

In [None]:
import datetime

dt_minus_6h = datetime.datetime.now() - datetime.timedelta(hours=6)
print(dt_minus_6h)

In [8]:
dataGfsIfs

Unnamed: 0,initDate,date,name,wwind10,t2m,model,wwind100,nleadHour,nleadDays
0,2024-12-31 19:00:00,2024-12-31 19:00:00,W.F. Cupisnique,5.058703,21.543909,GFS,6.880708,0,0
1,2024-12-31 19:00:00,2024-12-31 20:00:00,W.F. Cupisnique,4.960283,21.400262,GFS,6.746840,1,0
2,2024-12-31 19:00:00,2024-12-31 21:00:00,W.F. Cupisnique,4.971414,21.327539,GFS,6.761979,2,0
3,2024-12-31 19:00:00,2024-12-31 22:00:00,W.F. Cupisnique,4.695304,21.264276,GFS,6.386423,3,0
4,2024-12-31 19:00:00,2024-12-31 23:00:00,W.F. Cupisnique,4.289184,21.089227,GFS,5.834029,4,0
...,...,...,...,...,...,...,...,...,...
9205,2024-11-13 07:00:00,2024-11-27 07:00:00,W.F. Wayra I,2.889272,17.702325,IFS,4.006023,336,14
9206,2024-11-13 07:00:00,2024-11-27 13:00:00,W.F. Wayra I,6.126665,28.574854,IFS,6.770071,342,14
9207,2024-11-13 07:00:00,2024-11-27 19:00:00,W.F. Wayra I,4.241632,19.710962,IFS,5.340551,348,14
9208,2024-11-13 07:00:00,2024-11-28 01:00:00,W.F. Wayra I,2.943751,15.951013,IFS,3.968647,354,14


In [6]:
df_ifs

Unnamed: 0,initDate,date,name,wwind10,wwind100,t2m,model
0,2024-11-13 07:00:00,2024-11-13 07:00:00,W.F. Cupisnique,3.412731,4.002034,17.739099,IFS
1,2024-11-13 07:00:00,2024-11-13 10:00:00,W.F. Cupisnique,3.829248,3.951125,21.268060,IFS
2,2024-11-13 07:00:00,2024-11-13 13:00:00,W.F. Cupisnique,6.157979,6.942782,21.782098,IFS
3,2024-11-13 07:00:00,2024-11-13 16:00:00,W.F. Cupisnique,7.785364,9.299273,21.145441,IFS
4,2024-11-13 07:00:00,2024-11-13 19:00:00,W.F. Cupisnique,6.373901,8.489606,18.968378,IFS
...,...,...,...,...,...,...,...
845,2024-11-13 07:00:00,2024-11-27 07:00:00,W.F. Wayra I,2.889272,4.006023,17.702325,IFS
846,2024-11-13 07:00:00,2024-11-27 13:00:00,W.F. Wayra I,6.126665,6.770071,28.574854,IFS
847,2024-11-13 07:00:00,2024-11-27 19:00:00,W.F. Wayra I,4.241632,5.340551,19.710962,IFS
848,2024-11-13 07:00:00,2024-11-28 01:00:00,W.F. Wayra I,2.943751,3.968647,15.951013,IFS


In [7]:
df_gfs

Unnamed: 0,initDate,date,name,wwind10,t2m,model,wwind100
0,2024-12-31 19:00:00,2024-12-31 19:00:00,W.F. Cupisnique,5.058703,21.543909,GFS,6.880708
1,2024-12-31 19:00:00,2024-12-31 20:00:00,W.F. Cupisnique,4.960283,21.400262,GFS,6.746840
2,2024-12-31 19:00:00,2024-12-31 21:00:00,W.F. Cupisnique,4.971414,21.327539,GFS,6.761979
3,2024-12-31 19:00:00,2024-12-31 22:00:00,W.F. Cupisnique,4.695304,21.264276,GFS,6.386423
4,2024-12-31 19:00:00,2024-12-31 23:00:00,W.F. Cupisnique,4.289184,21.089227,GFS,5.834029
...,...,...,...,...,...,...,...
8355,2025-01-01 13:00:00,2025-01-17 01:00:00,W.F. Wayra I,1.329683,18.827753,GFS,1.808598
8356,2025-01-01 13:00:00,2025-01-17 04:00:00,W.F. Wayra I,1.262610,18.402673,GFS,1.717367
8357,2025-01-01 13:00:00,2025-01-17 07:00:00,W.F. Wayra I,1.284092,19.190759,GFS,1.746587
8358,2025-01-01 13:00:00,2025-01-17 10:00:00,W.F. Wayra I,1.875136,23.415216,GFS,2.550508


In [8]:
class gee_GfsIfs:
    def __init__(self, key_path="/home/cenciso/Downloads/my-gee-dashboard-c87a3d557c85.json"):
        #-- Credentials --#
        self.key_path = key_path
        self.gee_auth()

    def gee_auth(self):
        #-- Cargar JSON desde archivo --#
        with open(self.key_path, "r") as f:
            key_dict = json.load(f)
        scopes = [
            "https://www.googleapis.com/auth/earthengine",
            "https://www.googleapis.com/auth/devstorage.read_only",
        ]
        credentials = service_account.Credentials.from_service_account_info(key_dict, scopes=scopes)
        ee.Initialize(credentials)
        print("Google Earth Engine authenticated with local JSON file.")

#-- Initialize --#
gee = gee_GfsIfs()


Google Earth Engine authenticated with local JSON file.


In [None]:
def gee_auth_from_b64_env(var_name="EE_SERVICE_ACCOUNT_JSON_B64"):
    key_b64 = os.environ.get(var_name)
    if not key_b64:
        raise RuntimeError(f"Falta la variable de entorno {var_name}")
    key_json = base64.b64decode(key_b64).decode("utf-8")
    info = json.loads(key_json)

    scopes = [
        "https://www.googleapis.com/auth/earthengine",
        "https://www.googleapis.com/auth/devstorage.read_only",
    ]
    creds = service_account.Credentials.from_service_account_info(info, scopes=scopes)
    ee.Initialize(creds)
    print("✅ Autenticado en Earth Engine (local con Base64)")

# --- USO ---
gee_auth_from_b64_env()


In [None]:
s3://cdh-hydrolongterm-514438/longterm-forecast/monthly/run_date=2024-01-01/version=v1.0/ensemble/forecast.parquet

360

In [55]:
s3_upload_parquet(datetime.datetime(2024, 1, 1))

Unnamed: 0,name
0,QN-Mantaro
1,QN-Mantaro
2,QN-Mantaro
3,QN-Mantaro
4,QN-Mantaro
...,...
355,QN-Vilcanota
356,QN-Vilcanota
357,QN-Vilcanota
358,QN-Vilcanota


In [None]:
np.random.randn(18, )

In [None]:
data = np.random.randn(3, 12)
merged_array = data.flatten()[:26]
print(merged_array)
print(merged_array.shape)  # (26,)

In [34]:
import numpy as np
np.random.randn(3, 12).flatten().shape

(36,)

In [None]:
rng_date = pd.date_range(start="2024-01-01", end="2024-01-10", freq='D')
for single_date in rng_date:
    # print(single_date.strftime("%Y-%m-%d"))
    print(f"InitDate_{single_date:%Y-%m-%d}")

InitDate_2024-01-01
InitDate_2024-01-02
InitDate_2024-01-03
InitDate_2024-01-04
InitDate_2024-01-05
InitDate_2024-01-06
InitDate_2024-01-07
InitDate_2024-01-08
InitDate_2024-01-09
InitDate_2024-01-10


In [None]:
https://cdh-hydrolongterm-514438.s3.eu-west-1.amazonaws.com/longterm-forecasts/raw/20250911_160434_input.parquet

In [1]:
from dotenv import load_dotenv
load_dotenv(".env")

True

In [2]:
import boto3
s3 = boto3.client("s3")
s3.head_object(
    Bucket="cdh-hydrolongterm-514438",
    Key="longterm-forecasts/raw/20250911_160434_input.parquet"
)

{'ResponseMetadata': {'RequestId': 'R8YQRC84SW39NQ3M',
  'HostId': 'UThAGF2Jx5y8+R+pH9VWk3pZqslIy7xgpT6uTpINUXeYQ3lvwysyWjgM/WphbbFBudvOf1BVwLE=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'UThAGF2Jx5y8+R+pH9VWk3pZqslIy7xgpT6uTpINUXeYQ3lvwysyWjgM/WphbbFBudvOf1BVwLE=',
   'x-amz-request-id': 'R8YQRC84SW39NQ3M',
   'date': 'Thu, 11 Sep 2025 16:35:32 GMT',
   'last-modified': 'Thu, 11 Sep 2025 16:04:38 GMT',
   'etag': '"54054e8ae83393e08e2503d0244b30aa"',
   'x-amz-server-side-encryption': 'aws:kms',
   'x-amz-server-side-encryption-aws-kms-key-id': 'arn:aws:kms:eu-west-1:228119973315:key/95a16e40-70e3-4520-baba-7a90da7bcc5a',
   'x-amz-server-side-encryption-bucket-key-enabled': 'true',
   'accept-ranges': 'bytes',
   'content-type': 'binary/octet-stream',
   'content-length': '18687',
   'server': 'AmazonS3'},
  'RetryAttempts': 1},
 'AcceptRanges': 'bytes',
 'LastModified': datetime.datetime(2025, 9, 11, 16, 4, 38, tzinfo=tzutc()),
 'ContentLength': 18687,
 'ETag': '"54

In [3]:
import pandas as pd

uri = "s3://cdh-hydrolongterm-514438/longterm-forecasts/raw/20250911_160434_input.parquet"
df = pd.read_parquet(uri)   # con s3fs/pyarrow y tus env vars, debería funcionar
df.head()

Unnamed: 0,date,NombreEmpresa,Tipoinfoabrev,power,name
0,2025-09-05 00:30:00,ENGIE,MW,107.083,W.F. Punta Lomitas
1,2025-09-05 01:00:00,ENGIE,MW,107.083,W.F. Punta Lomitas
2,2025-09-05 01:30:00,ENGIE,MW,102.6775,W.F. Punta Lomitas
3,2025-09-05 02:00:00,ENGIE,MW,102.6775,W.F. Punta Lomitas
4,2025-09-05 02:30:00,ENGIE,MW,99.3825,W.F. Punta Lomitas


### **Subir parquet**

In [7]:
import boto3

s3 = boto3.client("s3")

# Ruta exacta en tu bucket (ajusta run_date y version si quieres)
bucket = "cdh-hydrolongterm-514438"
key = "longterm-forecasts/raw/test_forecast.parquet"

# Subir archivo local al S3/CDH
s3.upload_file("../dataset/currentGen.parquet", bucket, key)

print("Archivo subido a:", f"s3://{bucket}/{key}")

Archivo subido a: s3://cdh-hydrolongterm-514438/longterm-forecasts/raw/test_forecast.parquet


In [8]:
# Leer directo desde S3 (usando pandas + s3fs)
uri = f"s3://{bucket}/{key}"
df_check = pd.read_parquet(uri)
print(df_check)

                    date       NombreEmpresa Tipoinfoabrev     power  \
0    2025-09-05 00:30:00               ENGIE            MW  107.0830   
1    2025-09-05 01:00:00               ENGIE            MW  107.0830   
2    2025-09-05 01:30:00               ENGIE            MW  102.6775   
3    2025-09-05 02:00:00               ENGIE            MW  102.6775   
4    2025-09-05 02:30:00               ENGIE            MW   99.3825   
...                  ...                 ...           ...       ...   
2875 2025-09-10 22:00:00  ORYGEN PERU S.A.A.            MW  115.1340   
2876 2025-09-10 22:30:00  ORYGEN PERU S.A.A.            MW  114.4970   
2877 2025-09-10 23:00:00  ORYGEN PERU S.A.A.            MW  114.4970   
2878 2025-09-10 23:30:00  ORYGEN PERU S.A.A.            MW  112.3075   
2879 2025-09-11 00:00:00  ORYGEN PERU S.A.A.            MW  112.3075   

                    name  
0     W.F. Punta Lomitas  
1     W.F. Punta Lomitas  
2     W.F. Punta Lomitas  
3     W.F. Punta Lomitas  


In [4]:
!pwd

/Users/carlosenciso/Documents/ENGIE/windShortTermForecast/Notebooks


In [None]:
import os
from dotenv import load_dotenv

# Cargar variables del archivo .env
load_dotenv(".env")

# Verificación rápida
print("Access Key:", os.getenv("AWS_ACCESS_KEY_ID")[:5], "...")
print("Region:", os.getenv("AWS_DEFAULT_REGION"))

In [None]:
import boto3

s3 = boto3.client("s3")
resp = s3.list_objects_v2(
    Bucket="cdh-dsdatalakecoesprod-514438",
    Prefix="projects/hydroForecast-Peru/datasources/Hydro-LongTerm/datasets/LongTerm-Forecasts/raw/"
)
for obj in resp.get("Contents", []):
    print(obj["Key"])

In [None]:
#-- Send to CDH --#
import boto3
import pandas as pd
import os
from dotenv import load_dotenv

In [None]:
#-- Load dotenv --#
load_dotenv()
#-- Parquet file --#
parquetFile = '../dataset/currentGen.parquet'
print(f'File to send: {parquetFile}')
#-- The file exits --#
if os.path.exists(parquetFile):
    size_mb = os.path.getsize(parquetFile) / (1024 * 1024)
    print(f'File size: {size_mb:.2f} MB')
else:
    print(f'File {parquetFile} does not exist.')

In [None]:
#-- Credentials --#
aws_access_key = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_token = os.getenv('AWS_SESSION_TOKEN')
aws_region = os.getenv('AWS_REGION', 'us-east-1')
bucket_name = os.getenv('BUCKET_NAME', 'hydroforecast-peru-data')
print(f"🔑 Usando región: {aws_region}")
print(f"🪣 Bucket destino: {bucket_name}")

In [None]:
s3_client = boto3.client(
    's3',
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_key,
    aws_session_token=aws_token,
    region_name=aws_region
)
print("✅ Cliente S3 creado")

In [None]:
filename = os.path.basename(parquetFile) 
s3_key = f"hydroForecast-Peru/data/{filename}"
print(f"📍 Se guardará en: s3://{bucket_name}/{s3_key}")

In [None]:
try:
    print("📤 Subiendo archivo...")
    s3_client.upload_file(
        parquetFile,    
        bucket_name,    
        s3_key          
    )
    print("🎉 ¡SUBIDA EXITOSA!")
    print(f"🌐 Tu archivo está en: s3://{bucket_name}/{s3_key}")
except Exception as error:
    print(f"❌ Error: {str(error)}")
print("🏁 Proceso terminado")

In [None]:
#-- Send to CDH --#
import boto3
import pandas as pd
import os
from dotenv import load_dotenv

#-- Load dotenv --#
load_dotenv()
#-- Parquet file --#
parquetFile = '../dataset/currentGen.parquet'
print(f'File to send: {parquetFile}')
#-- The file exits --#
if os.path.exists(parquetFile):
    size_mb = os.path.getsize(parquetFile) / (1024 * 1024)
    print(f'File size: {size_mb:.2f} MB')
else:
    print(f'File {parquetFile} does not exist.')
    exit()

#-- Credentials --#
aws_access_key = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_token = os.getenv('AWS_SESSION_TOKEN')
aws_region = os.getenv('AWS_REGION', 'us-east-1')
bucket_name = os.getenv('BUCKET_NAME', 'hydroforecast-peru-data')
print(f"🔑 Usando región: {aws_region}")
print(f"🪣 Bucket destino: {bucket_name}")

s3_client = boto3.client(
    's3',
    aws_access_key_id=aws_access_key,
    aws_secret_access_key=aws_secret_key,
    aws_session_token=aws_token,
    region_name=aws_region
)
print("✅ Cliente S3 creado")

# Función para verificar y crear el bucket si no existe
def check_and_create_bucket(bucket_name, region):
    try:
        # Verificar si el bucket existe
        s3_client.head_bucket(Bucket=bucket_name)
        print(f"✅ Bucket '{bucket_name}' existe")
        return True
    except s3_client.exceptions.ClientError as e:
        error_code = e.response['Error']['Code']
        if error_code == '404':
            print(f"❌ Bucket '{bucket_name}' no existe, intentando crearlo...")
            try:
                if region == 'us-east-1':
                    # us-east-1 tiene una sintaxis especial
                    s3_client.create_bucket(Bucket=bucket_name)
                else:
                    s3_client.create_bucket(
                        Bucket=bucket_name,
                        CreateBucketConfiguration={'LocationConstraint': region}
                    )
                print(f"✅ Bucket '{bucket_name}' creado exitosamente")
                return True
            except Exception as create_error:
                print(f"❌ Error creando bucket: {create_error}")
                return False
        else:
            print(f"❌ Error accediendo al bucket: {e}")
            return False

# Verificar y crear el bucket si es necesario
if not check_and_create_bucket(bucket_name, aws_region):
    print("No se pudo acceder al bucket, terminando ejecución.")
    exit()

filename = os.path.basename(parquetFile) 
s3_key = f"hydroForecast-Peru/data/{filename}"
print(f"📍 Se guardará en: s3://{bucket_name}/{s3_key}")

try:
    print("📤 Subiendo archivo...")
    s3_client.upload_file(
        parquetFile,    
        bucket_name,    
        s3_key          
    )
    print("🎉 ¡SUBIDA EXITOSA!")
    print(f"🌐 Tu archivo está en: s3://{bucket_name}/{s3_key}")
except Exception as error:
    print(f"❌ Error: {str(error)}")
print("🏁 Proceso terminado")

In [None]:
arn:aws:iam::228119973315:role/cdh_hydroforecastperu_78495

In [None]:
#-- Read dataset from Athena S3 --#
import boto3
import pandas as pd
#-- Main code --#
s3 = boto3.client('s3')
"""
bucket = 'your-bucket-name'

"""

In [None]:
import boto3

In [None]:
import boto3

# Initialize S3 client (requires AWS credentials configured in ~/.aws/credentials or env vars)
s3 = boto3.client('s3')

bucket = "cdh-dsdatalakecoesprod-514438"
prefix = "central_generadora/"

# List objects
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in response.get('Contents', []):
    print(obj['Key'])

# Read one file (example CSV/JSON/Parquet)
obj = s3.get_object(Bucket=bucket, Key="central_generadora/example.csv")
data = obj['Body'].read().decode('utf-8')

print(data[:500])  # preview first 500 chars
