In [4]:
import os
import requests
import pandas as pd
import sqlite3
from datetime import datetime, timedelta

class CryptoDataExtractor:
    """
    Script para extraer datos de criptomonedas de la API CoinAPI.

    Argumentos:
        api_key (str): Clave de API para acceder a los datos de CoinAPI.

    Atributos:
        base_url (str): URL base de la API de CoinAPI.
        api_key (str): Clave de API para acceder a los datos de CoinAPI.
        data_directory (str): Directorio donde se guardarán los datos extraídos.

    Metodos:
        fetch_data_from_api(endpoint):
            Obtiene datos de la API de CoinAPI para un endpoint dado.
        convert_to_dataframe(data):
            Convierte los datos obtenidos de la API en un DataFrame de Pandas.
        save_to_parquet(df, directory, filename):
            Guarda los datos en formato Parquet en el directorio especificado.
        create_table_in_database(df, table_name):
            Crea una tabla en la base de datos SQLite con los datos proporcionados.
        extract_full_data():
            Extrae y guarda los datos completos de criptomonedas.
        extract_incremental_data():
            Extrae y guarda los datos incrementales de criptomonedas desde el día anterior.
    """
    def __init__(self, api_key):
        self.base_url = 'https://rest.coinapi.io/v1'
        self.api_key = api_key
        self.data_directory = 'crypto_data_lake'

    def fetch_data_from_api(self, endpoint):
        url = self.base_url + endpoint
        headers = {'X-CoinAPI-Key': self.api_key}
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data = response.json()
            return data
        else:
            print(f"Error al obtener datos del endpoint {endpoint}")
            return None

    def convert_to_dataframe(self, data):
        if data:
            df = pd.DataFrame(data)
            return df
        else:
            return None

    def save_to_parquet(self, df, directory, filename):
        if not os.path.exists(directory):
            os.makedirs(directory)
        file_path = os.path.join(directory, filename)
        if df is not None:
            df.to_parquet(file_path, engine='pyarrow')
            print(f"Datos guardados correctamente como archivo Parquet en {file_path}.")
        else:
            print("No hay datos para guardar.")

    def create_table_in_database(self, df, table_name):
        conn = sqlite3.connect('crypto_data.db')
        df.to_sql(table_name, conn, if_exists='replace', index=False)
        conn.close()
        print(f"Tabla '{table_name}' creada en la base de datos 'crypto_data.db'.")

    def _create_date_directory(self, date):
        year = str(date.year)
        month = str(date.month).zfill(2)
        day = str(date.day).zfill(2)
        hour = str(date.hour).zfill(2)
        directory = os.path.join(self.data_directory, 'crypto_incremental_data', year, month, day, hour)
        if not os.path.exists(directory):
            os.makedirs(directory)
        return directory

    def extract_full_data(self):
        print("Iniciando extracción completa de datos de criptomonedas...")
        metadata = self.fetch_data_from_api('/exchanges')
        metadata_df = self.convert_to_dataframe(metadata)
        self.save_to_parquet(metadata_df, os.path.join(self.data_directory, 'crypto_metadata'), 'crypto_metadata.parquet')

        historical_data = self.fetch_data_from_api('/ohlcv/BITSTAMP_SPOT_BTC_USD/latest?period_id=1DAY')
        historical_df = self.convert_to_dataframe(historical_data)
        self.save_to_parquet(historical_df, os.path.join(self.data_directory, 'crypto_historical_data'), 'crypto_historical_data.parquet')

        # Crear tabla en la base de datos con los datos históricos
        self.create_table_in_database(historical_df, 'crypto_historical_data')
        print("Extracción completa de datos finalizada.")

    def extract_incremental_data(self):
        print("Iniciando extracción incremental de datos de criptomonedas...")
        # Supongamos que la API proporciona datos diarios actualizados una vez al día
        yesterday_date = datetime.now() - timedelta(days=1)
        formatted_date = yesterday_date.strftime('%Y-%m-%d')
        updated_data = self.fetch_data_from_api(f'/ohlcv/BITSTAMP_SPOT_BTC_USD/history?time_start={formatted_date}T00:00:00')
        updated_df = self.convert_to_dataframe(updated_data)
        if updated_df is not None and not updated_df.empty:  # Verifica si hay datos y si no está vacío
            directory = self._create_date_directory(yesterday_date)
            self.save_to_parquet(updated_df, directory, 'crypto_incremental_data.parquet')
            # Crear tabla en la base de datos con los datos actualizados
            self.create_table_in_database(updated_df, f'crypto_daily_data_{formatted_date.replace("-", "_")}')
            print("Extracción incremental de datos finalizada.")
        else:
            print("No se encontraron nuevos datos para extraer.")

def main():
    api_key = '4ADF70B7-F58E-4089-AE0D-958AE7D10440'
    extractor = CryptoDataExtractor(api_key)
    extractor.extract_full_data()  # Extracción completa
    extractor.extract_incremental_data()  # Extracción incremental

if __name__ == "__main__":
    main()



Iniciando extracción completa de datos de criptomonedas...
Datos guardados correctamente como archivo Parquet en crypto_data_lake\crypto_metadata\crypto_metadata.parquet.
Datos guardados correctamente como archivo Parquet en crypto_data_lake\crypto_historical_data\crypto_historical_data.parquet.
Tabla 'crypto_historical_data' creada en la base de datos 'crypto_data.db'.
Extracción completa de datos finalizada.
Iniciando extracción incremental de datos de criptomonedas...
Error al obtener datos del endpoint /ohlcv/BITSTAMP_SPOT_BTC_USD/history?time_start=2024-03-13T00:00:00
No se encontraron nuevos datos para extraer.
