# Iniciar los servidores
cd kafka_2.11-2.1.0/

zookeeper-server-start.sh config/zookeeper.properties

kafka-server-start.sh config/server.properties

# Topic

## Opciones
kafka-topics.sh

## Errores al crear
kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create

<pre>WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Missing required argument "[partitions]"</pre>

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3

<pre>WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Missing required argument "[replication-factor]"</pre>

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --
replication-factor 2

<pre>Error while executing topic command : Replication factor: 2 larger than available brokers: 1. 
ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.</pre>

## Crear

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --create --partitions 3 --
replication-factor 1

<pre>Created topic &quot;first_topic&quot;.</pre>

## Listar

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

<pre>first_topic</pre>

## Descripcion

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic first_topic --describe

<pre>Topic:first_topic	PartitionCount:3	ReplicationFactor:1	Configs:
	Topic: first_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: first_topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: first_topic	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
</pre>

## Crear un segundo topic

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --create --partitions 6 --replication-factor 1

## Listar los topics

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

<pre>first_topic
second_topic
</pre>

## Eliminar 

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic second_topic --delete

<pre>Topic second_topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
</pre>

## Mostrar

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

<pre>first_topic</pre>

# Producer

## Opciones 

kafka-console-producer.sh

## Broker 

Escribir en un topic

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

<pre>&gt;Hola mundo
&gt;aprendiendo kafka
&gt;mas texto para seguir aprendiendo
&gt;otro mensaje
</pre>

Ctrl+C para salir

## Broker confirmacion

espera la sincronización de las replicas para 
realizar la confirmación, lo cual garantiza que el registro no se pierda mientras al menos
una replica se mantenga levantado y en ejecución, esta (acks=all) es la garantía mas fuerte de disponibilidad. 

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic firsopic --producer-property acks=all

<pre>&gt;Un mensaje que es confirmado
&gt;otro mensaje mas
&gt;seguimos aprendiendo
</pre>

## Broker a un topic inexistente

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic new_topic

<pre>&gt;Mensaje en topic nuevo inexistente
WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {firnew_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
&gt;otro mensaje
&gt;un ultimo mensaje</pre>

Al no existir ese topic, se crea uno nuevo automaticamente

## Listar

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

<pre>firnew_topic
first_topic
</pre>

## Descripcion

kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic firnew_topic --describe

<pre>Topic:firnew_topic	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: firnew_topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
</pre>

# Consumer

## Opciones 

kafka-console-consumer.sh

## leer
 
Iniciamos el consumer para que este a la escucha de ese topic (no aparece ningun registro)

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic

Despues se inicia el broker para introducir los datos

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

- Mensaje de broker:
<pre>&gt;hola nuevo mensaje para el consumer
&gt;mas mensajes de prueba
&gt;
</pre>
- Mensaje desde consumer:
<pre>hola nuevo mensaje para el consumer
mas mensajes de prueba
</pre>
 
Cancelamos el consumer (utilizando Ctrl+C) y lo lanzamos de nuevo pero indicando que
lea los mensajes desde el principio del topic y así podremos ver todos los mensajes 
además de los que vayan llegando

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

<pre>aprendiendo kafka
otro mensaje mas
hola nuevo mensaje para el consumer
Hola mundo
otro mensaje
Un mensaje que es confirmado
mas texto para seguir aprendiendo
seguimos aprendiendo
mas mensajes de prueba

</pre>

## Group

Se inicia en un terminal un consumer

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-first-application

Se inicia en otro terminal otro consumer

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-first-application

Por ultimo, se inicia en otro terminal un broker y se escribe un mensaje

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic

- Mensaje del broker:
<pre>&gt;esto es un mensaje de prueba
&gt;para consumer-group
&gt;
</pre>

- Mensaje del primer consumer
<pre>para consumer-group
</pre>

- Mensaje del segundo consumer
<pre>esto es un mensaje de prueba</pre>

### Opciones
kafka-consumer-groups.sh

### Listar

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

<pre>my-first-application
my-second-application
</pre>

### Visualizar offsets / describe

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-second-application

<pre>TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
first_topic     0          8               8               0               consumer-1-ed..                                 /127.0.0.1      consumer-1
first_topic     1          7               7               0               consumer-1-ed..                                 /127.0.0.1      consumer-1
first_topic     2          7               7               0               consumer-1-ed..                                 /127.0.0.1      consumer-1
</pre>

### Comprobar los otros grupos 

Se lanza de nuevo el consumer para el grupo my-first-application ya
que puede que todavía haya mensajes que no hayamos leído, en este caso
al haber estado lanzando mensajes para ver como los
consumía el grupo my-second-application, arroja lo siguiente después de ejecutar el
comando:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-first-application

<pre>otro mensaje
otro mensaje
esto es una prueba con group 2

</pre>

### offsets del primer grupo

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-application

<pre>TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first_topic     0          8               9               1               -               -               -
first_topic     1          7               7               0               -               -               -
first_topic     2          7               8               1               -               -    </pre>

### Resetear los offsets

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic

<pre>TOPIC                          PARTITION  NEW-OFFSET     
first_topic                    0          0              
first_topic                    2          0              
first_topic                    1          0              
</pre>

### Lanzar de nuevo el consumer

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-first-application

<pre>aprendiendo kafka
otro mensaje mas
hola nuevo mensaje para el consumer
esto es un mensaje de prueba
con 3 consumer
ya se ha cerrado un consumer
otro consumer
otro mensaje
Hola mundo
otro mensaje
Un mensaje que es confirmado
esto es otra prueba
cerrando un consumer 
ahora se ha cancelado
otro mensaje
mas texto para seguir aprendiendo
seguimos aprendiendo
mas mensajes de prueba
para consumer-group
y ahora se va hacer otra prueba
y se sigue mandando mensajes
esto es una prueba con group 2

</pre>

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group my-first-application

<pre>TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first_topic     0          8               9               1               -               -               -
first_topic     1          7               7               0               -               -               -
first_topic     2          7               8               1               -               -    </pre>

###  shift-by

Cambia los offsets

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic

<pre>TOPIC                          PARTITION  NEW-OFFSET     
first_topic                    0          6              
first_topic                    2          5              
first_topic                    1          5              
</pre>

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic

<pre>TOPIC                          PARTITION  NEW-OFFSET     
first_topic                    0          4              
first_topic                    2          3              
first_topic                    1          3              
</pre>


kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group my-first-application

<pre>hola nuevo mensaje para el consumer
esto es un mensaje de prueba
con 3 consumer
ya se ha cerrado un consumer
otro consumer
otro mensaje
Un mensaje que es confirmado
esto es otra prueba
cerrando un consumer 
ahora se ha cancelado
otro mensaje
mas mensajes de prueba
para consumer-group
y ahora se va hacer otra prueba
y se sigue mandando mensajes
esto es una prueba con group 2

</pre>