# ![Digital Futures](https://github.com/digital-futures-academy/DataScienceMasterResources/blob/main/Resources/datascience-notebook-header.png?raw=true)

# Extracting Data from a Stream Provided by Kafka

This notebook demonstrates how to extract data from a stream provided by a Kafka stream.  The stream is randomly generated numbers between 0 and 100 that are emitted every 0.5s.  The objective is to tap into the stream and collect the next 100 values and put them into a ***Pandas*** **DataFrame**.

---

## Step 1: Install and Import Required Libraries

Install and import the necessary libraries, including confluent_kafka.Consumer and pandas.

### Environment Setup Scripts

Make sure that you have the correct environment running - this includes the correct version of Python and all the necessary libraries.  If running in VSCode, you may need to create a virtual environment and activate it before running code cells notebook.

If you are running this notebook after cloning and have not set up your environment to run shell commands, you will need to run the following commands in your terminal to set up the environment.

> **NOTE:**  These commands need to be executed in the terminal.  
>
> Open a terminal at the root of your project before executing these commands
> 
> Until your environment is set up, Jupyter Notebooks will not be able to run **shell** scripts.

```sh
# Create a virtual environment (add the command below)
python3 -m venv .venv
# Note: This command could also be python -m venv .venv 
# python3 and python are a symlink to the python version installed on your system

# Activate the virtual environment 
source .venv/bin/activate

# Install required package to execute shell commands from Jupyter Notebook
pip install ipykernel confluent-kafka pandas        ## OR 
pip install -r requirements.txt     ## IF there is already a requirements.txt file CONTAINING ipykenrnel in the project
```

`ipykernel` is a package that allows Jupyter Notebooks to run shell commands.

### Import Libraries

In [None]:
# Import the necessary libraries
from confluent_kafka import Consumer, KafkaException
import pandas as pd

---

## Configure Kafka Consumer

Set up the Kafka consumer configuration, including the bootstrap servers and topic.

In [None]:
# Configure Kafka Consumer
bootstrap_server = 'localhost:9092'
consumer_conf = {
    'bootstrap.servers': bootstrap_server,  # Replace with your Kafka bootstrap servers
    'group.id': 'my_group',  # Replace with your consumer group id
    'auto.offset.reset': 'earliest'
}

print("Kafka Consumer Configuration:", consumer_conf)

# Create Kafka Consumer
consumer = Consumer(consumer_conf)

# Subscribe to the topic
topic = 'random_numbers'  # Replace with your Kafka topic
consumer.subscribe([topic])


---

## Collect Data from Kafka Stream

Consume messages from the Kafka stream and collect 100 values.

In [None]:
# Collect Data from Kafka Stream
messages = []
num_messages_to_collect = 100

try:
    while len(messages) < num_messages_to_collect:
        msg = consumer.poll(timeout=1.0)  # Poll for new messages
        if msg is None:
            print("No message")
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition event
                continue
            else:
                # Error
                raise KafkaException(msg.error())
        else:
            # Message is valid
            messages.append(msg.value().decode('utf-8'))
finally:
    # Close the consumer to commit final offsets
    consumer.close()


---

## Store Data in Pandas DataFrame
Store the collected values into a pandas DataFrame.

In [None]:
# Store the collected values into a pandas DataFrame.

# Create a pandas DataFrame from the collected messages
df = pd.DataFrame(messages, columns=['message'])

# Display the first few and last few rows of the DataFrame
df
