# Streaming - Kafka

**Deadline:10 mei om 23u59** 

In deze oefening wordt een streaming applicatie gemaakt door middel van het Kafka streaming platform.
Deze opgave bestaat uit drie delen:
* Ten eerste het installeren en opstarten van het Kafka platform
* Ten tweede het schrijven van een applicatie dat data stuurt naar het Kafka platform
* Ten derde een Spark applicatie dat de data verwerkt van het Kafka platform.

Indien je data moet/wil bewaren op het hdfs, doe dit dan in de **Oefeningen/Streaming** folder en maak deze folder ook leeg voor je begint.

Een kort overzicht van het werken met Kafka binnen python kan je [hier](https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1) vinden.

## Installatie en opstarten

Ga eerst op zoek naar hoe je Kafka kan gebruiken in python. 
Zoek hierbij dus de nodige stappen op om Kafka te installeren.
Beschrijf hieronder de te volgen stappen en extra applicaties die moeten geinstalleerd worden om kafka te kunnen installeren en starten.
Ga ten slotte op zoek naar een python package om te werken met kafka.
Zorg ervoor dat dit ook geinstalleerd is.
**Let op:** Het is reeds geinstalleerd in de kafka folder in je home-directory dus moeten deze commando's niet uitgevoerd worden.

Beschrijf hieronder de te volgen stappen:

De te volgen stappen zijn:
* **Stap 1:** Installeer benodigde Java Development Kit (JDK) (OpenJDK or Oracle JDK zijn twee voorbeelden)
* **Stap 2:** Download Kafka en ZooKeeper van: \
Kafka: https://kafka.apache.org/downloads \
ZooKeeper: https://zookeeper.apache.org/releases.html
* **Stap 3:** Uitpakken bestanden (vervang version door juiste versie die je gedownload hebt): \
Kafka: tar -xzf kafka_\<version>\.tgz \
ZooKeeper: tar -xzf zookeeper-\<version>\.tar.gz
* **Stap 4:** Configuratie: \
Kafka: in config/server.propperties \
ZooKeeper: zoo_sample.cfg is een template van de config. Rename naar zoo.cfg en edit naar juiste gegevens.

Indien er commando's moeten uitgevoerd worden in de terminal, plaats een kopie hiervan ook in de code cel hieronder en verwijs in commentaar naar de bijhorende stap.
* Ja, alles van de installatie moet in terminal gebeuren behalve de pip install.

* **Stap 5:** Installeer kafka-python package met pip.

In [None]:
# installatie commando's
pip install kafka-python

Aangezien kafka reeds geinstalleerd is moeten bovenstaande commando's niet uitgevoerd worden.
Echter moet kafka wel nog opgestart worden.
Schrijf in de code-cell hieronder de nodige commandline commando's om kafka goed op te starten.
**Voer deze commando's ook uit.**

* **Stap 6:** Opstarten van ZooKeeper en Kafka server.

Opstart commando's (in terminal)

* **Stap 7:** Open terminal in de juiste folder: \
(base) bigdata@bigdata-VirtualBox:~/kafka/bin$

* **Stap 8**: ZooKeeper server starten: \
./zookeeper-server-start.sh ../config/zookeeper.properties

* **Stap 9**: Kafka Server startem: \
./kafka-server-start.sh ../config/server.properties

Let zeker op voor de volgende zaken bij het gebruik van kafka:
* Het delete.topic.enable attribuut staat op true in de file config/server.properties. Dit laat toe om test-topics te verwijderen.
* Indien er bij het starten van de applicaties een foutmelding komt dat de poort reeds in gebruik is, kies dan een andere poort. Gebruik hiervoor de properties files in de config directory. Zorg er ook voor dat deze poort gekend is bij het starten van kafka door de configuratie aan te passen in de file **server.properties**. Deze poort heb je ook nodig bij het aanmaken van topics dus ga zeker op zoek naar welke ik geconfigureerd heb.

**Indien je de poort hebt aangepast, welke poort kan gebruikt worden om met kafka te connecteren:** ...

Indien je bij het opstarten van de applicaties foutmeldingen krijgt waar je niet direct het antwoord op weet, contacteer me en dan zoek ik samen met jou naar de oplossing.

Als kafka correct opgestart is dan kunnen we de installatie testen om dit te verifieren.
Voer daarvoor de volgende stappen uit:
* Stap 1: Maak een nieuw topic "test" aan.
* Stap 2: Print een lijst met alle beschikbare topics. (Plaats het nodige commando hieronder)
* Stap 3: Print een beschrijving van het net aangemaakte topic

Schrijf in de code-cellen hieronder de nodige commandline commando's om deze stappen uit te voeren.

In [2]:
from kafka import KafkaAdminClient, TopicPartition, KafkaConsumer
from kafka.admin import NewTopic
import json
from IPython.display import JSON
import os

In [3]:
# zookeeper server runnen
command = "cd ~/kafka/bin && ./zookeeper-server-start.sh ../config/zookeeper.properties"
os.system("gnome-terminal -- bash -c \"%s; exec bash\"" % command)

0

In [4]:
# kafka server runnen
command = "cd ~/kafka/bin && ./kafka-server-start.sh ../config/server.properties"
os.system("gnome-terminal -- bash -c \"%s; exec bash\"" % command)

0

In [5]:
bootstrap_servers = 'localhost:9092'
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

In [6]:
# alle topics verwijderen zodat er geen foutmelding komt van topic test already exists bij meerdere keren heel de
# de notebook te runnen

topic_names = admin_client.list_topics()
if topic_names:
    for topic_name in topic_names:
        try:
            admin_client.delete_topics(topics=[topic_name])
            print(f'Topic "{topic_name}" has been marked for deletion.')
        except TopicAlreadyMarkedForDeletionError:
            print(f'Topic "{topic_name}" has already been marked for deletion.')
else:
    print('No topics found in the Kafka cluster.')

No topics found in the Kafka cluster.


In [7]:
# aanmaken topic test
topic_name = 'test'

new_topic = NewTopic(topic_name, num_partitions=1, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic])

CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='test', error_code=0, error_message=None)])

In [8]:
# printen lijst met alle beschikbare topics

consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers)
topics = consumer.topics()
print(topics)

{'test'}


In [9]:
# printen informatie over test topic

topic_metadata = admin_client.describe_topics()
json_string = json.dumps(topic_metadata, indent=2)
print(json_string)

[
  {
    "error_code": 0,
    "topic": "test",
    "is_internal": false,
    "partitions": [
      {
        "error_code": 0,
        "partition": 0,
        "leader": 0,
        "replicas": [
          0
        ],
        "isr": [
          0
        ],
        "offline_replicas": []
      }
    ]
  }
]


Voor er kan overgegaan worden naar het coderen van kafka applicaties, gaan we nog testen of er berichten kunnen verstuurd worden door de Kafka streaming server via de command terminal.
Start hiervoor een console-producer applicatie en verstuur drie berichten. 
**Zorg ervoor dat er in minstens 1 van de berichten je naam staat.**

Schrijf hieronder het gebruikte commando om de console producer op te starten.

Nadat de berichten verstuurd zijn, maak een consumer applicatie aan die alle berichten vanaf het begin van het topic uitleest. 
Maak een screenshot van de output met het commando erbij en sla dit op als **output_test.png**.

Wanneer dit gebeurd is verwijder het test topic. Schrijf het commando hiervoor in de cell hieronder. Verifeer het commando voor het verwijderen van het topic door opnieuw een lijst af te printen met alle mogelijke beschikbare topics.

![Screenshot Output Test](output_test.png)

In [None]:
consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', group_id=None)

for message in consumer:
    value = message.value.decode('utf-8')
    print(value)

