In [1]:
from kafka import KafkaProducer
from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner
from kafka import TopicPartition

In [2]:
def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(key, encoding = 'utf-8')
        value_bytes = bytes(value, encoding = 'utf-8')
        x = producer_instance.send(topic_name, value = value_bytes)
        producer_instance.flush()
        return x
    except Exception as ex:
        print('Exception in publishing message')
        print(str(ex))


def connect_kafka_producer():
    print('connecting to kafka')
    _producer = None
    try:
        _producer = KafkaProducer(
            bootstrap_servers = ['kafka:9092'], api_version = (0, 10),
            partitioner = RoundRobinPartitioner()
        )
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    finally:
        print('successfully connected to kafka')
        return _producer

In [3]:
kafka_producer = connect_kafka_producer()

connecting to kafka
successfully connected to kafka


In [4]:
kafka_producer.partitions_for('polarities')

{0, 1, 2}

In [5]:
from kafka import KafkaConsumer

In [23]:
publish_message(kafka_producer, 'polarities', 'polarity', 'testing10').get(timeout=10)

RecordMetadata(topic='polarities', partition=1, topic_partition=TopicPartition(topic='polarities', partition=1), offset=9, timestamp=1550384250389, checksum=1138447739, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1)

In [22]:
publish_message(kafka_producer, 'polarities', 'polarity', 'testing9').get(timeout=10)

RecordMetadata(topic='polarities', partition=0, topic_partition=TopicPartition(topic='polarities', partition=0), offset=5, timestamp=1550384245998, checksum=3836992277, serialized_key_size=-1, serialized_value_size=8, serialized_header_size=-1)

In [17]:
consumer = KafkaConsumer(
    auto_offset_reset = 'earliest',
    bootstrap_servers = ['kafka:9092'],
    partition_assignment_strategy = ['RoundRobinPartitionAssignor'],
    api_version = (0, 10),
    consumer_timeout_ms = 1000,
)
consumer.assign([TopicPartition(topic='polarities',partition=0)])

In [18]:
consumer2 = KafkaConsumer(
    auto_offset_reset = 'earliest',
    bootstrap_servers = ['kafka:9092'],
    partition_assignment_strategy = ['RoundRobinPartitionAssignor'],
    api_version = (0, 10),
    consumer_timeout_ms = 1000,
)
consumer2.assign([TopicPartition(topic='polarities',partition=1)])

In [24]:
print([msg.value.decode('utf-8') for msg in consumer])

['testing9']


In [25]:
print([msg.value.decode('utf-8') for msg in consumer2])

['testing10']


In [12]:
consumer2.assign([TopicPartition(topic='polarities',partition=0)])

In [13]:
consumer.assign([TopicPartition(topic='polarities',partition=1)])

In [14]:
consumer2.assignment()

{TopicPartition(topic='polarities', partition=0)}

In [15]:
consumer.assignment()

{TopicPartition(topic='polarities', partition=1)}

In [16]:
consumer2.poll()

{}