In [1]:
# Importar librerías necesarias
import pandas as pd
import numpy as np
import requests  # Para consumir APIs
import os        # Para gestionar archivos y rutas
from datetime import datetime  # Manejo de fechas y horas
import time
import pyarrow
import fastparquet

# Configuración inicial
print("Entorno configurado para pipeline de datos")

Entorno configurado para pipeline de datos


In [2]:
# Rutas del proyecto
DATA_DIR = "../data/"
API_DATA_DIR = os.path.join(DATA_DIR, "api_data/")
KAGGLE_DATA_DIR = os.path.join(DATA_DIR, "kaggle_data/")

# Crear directorios si no existen
os.makedirs(API_DATA_DIR, exist_ok=True)
os.makedirs(KAGGLE_DATA_DIR, exist_ok=True)

API_KEY = "YO3SULHD55PCVOEF" 
BASE_URL = "https://www.alphavantage.co/query"

print(f"Rutas creadas:\n- Datos de API: {API_DATA_DIR}\n- Datos de Kaggle: {KAGGLE_DATA_DIR}")

Rutas creadas:
- Datos de API: ../data/api_data/
- Datos de Kaggle: ../data/kaggle_data/


In [None]:
# Función genérica para obtener datos de Alpha Vantage
def get_alpha_data(function, params={}):
    params.update({"apikey": API_KEY, "function": function})
    response = requests.get(BASE_URL, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        print(f"Error al obtener {function}: {response.status_code}")
        return None

# Función para obtener tasas de interés
def get_interest_rates():
    data = get_alpha_data("FEDERAL_FUNDS_RATE")
    if data:
        return pd.DataFrame(data.get("data", []))
    return None

# Función para obtener datos de inflación
def get_inflation():
    data = get_alpha_data("INFLATION")
    if data:
        return pd.DataFrame(data.get("data", []))
    return None

# Función para obtener índices bursátiles (por ejemplo, S&P 500)
def get_index_data(index_symbol):
    params = {"symbol": index_symbol, "outputsize": "full"}
    data = get_alpha_data("TIME_SERIES_DAILY", params)
    if data and "Time Series (Daily)" in data:
        df = pd.DataFrame(data["Time Series (Daily)"]).T
        df = df.rename(columns={
            "1. open": "Open",
            "2. high": "High",
            "3. low": "Low",
            "4. close": "Close",
            "5. volume": "Volume"
        })
        df.index = pd.to_datetime(df.index)
        return df
    return None

# Función para obtener tipos de cambio
def get_exchange_rates(from_currency="USD", to_currency="EUR"):
    params = {"from_symbol": from_currency, "to_symbol": to_currency}
    data = get_alpha_data("FX_DAILY", params)
    if data and "Time Series FX (Daily)" in data:
        df = pd.DataFrame(data["Time Series FX (Daily)"]).T
        df.index = pd.to_datetime(df.index)
        return df
    return None


# Pipeline de ejecución
def execute_pipeline():

    # Ruta relativa a la carpeta de data
    external_path = "../data/api_data/"

    # Tasas de interés
    interest_rates = get_interest_rates()
    if interest_rates is not None:
        file_path = os.path.join(external_path, "tasas_interes.parquet")
        interest_rates.to_parquet(file_path, index=False)
        print("Tasas de interés guardadas.")

    # Inflación
    inflation = get_inflation()
    if inflation is not None:
        file_path = os.path.join(external_path, "inflacion.parquet")
        inflation.to_parquet(file_path, index=False)
        print("Inflación guardada.")

    # Tipos de cambio USD/EUR
    exchange_rates = get_exchange_rates()
    if exchange_rates is not None:
        file_path = os.path.join(external_path, "tipos_cambio_usd_eur.parquet")
        exchange_rates.to_parquet(file_path)
        print("Tipos de cambio guardados.")

    print("Pipeline ejecutado con éxito.")



# Ejecutar el pipeline
execute_pipeline()

Tasas de interés guardadas.
Inflación guardada.
Tipos de cambio guardados.
Pipeline ejecutado con éxito.
