Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to start from beginning of topic using on_assign hook #224

Closed
xrl opened this issue Jul 31, 2017 · 2 comments · Fixed by #268
Closed

Unable to start from beginning of topic using on_assign hook #224

xrl opened this issue Jul 31, 2017 · 2 comments · Fixed by #268
Labels

Comments

@xrl
Copy link

xrl commented Jul 31, 2017

I am unable to recreate the behavior of the kafka-console-consumer.sh script with my python code. I am using the broker-stored offsets (new style API).

I have a topic io which I am trying to read from the beginning with python. This topic already has data in it (generated by a confluent_kafka Producer).

When I run the python consumer I see the consumer but it does not have any partition information:

$ ./bin/kafka-consumer-groups.sh --bootstrap kafka-0.io:9092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

console-consumer-606
etl-3
console-consumer-66817
$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0.io:9092 --describe --group io-etl-3
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).


TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
$

The consumer code looks like the references:

$ cat kafka-to-postgres.py
import os
import requests
import time
import psycopg2
import json

from confluent_kafka import Consumer, KafkaError

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from models import Device

#### SQLALCHEMY CONNECTION ###
print "initializing db connection"
engine = create_engine( "postgresql+psycopg2://" +  os.environ['POSTGRES_USER'] + ":" + os.environ['POSTGRES_PW'] + "@" + os.environ['POSTGRES_HOST'] + ":" + os.environ["POSTGRES_SERVICE_PORT_PSQL"] + "/" + os.environ['POSTGRES_DB'] )

s = sessionmaker(bind=engine)
session = s()

#### KAKFA CONNECTTION ####
print "initializing kafka connection"
c = Consumer({
    'bootstrap.servers': os.environ['KAFKA_URL'] ,
    'group.id':          os.environ['KAFKA_CONSUMER_GROUP'],
    'auto.offset.reset': 'smallest',
})

def on_assign (c, ps):
  print("on_assign!!!")
  for p in ps:
    import pdb; pdb.set_trace()
    print("on_assigning")
    p.offset=-10
    c.assign(ps)
c.subscribe(['io'], on_assign=on_assign)

print "entering run loop"
running = True
while running:
    print "going to poll"
    msg = c.poll(timeout=1)
    if msg and not msg.error():
       # snip
    elif msg and msg.error().code() != KafkaError._PARTITION_EOF:
        print("error: %s"  % msg.error().str())
        running = False
    elif msg and msg.error():
        print("got error: %s" % ( msg.error().str() ) )
    else:
        print("got message %s" % (msg) )
c.close()
print "exiting"
exit(1)

With this output:

# KAFKA_CONSUMER_GROUP=io-etl-3 python kafka-to-postgres.py
initializing db connection
initializing kafka connection
entering run loop
going to poll
on_assign!!!
> /kafka-to-postgres.py(33)on_assign()
-> print("on_assigning")
(Pdb) l
 28
 29  	def on_assign (c, ps):
 30  	  print("on_assign!!!")
 31  	  for p in ps:
 32  	    import pdb; pdb.set_trace()
 33  ->	    print("on_assigning")
 34  	    p.offset=-10
 35  	    c.assign(ps)
 36  	c.subscribe(['io'], on_assign=on_assign)
 37
 38  	print "entering run loop"
(Pdb) p ps
[TopicPartition{topic=io,partition=0,offset=-1001,error=None}, TopicPartition{topic=io,partition=1,offset=-1001,error=None}]
(Pdb) p p
TopicPartition{topic=io,partition=0,offset=-1001,error=None}
(Pdb)
TopicPartition{topic=io,partition=0,offset=-1001,error=None}
(Pdb) p p.offset
-1001L
(Pdb) c
on_assigning
> /kafka-to-postgres.py(32)on_assign()
-> import pdb; pdb.set_trace()
(Pdb) c
on_assigning
got message None
going to poll
got message None
going to poll
got message None
going to poll

so no data comes through. I have tried setting the partition offset to 0, -10. The default value provided to the on_assign callback is an offset of -1001L, if that makes any difference.

Compare to the output from invoking the kafka console consumer:

$ ./bin/kafka-console-consumer.sh --bootstrap-server kafka-0.io:9092 --topic io --from-beginning --timeout-ms=60000
# DATA DELUGE

and the offset inspector showing:

$ ./bin/kafka-consumer-groups.sh --bootstrap-server kafka-0:9092 --describe --group console-consumer-72507
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

Consumer group 'console-consumer-72507' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
io                          1          2393            2393            0          -                                                 -                              -
io                          0          2393            2393            0          -                                                 -

Why isn't the on_assign hook making any difference? I should see some kind of data coming through.

@xrl
Copy link
Author

xrl commented Aug 4, 2017

OK, this had nothing to do with the unassign hook. I was missing the 'default.topic.config': {'auto.offset.reset': 'smallest'} on the Consumer. This value isn't required in the kafka-python library. And it's surprising that the consumer had no partition assignments whatsoever, would that be considered a bug?

@edenhill
Copy link
Contributor

edenhill commented Aug 9, 2017

-10 is not a valid offset.
If you want to force the consumer to start at the earliest available offset regardless of previously committed offsets you should set the .offset to confluent_kafka.OFFSET_BEGINNING before calling assign().

Also note that kafka-consumer-groups.sh is displaying the committed offsets

@edenhill edenhill closed this as completed Sep 1, 2017
johnistan added a commit to johnistan/confluent-kafka-python that referenced this issue Oct 25, 2017
johnistan added a commit to johnistan/confluent-kafka-python that referenced this issue Oct 25, 2017
Expose offsets_for_times consumer method. closes confluentinc#224
johnistan added a commit to johnistan/confluent-kafka-python that referenced this issue Oct 26, 2017
Expose offsets_for_times consumer method. closes confluentinc#224

Expose offsets_for_times consumer method. closes confluentinc#224
johnistan added a commit to johnistan/confluent-kafka-python that referenced this issue Oct 26, 2017
Expose offsets_for_times consumer method. closes confluentinc#224

Expose offsets_for_times consumer method. closes confluentinc#224

Expose offsets_for_times consumer method. closes confluentinc#224
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants