In [1]:
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
import sys
import socket
import cv2
import os
import base64
import numpy as np
from bigdl.serving.schema import *
from utils import preprocess
import time

In [2]:
class InputQueue:
    def __init__(self, frontend_url=None, **kwargs):
        host = kwargs.get("host") if kwargs.get("host") else "localhost"
        port = kwargs.get("port") if kwargs.get("port") else "9092"
        self.topic_name = kwargs.get("topic_name") if kwargs.get("topic_name") else "serving_stream"
        self.interval_if_error = 1
        for key in ["host", "port", "topic_name"]:
            if key in kwargs:
                kwargs.pop(key)    
        self.db = KafkaProducer(bootstrap_servers=host+":"+port,
                                key_serializer=lambda k: json.dumps(k).encode('utf-8'),
                                value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                **kwargs)
        
    def enqueue(self, uri, **data):
        b64str = self.data_to_b64(**data)
        d = {"key":uri, "value":{"uri":uri, "data":b64str}}        
        self.__enqueue_data(d)
    
    def data_to_b64(self, **data):
        sink = pa.BufferOutputStream()
        field_list = []
        data_list = []
        for key, value in data.items():
            field, data = get_field_and_data(key, value)
            field_list.append(field)
            data_list.append(data)

        schema = pa.schema(field_list)
        batch = pa.RecordBatch.from_arrays(
            data_list, schema)

        writer = pa.RecordBatchStreamWriter(sink, batch.schema)
        writer.write_batch(batch)
        writer.close()
        buf = sink.getvalue()
        b = buf.to_pybytes()
        b64str = self.base64_encode_image(b)
        return b64str
    
    def __enqueue_data(self, data):
        future = self.db.send(self.topic_name, **data)
        try:
            future.get(timeout=10) # check if send successfully
        except kafka_errors:  # throw kafka_errors if failed
            traceback.format_exc()
        print("Write to Kafka successful")
    
    @staticmethod
    def base64_encode_image(img):
        # base64 encode the input NumPy array
        return base64.b64encode(img).decode("utf-8")
    
    def close(self):
        self.db.close()

In [3]:
def load_imagedata(img_dir):
    image_path = [os.path.join(img_dir, name) for name in os.listdir(img_dir) if name.endswith("jpg")]
    images = {}
    
    for img_p in image_path:
        origin_img = cv2.imread(img_p)
        img, ratio = preprocess(origin_img, (640, 640))
        img_arr = img.tobytes()
        image_name = os.path.split(img_p)[-1]
        images[image_name] = img
    return images

In [4]:
def push_data(batch_datas):
    host_name = socket.gethostname()
    port = "9092"
    n = 0
    input_queue = InputQueue(host=host_name, port=port, max_request_size=10485760) #设置发送数据最大长度是10M
    epoch = 2
    batch_size = len(batch_datas)
    
    for i in range(epoch):
        for image_name, data in batch_datas.items():
            m_id = "id-{:03d}-{}".format(i, image_name)
            input_queue.enqueue(m_id, image=data)
            time.sleep(0.1)
            n += 1
    input_queue.close()
    print("The total number of images written to Kafka is {}.".format(epoch*batch_size))

In [5]:
images = load_imagedata("./images") #images base64 string
push_data(images)



Write to Kafka successful
Write to Kafka successful
The total number of images written to Kafka is 2.
