
### TensorFlow-IO

[TensorFlow-IO](https://github.com/tensorflow/io) es una extensión de TensorFlow que proporciona funciones y utilidades para leer y escribir datos desde y hacia diferentes fuentes de datos.

- **Interoperabilidad de formatos de datos**: TensorFlow-IO permite trabajar con una amplia variedad de formatos de datos, incluidos CSV, JSON, TFRecord, Parquet, y más. Esto es útil en tareas de preparación de datos para entrenar modelos de machine learning.

- **Soporte para sistemas de archivos**: Puede acceder a datos almacenados en sistemas de archivos locales o distribuidos, como HDFS (Hadoop Distributed File System) y Amazon S3.

- **Integración con TensorFlow**: Al ser una extensión de TensorFlow, TensorFlow-IO se integra sin problemas con el ecosistema de TensorFlow, lo que facilita la carga y preprocesamiento de datos para entrenar modelos de machine learning con TensorFlow.

- **Conectividad con bases de datos**: También ofrece conectividad con bases de datos populares, como MySQL, PostgreSQL y Apache Cassandra, lo que permite la lectura y escritura de datos directamente desde y hacia estas bases de datos.

### Kafka-Python

[kafka-python](https://github.com/dpkp/kafka-python) es una biblioteca de Python que proporciona funcionalidad para interactuar con [Apache Kafka](https://kafka.apache.org/), un sistema de mensajería distribuida ampliamente utilizado en el procesamiento de datos en tiempo real.

- **Productor y consumidor Kafka**: kafka-python permite crear productores y consumidores de Kafka en Python, lo que facilita la publicación y la suscripción a flujos de datos en tiempo real.

- **Fácil integración**: La biblioteca está diseñada para ser fácil de usar y ofrece una interfaz Pythonic para interactuar con Kafka. Esto hace que sea conveniente para desarrolladores de Python que deseen trabajar con Kafka.

- **Soporte para SSL y autenticación**: kafka-python admite la configuración de conexiones seguras a clústeres de Kafka mediante SSL y ofrece opciones para la autenticación, lo que es esencial en entornos de producción.

- **Alta disponibilidad y escalabilidad**: Apache Kafka es conocido por su capacidad de escalar horizontalmente y garantizar la alta disponibilidad de datos. Kafka-python facilita el acceso a estas características.



In [None]:
!pip install tensorflow-io
!pip install kafka-python

Collecting tensorflow-io
  Downloading tensorflow_io-0.37.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (14 kB)
Downloading tensorflow_io-0.37.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (49.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.6/49.6 MB[0m [31m14.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tensorflow-io
Successfully installed tensorflow-io-0.37.1
Collecting kafka-python
  Downloading kafka_python-2.1.5-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading kafka_python-2.1.5-py2.py3-none-any.whl (285 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m285.4/285.4 kB[0m [31m9.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.1.5


```markdown
# Importación de paquetes necesarios
```

Esta línea indica que se están importando los paquetes y módulos necesarios para el código. Estos paquetes se utilizan a lo largo del código para diversas tareas.

```python
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
```

Los paquetes importados son:

- `os`: Proporciona funciones para interactuar con el sistema operativo, como la manipulación de rutas de archivos y directorios.
- `datetime`: Proporciona clases para trabajar con fechas y horas.
- `time`: Proporciona funciones relacionadas con el tiempo, como pausas y medidas de tiempo.
- `threading`: Permite trabajar con hilos (threads) para realizar tareas en paralelo.
- `json`: Ofrece funciones para trabajar con el formato JSON (JavaScript Object Notation).
- `KafkaProducer` y `KafkaError` de `kafka`: Estos módulos son parte de la biblioteca Kafka para la comunicación y procesamiento de datos a través de Kafka, una plataforma de transmisión de datos.
- `train_test_split` de `sklearn.model_selection`: Se utiliza para dividir conjuntos de datos en conjuntos de entrenamiento y prueba.
- `pandas` como `pd`: Es una biblioteca para la manipulación y análisis de datos en Python.
- `tensorflow` como `tf`: Un marco de trabajo de código abierto para aprendizaje automático y aprendizaje profundo.
- `tensorflow_io` como `tfio`: Una extensión de TensorFlow para entrada y salida de datos.

```python
# Validar las versiones de tensorflow y tensorflow-io
print("Versión de tensorflow-io: {}".format(tfio.__version__))
print("Versión de tensorflow: {}".format(tf.__version__))
```

Estas líneas imprimen en la consola las versiones de TensorFlow y TensorFlow-IO que están instaladas en el entorno. Esto es útil para asegurarse de que se están utilizando las versiones correctas de las bibliotecas.

```python
# Descargar y configurar instancias de Kafka y Zookeeper
```

Esta línea es un comentario que indica que a continuación se realizará la descarga y configuración de instancias de Kafka y Zookeeper. No es código ejecutable.

```bash
!curl -sSOL https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
```

Esta línea descarga el archivo binario de Kafka versión 3.5.1 utilizando el comando `curl`. El `!` al principio indica que se trata de un comando que se ejecutará en la línea de comandos.

```bash
!tar -xzf kafka_2.13-3.5.1.tgz
```

Esta línea descomprime el archivo tgz descargado utilizando el comando `tar`.

```bash
!./kafka_2.13-3.5.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.5.1/config/zookeeper.properties
```

Esta línea inicia un servidor Zookeeper en modo daemon utilizando el script `zookeeper-server-start.sh` y se le proporciona una configuración en `zookeeper.properties`.

```bash
!./kafka_2.13-3.5.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.5.1/config/server.properties
```

Esta línea inicia un servidor Kafka en modo daemon utilizando el script `kafka-server-start.sh` y se le proporciona una configuración en `server.properties`.

```bash
!echo "Esperando 10 segundos hasta que los servicios de kafka y zookeeper estén activos"
```

Este comando imprime un mensaje en la consola para indicar que se espera durante 10 segundos hasta que los servicios de Kafka y Zookeeper estén activos.

```bash
!sleep 10
```

Este comando pausa la ejecución del script durante 10 segundos utilizando el comando `sleep`.

```bash
!ps -ef | grep kafka
```

Este comando muestra información sobre los procesos que contienen la palabra "kafka" en su nombre o descripción utilizando el comando `ps` y `grep`.

In [None]:
# Importación de paquetes necesarios
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio

# Validar las versiones de tensorflow y tensorflow-io
print("Versión de tensorflow-io: {}".format(tfio.__version__))
print("Versión de tensorflow: {}".format(tf.__version__))

# Descargar y configurar instancias de Kafka y Zookeeper
!curl -sSOL https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
!tar -xzf kafka_2.13-3.5.1.tgz
!./kafka_2.13-3.5.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.5.1/config/zookeeper.properties
!./kafka_2.13-3.5.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.5.1/config/server.properties
!echo "Esperando 10 segundos hasta que los servicios de kafka y zookeeper estén activos"
!sleep 10
!ps -ef | grep kafka




caused by: ['/usr/local/lib/python3.11/dist-packages/tensorflow_io/python/ops/libtensorflow_io_plugins.so: undefined symbol: _ZN3tsl8str_util8EndsWithESt17basic_string_viewIcSt11char_traitsIcEES4_']
caused by: ['/usr/local/lib/python3.11/dist-packages/tensorflow_io/python/ops/libtensorflow_io.so: undefined symbol: _ZN3tsl8str_util9LowercaseB5cxx11ESt17basic_string_viewIcSt11char_traitsIcEE']


Versión de tensorflow-io: 0.37.1
Versión de tensorflow: 2.18.0

gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now
/bin/bash: line 1: ./kafka_2.13-3.5.1/bin/zookeeper-server-start.sh: No such file or directory
/bin/bash: line 1: ./kafka_2.13-3.5.1/bin/kafka-server-start.sh: No such file or directory
Esperando 10 segundos hasta que los servicios de kafka y zookeeper estén activos
root        1386    1082  0 15:20 ?        00:00:00 /bin/bash -c ps -ef | grep kafka
root        1388    1386  0 15:20 ?        00:00:00 grep kafka


1. `Versión de tensorflow-io: 0.34.0` y `Versión de tensorflow: 2.13.0`: Estas líneas indican las versiones de TensorFlow y TensorFlow-IO que se están utilizando en este entorno. TensorFlow es un marco de código abierto para aprendizaje automático y TensorFlow-IO es una extensión de TensorFlow que proporciona soporte para diferentes formatos de datos.

2. `Esperando 10 segundos hasta que los servicios de kafka y zookeeper estén activos`: Esta línea indica que se está esperando un período de 10 segundos antes de que los servicios de Kafka y Zookeeper estén completamente activos. Esto se hace para dar tiempo a que los servicios se inicialicen y estén listos para su uso antes de realizar cualquier operación con ellos.

3. A continuación, se enumeran dos procesos Java relacionados con Kafka:

   - `java -Xmx512M -Xms512M ... org.apache.zookeeper.server.quorum.QuorumPeerMain ./kafka_2.13-3.5.1/config/zookeeper.properties`: Este proceso corresponde a ZooKeeper, que es un servicio necesario para el funcionamiento de Kafka. ZooKeeper se utiliza para la gestión de clústeres y la coordinación de líderes en Kafka.

   - `java -Xmx1G -Xms1G ... kafka.Kafka ./kafka_2.13-3.5.1/config/server.properties`: Este proceso corresponde al servidor de Kafka en sí. Utiliza la configuración definida en `server.properties` para iniciar el servidor de Kafka.

4. Finalmente, hay dos procesos de búsqueda que utilizan el comando `ps -ef | grep kafka` para buscar cualquier proceso que contenga la palabra "kafka". Estos procesos son:

   - `/bin/bash -c ps -ef | grep kafka`: Es el proceso que ejecuta el comando `ps -ef | grep kafka` y busca procesos relacionados con Kafka.

   - `grep kafka`: Es el proceso de búsqueda en sí, que busca la cadena "kafka" en la lista de procesos generada por el comando `ps -ef`.

La salida muestra que se están ejecutando dos procesos Java relacionados con Kafka (el servidor de Kafka y ZooKeeper), y también muestra los procesos de búsqueda que buscan procesos relacionados con Kafka en la lista de procesos. Esto sugiere que Kafka y ZooKeeper están en funcionamiento en este entorno.

El conjunto de datos SUSY (SuperSymmetry) es un conjunto de datos utilizado en física de partículas para la identificación de partículas supersimétricas. Las columnas en este conjunto de datos representan diversas características o propiedades medidas en eventos de colisiones de partículas.

1. `class`: Esta columna generalmente contiene una etiqueta que indica la clase o tipo del evento. En el contexto de física de partículas, podría indicar si el evento es "señal" (representa la producción de partículas supersimétricas) o "fondo" (representa eventos de fondo no interesantes).

2. `lepton_1_pT`: La cantidad de momento transversal (pT) del primer leptón en el evento. El pT es una medida de la cantidad de momento de una partícula en la dirección perpendicular al haz de partículas.

3. `lepton_1_eta`: La pseudorrapidez del primer leptón en el evento. La pseudorrapidez es una medida relacionada con el ángulo de dispersión de una partícula.

4. `lepton_1_phi`: El ángulo azimutal del primer leptón en el evento. Indica la dirección en el plano transversal al haz.

5. `lepton_2_pT`: Similar a `lepton_1_pT`, pero para el segundo leptón en el evento.

6. `lepton_2_eta`: Similar a `lepton_1_eta`, pero para el segundo leptón.

7. `lepton_2_phi`: Similar a `lepton_1_phi`, pero para el segundo leptón.

8. `missing_energy_magnitude`: La magnitud de la energía faltante en el evento. Esto a menudo se debe a partículas invisibles o que no interactúan con el detector.

9. `missing_energy_phi`: El ángulo azimutal de la energía faltante en el evento.

10. `MET_rel`: Una medida relacionada con la energía transversal faltante en el evento.

11. `axial_MET`: Otra medida relacionada con la energía transversal faltante.

12. `M_R`: La masa transversal reducida.

13. `M_TR_2`: Otra medida relacionada con la masa transversal.

14. `R`: Una variable relacionada con la relación entre el momento transversal de dos objetos en el evento.

15. `MT2`: Una variable relacionada con la transferencia de masa transversal.

16. `S_R`: Una variable relacionada con la masa transversal reducida.

17. `M_Delta_R`: Una medida relacionada con la diferencia en la rapidez transversal entre dos objetos en el evento.

18. `dPhi_r_b`: El ángulo azimutal entre dos objetos en el evento.

19. `cos(theta_r1)`: El coseno del ángulo theta relacionado con el evento.

### Descargar el conjunto de datos SUSY

La primera parte del código es un comentario que indica que se está descargando el conjunto de datos SUSY de alguna fuente externa. Se utiliza el comando `curl` para descargar el archivo. El símbolo `!` al principio de la línea indica que se ejecutará un comando de shell desde el entorno de Python para descargar el archivo. El archivo se descarga con el nombre "SUSY.csv.gz".

```markdown
# Descargar el conjunto de datos SUSY
!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
```

### Definición de las columnas del conjunto de datos

A continuación, se define una lista llamada `COLUMNS` que contiene los nombres de las columnas del conjunto de datos SUSY. Estos nombres se utilizarán más adelante para especificar cómo se deben leer los datos del archivo CSV descargado.

```markdown
COLUMNS = [
    'class',
    'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi',
    'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi',
    'missing_energy_magnitude', 'missing_energy_phi',
    'MET_rel', 'axial_MET', 'M_R', 'M_TR_2',
    'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)'
]
```

### Cargar el conjunto de datos

En esta sección, se carga el conjunto de datos SUSY desde el archivo CSV descargado en una estructura de datos de pandas llamada `DataFrame`. Se utiliza `pd.read_csv` para leer el archivo CSV y se especifica que no hay una fila de encabezado en el archivo CSV original (usando `header=None`). Luego, se asignan los nombres de las columnas definidos anteriormente (`COLUMNS`) al conjunto de datos. Finalmente, el conjunto de datos se lee en chunks (trozos) de 100,000 filas a la vez.

```markdown
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
```

### Dividir el conjunto de datos en entrenamiento y prueba

En esta sección, el conjunto de datos se divide en conjuntos de entrenamiento y prueba. Se utiliza `train_test_split` de scikit-learn para realizar esta división. El conjunto de entrenamiento se toma como el 60% del conjunto de datos original (`test_size=0.4`), y se realiza un barajado aleatorio de los datos (`shuffle=True`). Luego, los conjuntos de características (atributos) se separan de las etiquetas (la columna "class") tanto para el conjunto de entrenamiento como para el conjunto de prueba.

```markdown
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]
```

### Convertir el conjunto de datos en listas para Kafka

En esta sección, los DataFrames de pandas se convierten en listas de Python para su posterior uso con Kafka. Esto se hace utilizando el método `to_csv` de pandas para convertir cada DataFrame en una cadena CSV y luego dividir esa cadena en líneas para obtener una lista de valores.

```markdown
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
```

### Función para almacenar datos en Kafka

En esta sección, se define una función llamada `write_to_kafka` que se utiliza para escribir datos en un servidor Kafka. Se utiliza la biblioteca `KafkaProducer` para interactuar con Kafka. La función toma el nombre del tema Kafka y una lista de tuplas `items`, donde cada tupla contiene un mensaje y una clave. La función itera a través de los elementos y los envía al tema Kafka.

```markdown
def write_to_kafka(topic_name, items):
    count = 0
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
    for message, key in items:
        producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8'))
        count += 1
    producer.flush()
    print("Escribió {0} mensajes en el tema: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
```

Esta función se llama dos veces al final del código para escribir los conjuntos de datos de entrenamiento y prueba en los temas Kafka "susy-train" y "susy-test", respectivamente.

Para que este código funcione correctamente, se debe tener un servidor Kafka en funcionamiento en la dirección `127.0.0.1:9092`, y se deben instalar las bibliotecas pandas y kafka-python en el entorno de Python. Además, este código asume que ya tienes configurado Kafka y los temas "susy-train" y "susy-test" existen en tu servidor Kafka.

In [None]:
# Descargar el conjunto de datos SUSY
!curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz

# Definición de las columnas del conjunto de datos
COLUMNS = [
    'class',
    'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi',
    'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi',
    'missing_energy_magnitude', 'missing_energy_phi',
    'MET_rel', 'axial_MET', 'M_R', 'M_TR_2',
    'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)'
]

# Cargar el conjunto de datos
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)

# Dividir el conjunto de datos en entrenamiento y prueba
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]

# Convertir el conjunto de datos en listas para kafka
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns)

# Función para almacenar datos en kafka
def write_to_kafka(topic_name, items):
    count = 0
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
    for message, key in items:
        producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8'))
        count += 1
    producer.flush()
    print("Escribió {0} mensajes en el tema: {1}".format(count, topic_name))

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))


ERROR:kafka.conn:<BrokerConnection client_id=kafka-python-producer-2, node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connect attempt returned error 111. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=kafka-python-producer-2, node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
ERROR:kafka.conn:<BrokerConnection client_id=kafka-python-producer-2, node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connect attempt returned error 111. Disconnecting.
ERROR:kafka.conn:<BrokerConnection client_id=kafka-python-producer-2, node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED
ERROR:kafka.conn:<BrokerConnection client_id=kafka-python-producer-2, node_id=bootstrap-0 host=127.0.0.1:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connect attempt retu

NoBrokersAvailable: NoBrokersAvailable



```
Escribió 60000 mensajes en el tema: susy-train
Escribió 40000 mensajes en el tema: susy-test
```

Esto indica que se escribieron un total de 60,000 mensajes en el tema llamado "susy-train" y 40,000 mensajes en el tema llamado "susy-test" utilizando la función `write_to_kafka` en el código anterior. Estos números representan la cantidad de mensajes escritos con éxito en cada uno de los dos temas de Kafka mencionados.

#`confluent-kafka`

**`confluent-kafka`**

La biblioteca `confluent-kafka` es una biblioteca de Python que proporciona una interfaz para interactuar con Apache Kafka, una plataforma de transmisión de datos de código abierto ampliamente utilizada para la gestión y el procesamiento de flujos de datos en tiempo real. `confluent-kafka` es una biblioteca popular para trabajar con Kafka debido a su eficiencia y capacidad para aprovechar las características avanzadas de Kafka.

- **Interacción con Kafka:** `confluent-kafka` permite a los desarrolladores interactuar con clústeres de Kafka para producir y consumir mensajes, gestionar grupos de consumidores, y realizar tareas administrativas, como la creación y configuración de temas.

- **Eficiencia:** Está diseñada para ser eficiente y de alto rendimiento, lo que la hace adecuada para aplicaciones de procesamiento de datos en tiempo real que requieren baja latencia.

- **Soporte para la API de Kafka:** La biblioteca proporciona soporte completo para las APIs de Kafka, incluyendo la API de productor y la API de consumidor. Esto permite a los desarrolladores escribir aplicaciones que publiquen y consuman mensajes en Kafka de manera eficiente.

- **Integración con Confluent Platform:** `confluent-kafka` está diseñada para funcionar bien con Confluent Platform, una plataforma de gestión y mejora de Kafka que agrega funcionalidades adicionales a Kafka, como la gestión de esquemas y la replicación multi-datos.

- **Configuración Flexible:** Ofrece una amplia gama de opciones de configuración para adaptarse a las necesidades específicas de tu aplicación y del clúster de Kafka en el que estás trabajando.

- **Documentación Rica:** La biblioteca cuenta con documentación detallada y ejemplos de uso, lo que facilita a los desarrolladores aprender a usarla y resolver problemas.

- **Comunidad Activa:** `confluent-kafka` es mantenido por la comunidad y tiene una base de usuarios activa, lo que significa que es probable que encuentres soporte y actualizaciones regulares.

In [None]:
pip install confluent-kafka


```python
import matplotlib.pyplot as plt
import seaborn as sns

# Configura el estilo de Seaborn para las visualizaciones
sns.set_style("whitegrid")
```

- En este bloque de código, se importan las bibliotecas `matplotlib.pyplot` y `seaborn` para realizar visualizaciones de datos. Luego, se configura el estilo de Seaborn para que las visualizaciones tengan un fondo blanco con líneas de cuadrícula.

```python
# 1. Distribución de la clase objetivo.
plt.figure(figsize=(7, 5))
sns.countplot(data=susy_df, x="class")
plt.title("Distribución de la clase objetivo")
plt.show()
```

1. Se crea una figura de Matplotlib con un tamaño de 7x5 pulgadas.
2. Se utiliza `sns.countplot()` de Seaborn para crear un gráfico de barras que muestra la distribución de la clase objetivo ("class") en el conjunto de datos `susy_df`.
3. Se agrega un título al gráfico.
4. Se muestra el gráfico utilizando `plt.show()`. Este gráfico muestra cuántas instancias pertenecen a cada clase en el conjunto de datos.

```python
# 2. Histogramas de algunas características.
# Elegimos visualizar las primeras 4 características, pero puedes cambiar este número como desees.
selected_features = COLUMNS[1:5]
for feature in selected_features:
    plt.figure(figsize=(7, 5))
    sns.histplot(data=susy_df, x=feature, bins=50, kde=True)
    plt.title(f"Distribución de {feature}")
    plt.show()
```

1. Se crea una lista llamada `selected_features` que contiene los nombres de las características (columnas) que se desean visualizar en histogramas. En este caso, se eligen las características del índice 1 al 4 del conjunto de datos, como se indica en `COLUMNS[1:5]`.
2. Se inicia un bucle `for` que recorre cada una de las características seleccionadas.
3. Para cada característica, se crea una figura de Matplotlib de tamaño 7x5 pulgadas.
4. Se utiliza `sns.histplot()` de Seaborn para crear un histograma de la característica actual. Se especifica `x=feature` para seleccionar la columna correspondiente en el DataFrame `susy_df`. Se utilizan 50 contenedores (bins) y se agrega una estimación de densidad kernel (kde) para visualizar la distribución de datos de manera suave.
5. Se agrega un título al histograma que muestra el nombre de la característica actual.
6. Se muestra el histograma utilizando `plt.show()`. Esto genera un conjunto de histogramas, uno para cada característica seleccionada, que representan la distribución de valores en esas características.

```python
# 3. Matriz de correlación entre características.
correlation_matrix = susy_df[selected_features].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", linewidths=.5)
plt.title("Matriz de Correlación")
plt.show()
```

1. Se calcula una matriz de correlación entre las características seleccionadas utilizando el método `.corr()` aplicado al subconjunto de datos `susy_df[selected_features]`. Esto mide la relación lineal entre pares de características.
2. Se crea una figura de Matplotlib de tamaño 10x8 pulgadas para la visualización de la matriz de correlación.
3. Se utiliza `sns.heatmap()` de Seaborn para mostrar la matriz de correlación como un mapa de calor. Los valores de correlación se muestran como anotaciones en las celdas, y se utiliza la paleta de colores "coolwarm" para representar los valores de correlación. También se agregan líneas de cuadrícula para facilitar la lectura.
4. Se agrega un título a la visualización que indica que es una "Matriz de Correlación".
5. Finalmente, se muestra la visualización con `plt.show()`. Esto muestra un mapa de calor que representa las relaciones de correlación entre las características seleccionadas.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Configura el estilo de Seaborn para las visualizaciones
sns.set_style("whitegrid")

# 1. Distribución de la clase objetivo.
plt.figure(figsize=(7, 5))
sns.countplot(data=susy_df, x="class")
plt.title("Distribución de la clase objetivo")
plt.show()

# 2. Histogramas de algunas características.
# Elegimos visualizar las primeras 4 características, pero puedes cambiar este número como desees.
selected_features = COLUMNS[1:5]
for feature in selected_features:
    plt.figure(figsize=(7, 5))
    sns.histplot(data=susy_df, x=feature, bins=50, kde=True)
    plt.title(f"Distribución de {feature}")
    plt.show()

# 3. Matriz de correlación entre características.
correlation_matrix = susy_df[selected_features].corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", linewidths=.5)
plt.title("Matriz de Correlación")
plt.show()


## Arquitectura de la Red Neuronal

Supongamos que tenemos una red neuronal con la siguiente estructura:

- **Capa de Entrada (Input Layer):** La capa de entrada consta de $n$ neuronas, una para cada característica de entrada. Las entradas se denotan como $x_1, x_2, \ldots, x_n$.

- **Capa Oculta (Hidden Layer):** En esta capa, tenemos $m$ neuronas. Las activaciones de la capa oculta se denotan como $a_1, a_2, \ldots, a_m$.

- **Capa de Salida (Output Layer):** Para clasificación binaria, generalmente se utiliza una única neurona de salida, cuya activación se denota como $y$.

## Fórmulas Matemáticas

### Paso 1: Cálculo de la Capa Oculta

1. **Combinación Lineal Ponderada en la Capa Oculta:**

   Para cada neurona oculta $j$, calculamos la combinación lineal ponderada de las entradas de la capa de entrada utilizando los pesos $w_{ij}$ y agregamos un sesgo $b_j$:

   $$
   z_j = \sum_{i=1}^{n} w_{ij} \cdot x_i + b_j, \quad \text{para } j = 1, 2, \ldots, m
   $$

2. **Función de Activación en la Capa Oculta:**

   Luego, aplicamos una función de activación no lineal $\sigma(\cdot)$, como la función sigmoide o ReLU, a la combinación lineal para obtener la activación de la neurona oculta $a_j$:

   $$
   a_j = \sigma(z_j), \quad \text{para } j = 1, 2, \ldots, m
   $$

### Paso 2: Cálculo de la Capa de Salida

1. **Combinación Lineal Ponderada en la Capa de Salida:**

   Para la neurona de salida única, calculamos la combinación lineal ponderada de las activaciones de la capa oculta utilizando el peso $v_j$ y el sesgo $c$:

   $$
   u = \sum_{j=1}^{m} v_j \cdot a_j + c
   $$

2. **Función de Activación en la Capa de Salida:**

   Para clasificación binaria, utilizamos la función de activación sigmoide $\sigma(\cdot)$ en la capa de salida para obtener la probabilidad de que la entrada pertenezca a la clase positiva (clase 1):

   $$
   y = \sigma(u)
   $$

### Paso 3: Función de Pérdida

La función de pérdida mide la discrepancia entre la salida de la red ($y$) y las etiquetas verdaderas ($y_{\text{verdadero}}$) en el conjunto de datos de entrenamiento. Para clasificación binaria, la función de pérdida comúnmente utilizada es la Entropía Cruzada Binaria (Binary Cross-Entropy):

$$
L(y_{\text{verdadero}}, y) = -\left[y_{\text{verdadero}} \cdot \log(y) + (1 - y_{\text{verdadero}}) \cdot \log(1 - y)\right]
$$

Claro, ahora matematicemos el proceso de optimización utilizando el algoritmo de optimización Adam (Adaptive Moment Estimation) en el contexto de una red neuronal artificial para tareas de clasificación binaria. Adam es un método de optimización ampliamente utilizado que se adapta dinámicamente las tasas de aprendizaje para cada parámetro. Aquí se muestra cómo se incorpora Adam en las ecuaciones matemáticas:

## Optimización con Adam

### Paso 4: Aprendizaje con Adam

El objetivo del aprendizaje es minimizar la función de pérdida $L$ ajustando los pesos ($w_{ij}$, $v_j$) y los sesgos ($b_j$, $c$) de la red. En lugar de actualizar directamente los parámetros utilizando el descenso de gradiente, utilizamos el algoritmo de optimización Adam, que mantiene dos promedios móviles exponenciales de los gradientes ($m_t$ y $v_t$) y ajusta la tasa de aprendizaje en función de estos promedios.

Supongamos que $t$ es el paso de tiempo actual, y $t$ varía de 1 hasta el número total de pasos de entrenamiento.

1. **Inicialización de Parámetros de Adam:**

   Inicializamos los promedios móviles $m_0$ y $v_0$ a cero y configuramos los hiperparámetros de Adam, como la tasa de aprendizaje $\alpha$, los coeficientes de decaimiento exponencial $\beta_1$ y $\beta_2$.

2. **Cálculo de Gradientes:**

   Calculamos los gradientes de la función de pérdida con respecto a los parámetros de la red:

   - Para los pesos y sesgos de la capa oculta:
     $$
     \nabla w_{ij}^t, \nabla b_j^t
     $$

   - Para los pesos y sesgos de la capa de salida:
     $$
     \nabla v_j^t, \nabla c^t
     $$

3. **Actualización de Promedios Móviles:**

   Actualizamos los promedios móviles exponenciales de los gradientes:

   - Para cada parámetro $p$ (donde $p$ representa un conjunto de pesos o sesgos):

     $$
     m_t^p = \beta_1 \cdot m_{t-1}^p + (1 - \beta_1) \cdot \nabla p^t
     v_t^p = \beta_2 \cdot v_{t-1}^p + (1 - \beta_2) \cdot (\nabla p^t)^2
     $$

4. **Corrección de Sesgo:**

   Corregimos los promedios móviles sesgados inicialmente:

   - Para cada parámetro $p$:

     $$
     \hat{m}_t^p = \frac{m_t^p}{1 - \beta_1^t}
     \hat{v}_t^p = \frac{v_t^p}{1 - \beta_2^t}
     $$

5. **Actualización de Parámetros:**

   Actualizamos los parámetros de la red utilizando los promedios móviles corregidos y la tasa de aprendizaje:

   - Para cada parámetro $p$:
   
     $$
     p^{t+1} = p^t - \alpha \cdot \frac{\hat{m}_t^p}{\sqrt{\hat{v}_t^p} + \epsilon}
     $$

   Donde $\epsilon$ es una pequeña constante para evitar la división por cero.

6. **Repetición:**

   Repetimos los pasos 2 a 5 durante un número predeterminado de pasos de entrenamiento.

# Modelo

Supongamos que tienes una entrada $X$ de dimensión $(batch\_size, input\_size)$, donde $batch\_size$ es el tamaño del lote (batch size) y $input\_size$ es la dimensión de las características de entrada.

1. **Primera capa densa (dense_16):**

   - Entrada: $X$
   - Pesos: $W^{(1)}$, donde $W^{(1)}$ es una matriz de pesos de dimensión $(input\_size, 128)$.
   - Sesgos: $b^{(1)}$, donde $b^{(1)}$ es un vector de sesgo de dimensión $(128,)$.
   - Función de activación: ReLU ($\sigma$).
   - Salida: $A^{(1)} = \sigma(X \cdot W^{(1)} + b^{(1)})$ de dimensión $(batch\_size, 128)$.

2. **Capa de Dropout (dropout_12):**

   - Aplicación de dropout con una tasa de dropout específica para desactivar aleatoriamente algunas unidades en $A^{(1)}$.

3. **Segunda capa densa (dense_17):**

   - Entrada: $A^{(1)}$ (la salida de la capa anterior).
   - Pesos: $W^{(2)}$, donde $W^{(2)}$ es una matriz de pesos de dimensión $(128, 256)$.
   - Sesgos: $b^{(2)}$, donde $b^{(2)}$ es un vector de sesgo de dimensión $(256,)$.
   - Función de activación: ReLU ($\sigma$).
   - Salida: $A^{(2)} = \sigma(A^{(1)} \cdot W^{(2)} + b^{(2)})$ de dimensión $(batch\_size, 256)$.

4. **Capa de Dropout (dropout_13):**

   - Aplicación de dropout con una tasa de dropout específica para desactivar aleatoriamente algunas unidades en $A^{(2)}$.

5. **Tercera capa densa (dense_18):**

   - Entrada: $A^{(2)}$ (la salida de la capa anterior).
   - Pesos: $W^{(3)}$, donde $W^{(3)}$ es una matriz de pesos de dimensión $(256, 128)$.
   - Sesgos: $b^{(3)}$, donde $b^{(3)}$ es un vector de sesgo de dimensión $(128,)$.
   - Función de activación: ReLU ($\sigma$).
   - Salida: $A^{(3)} = \sigma(A^{(2)} \cdot W^{(3)} + b^{(3)})$ de dimensión $(batch\_size, 128)$.

6. **Capa de Dropout (dropout_14):**

   - Aplicación de dropout con una tasa de dropout específica para desactivar aleatoriamente algunas unidades en $A^{(3)}$.

7. **Última capa densa (dense_19):**

   - Entrada: $A^{(3)}$ (la salida de la capa anterior).
   - Pesos: $W^{(4)}$, donde $W^{(4)}$ es una matriz de pesos de dimensión $(128, 1)$.
   - Sesgo: $b^{(4)}$, donde $b^{(4)}$ es un valor único que actúa como sesgo.
   - Función de activación: Sigmoide ($\sigma$).
   - Salida: $A^{(4)} = \sigma(A^{(3)} \cdot W^{(4)} + b^{(4)})$ de dimensión $(batch\_size, 1)$.

```python
import pandas as pd
from sklearn.model_selection import train_test_split
from kafka import KafkaProducer
import tensorflow as tf
import tensorflow_io as tfio
```

1. Se importan las bibliotecas necesarias: `pandas` para manejar datos tabulares, `train_test_split` de `sklearn.model_selection` para dividir el conjunto de datos, `KafkaProducer` para escribir datos en Kafka, `tensorflow` para crear y entrenar modelos de aprendizaje automático, y `tensorflow_io` para interactuar con Kafka.

```python
# Configuraciones
COLUMNS = [
    'class', 'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi',
    'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi',
    'missing_energy_magnitude', 'missing_energy_phi',
    'MET_rel', 'axial_MET', 'M_R', 'M_TR_2',
    'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)'
]
BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64
OPTIMIZER = "adam"
LOSS = tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS = ['accuracy']
EPOCHS = 10
```

2. Se definen diversas configuraciones y constantes utilizadas en el código, como el nombre de las columnas del conjunto de datos, el tamaño del lote (batch), el tamaño del búfer de mezcla, el optimizador a utilizar en el modelo, la función de pérdida y las métricas a seguir durante el entrenamiento, y el número de épocas de entrenamiento.

```python
def download_and_load_data():
    # Descargar el conjunto de datos SUSY
    !curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
    susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
    return next(susy_iterator)
```

3. La función `download_and_load_data()` se encarga de descargar el conjunto de datos SUSY de una URL y cargarlo en un DataFrame de Pandas utilizando `pd.read_csv()`. Los datos se leen en bloques (chunks) de 100,000 filas y la primera parte se devuelve como resultado.

```python
def split_data(df):
    train_df, test_df = train_test_split(df, test_size=0.4, shuffle=True)
    x_train_df = train_df.drop(["class"], axis=1)
    y_train_df = train_df["class"]
    x_test_df = test_df.drop(["class"], axis=1)
    y_test_df = test_df["class"]
    return x_train_df, y_train_df, x_test_df, y_test_df
```

4. La función `split_data(df)` toma un DataFrame como entrada y lo divide en un conjunto de entrenamiento (`train_df`) y un conjunto de prueba (`test_df`) utilizando `train_test_split`. Luego, separa las características y las etiquetas en conjuntos separados.

```python
def convert_to_kafka_lists(df):
    return list(filter(None, df.to_csv(index=False).split("\n")[1:]))
```

5. La función `convert_to_kafka_lists(df)` toma un DataFrame, lo convierte en un archivo CSV y luego lo divide en una lista de líneas. Se omite la primera línea que contiene los encabezados y se filtran las líneas vacías.

```python
def write_to_kafka(topic_name, items):
    count = 0
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
    for message, key in items:
        producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8'))
        count += 1
    producer.flush()
    print(f"Escribió {count} mensajes en el tema: {topic_name}")
```

6. La función `write_to_kafka(topic_name, items)` toma una lista de elementos (pares de mensaje y clave) y los envía al servidor Kafka especificado en `bootstrap_servers`. Luego, se imprime el número de mensajes escritos en el tema especificado.

```python
def decode_kafka_item(item, num_columns):
    message = tf.io.decode_csv(item.message, [[0.0] for _ in range(num_columns)])
    key = tf.strings.to_number(item.key)
    return (message, key)
```

7. La función `decode_kafka_item(item, num_columns)` toma un elemento Kafka (que contiene un mensaje y una clave) y decodifica el mensaje CSV utilizando TensorFlow. También convierte la clave en un número y devuelve una tupla que contiene el mensaje y la clave.

```python
def build_model(input_shape):
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=input_shape),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.Dropout(0.4),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.4),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
    return model
```

8. La función `build_model(input_shape)` crea un modelo de red neuronal utilizando TensorFlow. El modelo es secuencial y consta de capas densas con funciones de activación ReLU y capas de dropout para evitar el sobreajuste. El modelo se compila con el optimizador y la función de pérdida especificados.

```python
# Carga y preparación de datos
susy_df = download_and_load_data()
x_train_df, y_train_df, x_test_df, y_test_df = split_data(susy_df)
NUM_COLUMNS = len(x_train_df.columns)
x_train = convert_to_kafka_lists(x_train_df)
y_train = convert_to_kafka_lists(y_train_df)
x_test = convert_to_kafka_lists(x_test_df)
y_test = convert_to_kafka_lists(y_test_df)
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
```

9. Se descarga y carga el conjunto de datos SUSY utilizando la función `download_and_load_data()`. Luego, se divide en conjuntos de entrenamiento y prueba utilizando `split_data()`. Además, se preparan los datos para ser escritos en Kafka y se escriben en los temas "susy-train" y "susy-test" utilizando `write_to_kafka()`.

```python
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0).shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map

(lambda item: decode_kafka_item(item, NUM_COLUMNS)).batch(BATCH_SIZE)
```

10. Se crea un conjunto de datos TensorFlow `train_ds` utilizando `tfio.IODataset.from_kafka()`. Se especifica el tema "susy-train", la partición 0 y el offset 0. Luego, se aplica una mezcla aleatoria (`shuffle`) con el tamaño de búfer especificado y se mapea cada elemento utilizando la función `decode_kafka_item()`. Finalmente, los datos se agrupan en lotes con el tamaño especificado.

```python
# Construcción y entrenamiento del modelo
model = build_model((NUM_COLUMNS,))
model.fit(train_ds, epochs=EPOCHS)
```

11. Se construye el modelo de red neuronal utilizando `build_model()` y se le pasa la forma de entrada `(NUM_COLUMNS,)`. Luego, el modelo se entrena con el conjunto de datos `train_ds` durante el número de épocas especificado en `EPOCHS`.

In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from kafka import KafkaProducer
import tensorflow as tf
import tensorflow_io as tfio

# Configuraciones
COLUMNS = [
    'class',
    'lepton_1_pT', 'lepton_1_eta', 'lepton_1_phi',
    'lepton_2_pT', 'lepton_2_eta', 'lepton_2_phi',
    'missing_energy_magnitude', 'missing_energy_phi',
    'MET_rel', 'axial_MET', 'M_R', 'M_TR_2',
    'R', 'MT2', 'S_R', 'M_Delta_R', 'dPhi_r_b', 'cos(theta_r1)'
]
BATCH_SIZE = 64
SHUFFLE_BUFFER_SIZE = 64
OPTIMIZER = "adam"
LOSS = tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS = ['accuracy']
EPOCHS = 10

def download_and_load_data():
    # Descargar el conjunto de datos SUSY
    !curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
    susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
    return next(susy_iterator)

def split_data(df):
    train_df, test_df = train_test_split(df, test_size=0.4, shuffle=True)
    x_train_df = train_df.drop(["class"], axis=1)
    y_train_df = train_df["class"]
    x_test_df = test_df.drop(["class"], axis=1)
    y_test_df = test_df["class"]

    return x_train_df, y_train_df, x_test_df, y_test_df

def convert_to_kafka_lists(df):
    return list(filter(None, df.to_csv(index=False).split("\n")[1:]))

def write_to_kafka(topic_name, items):
    count = 0
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
    for message, key in items:
        producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8'))
        count += 1
    producer.flush()
    print(f"Escribió {count} mensajes en el tema: {topic_name}")

def decode_kafka_item(item, num_columns):
    message = tf.io.decode_csv(item.message, [[0.0] for _ in range(num_columns)])
    key = tf.strings.to_number(item.key)
    return (message, key)

def build_model(input_shape):
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=input_shape),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(256, activation='relu'),
        tf.keras.layers.Dropout(0.4),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.4),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
    return model

# Carga y preparación de datos
susy_df = download_and_load_data()
x_train_df, y_train_df, x_test_df, y_test_df = split_data(susy_df)
NUM_COLUMNS = len(x_train_df.columns)

x_train = convert_to_kafka_lists(x_train_df)
y_train = convert_to_kafka_lists(y_train_df)
x_test = convert_to_kafka_lists(x_test_df)
y_test = convert_to_kafka_lists(y_test_df)

write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))

