In [85]:
import sqlite3
import pandas as pd
import xarray as xr
import cdsapi
import os
import zipfile
import sqlite3
from datetime import datetime, timedelta
from calendar import monthrange
import requests
import zipfile
import io
from Func_extr import extract_infos

##### EXTRACCIÓN DE COPERMICUS

def preprocesarDataFrame(df):
    df = df[['valid_time', 'latitude', 'longitude', 'tp', 'skt', 'e', 'ro', 'sf',
         'swvl1', 'swvl2', 'swvl3', 'swvl4', 'cvh', 'cvl', 'tvh', 'tvl']]
    df = df.rename(columns={
        'valid_time': 'date',
        'tp': 'total_precipitation',
        'skt': 'skin_temperature',
        'e': 'evaporation',
        'ro': 'runoff',
        'sf': 'snowfall',
        'swvl1': 'soil_water_l1',
        'swvl2': 'soil_water_l2',
        'swvl3': 'soil_water_l3',
        'swvl4': 'soil_water_l4',
        'cvh': 'high_vegetation_cover',
        'cvl': 'low_vegetation_cover',
        'tvh': 'type_high_vegetation',
        'tvl': 'type_low_vegetation'
    })
    # Convertir la columna 'date' a formato de fecha
    df['date'] = pd.to_datetime(df['date']).dt.date
    # Realizar las agregaciones
    agg_funcs = {
        'total_precipitation': 'sum',
        'skin_temperature': 'mean',
        'evaporation': 'sum',
        'runoff': 'sum',
        'snowfall': 'sum',
        'soil_water_l1': 'sum',
        'soil_water_l2': 'sum',
        'soil_water_l3': 'sum',
        'soil_water_l4': 'sum',
        'high_vegetation_cover': 'mean',
        'low_vegetation_cover': 'mean',
        'type_high_vegetation': lambda x: x.mode()[0] if not x.mode().empty else np.nan,
        'type_low_vegetation': lambda x: x.mode()[0] if not x.mode().empty else np.nan
    }
    df = df.groupby(['latitude', 'longitude', 'date']).agg(agg_funcs).reset_index()
    return df


def downloadMesCopernicus (days,year, month):
    dataset = "reanalysis-era5-single-levels"
    request = {
    'product_type': ['ensemble_mean'],
    'variable': ['total_precipitation', 'skin_temperature', 'evaporation', 'runoff', 'snowfall', 'volumetric_soil_water_layer_1', 'volumetric_soil_water_layer_2', 'volumetric_soil_water_layer_3', 'volumetric_soil_water_layer_4', 'high_vegetation_cover', 'low_vegetation_cover', 'type_of_high_vegetation', 'type_of_low_vegetation'],
    'year': [str(year)],
    'month': [str(month)],
    'day': days,
    'time': ['00:00', '03:00', '06:00', '09:00', '12:00', '15:00', '18:00', '21:00'],
    'data_format': 'netcdf',
    'download_format': 'unarchived',
    'area': [40.5425, -2.255, 38.1739, 0.5665] ##Cuenca hidrográfica del Jucar +/-
    }
    client = cdsapi.Client()
    file_name = client.retrieve(dataset, request).download()

    return file_name
    
def procesarZip(zip_file_name):
    dataFrames = []
    with zipfile.ZipFile(zip_file_name, 'r') as zip_ref:
        # Listar los archivos en el ZIP
        archivos = zip_ref.namelist()
        for archivo_nc in archivos: 
            # Extraer el archivo 'instant'
            zip_ref.extract(archivo_nc)
            print(f"Archivo '{archivo_nc}' extraído.")
            ds = xr.open_dataset(archivo_nc)
            df = ds.to_dataframe().reset_index()
            dataFrames.append (df)
            ds.close()
            os.remove(archivo_nc)
    df1 = dataFrames[1]
    df1 = df1.drop(['latitude', 'longitude', 'valid_time'], axis = 1)
    df = pd.concat([dataFrames[0],df1], axis = 1)
    df = preprocesarDataFrame(df)
    fechaMin = df['date'].min()
    fechaMax = df['date'].max()
    print(f'Datos de coeprnicus con fechas de {fechaMin} a {fechaMax} descargadas correctamente')
    os.remove(zip_file_name)
    return df

def extraccionCopernicus (days,year, month):
    
    zip_file_name = downloadMesCopernicus(days,year, month)
    print(f"Archivo descargado: {zip_file_name}")

    df = procesarZip(zip_file_name)
    return df

def extraerUltimasFechasCopernicus():
    conn = sqlite3.connect('aguaCHJucar.db')
    
    cursor = conn.cursor()
    
    query = f'''
        SELECT 
        c.date_id, d.date
        FROM df_copernicus c JOIN df_date d ON  c.date_id = d.date_id;
    '''
    # Ejecutar la consulta
    cursor.execute(query)
    df_date = pd.read_sql_query(query, conn)
    df_date['date'] = pd.to_datetime(df_date['date']).dt.date
    ultima_fecha = df_date['date'].max()
    return ultima_fecha
    
