# Test kafka stream

This notebook is used simply to test whether we can produce and consume data with PyKafka!


## Setup

  - Installed JDK 64-bit on Windows and added `JAVA_HOME` to the environment variables at the install directory, https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
  - Installed Apache ZooKeeper on Windows https://zookeeper.apache.org/releases.html
  - Installed Apache Kafka by following official QuickStart https://kafka.apache.org/quickstart
 
To use Apache Kafka we unzip the `kafka_2.11-2.1.0.tgz`, found at the Kafka quickstart. Unzip the contents to a convenient location, e.g. `C:\kafka`. 
 
### How to run

Go to the directory where Kafka was installed. Then start Zookeeper first and then start Kafka.

```
> bin\windows\zookeeper-server-start.bat config/zookeeper.properties
```

```
> bin\windows\kafka-server-start.bat config/server.properties
```

### Setup producer/consumer

To produce/consume data we first need a topic. Let's create the topic and call it 'test' by running the following in a new terminal/

```
> bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
```

Then we create a producer. This becomes interactive in the terminal and we can just start writing messages!

```
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
```

Untitled.ipynb


In [5]:
msg_count = 1000000
msg_size = 100
msg_payload = ('kafkatest' * 20).encode()[:msg_size]
print(msg_payload)
print(len(msg_payload))


b'kafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestkafkatestk'
100


In [6]:

bootstrap_servers = 'localhost:9092' # change if your brokers live else where

In [7]:
import time

producer_timings = {}
consumer_timings = {}

In [8]:
def calculate_thoughput(timing, n_messages=1000000, msg_size=100):
    print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
    print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
    print("{0:.2f} Msgs/s".format(n_messages / timing))

In [9]:
from pykafka import KafkaClient

def pykafka_producer_performance(use_rdkafka=False):
    
    # Setup client
    client = KafkaClient(hosts=bootstrap_servers)
    topic = client.topics[b'test']
    producer = topic.get_producer(use_rdkafka=use_rdkafka)

    msgs_produced = 0
    produce_start = time.time()
    for i in range(msg_count):
        # Start producing
        producer.produce(msg_payload)
                     
    producer.stop() # Will flush background queue
 
    return time.time() - produce_start

In [10]:
producer_timings['pykafka_producer'] = pykafka_producer_performance()
calculate_thoughput(producer_timings['pykafka_producer'])


Processed 1000000 messsages in 21.73 seconds
4.39 MB/s
46012.58 Msgs/s