train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0).shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(lambda item: decode_kafka_item(item, NUM_COLUMNS)).batch(BATCH_SIZE)

# Construcción y entrenamiento del modelo
model = build_model((NUM_COLUMNS,))
model.fit(train_ds, epochs=EPOCHS)

1. **Epoch 1/10**:
   - **loss**: 0.4600: En la primera época, la pérdida promedio del modelo es aproximadamente 0.4600. Esto significa que, en promedio, las predicciones del modelo están alejadas del valor real en 0.4600 unidades. La pérdida es una medida de cuán bien se ajusta el modelo a los datos de entrenamiento. Una pérdida más baja indica un mejor ajuste.
   - **accuracy**: 0.7851: La precisión promedio en la primera época es aproximadamente 0.7851. Esto significa que el modelo acertó alrededor del 78.51% de las predicciones en los datos de entrenamiento durante esta época.

2. **Epoch 2/10**:
   - **loss**: 0.4461: En la segunda época, la pérdida promedio disminuye a aproximadamente 0.4461. Esto indica que el modelo está mejorando su capacidad para ajustarse a los datos y está haciendo predicciones más cercanas a los valores reales.
   - **accuracy**: 0.7936: La precisión promedio en la segunda época es aproximadamente 0.7936, lo que sugiere un aumento en la precisión con respecto a la primera época.

3. Las mismas interpretaciones se aplican a las épocas subsiguientes (Epoch 3/10, Epoch 4/10, ...) donde se observa una tendencia general de disminución de la pérdida y aumento de la precisión. Esto indica que el modelo está convergiendo hacia un mejor ajuste de los datos a medida que avanza el entrenamiento.

4. **Epoch 10/10**:
   - **loss**: 0.4373: En la última época (décima), la pérdida promedio es aproximadamente 0.4373, lo que sugiere una mejora continua del modelo.
   - **accuracy**: 0.7980: La precisión promedio en la décima época es aproximadamente 0.7980, lo que indica que el modelo ha logrado una precisión del 79.80% en los datos de entrenamiento.

In [None]:
model.summary()


Aquí un descripción detallada de un modelo de red neuronal que consta de múltiples capas densas y capas de dropout. Está diseñado para tareas de clasificación binaria y tiene un total de 68,481 parámetros ajustables. La información proporcionada aquí es útil para comprender la arquitectura del modelo y la cantidad de parámetros que se deben entrenar.

1. **Modelo:** Este es el nombre del modelo, en este caso, se llama "sequential_4". La numeración (en este caso, "_4") se agrega automáticamente si ya hay otros modelos con el mismo nombre en el entorno.

2. **Capas:** A continuación, se enumeran todas las capas del modelo junto con su tipo. En este caso, todas las capas son del tipo "Dense", que son capas completamente conectadas. Esto significa que cada neurona en una capa está conectada a todas las neuronas en la capa anterior. Además, cada capa tiene un número que indica el orden en el que se agregaron al modelo.

   - **dense_16:** La primera capa densa (completamente conectada) tiene 128 neuronas. El número entre paréntesis "(None, 128)" indica la forma de la salida de esta capa. "None" significa que el tamaño del lote (batch size) puede variar y "128" es la dimensión de la salida de esta capa.

   - **dropout_12:** Esta es una capa de "Dropout" con 128 unidades, que ayuda a prevenir el sobreajuste al desactivar aleatoriamente algunas unidades durante el entrenamiento.

   - **dense_17:** La segunda capa densa tiene 256 neuronas. Al igual que antes, "(None, 256)" indica la forma de la salida.

   - **dropout_13:** Otra capa de "Dropout" con 256 unidades.

   - **dense_18:** La tercera capa densa tiene 128 neuronas.

   - **dropout_14:** Otra capa de "Dropout" con 128 unidades.

   - **dense_19:** La última capa densa tiene solo 1 neurona, lo que sugiere que este modelo está diseñado para tareas de clasificación binaria, como problemas de clasificación binaria.

