In [2]:
# Instalación de librerías necesarias
!pip install kafka-python pymongo pyspark




In [46]:
import time
import json
from kafka import KafkaProducer
from kafka import KafkaConsumer
from pymongo import MongoClient
import requests
from datetime import datetime

# Configuración de Kafka
# Tópico de Kafka donde se publicarán los mensajes 
KAFKA_TOPIC = 'data-crypto'

# Servidor Bootstrap de Kafka
# Especifica la dirección del clúster de Kafka al que se conectará el productor
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

# Productor de Kafka
# Configuración del productor responsable de enviar datos al tópico de Kafka
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,  # Dirección del clúster de Kafka
    value_serializer=lambda v: json.dumps(v).encode('utf-8')  # Serializa los datos en formato JSON antes de enviarlos
)

# Configuración de MongoDB
MONGO_URI = 'mongodb://localhost:27017'  # URL para conectarse a MongoDB
DATABASE_NAME = 'crypto_db'              # Nombre de la base de datos
COLLECTION_NAME = 'crypto_data'          # Nombre de la colección

# Conexión a MongoDB
client = MongoClient(MONGO_URI)          # Establecer la conexión con el servidor de MongoDB
db = client[DATABASE_NAME]               # Seleccionar la base de datos
collection = db[COLLECTION_NAME]         # Seleccionar la colección

print(f"Conexión a MongoDB establecida con la colección: {COLLECTION_NAME}")


# Configuración de la API de CoinGecko

# URL base de la API
# Endpoint que proporciona los precios actuales de las criptomonedas seleccionadas
COINGECKO_API_URL = "https://api.coingecko.com/api/v3/simple/price"

# Criptomonedas seleccionadas para el análisis
# Identificadores de CoinGecko para Bitcoin, Ethereum y Binance Coin
CRYPTOCURRENCIES = ["bitcoin", "ethereum", "binancecoin"]  

# Moneda de referencia para los precios
CURRENCY = "usd"  

# Función para obtener datos desde la API
def fetch_crypto_data():
     """
    Obtiene los precios actuales de las criptomonedas seleccionadas desde la API.

    Proceso:
    - Realiza una solicitud GET al endpoint de CoinGecko con los parámetros configurados.
    - Retorna los datos en formato JSON si la solicitud es exitosa.
    - Si ocurre un error, imprime un mensaje en la consola y retorna None.

    Retorno:
    - dict: Contiene los precios de las criptomonedas en USD si la solicitud es exitosa.
    - None: En caso de fallo en la conexión o error en la API.
    """
    try:
        # Realizar solicitud GET a la API
        response = requests.get(
            COINGECKO_API_URL, # URL base de la API
            params={
                "ids": ",".join(CRYPTOCURRENCIES), # Lista de criptomonedas separadas por comas
                "vs_currencies": CURRENCY # Moneda de referencia
            }
        )
        # Verificar si la solicitud fue exitosa
        if response.status_code == 200:
            return response.json() # Retornar datos en formato JSON
        else:
            # Imprimir mensaje de error si la solicitud falla
            print(f"Error en la API de CoinGecko: {response.status_code}")
            return None
    except Exception as e:
        # Manejar excepciones en caso de fallo en la conexión
        print(f"Error al conectar con la API: {e}")
        return None

# Función principal que integra la recolección de datos, transmisión a Kafka y almacenamiento en MongoDB
def collect_and_store_data():
    """
    Recolecta los datos de los precios de  las criptomonedas desde la API de CoinGecko, 
    los transmite en tiempo real al tópico de Kafka y los almacena en MongoDB.

    Proceso:
    1. Obtiene datos en formato JSON desde la API.
    2. Transforma los datos en registros estructurados.
    3. Envía los registros a Kafka.
    4. Inserta los registros en una base de datos MongoDB.

    Este proceso se repite cada 10 segundos hasta que se interrumpe manualmente.
    """
    try:
        while True:
            # Obtener datos de la API
            crypto_data = fetch_crypto_data()
            if crypto_data:
                for crypto, data in crypto_data.items():
                    # Crear el registro con la información recolectada
                    record = {
                        "currency": crypto.capitalize(),
                        "price": data["usd"],
                        "timestamp": datetime.utcnow().isoformat()
                    }
                    
                    # Enviar los datos a Kafka
                    producer.send(KAFKA_TOPIC, value=record)
                    print(f"Enviado a Kafka: {record}")
                    
                    # Almacenar los datos en MongoDB
                    collection.insert_one(record)
                    print(f"Guardado en MongoDB: {record}")
            
            # Intervalo entre recolecciones
            time.sleep(10)
    except KeyboardInterrupt:
        print("\nRecolección detenida por el usuario.")
    except Exception as e:
        print(f"Error durante la recolección: {e}")
    finally:
        # Cierre de conexiones
        producer.close()
        client.close()
        print("Conexiones cerradas.")

# Iniciar el proceso
if __name__ == "__main__":
    collect_and_store_data()


Conexión a MongoDB establecida con la colección: crypto_data_collect
Error durante la recolección: KafkaTimeoutError: Failed to update metadata after 60.0 secs.
Conexiones cerradas.
