In [3]:
import random
import string
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from resources.parameters import params

class QueryProducer:
    def __init__(self):
        self._seed = random.randint(1, 3650)
        self._params = params
        self._schemas()
        self._initProducer()
    
    def _schemas(self):
        self._keySchema = avro.load("schemas/keyschema.avsc")
        self._valueSchema = avro.load("schemas/valueschema.avsc")
            
    
    def _initProducer(self):
        self._producer = AvroProducer(config = self._params["consumerconfig"]
                                     , default_key_schema = self._keySchema
                                     , default_value_schema = self._valueSchema
                                     )
        
        self._producer.flush()
        
    def _produce_msg(self, value):
        self._seed += 1
        random.seed(self._seed)
        
        key = "".join(random\
                      .SystemRandom()\
                      .choice(string.ascii_uppercase + string.digits) for _ in range(10)
                      )
        
        self._producer.produce(topic = self._params["topic"]
                              , key = key
                              , value = value
                              )
    
    @property
    def msg(self):
        return self._msg
    
    @msg.setter
    def send(self, msg):
        self._msg = msg
        self._produce_msg(self._msg)

In [4]:
from resources.parameters import params
from confluent_kafka.avro import AvroConsumer

class QueryConsumer:
    def __init__(self):
        self._params = params
        self._switch = False
        self._initConsumer()
    
    def _initConsumer(self):
        self._consumer = AvroConsumer(config = self._params["consumerconfig"])
        self._consumer.subscribe([self._params["topic"]])
        
    def _pollMsg(self):
        while self._switch:
            try:
                msg = self._consumer.poll(0)
                
            except:
                pass
            
            if msg is not None:
                    return msg.key(), msg.value()

In [5]:
from threading import Thread

class Pipe(QueryConsumer):
    def __init__(self):
        super().__init__()
        self._queue = []
    
    def queue(self):
        return self._queue
    
    def _trigger(self):
        self._query = self._pollMsg()
    
    @property
    def on(self):
        self._switch = True
        self.thread = Thread(target = self._trigger)
        self.thread.start()

        return self
        
    @property
    def off(self):
        self._switch = False
        
        return self
    
    @property
    def _query(self):
        return self._msg
        
    @_query.setter
    def _query(self, msg):
        self._msg = msg
        if msg is not None:
            self._queue.append(msg)
        return self

In [6]:
producer = QueryProducer()

KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "max.poll.interval""}

In [114]:
pipe = Pipe()

In [124]:
pipe.on

<__main__.Pipe at 0x7fccfc295b38>

In [123]:
import json

tosend = """
{
    "brand":"brandA"
    , "itemName":"tenisA"
    , "itemSpec":"seriaA"
    , "color":"white"
    , "size":"40"
}
"""

msg = json.loads(tosend)

producer.send = msg

In [125]:
pipe.thread.is_alive()

True

In [126]:
pipe.queue()

[]

In [127]:
pipe.off

<__main__.Pipe at 0x7fccfc295b38>