3. **Total de parámetros:** Esto muestra la cantidad total de parámetros en el modelo, que incluye los pesos y los sesgos de todas las capas. En este caso, hay 68,481 parámetros en total. También se muestra el tamaño en kilobytes (KB) de estos parámetros.

4. **Parámetros entrenables:** Esta es la misma cantidad que el "Total de parámetros". Son los parámetros que se ajustan durante el entrenamiento del modelo.

5. **Parámetros no entrenables:** Esto muestra la cantidad de parámetros que no se ajustan durante el entrenamiento. En este caso, son 0.

El siguiente código se encarga de visualizar el historial de entrenamiento de un modelo de aprendizaje automático utilizando la biblioteca `matplotlib` y `seaborn`. El historial de entrenamiento contiene información sobre la pérdida (loss) y la precisión (accuracy) del modelo en cada época del entrenamiento. El código genera dos gráficos: uno para mostrar la evolución de la pérdida y otro para mostrar la evolución de la precisión a lo largo de las épocas.

```python
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, roc_curve, auc
```
Se importan las bibliotecas necesarias para realizar las visualizaciones y calcular métricas adicionales, como la matriz de confusión y la curva ROC.

```python
# Historial de entrenamiento
# Asumiendo que 'history' es el objeto devuelto por model.fit()
history = model.fit(train_ds, epochs=EPOCHS)
```
Aquí, se asume que ya se ha entrenado un modelo (`model`) utilizando el conjunto de datos de entrenamiento (`train_ds`) durante un número especificado de épocas (`EPOCHS`). El historial de entrenamiento se almacena en la variable `history`, y contiene información sobre las métricas de pérdida y precisión en cada época.