def generar_diferencias_mes_a_mes(fecha1 ,fecha2):
    """
    Genera una lista de días, meses y años entre dos fechas mes a mes,
    considerando la fecha inicial desde su día específico y el día final.

    Parámetros:
    - fecha_actual: Fecha inicial en formato datetime.date.
    - fecha_futura: Fecha final en formato datetime.date.

    Retorno:
    - Lista de tuplas (días, año, mes), donde:
      - días: lista de strings con los días del mes ('15', '16', ..., '30')
      - año: año correspondiente
      - mes: mes correspondiente
    """
    diferencias = []
    fecha1 = fecha1  + timedelta(days=1)
    fecha_inicio = fecha1  

    while fecha_inicio <= fecha2:
        # Obtener año y mes actuales
        year = fecha_inicio.year
        month = fecha_inicio.month

        # Si estamos en el primer mes, iniciar desde el día específico de la fecha actual
        if fecha_inicio == fecha1:
            start_day = fecha_inicio.day
        else:
            start_day = 1
        
        # Para el último mes (fecha_futura), restringir al día final
        if fecha_inicio.month == fecha2.month:
            end_day = fecha2.day
        else:
            # Si no es el último mes, el mes completo
            end_day = monthrange(year, month)[1]

        # Calcular los días del mes actual, considerando start_day y end_day
        days = [f"{day:02d}" for day in range(start_day, end_day + 1)]
        
        # Agregar a la lista de diferencias
        diferencias.append((days, year, month))

        # Avanzar al siguiente mes
        if month == 12:
            fecha_inicio = fecha_inicio.replace(year=year + 1, month=1, day=1)
        else:
            fecha_inicio = fecha_inicio.replace(month=month + 1, day=1)
    
    return diferencias

def fechasActualizarCopernicus():
    fecha1 = extraerUltimasFechasCopernicus()
    fecha2 = fecha_actual = datetime.now().date()
    fechas_new = generar_diferencias_mes_a_mes(fecha1 ,fecha2)
    return fechas_new

def actualizarTablaCopernicus():
    fechas_new = fechasActualizarCopernicus()
    dataFrames = []
    for i in fechas_new:
        days = i[0]
        month = i[2]
        year = i[1]
        df = extraccionCopernicus (days,year, month)
        dataFrames.append(df)
    df = pd.concat(dataFrames)
    return df

##### EXTRACCIÓN DE LA CUENCA DEL JUCAR
def extraccionRiosCHJ():
    url = "https://aps.chj.es/down/CSV/F2796_Rios_y_Canales_ROEA.zip"
    fecha_inicial = pd.to_datetime(extraerUltimasFechasCopernicus())
    response = requests.get(url)
    response.raise_for_status()  # Verificar si la descarga fue exitosa
    
    # Paso 2: Cargar el contenido del ZIP en memoria
    zip_file = zipfile.ZipFile(io.BytesIO(response.content))
    target_file = "F2796_D2_Serie día.csv"
    if target_file in zip_file.namelist():
        with zip_file.open(target_file) as file:
            # Leer el archivo CSV directamente como DataFrame
            df_rios_canales = pd.read_csv(file, sep=";", encoding="latin1")  # Ajusta el separador y la codificación si es necesario
    
    else:
        print(f"El archivo '{target_file}' no se encuentra en el ZIP.")
    df_rios_canales = df_rios_canales.rename(columns = {'Cód. CHJ' : 'id_station','Fecha' : 'date','Cantidad (hm³)' : 'quantity_hm3'})
    df_rios_canales = df_rios_canales[['id_station', 'date','quantity_hm3']]
    df_rios_canales['date'] = pd.to_datetime(df_rios_canales['date'], format='%d-%m-%Y %H:%M:%S')
    df_rios_canales = df_rios_canales.dropna()
    df_rios_canales['quantity_hm3'] = df_rios_canales['quantity_hm3'].str.replace(',','.').astype('float')
    id_stations_list = []
    for pixel in range(176,301):
        id_stations = extract_infos (pixel)
        id_stations['location_id'] = pixel
        id_stations_list.append(id_stations)     
    id_stations_df = pd.concat(id_stations_list)
    df_rios_canales = df_rios_canales[df_rios_canales['date'] > fecha_inicial]
    id_stations_df = id_stations_df[['id_station_rios_canales','pixel']].drop_duplicates()
    id_stations_df= id_stations_df.rename(columns = {'id_station_rios_canales' : 'id_station'})
    df_rios_canales = pd.merge(df_rios_canales, id_stations_df, on = 'id_station')
    return df_rios_canales

