Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Allow marking of earlier offsets #197

Merged
merged 6 commits into from
Jan 4, 2018
Merged

Conversation

jerrygb
Copy link

@jerrygb jerrygb commented Dec 8, 2017

This commit is in alignment with the sarama library commits to reset offsets
IBM/sarama@b966238#diff-3ca6d659defd100fe2de43adf2b8f41e
IBM/sarama@96fa1c8#diff-3ca6d659defd100fe2de43adf2b8f41e

@jerrygb
Copy link
Author

jerrygb commented Dec 8, 2017

I believe it might be a good idea to combine these with changes from @aravindvs #179

@jerrygb
Copy link
Author

jerrygb commented Dec 11, 2017

@dim Build failure is due to kafka mirror being incorrect. I have inserted a fix @ #198

consumer.go Outdated
//
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
c.subs.Fetch(msg.Topic, msg.Partition).MarkOffset(msg.Offset+1, metadata)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't that call .ResetOffset(...)

Copy link
Member

@dim dim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also hugely appreciate if you could add a test for this. It should not be too difficult.

@jerrygb
Copy link
Author

jerrygb commented Dec 27, 2017

@dim I have fixed as per the comment. You were right.

Additionally, added some tests. Thank you.

@jerrygb jerrygb closed this Dec 28, 2017
@jerrygb
Copy link
Author

jerrygb commented Dec 28, 2017

@dim reopening for travis rebuild. Last build failed, due to seemingly okay race test.

https://travis-ci.org/bsm/sarama-cluster/jobs/322296975

  • Go: 1.8.x
  • SCALA_VERSION: 2.12
  • KAFKA_VERSION: 0.11.0.1

@jerrygb jerrygb reopened this Dec 28, 2017
cluster_test.go Outdated
@@ -21,6 +21,7 @@ var (
testKafkaRoot = "kafka_2.12-1.0.0"
testKafkaAddrs = []string{"127.0.0.1:29092"}
testTopics = []string{"topic-a", "topic-b"}
testTopicsReset = "topic-c"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls use gofmt or goimports

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, no real need to declare this as global, it's only used once

Copy link
Author

@jerrygb jerrygb Jan 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True and formatted all the files. Sorry about that.

Removed globals assuming we dont need another fuzzing test.

cluster_test.go Outdated
@@ -73,6 +74,10 @@ var _ = BeforeSuite(func() {
"-name", "kafkaServer", "kafka.Kafka",
testDataDir("server.properties"),
)
if _, err := os.Stat(testKafkaData); err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for that clause, instead just the below will do:

Expect(os.RemoveAll(testKafkaData)).To(Succeed())

consumer_test.go Outdated
@@ -315,6 +340,94 @@ var _ = Describe("Consumer", func() {
Expect(uniques).To(HaveLen(15000))
})

It("should consume/commit/reset/resume", func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I am not really sure we need another fuzzing test for this. I don't see the added value. what you have done in https://github.com/bsm/sarama-cluster/pull/197/files#diff-e85d03976d9766c4d00ac176511d4f06R188 seems enough, please remove that test.

Copy link
Author

@jerrygb jerrygb Jan 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the test. This was meant to check that we do actually rewind and, whether the first offset consumed is actually the one we expected. The earlier fuzzing test actually doesnt deal specifically with start offsets, right?

Rather it deals with consuming atleast fixed number of unique entries (partition/offset pairs)?

I have removed it as you advised. But, let me know if we need check the start offsets when we start consumption.

@jerrygb
Copy link
Author

jerrygb commented Jan 3, 2018

Thank you @dim. Fixed as per comments.

Also removed the additional fuzzing test. This was meant to check whether we do actually rewind properly and check for the the first offset consumed. I suppose the earlier fuzzing test actually deal specifically with start offsets, right?

For now, I have removed it as you advised. But, let me know if you need to me add those back in.

Copy link
Member

@dim dim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jerrygb thanks so much for the effort, much appreciated!

@dim dim merged commit baf05b3 into bsm:master Jan 4, 2018
@jerrygb jerrygb deleted the issue-reset-offsets branch January 4, 2018 14:45
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants