In [83]:
import json
from PIL import Image
import numpy as np
from pykafka import KafkaClient
from pykafka.common import OffsetType

def gen_client(hosts = "127.0.0.1:9092", topic_name = 'test'):
    client = KafkaClient(hosts=hosts)
    topic = client.topics[topic_name]
    return client, topic

def load_image_as_bytes(infilename) :
    img = Image.open( infilename )
    img.load()
    data = np.asarray( img, dtype="int32" )
    np_array_to_list = data.tolist()
    b = bytes(json.dumps(np_array_to_list), 'utf-8')
    return b

def produce(topic, image_as_jsonified_bytes):
    with topic.get_sync_producer() as producer:
        producer.produce(image_as_jsonified_bytes)
        
def consume_latest(topic, LAST_N_MESSAGES = 1):
    consumer = topic.get_simple_consumer(
        auto_offset_reset=OffsetType.LATEST,
        reset_offset_on_start=True
    )
    offsets = [(p, op.next_offset - LAST_N_MESSAGES - 1) for p, op in consumer._partitions.items()]
    offsets = [(p, o) if o != -1 else (p, -2) for p, o in offsets]
    consumer.reset_offsets(offsets)
    return consumer.consume().value

def decode(incoming):
    as_json = json.loads(incoming.decode('utf-8'))
    arr = np.array(as_json)
    return arr

In [None]:
b = load_image_as_bytes('download.jpeg')
client, topic = gen_client()
produce(topic, b)

In [84]:
incoming = consume_latest(topic)
if incoming is not None:
    received_image = decode(incoming)
    print(received_image)
else:
    print('No message received!')

[[[230 134 118]
  [230 134 118]
  [229 133 117]
  ...
  [220 133 116]
  [232 145 128]
  [209 121 107]]

 [[229 133 117]
  [229 133 117]
  [229 133 117]
  ...
  [227 140 130]
  [234 147 138]
  [216 129 120]]

 [[229 133 117]
  [229 133 117]
  [228 132 116]
  ...
  [194 108 109]
  [177  92  95]
  [154  69  74]]

 ...

 [[ 84  22  59]
  [ 92  28  65]
  [ 98  30  67]
  ...
  [127  43  67]
  [141  53  75]
  [149  60  78]]

 [[ 81  21  59]
  [ 87  25  62]
  [ 95  27  64]
  ...
  [140  55  78]
  [154  65  85]
  [163  71  86]]

 [[ 77  19  57]
  [ 86  24  63]
  [ 92  26  64]
  ...
  [150  62  84]
  [163  72  89]
  [170  76  90]]]
