Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Last two messages are not written to the Topic #994

Open
ijbo opened this issue Feb 13, 2020 · 0 comments
Open

Last two messages are not written to the Topic #994

ijbo opened this issue Feb 13, 2020 · 0 comments

Comments

@ijbo
Copy link

ijbo commented Feb 13, 2020

Below is my code :
Problem : Producer not writing the last two messages to the Topic.

`import os
import asyncio
import websockets
from pykafka import KafkaClient
from websockets.extensions import permessage_deflate


class Server:
    client = None
    kafka_client = None
    topic_name = "tpa_19"
    topic = None
    producer = None

    def get_port(self):
        return os.getenv('WS_PORT', '10015')

    def connect_kafka_client(self):
        client = KafkaClient(hosts="localhost:9092", use_greenlets=True)
        self.client = client
        self.set_topic()
        print("Connection Done with Kafka")

    def set_topic(self):
        self.topic = self.client.topics[self.topic_name]
        self.producer = self.topic.get_producer(min_queued_messages=1,max_queued_messages=0,
                                                linger_ms=500)
 
    def get_host(self):
        return os.getenv('WS_HOST', 'localhost')

    def start(self):
        return websockets.serve(self.handler, self.get_host(), self.get_port(), ping_interval=None, max_size=None,
                                max_queue=None,close_timeout=None,extensions=[
        permessage_deflate.ServerPerMessageDeflateFactory(
            server_max_window_bits=11,
            client_max_window_bits=11,
            compress_settings={'memLevel': 4},
        ),
    ])

    async def send_message_to_kafka(self, producer, row):
        try:
            # print(row)
            producer.produce(row.encode())
        except Exception as ex:
            print(ex)

    async def handler(self, websocket, path):
        async for row in websocket:
             await self.send_message_to_kafka(self.producer, row)
 

if __name__ == '__main__':
    ws = Server()
    ws.connect_kafka_client()
    asyncio.get_event_loop().run_until_complete(ws.start())
    asyncio.get_event_loop().run_forever()
`
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant