<h2 style="color:darkcyan;">1. Elaboración de una visión amplia del problema y selección de variables: </h2>

* Para este problema, se ha planteado obtener datos de informes diarios del precio del bitcoin y su historial y realizar predicciones con modelos de regresión y redes neuronales.

* Para ello los datos son obtenidos en tiempo real de un productor, almacenados en una estructura de Bases de  Datos y posteriormente procesados para acabar siendo usados para entrenar modelos de prediccion del precio.

En los siguientes apartados se explicará el proceso de almacenamiento y tratado de datos.


<h2 style="color:darkcyan;">2. Obtención y almacenamiento de los datos</h2>

Una vez tengamos el productor de datos en kafka, necesitaremos obtener los datos con un consumidor

Para ello tendremos que leer los datos del productor y almacenarlos posteriormente en BBDD

<h3 style="color:darkcyan;">2.1. Estructura de la obtención y almacenamiento de los datos</h3>

A continuación podemos ver un esquema de como quedaria el resultado de nuestra estructura de procesamiento de datos y ELT
<br><br><br>
[![esquema.png](https://i.postimg.cc/BbfXwQt1/esquema.png)](https://postimg.cc/9R1mD2dW)
<br><br><br>
* 1. Productor kafka, el cual produce nuestros datos e informes diarios del bitcoin en tiempo real
* 2. Consumidor kafka, el cual consume los datos del productor y mediante python inserta los datos estructurados y semiestructurados en PostgreSQL y MongoDB
* 3. PostgreSQL para almacenar los datos estructurados como timestamp, precio_btc, cambio, clima, etc.
* 4. MongoDB para almacenar los datos semiestructurados como los factores
* 5. Apache NIFI para aplicar ELT a los datos obtenidos de las BBDD y obtener un archivo .csv resultado


<h3 style="color:darkcyan;">2.2. Productor kafka</h3>

A continuacion explicamos la estructura programada para el productor kafka , el cual nos genera datos aleatorios diarios a corde con el precio del bitcoin.

Para empezar importamos las librerias necesarias

```python
import random
import time
import json
from datetime import datetime, timedelta
from kafka import KafkaProducer
```

Importamos:
 - tandom, para generar variacion
 - time, para pausar el proceso de produccion
 - json, para serializar los mensajes
 - datetime, para simular el avance de los dias
 - kafka, para enviar mensajes al productor

```python
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
```

KafkaProducer se conecta a nuestra maquina local de kafka y envia los datos en JSON

<h4 style="color:cyan;">2.2.1. Parametros y variables locales</h4>

```python
bitcoin_price = 30000
fecha_inicio = datetime(2009, 1, 1)
disipacion_diaria = 0.07
precio_historial = []
precio_historial_90d = []
dias_buffer = 3
ventana_max_min = 90
prev_minimo = None
dias_en_euforia = 0
max_historico = bitcoin_price
dias_rebote_fuerte = 0
```

Con estas variables:
 - Simulamos una evolucion del precio del bitcoin desde 2009.
 - Almacenamos precios para:
    - Detectar subidas/bajadas bruscas (`dias_buffer`).
    - Identificar maximos y minimos en los ultimos 90 dias (`ventana_max_min`).
 - Disipacion para que los factores pierdan peso con el tiempo.
 - Activamos emociones de mercado en funcion de eventos relacionados con el precio (`dias_en_euforia` y `dias_rebote_fuerte`).


<h4 style="color:cyan;">2.2.2. Factores que afectan al precio</h4>

```python
factors = {
    "tipos_interes": {"valor": 0.0, "peso": 2.0},
    "inflacion": {"valor": 0.0, "peso": 1.5},
    "adopcion_institucional": {"valor": 0.0, "peso": 3.0},
    "noticias": {"valor": 0.0, "peso": 2.5},
    "regulacion": {"valor": 0.0, "peso": 3.0},
    "halving": {"valor": 0.0, "peso": 2.5},
    "seguridad_tecnologica": {"valor": 0.0, "peso": 1.0},
    "oferta_demanda": {"valor": 0.0, "peso": 2.0},
    "sentimiento_mercado": {"valor": 0.0, "peso": 2.5}
}
```

#### Que es?
 - Diccionario de factores economicos y sociales.
 - Cada uno tiene:
    - `valor`: entre -1 y 1 (negativo, neutro o positivo).
    - `peso`: cuanto influye ese factor en el clima.

#### Intencion
Recrear la complejidad del mercado, simulamos variables economicas y sociales que afectan al precio. Asi evitamos un comportamiento completamente aleatorio

<h4 style="color:cyan;">2.2.3. Funcion cambiar_factor()</h4>

```python
def cambiar_factor():
    return random.choice([-1.0, 1.0])
```

Devuelve un nuevo estado aleatorio del factor entre `-1` y `1`, simulando cambios aleatorios en las condiciones del mercado que marcaran fuertemente la subida o bajada del precio del `BitCoin`.

<h4 style="color:cyan;">2.2.4. Funcion disipar(valor)</h4>

```python
def disipar(valor):
    if abs(valor) <= disipacion_diaria:
        return 0.0
    return round(valor - disipacion_diaria, 2) if valor > 0 else round(valor + disipacion_diaria, 2)
```

Reduce progresivamente la influencia de un factor que no ha cambiado. Simula como el impacto de un evento pierde fuerza con el tiempo.

<h4 style="color:cyan;">2.2.5. Funcion calcular_clima()</h4>

```python
def calcular_clima():
    return round(sum(f["valor"] * f["peso"] for f in factors.values()), 2)
```

#### Que hace?
Suma todos los factores por su valor para obtener un valor de `clima`.
#### Intencion principal:
Determinar si el ambiente economico global es positivo o negativo para ver si afecta positiva o negativamente al precio del `BitCoin`

<h4 style="color:cyan;">2.2.6. Funcion `evaluar_sentimiento_mercado()`</h4>

```python
def evaluar_sentimiento_mercado():
    global prev_minimo, dias_en_euforia, dias_rebote_fuerte
    sentimiento = 0.0

    # FOMO/FUD por cambios bruscos
    if len(precio_historial) >= dias_buffer:
        p_ini = precio_historial[-dias_buffer]
        p_actual = precio_historial[-1]
        variacion = ((p_actual - p_ini) / p_ini) * 100
        if variacion >= 10:
            sentimiento += 0.6
        elif variacion <= -10:
            sentimiento -= 0.6

    # Reacciones por maximos y minimos
    if len(precio_historial_90d) >= ventana_max_min:
        p_actual = precio_historial_90d[-1]
        max_90d = max(precio_historial_90d)
        min_90d = min(precio_historial_90d)

        if p_actual == max_90d and dias_en_euforia == 0:
            duracion_euforia = 3
            dias_en_euforia = duracion_euforia
            print(f"[INFO] Precio en maximo de 3 meses. Se activa euforia de venta ({duracion_euforia} dias)")

        if prev_minimo is None:
            prev_minimo = min_90d
        distancia_min = abs(p_actual - min_90d) / min_90d
        if distancia_min <= 0.01 and min_90d < prev_minimo:
            sentimiento -= 0.7
            prev_minimo = min_90d

    # Durante dias en euforia
    if dias_en_euforia > 0:
        dias_en_euforia -= 1
        if random.random() < 0.6:
            sentimiento -= 0.3
            print(f"[INFO] Dia de venta por euforia. Quedan {dias_en_euforia} dias.")
        else:
            sentimiento += 0.2
            print(f"[INFO] Dia de hype por maximos. Quedan {dias_en_euforia} dias.")

    # Rebote por estar infravalorado
    if dias_rebote_fuerte > 0:
        dias_rebote_fuerte -= 1
        sentimiento += 0.6
        print(f"[INFO] Dia de confianza por infravaloracion. Quedan {dias_rebote_fuerte} dias.")

    sentimiento = max(min(sentimiento, 1.0), -1.0)
    return round(sentimiento, 2)
```
#### Que hace?
Calcula el sentimiento de mercado segun:
 - Cambios bruscos (FOMO/FUD).
 - Euforia al alcanzar maximos (activando `hype` o `venta`).
 - Rebote por minimos historicos (inversores confian a largo plazo)

#### Intencion principal
Simular el factor emocional del mercado, asi creamos zonas de sobrecompra/venta, correciones y rebotes.

<h4 style="color:cyan;">2.2.7. Funcion `simular-evento(dia_num)`</h4>

```python
def simular_evento(dia_num):
    global bitcoin_price, max_historico, dias_rebote_fuerte

    fecha = (fecha_inicio + timedelta(days=dia_num)).strftime('%Y-%m-%d')

    # Sentimiento global
    sentimiento_actual = evaluar_sentimiento_mercado()
    if sentimiento_actual != 0.0:
        factors["sentimiento_mercado"]["valor"] = sentimiento_actual

    # Actualizacion de factores
    for f in factors:
        if f == "sentimiento_mercado" and sentimiento_actual != 0.0:
            continue
        if random.random() < 0.1:
            factors[f]["valor"] = cambiar_factor()
        else:
            factors[f]["valor"] = disipar(factors[f]["valor"])

    clima = calcular_clima()
    influencia = clima / (1 + abs(clima))

    base_cambio = random.randint(100, 1000)
    ruido = random.randint(-50, 50)
    tendencia_largo_plazo = random.randint(2, 5)

    cambio = int(base_cambio * influencia) + ruido + tendencia_largo_plazo
    bitcoin_price = max(100, bitcoin_price + cambio)

    # Guardar en historial
    precio_historial.append(bitcoin_price)
    precio_historial_90d.append(bitcoin_price)
    if len(precio_historial_90d) > ventana_max_min:
        precio_historial_90d.pop(0)

    # Actualizar maximo historico
    if bitcoin_price > max_historico:
        max_historico = bitcoin_price

    # Activar rebote si cae por debajo del 33% del maximo historico
    if bitcoin_price < 0.33 * max_historico and dias_rebote_fuerte == 0:
        dias_rebote_fuerte = 5
        print(f"[INFO] Precio extremadamente bajo (${bitcoin_price:.2f}). Se activa rebote por infravaloracion ({dias_rebote_fuerte} dias).")

    mensaje = {
    "timestamp": fecha,
    "precio_btc": round(bitcoin_price, 2),
    "cambio": cambio,
    "clima": clima,
    "tendencia_largo_plazo": tendencia_largo_plazo,
    "sentimiento_detectado": sentimiento_actual,
    "max_historico": round(max_historico, 2),
    "relacion_con_max": round(bitcoin_price / max_historico, 4) if max_historico > 0 else 0,
    "dias_en_euforia": dias_en_euforia,
    "dias_rebote_fuerte": dias_rebote_fuerte,
    "factores": {
        f: round(data["valor"], 2) for f, data in factors.items()
        }
    }


    return mensaje
```
Simula el comportamiento diario del `BitCoin`:
 1. Avanza la fecha.
 2. Evalua el sentimiento emocional del mercado.
 3. Actualiza factores.
 4. Calcula el cambio del dia (clima + ruido + tendencia).
 5. Guarda historial.
 6. Detecta maximos y minimos.
 7. Genera el mensaje JSON con la informacion.

#### Intencion principal
Crear un evento diario realista del mercado que pueda ser consumido por sistemas de `Big Data`. Refleja tanto logica economica como emocional.

<h4 style="color:cyan;">2.2.8. Contenido del mensaje JSON enviado a Kafka</h4>

```python
mensaje = {
    "timestamp": fecha,
    "precio_btc": round(bitcoin_price, 2),
    "cambio": cambio,
    "clima": clima,
    "tendencia_largo_plazo": tendencia_largo_plazo,
    "sentimiento_detectado": sentimiento_actual,
    "max_historico": round(max_historico, 2),
    "relacion_con_max": round(bitcoin_price / max_historico, 4),
    "dias_en_euforia": dias_en_euforia,
    "dias_rebote_fuerte": dias_rebote_fuerte,
    "factores": {
        f: round(data["valor"], 2) for f, data in factors.items()
    }
}
```

Con esto emitimos el estado completo del mercado en un dia.

<h4 style="color:cyan;">2.2.9. Bucle principal</h4>

```python
dia = 0
try:
    while True:
        mensaje = simular_evento(dia)
        producer.send("btc_topic", value=mensaje)
        print(f"Dia {dia} | {mensaje}")
        dia += 1
        time.sleep(0.3)
```
Simula un nuevo dia cada 0.3s y lo envia a Kafka.

### Por que este metodo?
Hemos optado por simular un mercado que siga una logica economica y emocional coherente, en lugar de generar datos completamente aleatorios. Asi creamos un entorno de datos mas realista donde los precios reaccionan a factores externos y donde la IA pueda comprender el comportamiento de nuestro mercado simulado, creemos que asi las predicciones seran mas eficientes.

<h3 style="color:darkcyan;">2.3. Consumidor kafka</h3>

Ahora mostraremos el funcionamiento del consumidor kafka, el cual obtiene los datos del productor y los almacena en las bases de datos, pero antes debemos crear previamente las bases de datos donde almacenar los datos, para ello hemos preparado este docker.yml:

```yml
version: '3.8'
services:
  postgres:
    image: postgres:15
    container_name: btc_postgres
    restart: always
    environment:
      POSTGRES_USER: admin
      POSTGRES_PASSWORD: admin
      POSTGRES_DB: bitcoin
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  mongo:
    image: mongo:6
    container_name: btc_mongo
    restart: always
    ports:
      - "27017:27017"
    volumes:
      - mongodata:/data/db

volumes:
  pgdata:
  mongodata:
```
<h4 style="color:cyan;">2.3.1. Configuracion consumidor</h4>

```python
from kafka import KafkaConsumer
import json
import psycopg2
from pymongo import MongoClient

# Configuración Kafka
consumer = KafkaConsumer(
    'btc_topic',
    bootstrap_servers='185.228.67.7:9092',
    auto_offset_reset='latest',
    group_id='btc_remoto',
    value_deserializer=lambda m: m.decode('utf-8')
)
# Configuramos los parametros para conectarnos al productor
```


<h4 style="color:cyan;">2.3.2. Conexiones a bases de datos</h4>

```python
# Conexión PostgreSQL
pg_conn = psycopg2.connect(dbname="bitcoin", user="admin", password="admin", host="localhost")
pg_cursor = pg_conn.cursor()

# Conexión MongoDB
mongo_client = MongoClient('mongodb://localhost:27017/')
mongo_db = mongo_client['bitcoin']
mongo_collection = mongo_db['factores']
```

<h4 style="color:cyan;">2.3.3. Insertado de registro en las tablas</h4>

```python
for message in consumer:
    data = json.loads(message.value)
    
    try:
        # Insertar en PostgreSQL
        pg_cursor.execute("""
            INSERT INTO btc_data (timestamp, precio_btc, cambio, clima, tendencia_largo_plazo,
                                  sentimiento_detectado, max_historico, relacion_con_max,
                                  dias_en_euforia, dias_rebote_fuerte)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            """,
            (data['timestamp'], data['precio_btc'], data['cambio'], data['clima'],
             data['tendencia_largo_plazo'], data['sentimiento_detectado'], data['max_historico'],
             data['relacion_con_max'], data['dias_en_euforia'], data['dias_rebote_fuerte'])
        )
        pg_conn.commit()
        print(f"Registro insertado en PostgreSQL: {data['timestamp']}")
    except Exception as e:
        print(f"Error al insertar en PostgreSQL: {e}")
    
    try:
        # Insertar en MongoDB el campo "factores"
        mongo_collection.insert_one({
            "timestamp": data['timestamp'],
            "factores": data['factores']
        })
        print(f"Registro insertado en MongoDB: {data['timestamp']}")
    except Exception as e:
        print(f"Error al insertar en MongoDB: {e}")
```

<h4 style="color:cyan;">2.3.4. EStructura de PostgreSQL y MongoDB</h4>

A continuacion monstramos la estructura de PostgreSQL para los datos estructurados:

```sql
CREATE TABLE btc_data (
    id SERIAL PRIMARY KEY,
    timestamp DATE NOT NULL,
    precio_btc INTEGER NOT NULL,
    cambio INTEGER NOT NULL,
    clima REAL NOT NULL,
    tendencia_largo_plazo INTEGER NOT NULL,
    sentimiento_detectado REAL NOT NULL,
    max_historico INTEGER NOT NULL,
    relacion_con_max REAL NOT NULL,
    dias_en_euforia INTEGER NOT NULL,
    dias_rebote_fuerte INTEGER NOT NULL
);
```
Y aqui tenemos la estructura de datos en MOngoDB
```json
{
  "timestamp": "2067-06-12",
  "factores": {
    "tipos_interes": -0.93,
    "inflacion": -0.93,
    "adopcion_institucional": 0.0,
    "noticias": 1.0,
    "regulacion": -0.51,
    "halving": 0.72,
    "seguridad_tecnologica": 0.86,
    "oferta_demanda": -0.44,
    "sentimiento_mercado": 0.0
  }
}
```

<h4 style="color:cyan;">2.3.5. Visualización de datos en las BBDD</h4>

Una vez estructuradas las bases de datos y ejecutado el código de productor y consumidor, los datos son almacenados en sus respectivas BBDD.

Para cada una de ellas , han sido almacenadas en un contenedor docker, podemos visualizar los datos previamente:

* Estructura en PostgreSQL
<br><br><br>
[![Captura-desde-2025-05-21-14-39-04.png](https://i.postimg.cc/8z1dkDNS/Captura-desde-2025-05-21-14-39-04.png)](https://postimg.cc/zVcHpsyt)
<br><br><br>
* Estructura en MongoDB
<br><br><br>
[![Captura-desde-2025-05-21-14-39-54.png](https://i.postimg.cc/8zPWQHmk/Captura-desde-2025-05-21-14-39-54.png)](https://postimg.cc/qzSg22FW)
<br><br><br>

<h3 style="color:darkcyan;">2.4. Apache nifi</h3>

Una vez tengamos los datos almacenados, es hora de aplicar ELT, transformarlos y aplanarlos para poder obtener un archivo y poder entrenar nuestro algoritmo

<h4 style="color:cyan;">2.4.1. Conexiones a BBDD</h4>

* Para empezar, creamos un nuevo proceso en nifi y configuramos las conexiones a las BBDD como tenemos en la imagen inferior, en nuestro caso configuramos con los datos e acceso que hemos creado en nuestras imagenes de docker.

* Tambien configuramos los distintos procesos que enseñaremos posteriormente en la estructura que hemos usado

[![image.png](https://i.postimg.cc/DzqK0dMY/image.png)](https://postimg.cc/VSLhHM1j)

<h4 style="color:cyan;">2.4.2. Estructura de proceso</h4>

Para nuestra estructura, necesitamos que los datos sean obtenidos de las 2 bases de datos (PostgreSQL para datos estructurados y MongoDB para factores que puedan variar) y posteriormente juntarlos y obtener un archivo único con para la elaboración de los modelos ML.

El orden de proceso que hemos usado son los siguientes:

* GetMongo

 Extrae los registros desde la colección de MongoDB.

 * ForkEnrichment

 Divide los datos en dos rutas: Una se va a JoinEnrichment y otra se va a la rama de enriquecimiento SQL (splits → SplitRecord).

* ExecuteSQLRecord

Ejecuta una consulta SQL para obtener los factores del bitcoin para posteriormente unirlos a los datos de MongoDB.

* SplitRecord

Divide el resultado del SQL en registros individuales para que puedan ser procesados por separado.

* JoinEnrichment

Une los datos originales de MongoDB con los factores obtenidos de PostgreSQL a través de la fecha.

* MergeRecord

Agrupa todos los registros enriquecidos en una sola estructura Json.

* PutFile

Guarda el resultado final en un archivo local en formato .cvs (posibilidad de modificar formato).

[![image2.png](https://i.postimg.cc/KjYX9Xvd/image2.png)](https://postimg.cc/0MhHQBSf)

Una vez realizado el proceso, obtendremos un archivo .csv con todos los registros listos para tratar, analizar y ser usado para ML de forma local.