def extraccionEmbalsesCHJ():
    url = "https://aps.chj.es/down/CSV/F2797_Embalses_ROEA.zip"
    fecha_inicial = pd.to_datetime(extraerUltimasFechasCopernicus())
    response = requests.get(url)
    response.raise_for_status()  # Verificar si la descarga fue exitosa
    
    # Paso 2: Cargar el contenido del ZIP en memoria
    zip_file = zipfile.ZipFile(io.BytesIO(response.content))
    target_file = "F2797_D1_Serie mes.xlsx"
    if target_file in zip_file.namelist():
        with zip_file.open(target_file) as file:
            # Leer el archivo CSV directamente como DataFrame
            df_embalses = pd.read_excel(file)  # Ajusta el separador y la codificación si es necesario
    
    else:
        print(f"El archivo '{target_file}' no se encuentra en el ZIP.")
    df_embalses = df_embalses.rename(columns = {'Cód. Embalse' : 'id_station','Fecha' : 'date','Volumen (hm³)' : 'quantity_hm3'})
    df_embalses = df_embalses[['id_station', 'date','quantity_hm3']]
    df_embalses['date'] = pd.to_datetime(df_rios_canales['date'], format='%d-%m-%Y %H:%M:%S')
    df_embalses = df_rios_canales.dropna()
    df_embalses['quantity_hm3'] = df_embalses['quantity_hm3'].str.replace(',','.').astype('float')
    id_stations_list = []
    for pixel in range(176,301):
        id_stations = extract_infos (pixel)
        id_stations['location_id'] = pixel
        id_stations_list.append(id_stations)     
    id_stations_df = pd.concat(id_stations_list)
    df_embalses = df_embalses[df_embalses['date'] > fecha_inicial]
    id_stations_df = id_stations_df[['id_station_rios_canales','pixel']].drop_duplicates()
    id_stations_df= id_stations_df.rename(columns = {'id_station_embalse' : 'id_station'})
    df_embalses = pd.merge(df_embalses, id_stations_df, on = 'id_station')
    return df_embalses

def preparacionIngesta(df_copernicus, df_rios):
    #tabla fechas
    df_copernicus['date'] = pd.to_datetime(df_copernicus['date']).drop_duplicates().reset_index(drop = True)
    dates1 = df_copernicus['date'].dropna()
    df_date1 = pd.DataFrame({'date': dates1})
    df_date1['date_id'] = df_date1['date'].dt.strftime('%Y%m%d').astype(int)
    
    df_rios['date'] = pd.to_datetime(df_rios['date']).drop_duplicates().reset_index(drop = True)
    dates2 = df_rios['date'].dropna()
    df_date2 = pd.DataFrame({'date': dates2})
    df_date2['date_id'] = df_date2['date'].dt.strftime('%Y%m%d').astype(int)
    df_date = pd.concat([df_date1, df_date2])
    df_date = df_date.drop_duplicates().reset_index(drop = True)
    
    #tabla df_copernicus
    conn = sqlite3.connect('aguaCHJucar.db')
    
    cursor = conn.cursor()
    query = f'''
        SELECT * FROM locations_id WHERE Type == 'Copernicus'
            ;
    '''
    
    df_loc = pd.read_sql_query(query, conn)
    cursor.execute(query)
    conn.close()
    
    df_copernicus = pd.merge(df_copernicus, df_loc[['latitude', 'longitude','location_id']], on = ['latitude', 'longitude'], how = 'inner')
    df_copernicus = pd.merge(df_copernicus, df_date, on = ['date'], how = 'inner')
    df_copernicus = df_copernicus.drop(['latitude', 'longitude', 'date'], axis = 1)
    
    #tabla ríos
    df_rios = pd.merge(df_date,df_rios, on = 'date', how = 'left')
    df_rios = df_rios[['quantity_hm3','location_id','date_id']]
    return df_copernicus, df_rios, df_date

def ingesta(df_copernicus, df_rios, df_date):
    # Conexión a la base de datos
    connection = sqlite3.connect("aguaCHJucar.db")
    cursor = connection.cursor()
    
    try:
        # Inserciones para df_copernicus
        for row in df_rios.itertuples(index=False):
            cursor.execute("""
            INSERT OR IGNORE INTO df_copernicus (
                location_id, date_id, total_precipitation,
                skin_temperature, evaporation, runoff, snowfall,
                soil_water_l1, soil_water_l2, soil_water_l3, soil_water_l4,
                high_vegetation_cover, low_vegetation_cover,
                type_high_vegetation, type_low_vegetation
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, row)
    
        # Inserciones para df_rios_canales
        for row in df_rios.itertuples(index=False):
            cursor.execute("""
            INSERT OR IGNORE INTO df_rios_canales (
                quantity_hm3, location_id, date_id
            ) VALUES (?, ?, ?)
            """, row)
    
        # Inserciones para df_date
        for row in df_date.itertuples(index=False):
            cursor.execute("""
            INSERT OR IGNORE INTO df_date (
                date, date_id
            ) VALUES (?, ?)
            """, row)
    
        # Confirmar todas las inserciones
        connection.commit()
    
    except sqlite3.Error as e:
        # Manejo de errores con rollback en caso de fallo
        connection.rollback()
        print(f"Error al insertar registros: {e}")
    
    finally:
        # Cerrar la conexión
        connection.close()

In [None]:
df_copernicus = actualizarTablaCopernicus()
df_rios = extraccionRiosCHJ()
df_copernicus, df_rios, df_date = preparacionIngesta(df_copernicus, df_rios)