In [11]:
# delete topic commando

admin_client.delete_topics(topics=[topic_name])

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='test', error_code=0)])

In [12]:
# controleer of het commando goed verwijderd is
topics = consumer.topics()
print(topics)

set()


## Producer

Download een zelfgekozen boek in txt formaat, bijvoorbeeld aan de hand van [deze link](https://www.gutenberg.org/).
Schrijf nu een python producer applicatie in de code cell hieronder dat lijn per lijn dit boek wegschrijft als bytestring (door gebruik te maken van de .encode() functie) naar een nieuw topic met de naam "BookStream".
Zorg ervoor dat er een seconde gewacht wordt tussen het versturen van opeenvolgende lijnen. 
(Bij het testen kan je de code vroegtuidig stoppen door de kernel te onderbreken).
Door de file tag bovenaan wordt deze applicatie weggeschreven naar een python file.

**Let op:** Je wilt niet dat bij het testen meerdere keren de start van het boek weggeschreven wordt naar het topic. Voorzie dus ook de nodige python code om het topic te verwijderen en opnieuw aan te maken bij het starten van de applicatie. 
Indien je bij het aanmaken en verwijderen van topics foutmeldingen krijgt kan het helpen om in de terminal te verifieren welke topics reeds bestaan en eventueel een delay toe te voegen voor/na de verschillende stappen.

In [13]:
%%file kafka_producer.py
# Kafka producer streaming book line by line

import time
from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic

bookstream_topicname = 'BookStream'
bootstrap_servers = 'localhost:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)

# probeer te deleten, als dat failed bestaat hij niet dus is ok, anders delete hij sws het topic
topic_names = admin_client.list_topics()
if topic_names:
    for topic_name in topic_names:
        if topic_name == bookstream_topicname:
            print('Found an instance of BookStream topic. deleting...')
            admin_client.delete_topics(topics=[bookstream_topicname])
            
# bugfix met een delay
time.sleep(3)

# maak daarna opnieuw een aan
new_topic = NewTopic(bookstream_topicname, num_partitions=1, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic])
print(f'BookStream topic made.')

# bugfix met een delay
time.sleep(3)

print(f'Beginning streaming line per line.')

# lijn per lijn met 1 seconde delay versturen
file_path = './bookstream.txt'
with open(file_path, "r") as file:
    lines = file.readlines()

for line in lines:
    line = line.strip()
    if line:
        print(bookstream_topicname, line.encode())
        producer.send(bookstream_topicname, line.encode())
        time.sleep(1)

producer.close()

Overwriting kafka_producer.py


Schrijf hieronder het nodige commando om de python applicatie te starten in de file kafka_producer.py
Dit voer je echter best uit in een aparte terminal om deze file niet te blokkeren.

In [14]:
command = "python kafka_producer.py"
os.system("gnome-terminal -- bash -c \"%s; exec bash\"" % command)

0

Voor we de weggeschreven data gaan gebruiken in een consumer kan je ook controleren of je producer het gewenste resultaat geeft door alle data in een bepaald topic uit te printen.
Schrijf hieronder het terminal commando om dit uit te voeren. 

