In [2]:
!pip install confluent_kafka

Collecting confluent_kafka
  Downloading confluent_kafka-1.9.0-cp39-cp39-manylinux2010_x86_64.whl (3.1 MB)
[K     |████████████████████████████████| 3.1 MB 787 kB/s eta 0:00:01
[?25hInstalling collected packages: confluent-kafka
Successfully installed confluent-kafka-1.9.0


In [13]:
from confluent_kafka import Producer


p = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for data in ['a', 'b', 'x']:
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)

    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

Message delivered to mytopic [0]
Message delivered to mytopic [0]
Message delivered to mytopic [0]


0

In [14]:
from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

Received message: a
Received message: b
Received message: x
Received message: a
Received message: b
Received message: x


KeyboardInterrupt: 

# AVRO

In [2]:
!pip install avro-python3

Collecting avro-python3
  Downloading avro-python3-1.10.2.tar.gz (38 kB)
Building wheels for collected packages: avro-python3
  Building wheel for avro-python3 (setup.py) ... [?25ldone
[?25h  Created wheel for avro-python3: filename=avro_python3-1.10.2-py3-none-any.whl size=44010 sha256=369b6dcb04c5854da285b561fc59da4d890bd8e06d4cc846915ddd2f0d1206ce
  Stored in directory: /home/galastra/.cache/pip/wheels/5a/29/4d/510c0e098c49c5e49519f430481a5425e60b8752682d7b1e55
Successfully built avro-python3
Installing collected packages: avro-python3
Successfully installed avro-python3-1.10.2


In [4]:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer


value_schema_str = """
{
   "namespace": "my.test",
   "name": "value",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

key_schema_str = """
{
   "namespace": "my.test",
   "name": "key",
   "type": "record",
   "fields" : [
     {
       "name" : "name",
       "type" : "string"
     }
   ]
}
"""

value_schema = avro.loads(value_schema_str)
key_schema = avro.loads(key_schema_str)
value = {"name": "Meyrom2"}
key = {"name": "Key"}


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


avroProducer = AvroProducer({
    'bootstrap.servers': 'localhost:9092',
    'on_delivery': delivery_report,
    'schema.registry.url': 'http://localhost:8081'
    }, default_key_schema=key_schema, default_value_schema=value_schema)

avroProducer.produce(topic='my_avro', value=value, key=key)
avroProducer.flush()

Message delivered to my_avro [0]


0

In [14]:


from confluent_kafka import SerializingProducer, KafkaError, Message
from confluent_kafka.serialization import StringSerializer, SerializationContext
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

import dataclasses
import random
import time
import uuid

@dataclasses.dataclass
class Transaction(object):
    customrer_id: int
    item_id: int
    item_count: int
    shopper_id: int
    current_time: int # Python doesnt have long type :| 
    transaction_id: str


def transaction_to_dict(transaction: Transaction, ctx: SerializationContext):
    """
    Returns a dict representation of a Transaction instance for serialization.
    Args:
        transaction (Transaction): transaction instance.
        ctx (SerializationContext): Metadata pertaining to the serialization
            operation.
    Returns:
        dict: Dict populated with transaction attributes to be serialized.
    """
    return dict(
        customrer_id=transaction.customrer_id,
        item_id=transaction.item_id,
        item_count=transaction.item_count,
        shopper_id=transaction.shopper_id,
        current_time=transaction.current_time,
        transaction_id=transaction.transaction_id
    )


def delivery_report(err: KafkaError, msg: Message):
    """
    Reports the failure or success of a message delivery.
    Args:
        err (KafkaError): The error that occurred on None on success.
        msg (Message): The message that was produced or failed.
    Note:
        In the delivery report callback the Message.key() and Message.value()
        will be the binary format as encoded by any configured Serializers and
        not the same object that was passed to produce().
        If you wish to pass the original object(s) for key and value to delivery
        report callback we recommend a bound callback or lambda where you pass
        the objects along.
    """
    if err:
        print(f'Delivery failed for Transaction record {msg.key()}: {err}')
        return
    print(f'Message delivered to topic {msg.topic()} partition [{msg.partition()}] @ offset {msg.offset()}')


def main():

    topic='gal-topic'

    value_schema = """
        {
            "namespace": "confluent.io.examples.serialization.avro",
            "name": "Transaction",
            "type": "record",
            "fields": [
                {"name": "name", "type": "string"}
            ]
        }
    """


    schema_registry_conf = {'url': 'http://localhost:8081'}
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)

    avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client,
                                     schema_str=value_schema)

    producer_conf = {'bootstrap.servers': 'localhost:9092',
                     'key.serializer': StringSerializer('utf_8'),
                     'value.serializer': avro_serializer}

    producer = SerializingProducer(producer_conf)

    print(f'Producing transaction records to topic {topic}. ^C to exit.')
    while True:
        # Serve on_delivery callbacks from previous calls to produce()
        # SIGINT can't be handled when polling, limit timeout to 1 second.
        producer.poll(1.0)
        
        try:
            
            transaction = Transaction(
                customrer_id = random.randrange(1, 11),
                item_id = random.randrange(11, 21),
                item_count = random.randrange(1, 20),
                shopper_id = random.randrange(21, 30),
                current_time = round(time.time() * 1000),
                transaction_id = str(uuid.uuid4())
            )
            producer.produce(topic=topic, key=str(transaction.customrer_id), value={'name' : 'tom'},
                             on_delivery=delivery_report)

            time.sleep(1)
        except KeyboardInterrupt:
            break
        except ValueError:
            print("Invalid input, discarding record...")
            continue

        

    print("Flushing records...")
    producer.flush()


if __name__ == '__main__':
    main()

Producing transaction records to topic gal-topic. ^C to exit.
Message delivered to topic gal-topic partition [23] @ offset 9
Message delivered to topic gal-topic partition [26] @ offset 12
Message delivered to topic gal-topic partition [1] @ offset 6
Message delivered to topic gal-topic partition [27] @ offset 10
Message delivered to topic gal-topic partition [26] @ offset 13
Message delivered to topic gal-topic partition [31] @ offset 20
Message delivered to topic gal-topic partition [26] @ offset 14
Message delivered to topic gal-topic partition [1] @ offset 7
Message delivered to topic gal-topic partition [4] @ offset 9
Message delivered to topic gal-topic partition [31] @ offset 21
Message delivered to topic gal-topic partition [31] @ offset 22
Message delivered to topic gal-topic partition [31] @ offset 23
Message delivered to topic gal-topic partition [0] @ offset 8
Message delivered to topic gal-topic partition [23] @ offset 10
Message delivered to topic gal-topic partition [31]

Message delivered to topic gal-topic partition [0] @ offset 20
Message delivered to topic gal-topic partition [23] @ offset 29
Message delivered to topic gal-topic partition [23] @ offset 30
Message delivered to topic gal-topic partition [13] @ offset 20
Message delivered to topic gal-topic partition [0] @ offset 21
Message delivered to topic gal-topic partition [31] @ offset 48
Message delivered to topic gal-topic partition [31] @ offset 49
Message delivered to topic gal-topic partition [0] @ offset 22
Message delivered to topic gal-topic partition [31] @ offset 50
Message delivered to topic gal-topic partition [31] @ offset 51
Message delivered to topic gal-topic partition [1] @ offset 20
Message delivered to topic gal-topic partition [9] @ offset 19
Message delivered to topic gal-topic partition [31] @ offset 52
Message delivered to topic gal-topic partition [4] @ offset 20
Message delivered to topic gal-topic partition [4] @ offset 21
Message delivered to topic gal-topic partition 

Message delivered to topic gal-topic partition [13] @ offset 29
Message delivered to topic gal-topic partition [13] @ offset 30
Message delivered to topic gal-topic partition [23] @ offset 45
Message delivered to topic gal-topic partition [1] @ offset 33
Message delivered to topic gal-topic partition [27] @ offset 33
Message delivered to topic gal-topic partition [4] @ offset 37
Message delivered to topic gal-topic partition [31] @ offset 74
Message delivered to topic gal-topic partition [27] @ offset 34
Message delivered to topic gal-topic partition [26] @ offset 36
Message delivered to topic gal-topic partition [1] @ offset 34
Message delivered to topic gal-topic partition [31] @ offset 75
Message delivered to topic gal-topic partition [27] @ offset 35
Message delivered to topic gal-topic partition [31] @ offset 76
Message delivered to topic gal-topic partition [27] @ offset 36
Message delivered to topic gal-topic partition [31] @ offset 77


%6|1656378112.856|FAIL|rdkafka#producer-10| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 273718ms in state UP)
%6|1656378112.858|FAIL|rdkafka#producer-7| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 322187ms in state UP)
%6|1656378112.864|FAIL|rdkafka#producer-6| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected (after 346989ms in state UP)
%6|1656378112.878|FAIL|rdkafka#producer-6| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
%6|1656378112.898|FAIL|rdkafka#producer-8| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.vers

Message delivered to topic gal-topic partition [9] @ offset 33


%6|1656378113.683|FAIL|rdkafka#producer-7| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
%6|1656378113.760|FAIL|rdkafka#producer-7| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY, 1 identical error(s) suppressed)
%3|1656378123.467|FAIL|rdkafka#producer-8| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv6#[::1]:9092 failed: Connection refused (after 0ms in state CONNECT)
%3|1656378123.898|FAIL|rdkafka#producer-8| [thrd:localhost:9092/bootstrap]: localhost:9092/1: Connect to ipv4#127.0.0.1:9092 failed: Connection refu

KeyboardInterrupt: 