# INE

La función 'leer_archivo_rango_periodo' lee un archivo CSV, crea una copia del DataFrame original, filtra las filas que contienen un periodo específico en la columna 'Periodo', reinicia los índices y devuelve el DataFrame resultante. Esto facilita la obtención de datos específicos para un periodo dado desde un archivo CSV:

In [15]:
#Leer CSV
def leer_archivo_rango_periodo(ruta_archivo, periodo):
    
    #leer archivo CSV
    ine_original = pd.read_csv(ruta_archivo, encoding='latin1',sep="\t")
    
    #crear copia del DF original
    ine_copia= ine_original.copy()
    
    #FIltrar el DataFrame para incluir solo las filas que contienen el periodo deseado
    ine_filtrado= ine_copia[ine_copia['Periodo'].str.contains(periodo)]
    
    ine_filtrado = ine_filtrado.reset_index(drop=True)
    
    return ine_filtrado

La función 'procesar_columnas' renombra algunas columnas específicas de un DataFrame, utilizando un diccionario para asignar nombres más claros y legibles. Luego, reinicia los índices del DataFrame y devuelve el resultado procesado. Esto facilita la limpieza y clarificación de los nombres de columnas:

In [16]:
def procesar_columnas(df):

    column_rename_dict = {
        'ï»¿Totales Territoriales': 'Totales_Territoriales',
        'Comunidades y Ciudades AutÃ³nomas': 'Comunidades_Ciudades_Autonomas',
        'Provincias': 'Provincia',
        'Viajeros y pernoctaciones': 'Viajeros_Pernoctaciones',
        'Residencia: Nivel 1': 'Residencia_Nivel_1',
        'Residencia: Nivel 2': 'Residencia_Nivel_2',
        'Periodo': 'Periodo',
        'Total': 'Total'
    }

    df.rename(columns=column_rename_dict, inplace=True)
    df.reset_index(drop=True, inplace=True)

    return df

La función 'actualizar_valor_en_fila' toma un DataFrame, el número de fila, el nombre de la columna y el valor a restar. Luego, actualiza el valor en la fila y columna especificadas con el resultado de la resta:

In [17]:
def actualizar_valor_en_fila(dataframe, fila, columna, resta_resultado):
    dataframe.loc[fila, columna] = resta_resultado

# AEMET

La función 'obtener_url_datos' simplifica la obtención de datos climáticos diarios para un periodo de 10 años al construir dinámicamente la URL con las fechas proporcionadas y gestionar la autenticación mediante la clave de API almacenada, mejorando la modularidad y legibilidad del código:

In [4]:
def obtener_url_datos(fecha_inicio, fecha_fin):
    base_url = 'https://opendata.aemet.es/opendata/api/valores/climatologicos/diarios/datos/'
    parametros = f'fechaini/{fecha_inicio}/fechafin/{fecha_fin}/todasestaciones'
    url = f'{base_url}{parametros}'
    
    with open('../api.txt', 'r') as file:
        api_key = file.read().strip()

    response = requests.get(url, headers={'api_key': api_key})
    data = response.json()

    if data['estado'] == 200:
        return data['datos']
    
    else:
        print(f"Error en la solicitud: {data['descripcion']}")
        return None

La siguiente función, se crea para realizar diversas transformaciones en las columnas especificadas y devuelve un nuevo DataFrame con los resultados procesados:

In [7]:
def procesar_mes(df_mes, columnas_procesar):

    for col in columnas_procesar:
        df_mes[col] = df_mes[col].str.replace(',', '.').replace('Acum', np.nan).replace('Ip', np.nan).astype(float)

    df_mes['fecha'] = pd.to_datetime(df_mes['fecha']).dt.to_period('M')
    
    df_media_mes = df_mes.groupby('provincia').agg({'fecha': 'first', 
                                                    **{col: 'mean' for col in columnas_procesar}}).reset_index()

    df_media_mes = df_media_mes.rename(columns={'fecha': 'periodo', 
                                                **{col: f'media_{col}' for col in columnas_procesar}})
    
    try:
        df_media_mes = df_media_mes[~df_media_mes['provincia'].isin(['CEUTA', 'MELILLA'])].reset_index(drop=True)
    except KeyError:
        pass  

    df_media_mes.columns = df_media_mes.columns.str.capitalize()

    df_media_mes['Provincia'] = df_media_mes['Provincia'].apply(lambda x: x.capitalize())

    return df_media_mes

La función 'procesar_datos_mes' toma una URL y una lista de columnas de interés como entrada, realiza una solicitud GET a esa URL para obtener datos de la API, y luego procesa esos datos utilizando la función 'procesar_mes':

In [10]:
def procesar_datos_mes(url, columnas_interes):
    
    response = requests.get(url)

    if response.status_code == 200:
        
        datos = response.json()
        df = pd.DataFrame(datos)
        df = df[columnas_interes]
        df_procesado = procesar_mes(df)

        return df_procesado

# CONEXIÓN MySQL

In [12]:
def crear_cadena_conexion(usuario, ruta_archivo_contraseña, host, puerto):
    with open(ruta_archivo_contraseña, 'r') as file:
        contraseña = file.read().strip()

    cadena_conexion = f'mysql+mysqlconnector://{usuario}:{contraseña}@{host}:{puerto}'

    return cadena_conexion

In [13]:
def crear_base_de_datos(cadena_conexion, nombre_bd):
    engine = create_engine(cadena_conexion)

    with engine.connect() as connection:
        connection.execute(f"CREATE DATABASE IF NOT EXISTS {nombre_bd}")

In [14]:
def cargar_csv_a_mysql(ruta_csv, cadena_conexion, nombre_tabla, nombre_bd):

    try:
        df = pd.read_csv(ruta_csv)
        
        engine = create_engine(cadena_conexion)
        
        with engine.connect() as connection:
            connection.execute(f"USE {nombre_bd}")
        
        df.to_sql(nombre_tabla, con=engine, if_exists='replace', index=False)

        print(f"Datos cargados en la tabla {nombre_tabla} con éxito.")

    except Exception as e:
        print(f"Error al cargar los datos en MySQL: {e}")