# Práctica: Apache Kafka & PySpark

El objetivo de esta práctica es construir una aplicación que permite la ingesta de tweets usando Apache Kafka y puedan ser procesados como flujos de datos estructurados [Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html).

![my_test_image](https://turing.iimas.unam.mx/~blancavg/images/app_twitter.png)

### ¿Qué es Apache Kafka?

Kafka es una plataforma distribuida de transmisión de datos que permite publicar, almacenar, y procesar en tiempo real. Kafka es un proyecto de código abierto desarrollado por LinkedIn y donado a la Apache Software Foundation escrito en Java y Scala.

Ventajas que ofrece Kafka en comparación a otras opciones para el manejo de datos más tradicionales:

- Garantiza un procesamiento rápido y fluido de datos.
- Escalabilidad
- Tolerancia a fallos
- Pueden manejar escritura y lectura de datos de alta frecuencia
- Unifica los canales de datos entre microservicios (contenedores).

#### Conceptos

- Topico: Un tópico es un flujo de datos, los cuales se guardan en forma formato (llave, valor). Cada evento (dato) que llega al sistema debe ser parte de un tópico. 

- Productores: Son las apps o microservicios que  publican datos en el sistema de Kafka. Publican datos en los tópicos de su eleccion.

- Consumidores: Son las apps o microservicios que usan los datos publicados por los productores. Para consumir datos, un consumidor debe suscribirse a un tópico.

- Broker: es el conjunto de servidores (clúster) donde se ejecutar Kafka. Los datos de un tópico son replicados y particionados en varios brokers, los cual permite a los consumidores leer datos en paralelo y tolerar fallos.

### Instalación de Apache Kafka

En esta sección se describe un conjunto de pasos necesarios para la instalación y funcionamiento de Apache Kafka.

In [0]:
%sh hostname

1008-204032-lfazxauf-10-172-210-210


In [0]:
%sh cat /etc/os-release

NAME="Ubuntu"
VERSION="20.04.6 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.6 LTS"
VERSION_ID="20.04"
HOME_URL="https://www.ubuntu.com/"
SUPPORT_URL="https://help.ubuntu.com/"
BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/"
PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy"
VERSION_CODENAME=focal
UBUNTU_CODENAME=focal


In [0]:
%sh java -version

openjdk version "1.8.0_382"
OpenJDK Runtime Environment (Zulu 8.72.0.17-CA-linux64) (build 1.8.0_382-b05)
OpenJDK 64-Bit Server VM (Zulu 8.72.0.17-CA-linux64) (build 25.382-b05, mixed mode)


In [0]:
%sh whoami

root


##### Descarga de Apache Kafka

In [0]:
%sh wget https://dlcdn.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz

--2024-10-08 21:51:12--  https://dlcdn.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 120900366 (115M) [application/x-gzip]
Saving to: ‘kafka_2.12-3.8.0.tgz’

     0K .......... .......... .......... .......... ..........  0% 6.11M 19s
    50K .......... .......... .......... .......... ..........  0% 7.78M 17s
   100K .......... .......... .......... .......... ..........  0% 6.85M 17s
   150K .......... .......... .......... .......... ..........  0% 12.9M 15s
   200K .......... .......... .......... .......... ..........  0% 16.0M 13s
   250K .......... .......... .......... .......... ..........  0% 76.8M 11s
   300K .......... .......... .......... .......... ..........  0% 39.1M 10s
   350K .......... .......... .......... .......... ..........  0% 12.4M 10s
   400K ...

In [0]:
%sh ls -ltr ./

total 119380
-rw-r--r-- 1 root root 120900366 Jul 26 17:06 kafka_2.12-3.8.0.tgz
drwxr-xr-x 2 root root      4096 Oct  8 16:30 conf
-r-xr-xr-x 1 root root      2755 Oct  8 16:30 hadoop_accessed_config.lst
drwxr-xr-x 2 root root      4096 Oct  8 16:30 azure
-r-xr-xr-x 1 root root   1306936 Oct  8 16:30 preload_class.lst
drwxr-xr-x 3 root root      4096 Oct  8 20:41 eventlogs
drwxr-xr-x 5 root root      4096 Oct  8 20:45 metastore_db
drwxr-xr-x 2 root root      4096 Oct  8 21:01 logs
-rw-r--r-- 1 root root      2254 Oct  8 21:01 craft-popular-urls
drwxr-xr-x 2 root root      4096 Oct  8 21:45 ganglia


##### Extracción de Apache Kafka

In [0]:
%sh tar -xzf kafka_2.12-3.8.0.tgz

In [0]:
%sh ls -ltr ./

total 119384
drwxr-xr-x 7 root root      4096 Jul 23 08:07 kafka_2.12-3.8.0
-rw-r--r-- 1 root root 120900366 Jul 26 17:06 kafka_2.12-3.8.0.tgz
drwxr-xr-x 2 root root      4096 Oct  8 16:30 conf
-r-xr-xr-x 1 root root      2755 Oct  8 16:30 hadoop_accessed_config.lst
drwxr-xr-x 2 root root      4096 Oct  8 16:30 azure
-r-xr-xr-x 1 root root   1306936 Oct  8 16:30 preload_class.lst
drwxr-xr-x 3 root root      4096 Oct  8 20:41 eventlogs
drwxr-xr-x 5 root root      4096 Oct  8 20:45 metastore_db
drwxr-xr-x 2 root root      4096 Oct  8 21:01 logs
-rw-r--r-- 1 root root      2254 Oct  8 21:01 craft-popular-urls
drwxr-xr-x 2 root root      4096 Oct  8 21:45 ganglia


##### Inicio del servicio de ZooKeeper

ZooKeeper es una solución de consistencia de datos distribuidos, dedicada a proporcionar un servicio de coordinación distribuida con alto rendimiento, alta disponibilidad y estrictas capacidades de control de acceso secuencial para aplicaciones distribuidas.

ZooKeeper es un servicio que emplea Kafka para administrar los brokers.

In [0]:
%sh cd kafka_2.12-3.8.0
ls -ltr ./
bin/zookeeper-server-start.sh config/zookeeper.properties

total 72
-rw-r--r-- 1 root root 28359 Jul 23 08:04 NOTICE
-rw-r--r-- 1 root root 15295 Jul 23 08:04 LICENSE
drwxr-xr-x 2 root root  4096 Jul 23 08:07 licenses
drwxr-xr-x 3 root root  4096 Jul 23 08:07 config
drwxr-xr-x 3 root root  4096 Jul 23 08:07 bin
drwxr-xr-x 2 root root  4096 Jul 23 08:07 site-docs
drwxr-xr-x 2 root root 12288 Oct  8 21:52 libs
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/databricks/jars/----ws_3_3--mvn--hadoop3--org.apache.logging.log4j--log4j-slf4j-impl--org.apache.logging.log4j__log4j-slf4j-impl__2.18.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/databricks/driver/kafka_2.12-3.8.0/libs/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2024-10-08 21:52:49,096] INFO Reading configuration from: config/zookeepe