This notebook allows to publish stuff onto a Kafka topic, for consumption by others, and then to read it back in. Will only work in Python 3

To start Kafka, run 
`nohup ~/kafka/bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &`

Check its status with 
`tail ~/kafka/kafka.log`

The command-line code for a Kafka listener is 

`~/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic_1 --from-beginning &`

The command-line code for a command-line Kafka publisher is 

`echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic_1 > /dev/null`

(after installing Kafka according to https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04), but using kafka 0.9.0.1 instead of 8.2 as in that link)

In [1]:
import sys
ver=sys.version_info
assert ver[0]>=3

from rx import Observable, Observer
from kafka import KafkaProducer, KafkaConsumer
import io

# install Avro using pip install avro-python3 NOT pip install avro
import avro.schema 
import avro.io
import json

# import local stuff
sys.path.append('../rx')
from KafkaObserver import KafkaObserver
from KafkaObservable import KafkaObservable
topic='test_topic_1'
avro_topic='avro_topic'

In [2]:
# Send one record
kprod=KafkaProducer()
kprod.send(topic,bytes('test','utf-8'))

# Send a sequence using the Observer wrapper
kobs=KafkaObserver(kprod,topic)
obs=Observable.from_iterable(range(6,20,2)) 
obs.map(lambda x: bytes(str(x), 'utf-8')).subscribe(kobs)

Sequence completed
<KafkaObserver.KafkaObserver object at 0x7f6c8420f6d8>


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x7f6c8420fb38>

In [4]:
# Read one record
consumer1 = KafkaConsumer(topic, auto_offset_reset='earliest', consumer_timeout_ms=100) 
next(consumer1)

ConsumerRecord(topic='test_topic_1', partition=0, offset=0, timestamp=None, timestamp_type=None, key=None, value=b'Hello, World', checksum=-1623774016, serialized_key_size=-1, serialized_value_size=12)

In [5]:
# Now use Avro

schema = avro.schema.Parse(json.dumps({
"namespace"    : "example.avro",
 "type": "record",
 "name": "Test",
 "fields": [ {"name": "value",  "type": ["int","null"]} 
 ]
}))
#schema = avro.schema.Parse(json.dumps(test_schema))

def avroEncode(dict, schema):
    writer = avro.io.DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer.write(dict, encoder)
    return bytes_writer.getvalue()

def avroDecode(msg,schema):
    bytes_reader = io.BytesIO(msg)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    return reader.read(decoder)
                         
    
x=avroEncode({'value':1}, schema)
avroDecode(x, schema)

{'value': 1}

In [6]:
# Dump a bunch of records to Kafka. Next iteration should use timestamps so that things remain ordered
kobs=KafkaObserver(KafkaProducer(),avro_topic)
obs=Observable.from_iterable(range(6,20,2)).map(lambda x: {'value':x})
obs.map( lambda x: avroEncode(x,schema) ).subscribe(kobs)


Sequence completed
<KafkaObserver.KafkaObserver object at 0x7f6c8412e668>


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x7f6c8412e908>

In [7]:
# create a Kafka Observable the pedestrian way
consumer1 = KafkaConsumer(avro_topic,auto_offset_reset='earliest', enable_auto_commit=False,consumer_timeout_ms=100, value_deserializer=lambda x: avroDecode(x,schema))
kafkaObservable=Observable.from_iterable(consumer1)
kafkaObservable.subscribe(lambda x: print(x.value))

# appears to run, but slowly, and doesn't print anything now though identical code did earlier?


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x7f6c84143320>

In [8]:
# try to run my Observable wrapper
from KafkaObservable import KafkaObservable
myKafkaObservable=KafkaObservable(avro_topic,value_deserializer=lambda x: avroDecode(x,schema))
myKafkaObservable.subscribe(lambda x: print(x.value))

# appears to run, but slowly, and doesn't print anything now though identical code did earlier?


<rx.disposables.anonymousdisposable.AnonymousDisposable at 0x7f6c840d06d8>