In [15]:
# controleer of de producer goed werkt
consumer = KafkaConsumer('BookStream', bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest', group_id=None)

for message in consumer:
    value = message.value.decode('utf-8')
    print(value)

﻿The Project Gutenberg eBook of The Benson Murder Case, by S. S. Van
Dine
This eBook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever. You may copy it, give it away or re-use it under the terms
of the Project Gutenberg License included with this eBook or online at
www.gutenberg.org. If you are not located in the United States, you
will have to check the laws of the country where you are located before
using this eBook.
Title: The Benson Murder Case
Author: S. S. Van Dine
Release Date: April 24, 2023 [eBook #70634]
Language: English
Produced by: Transcribed and produced by Brian Raiter
*** START OF THE PROJECT GUTENBERG EBOOK THE BENSON MURDER CASE ***
The Benson Murder Case
by S. S. Van Dine
[Frontispiece: Philo Vance, from a drawing by Herbert Stoops. A sketch
of a seated gentleman in evening dress and wearing an monocle.]
Contents
I. Philo Vance at Home
II. At the Scene of the Crime
III. A L

KeyboardInterrupt: 

# Consumer

Nadat je erin geslaagd bent om lijnen uit de boek naar een kafka-topic te schrijven, kan je verdergaan naar de volgende stap.
Hierbij ga je een python-applicatie schrijven die data binnenkrijgt via een kafka stream en deze verwerkt door middel van spark structured streaming.
Meer informatie over structured streaming kan je [hier](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) vinden.
Deze applicatie voert de volgende stapen uit:
* Print het schema van de dataframes aangeleverd door Kafka
* Zet de aangeleverde dataframes om naar een dataframe met de volgende kolommen:
 * Aantal karakters per lijn
 * Aantal keer dat elke letter voorkomt (enkel letters) en zorg ervoor dat je zowel hoofd- als kleine letters meetelt.
 * Aantal woorden in de lijn
* Zorg ervoor dat dit dataframe na het verwerken van elke batch uitgeprint wordt in een console.
* Zorg ervoor dat enkel nieuwe berichten in het topic behandeld worden.

Opmerkingen:
* Default wordt er steeds verder gelezen in het topic. Om elke keer dat je de consumer start, opnieuw te beginnen vanaf de start van het boek kan je de optie startingOffsets op earliest zetten.
* Vergeet niet de vragen onder de code te beantwoorden!

In [23]:
%%file kafka_consumer.py
# Kafka consumer met pyspark om een charcount uit te voeren

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, size, split, trim, length, regexp_replace, expr
from pyspark.sql.types import StringType

spark = SparkSession.builder \
    .appName("KafkaStreamReader") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
    .getOrCreate()

kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", "BookStream") \
    .option("startingOffsets", "earliest") \
    .load()

print("Schema of incoming dataframe:")
kafka_df.printSchema()

print("Contents of incoming dataframe:")

print("New dataframe with questions answered:")
new_df = kafka_df.select("value")
new_df = new_df.withColumn("value", col("value").cast("binary").cast("string"))
new_df = new_df.withColumn("word_count", size(split(trim(col("value")), " ")))
new_df = new_df.withColumn("char_count", length(col("value")))

for letter in 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ':
    new_df = new_df.withColumn(f"{letter.lower()}_lower_count", length(trim(expr(f"regexp_replace(value, '[^{letter.lower()}]', '')"))))

    new_df = new_df.withColumn(f"{letter.upper()}_upper_count", length(trim(expr(f"regexp_replace(value, '[^{letter.upper()}]', '')"))))

query = new_df.writeStream \
    .option("truncate", "false") \
    .option("path", "~/bigdata/oefening-streaming-QuintenStr") \
    .format("console") \
    .trigger(processingTime="1 second") \
    .outputMode("append") \
    .start()

query.awaitTermination()

Overwriting kafka_consumer.py


De bovenstaande applicatie kan gestart worden door het volgende commando uit te voeren. Let op het --packages argument om kafka en pyspark te combineren. Let op dat dit een streaming applicatie is die actief blijft dus onderbreek de kernel om te hertesten of voer dit uit in een aparte terminal. 
Maak een screenshot van een deel van de output (na initialisatie) en sla het op onder de naam **Streaming_output_1.png**.

In [24]:
# commando om consumer te starten
command = "python kafka_consumer.py"
os.system("gnome-terminal -- bash -c \"%s; exec bash\"" % command)

0

![Screenshot Output Stream 2](Streaming_output_1.png)

Screenshot is in kladblok op windows zo kan is het leesbaarder want in de terminal is het helemaal door elkaar geklutst :).

**Vragen:**
* Wat is het verschil tussen de append en complete methode om de data weg te schrijven? Toon hier een voorbeeld van op basis van de uitgewerkte oefening.
* Stel dat we het totaal aantal keer dat elke letter voorkomt in de laatst geziene 20 seconden willen optellen en tonen. Wat moet er hiervoor aangepast worden in bovenstaande applicatie? Voer deze aanpassingen aan in de code cell hieronder.
* We willen nu ook in de gegroepeerde data bijhouden hoeveel rijen er samengeteld werden. Voeg ook de aanpassing hiervoor toe in de code-cell hieronder. Wat moet er hiervoor aangepast worden in bovenstaande applicatie?

Indien de aanpassingen niet lukken. Noteer hieronder zeker je ideeën om de aanpassingen door te voeren. 
Sla na de aanpassingen opnieuw een screenshot op van een deel van de output (na initialisatie) op onder de naam **Streaming_output_2.png**

**Antwoorden:**
* Vraag 1: Het verschil tussen de "append" en "complete" output modes in Spark Structured Streaming is dat "append" alleen nieuwe data naar de uitvoerbestemming schrijft, terwijl "complete" de volledige resultaatset, inclusief historische gegevens, naar de uitvoerbestemming schrijft. "Append" is handig als je alleen nieuwe data wilt opslaan, terwijl "complete" geschikt is als je een volledig historisch overzicht van alle gegevens wilt bijhouden. Ik heb geen aggreation in mijn code dus ik kan wel geen gebruik maken van complete.
* Vraag 2:     .trigger(processingTime="20 seconds")
* Vraag 3: Omdat ik niet met gegroepeerde data werk kan ik simpelweg gewoon het aantal rijen in het dataframe uitprinten. Deze zijn soms meer of minder dan 20 door delays/cpu niet snel genoeg/eerste batch is alles omdat je met starting offset earliest zit.

In [18]:
%%file kafka_consumer20sec.py
# Kafka consumer met pyspark om een charcount uit te voeren

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, size, split, trim, length, regexp_replace, expr
from pyspark.sql.types import StringType

spark = SparkSession.builder \
    .appName("KafkaStreamReader") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0") \
    .getOrCreate()

kafka_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", "BookStream") \
    .option("startingOffsets", "earliest") \
    .load()

def process_batch(df, batch_id):
    print("Schema of incoming dataframe:")
    df.printSchema()

    print("Contents of incoming dataframe:")
    df.show(truncate=False)

    print("New dataframe with questions answered:")
    new_df = df.select("value")
    new_df = new_df.withColumn("value", col("value").cast("binary").cast("string"))
    new_df = new_df.withColumn("word_count", size(split(trim(col("value")), " ")))
    new_df = new_df.withColumn("char_count", length(col("value")))

    for letter in 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ':
        new_df = new_df.withColumn(f"{letter.lower()}_lower_count", length(trim(expr(f"regexp_replace(value, '[^{letter.lower()}]', '')"))))

        new_df = new_df.withColumn(f"{letter.upper()}_upper_count", length(trim(expr(f"regexp_replace(value, '[^{letter.upper()}]', '')"))))
    
    new_df.show(truncate=False)
    
    print("Aantal rijen opgeteld:")
    print(new_df.count())


#new_df.show(truncate=False)

query = kafka_df.writeStream \
    .trigger(processingTime="20 seconds") \
    .outputMode("append") \
    .foreachBatch(process_batch) \
    .start()

query.awaitTermination()

Overwriting kafka_consumer20sec.py


In [19]:
# commando om consumer te starten
command = "python kafka_consumer20sec.py"
os.system("gnome-terminal -- bash -c \"%s; exec bash\"" % command)

0

![Screenshot Output Stream 2](Streaming_output_2.png)

Screenshot is in kladblok op windows zo kan is het leesbaarder want in de terminal is het helemaal door elkaar geklutst :).