```python
plt.figure(figsize=(12, 4))
```
Se crea una figura de Matplotlib con un tamaño de 12x4 pulgadas para acomodar dos subgráficos uno al lado del otro.

```python
plt.subplot(1, 2, 1)
```
Se crea el primer subgráfico en una cuadrícula de 1 fila y 2 columnas, y se selecciona el primer subgráfico para realizar la visualización de la pérdida.

```python
plt.plot(history.history['loss'], label='Train Loss')
```
Se traza la evolución de la pérdida en el conjunto de datos de entrenamiento. Se utiliza `history.history['loss']` para obtener los valores de pérdida en cada época y se etiqueta la curva como "Train Loss".

```python
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
```
Se establece el título del subgráfico como "Training Loss" (Pérdida de Entrenamiento) y se etiquetan los ejes x e y con "Epochs" (Épocas) y "Loss" (Pérdida), respectivamente. También se agrega una leyenda para identificar la curva.

```python
plt.subplot(1, 2, 2)
```
Se crea el segundo subgráfico en la misma cuadrícula de 1 fila y 2 columnas, y se selecciona para realizar la visualización de la precisión.

```python
plt.plot(history.history['accuracy'], label='Train Accuracy')
```
Se traza la evolución de la precisión en el conjunto de datos de entrenamiento. Se utiliza `history.history['accuracy']` para obtener los valores de precisión en cada época y se etiqueta la curva como "Train Accuracy" (Precisión de Entrenamiento).

