# **10. Using Kafka Streams in Capymoa**

This notebook shows how a **KafkaStream** class can be used to get instances from Kafka Producers

**Apache Kafka** is an open source distributed streaming system used for stream processing, real-time data pipelines, and data integration at scale. Continuously received data can be flexibly processed using Kafka.

---

*More information about CapyMOA can be found in* https://www.capymoa.org

**last update on 08/11/2024**

## 1. Setup

### For Windows and for localhost servers

- Skip this section if the server already exists

### 1.1 Install Kafka for Windows

- Head to the [**Apache Kafka** Website](https://kafka.apache.org/downloads) and download the `.tgz` tar file for **Windows**
- Extract the downloaded `.tgz` file
- **Recommended**: To set up the environment variables easily, move the extracted folder to the `C:\ Drive` and rename it to `kafka`
- Head to `kafka\config\server.properties` and change the `log.dirs=/tmp/kafka-logs` to `log.dirs=c:/kafka/kafka-logs` or the location and name of your installed **kafka** folder. Save this change.
- Head to `kafka\config\zookeeper.properties` and change the `dataDirs=/tmp/zookeeper` to `dataDirs=c:/kafka/zookeeper-data` or the location and name of your installed **kafka** folder. Save this change.

### 1.2 Setup Zookeeper

- Open a new command line and head to the directory where **kafka** was installed, for e.g, `c:/kafka`
- Run the command:

`.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties`

### 1.3 Setup Local Server

- Open a new command line and head to the directory where **kafka** was installed, for e.g, `c:/kafka`
- Run the command:

`.\bin\windows\kafka-server-start.bat .\config\server.properties`

### 1.4 Create a Kafka Topic

- Open a new command line and head to the directory where **kafka** was installed, for e.g, `c:/kafka`
- Head to `c:\kafka\bin\windows`
- Run the command:

`.\kafka-topics.bat --create --topic <topic_name> --bootstrap-server <server_name>`

Note: For Local Servers the server name is usually `localhost:9092`

### 1.5 Run a Kafka Producer

- In the same command line/location as where the topic is created (`c:\kafka\bin\windows`) run the command:

`.\kafka-console-producer.bat --broker-list <server_name> --topic <topic_name>`

### 1.6 Add Instances to the Kafka Producer

Add JSON formatted data instanced to the kafka producer by typing instances after the prompt `>`

Note: The Kafka Producer must have a JSON format - one line per instance
For example:

`{"feature1": 0.123456, "feature2": 0.828380, "feature3": 0.141512, "feature4": 0.314159, "feature5": 0.422915, "feature6": 0.414912, "target/class": 1}`

## 2. Create a **KafkaStream** instance

### 2.1 Required Imports

In [None]:
from capymoa.stream._stream import KafkaStream
import numpy as np

### 2.2 Initiate the KafkaStream Class depending on the known configuration

Options for
- "Classification" or "Regression" (mention the `target_type`)
- Schema or No Schema
- Data Types or Unknown Data Types \
FORMAT for Data Types: `[('column1', np.float64), ('column2', np.int32), ('column3', np.float64), ('column3', str)]`


In [None]:
server = "localhost:9092"
topic = "my_test_topic_1"
group_id = "group1"


# Classification with known datatypes

my_kafka_stream = KafkaStream(
            dtypes = [('period', np.float64), ('nswprice', np.float64), ('nswdemand', np.float64), ('vicprice', np.float64), ('vicdemand', np.float64), ('transfer', np.float64)],
            target_index = -1,
            class_labels = ['0', '1'], # For classification tasks
            server = server,
            topic = topic, 
            group_id = group_id,
            buffer_size = 3,
            schema = None)


# Classification with unknown datatypes

my_kafka_stream = KafkaStream(
            dtypes = None,
            target_type = 'categorical', # "numeric" for regression, 'categorical' for classification
            target_index = -1,
            class_labels = ['0', '1'], # For classification tasks
            server = server,
            topic = topic, 
            group_id = group_id,
            buffer_size = 3,
            schema = None)


# Regression with known datatypes

my_kafka_stream = KafkaStream(
            dtypes = [('period', np.float64), ('nswprice', np.float64), ('nswdemand', np.float64), ('vicprice', np.float64), ('vicdemand', np.float64), ('transfer', np.float64)], # [('column1', np.float64), ('column2', np.int32), ('column3', np.float64), ('column3', str)],
            target_type = 'numeric', # "numeric" for regression, 'categorical' for classification
            target_index = -1,
            server = server,
            topic = topic, 
            group_id = group_id,
            buffer_size = 3,
            schema = None)


# Regression with unknown datatypes

my_kafka_stream = KafkaStream(
            dtypes = None,
            target_type = 'numeric', # "numeric" for regression, 'categorical' for classification
            target_index = -1,
            server = server,
            topic = topic, 
            group_id = group_id,
            buffer_size = 3,
            schema = None)


## 3. Operating the **KafkaStream** instance

### 3.1 Poll Instances from Kafka Producer

The **KafkaStream** works on a buffer system whose size is specified in the constructor using `buffer_size`. To load instances into this buffer we use the `poll_kafka()` method. Only after polling `next_instance()` can be called.

In [None]:
my_kafka_stream.poll_kafka()

### 3.2 Get the next instance from the buffer.
This method only works if the buffer is not empty; it raises an error otherwise. The instance can be stored in a local variable for processing

In [None]:
my_kafka_stream.next_instance()

### 3.3 Check if there are more instances in the current buffer

This method checks if all the items in the current buffer have been processed or not. If `False' is returned, then onw would have to poll for any new instances.

In [None]:
my_kafka_stream.has_more_instances()

### 3.4 Return the number of processed instances till now by the stream

A processed instance is considered when it is returned by `next_instance()`.

In [None]:
my_kafka_stream.get_processed_instances()

### 3.5 Getter Methods

- Schema
- Buffer Size

In [None]:
my_kafka_stream.get_schema() # Schema

In [None]:
my_kafka_stream.get_buffer_size() # Buffer Size

### 3.6 Close the Kafka Consumer

In [None]:
my_kafka_stream.close()