```python
plt.title('Training Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
```
Se establece el título del subgráfico como "Training Accuracy" (Precisión de Entrenamiento) y se etiquetan los ejes x e y con "Epochs" (Épocas) y "Accuracy" (Precisión), respectivamente. También se agrega una leyenda para identificar la curva.

```python
plt.tight_layout()
```
Se utiliza `plt.tight_layout()` para asegurarse de que los subgráficos estén bien ajustados en la figura y no haya superposiciones.

```python
plt.show()
```
Se muestra la figura que contiene los dos subgráficos que representan la evolución de la pérdida y la precisión durante el entrenamiento del modelo. Esto proporciona una visualización útil para evaluar el rendimiento del modelo a lo largo de las épocas.

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, roc_curve, auc

# Historial de entrenamiento
# Asumiendo que 'history' es el objeto devuelto por model.fit()
history = model.fit(train_ds, epochs=EPOCHS)

plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.title('Training Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()

El siguiente código se utiliza para entrenar y evaluar un modelo en línea utilizando datos de Kafka. Permite la ingestión continua de datos de entrenamiento y evaluación a medida que llegan a través de Kafka, lo que es útil para aplicaciones en tiempo real.

```python
def decode_kafka_item(raw_message, raw_key, num_columns):
    message = tf.io.decode_csv(raw_message, [[0.0] for i in range(num_columns)])
    key = tf.strings.to_number(raw_key)
    return (message, key)
```

- `decode_kafka_item`: Esta función toma dos argumentos, `raw_message` y `raw_key`, que representan un mensaje y una clave de Kafka. Su objetivo es decodificar el mensaje CSV y la clave.
  - `raw_message`: El mensaje en formato CSV que se va a decodificar.
  - `raw_key`: La clave asociada al mensaje.
  - `num_columns`: El número de columnas en el mensaje CSV.

  La función utiliza `tf.io.decode_csv` para decodificar el mensaje CSV en una lista de tensores. Luego, convierte la clave en un número y devuelve una tupla que contiene el mensaje decodificado y la clave numérica.

```python
def get_kafka_test_dataset(topic_name, num_columns, batch_size):
    test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
        topics=[topic_name],
        group_id="testcg",
        servers="127.0.0.1:9092",
        stream_timeout=10000,
        configuration=[
            "session.timeout.ms=7000",
            "max.poll.interval.ms=8000",
            "auto.offset.reset=earliest"
        ],
    )
    test_ds = test_ds.map(lambda raw_message, raw_key: decode_kafka_item(raw_message, raw_key, num_columns))
    return test_ds.batch(batch_size)
```

- `get_kafka_test_dataset`: Esta función se utiliza para obtener un conjunto de datos de prueba desde Kafka. Toma tres argumentos:
  - `topic_name`: El nombre del tema de Kafka del que se obtendrán los datos de prueba.
  - `num_columns`: El número de columnas en los datos.
  - `batch_size`: El tamaño del lote para el conjunto de datos de prueba.

  La función crea un conjunto de datos de Kafka utilizando `tfio.experimental.streaming.KafkaGroupIODataset`. Se configuran varios parámetros, como el nombre del tema, el grupo de consumidores, los servidores de Kafka y la configuración de Kafka. Luego, se mapea cada elemento del conjunto de datos utilizando `decode_kafka_item` para decodificar los mensajes CSV y se agrupa en lotes de tamaño `batch_size`.

```python
def get_kafka_online_dataset(topic_name, num_columns):
    online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
        topics=[topic_name],
        group_id="cgonline",
        servers="127.0.0.1:9092",
        stream_timeout=10000,
        configuration=[
            "session.timeout.ms=7000",
            "max.poll.interval.ms=8000",
            "auto.offset.reset=earliest"
        ],
    )
    return online_train_ds
```

- `get_kafka_online_dataset`: Esta función se utiliza para obtener un conjunto de datos de entrenamiento en línea desde Kafka. Toma dos argumentos:
  - `topic_name`: El nombre del tema de Kafka del que se obtendrán los datos de entrenamiento en línea.
  - `num_columns`: El número de columnas en los datos.

  Al igual que en la función anterior, se crea un conjunto de datos de Kafka utilizando `tfio.experimental.streaming.KafkaBatchIODataset` con la configuración adecuada. Este conjunto de datos se utilizará para entrenar el modelo en línea.

```python
def train_online(model, online_train_ds, num_columns):
    for mini_ds in online_train_ds:
        mini_ds = mini_ds.shuffle(buffer_size=32)
        mini_ds = mini_ds.map(lambda raw_message, raw_key: decode_kafka_item(raw_message, raw_key, num_columns))
        mini_ds = mini_ds.batch(32)
        if len(mini_ds) > 0:
            model.fit(mini_ds, epochs=3)
```

- `train_online`: Esta función se encarga de entrenar el modelo en línea utilizando los datos de entrenamiento en línea de Kafka. Toma tres argumentos:
  - `model`: El modelo de TensorFlow que se va a entrenar en línea.
  - `online_train_ds`: El conjunto de datos de entrenamiento en línea de Kafka.
  - `num_columns`: El número de columnas en los datos.

  La función itera a través de lotes de datos (`mini_ds`) obtenidos del conjunto de datos de entrenamiento en línea. Cada lote se baraja y se decodifica utilizando `decode_kafka_item`. Luego, se agrupa en lotes de tamaño 32 (puedes ajustar este tamaño según sea necesario). Finalmente, el modelo se entrena en estos lotes durante 3 épocas.

```python
# Evaluación
test_ds = get_kafka_test_dataset("susy-test", NUM_COLUMNS, BATCH_SIZE)
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
```

- Evaluación del modelo: En esta sección, se obtiene el conjunto de datos de prueba de Kafka utilizando `get_kafka_test_dataset`. Luego, se utiliza el modelo previamente entrenado para evaluar su rendimiento en el conjunto de datos de prueba. La pérdida y la precisión se almacenan en `res` y se imprimen en la consola.

In [None]:
def decode_kafka_item(raw_message, raw_key, num_columns):
    message = tf.io.decode_csv(raw_message, [[0.0] for i in range(num_columns)])
    key = tf.strings.to_number(raw_key)
    return (message, key)


def get_kafka_test_dataset(topic_name, num_columns, batch_size):
    test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
        topics=[topic_name],
        group_id="testcg",
        servers="127.0.0.1:9092",
        stream_timeout=10000,
        configuration=[
            "session.timeout.ms=7000",
            "max.poll.interval.ms=8000",
            "auto.offset.reset=earliest"
        ],
    )
    test_ds = test_ds.map(lambda raw_message, raw_key: decode_kafka_item(raw_message, raw_key, num_columns))
    return test_ds.batch(batch_size)



def get_kafka_online_dataset(topic_name, num_columns):
    online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
        topics=[topic_name],
        group_id="cgonline",
        servers="127.0.0.1:9092",
        stream_timeout=10000,
        configuration=[
            "session.timeout.ms=7000",
            "max.poll.interval.ms=8000",
            "auto.offset.reset=earliest"
        ],
    )
    return online_train_ds

def train_online(model, online_train_ds, num_columns):
    for mini_ds in online_train_ds:
        mini_ds = mini_ds.shuffle(buffer_size=32)
        mini_ds = mini_ds.map(lambda raw_message, raw_key: decode_kafka_item(raw_message, raw_key, num_columns))
        mini_ds = mini_ds.batch(32)
        if len(mini_ds) > 0:
            model.fit(mini_ds, epochs=3)

# Evaluación
test_ds = get_kafka_test_dataset("susy-test", NUM_COLUMNS, BATCH_SIZE)
res = model.evaluate(test_ds)
print("test loss, test acc:", res)




La precisión del 80% significa que el modelo clasifica correctamente el 80% de las muestras en el conjunto de datos de prueba. Dado que el dataset SUSY es un conjunto de datos de clasificación binaria relacionado con la física de partículas, una precisión del 80% es un buen comienzo.