Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STORM-2554: Trident Kafka Spout Refactoring to Include Manual Partition Assignment #2174

Closed
wants to merge 1 commit into from

Conversation

hmcl
Copy link
Contributor

@hmcl hmcl commented Jun 24, 2017

  • Support manual partition assignment with changes in STORM-2541
  • Improve rebalance pause/resume logic

…on Assignment

  - Support manual partition assignment with changes in STORM-2541
  - Improve rebalance pause/resume logic
@kristopherkane
Copy link

I had a parallel effort which arrived at most of the same conclusions before discovering this JIRA.

There is a bug with activate/deactivate where the spout continues to call nextTuple() but never emits new messages. I'm not sure if it is scoped to the manual partition work or is a general bug. Would you consider a fix here for that or a new JIRA?

@srdo
Copy link
Contributor

srdo commented Jul 18, 2017

@hmcl Sorry for not looking at this earlier, I thought it made sense to finish up 2541 first. Could you fix the conflicts? Then I'd be happy to help review :)

@srdo
Copy link
Contributor

srdo commented Aug 10, 2017

I think we should consider if we can improve the way this spout implements the Trident API.

The Trident API is expecting that the Coordinator figures out which partitions exist for a batch. The partitions are passed to the spout executors ("coordinatorMeta"), and it is expected that the Emitter filters that list to get the partitions assigned to itself. Please see

if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
.

Deciding which partitions exist is the responsibility of the Coordinator, but this implementation puts that responsibility in the Emitter, which causes us to have to hack around with e.g. the enum instance or having to drop an emit because the partition is no longer assigned to this task. It will break if the scheduler happens to put the Coordinator in a different worker from any of the Emitter tasks. The Emitter code ends up being confusing, e.g. refreshPartitions does nothing, but logs based on the coordinatorMeta, which may be different from what the spout is actually assigned. I also think we make it easier to maintain OpaquePartitionedTridentSpoutExecutor if we don't have to keep in mind that the Kafka spout doesn't implement the API in the expected way.

We already have the clean separation we need to implement the API as specified, but they're conflated in the Subscription interface. The Coordinator should use PartitionFilter and its own KafkaConsumer instance (which we should add) to get the list of batch partitions instead of asking the KafkaTridentSpoutManager, so we get a nice decoupling from the Emitter. We should put the refresh subscription timer in the Coordinator as well. The Emitter should receive these partitions in getPartitionsForTask, use ManualPartitioner to decide which partitions are assigned to the task, and assign them on the consumer.

We'd need to change either KafkaSpoutConfig or Subscription a bit so we can get at the partitioner and filter classes, but I think it should be doable.

Somewhat related: I noticed that OpaquePartitionedSpoutExecutor was changed in #1995 to fix this spout. It might be good to deprecate the getOrderedPartitions/refreshPartitions methods in 1.x so we can remove them from master. Right now the functionality seems duplicated on the Emitter interface, since getPartitionsForTask has the same purpose.

@hmcl
Copy link
Contributor Author

hmcl commented Aug 10, 2017

@srdo thanks for your comments. Let me think about it. This change builds on the existing code and fixes some of the problems. I was also planning on revisiting this implementation. The way to go about it is either:

  1. Submit this fix and then an improvement with the new design (if we agree it's the right thing to do)
  2. Submit the new design (if we agree it's the right thing to do) without this fix, i.e. on top of the existing code.

@srdo
Copy link
Contributor

srdo commented Aug 12, 2017

@hmcl Sure, take your time.

Assuming we want to make the changes I suggested, then I'd be in favor of just doing those if we can avoid too much API breakage. Otherwise we could make this change on 1.x and then break the API for 2.0?

@hmcl
Copy link
Contributor Author

hmcl commented Aug 14, 2017

@srdo I am evaluating if we can do the change without breaking the API. If so we can go ahead with it. Otherwise, as you suggested, we can go with this change for 1.x-branch and then refactor for Storm 2.0.

@srdo
Copy link
Contributor

srdo commented Aug 30, 2017

@hmcl I went ahead and implemented the changes I suggested here #2300. As you noted, it isn't possible to do without breaking the API. Please take a look when you get the chance.

@hmcl
Copy link
Contributor Author

hmcl commented Aug 30, 2017

@srdo I think that you should have spoken with me before implementing this. This task is assigned to me, and I have an implementation that is pending addressing a few changes. There was no timeframe for it, and although I have been busy with other things, I had all the intention to do it. Furthermore, there have also been other instances that I have bounced some ideas in pull request discussions, and you went ahead and implemented them without asking me if I was planing on working on them. I would appreciate that this does not keep happening. Furthermore, I would like to ask what motivated you to provide this implementation without first speaking with me.

@srdo
Copy link
Contributor

srdo commented Aug 30, 2017

@hmcl Neither https://issues.apache.org/jira/browse/STORM-2691 nor https://issues.apache.org/jira/browse/STORM-2473 is assigned to you, and https://issues.apache.org/jira/browse/STORM-2554 doesn't list any of the problems pointed out in either of those issues. 2554's description says it will "do some refactoring to internal state partition management to make it cleaner and more properly handle partitions reassignment", and given the contents of this PR I assumed you were intending for this to make roughly these changes. I was reinforced in that belief by your comments here #2174 (comment), which indicated that we were discussing either merging this to 1.x and then another solution to master, or providing a different solution for both branches and closing this PR.

I did speak with you, three weeks ago when we discussed how to fix the Trident spout #2174 (comment). Your last comment was #2174 (comment), which I took to mean that you basically agreed that we should make the changes, but that you were considering whether we should keep this PR for 1.x. I had no way to know that you were working on implementing fixes for the 2691/2473 issues as part of 2554. I feel like I communicated pretty clearly that I was beginning work on an implementation, both by notifying you here that I thought we should consider a broad rewrite that would possibly make this PR redundant, and by assigning https://issues.apache.org/jira/browse/STORM-2691 to myself (this is visible to everyone subscribed to the storm-issues mailing list). I'd be happy to explicitly mention you in an issue comment before I begin work if we get into a situation like this again.

Could you elaborate on the other instances you're talking about? The only one I can recall is #2147, and we discussed the alternative fix a bunch on the mailing list before I touched the code, and you gave no indication that you were intending to work on manual partition assignment.

I'm sorry about the misunderstanding, and I'm happy to close #2300 if you have a better solution. I'm not trying to step on your toes or "steal" your issues.

d2r pushed a commit to d2r/storm that referenced this pull request Oct 16, 2018
We are closing stale Pull Requests to make the list more manageable.

Please re-open any Pull Request that has been closed in error.

Closes apache#608
Closes apache#639
Closes apache#640
Closes apache#648
Closes apache#662
Closes apache#668
Closes apache#692
Closes apache#705
Closes apache#724
Closes apache#728
Closes apache#730
Closes apache#753
Closes apache#803
Closes apache#854
Closes apache#922
Closes apache#986
Closes apache#992
Closes apache#1019
Closes apache#1040
Closes apache#1041
Closes apache#1043
Closes apache#1046
Closes apache#1051
Closes apache#1078
Closes apache#1146
Closes apache#1164
Closes apache#1165
Closes apache#1178
Closes apache#1213
Closes apache#1225
Closes apache#1258
Closes apache#1259
Closes apache#1268
Closes apache#1272
Closes apache#1277
Closes apache#1278
Closes apache#1288
Closes apache#1296
Closes apache#1328
Closes apache#1342
Closes apache#1353
Closes apache#1370
Closes apache#1376
Closes apache#1391
Closes apache#1395
Closes apache#1399
Closes apache#1406
Closes apache#1410
Closes apache#1422
Closes apache#1427
Closes apache#1443
Closes apache#1462
Closes apache#1468
Closes apache#1483
Closes apache#1506
Closes apache#1509
Closes apache#1515
Closes apache#1520
Closes apache#1521
Closes apache#1525
Closes apache#1527
Closes apache#1544
Closes apache#1550
Closes apache#1566
Closes apache#1569
Closes apache#1570
Closes apache#1575
Closes apache#1580
Closes apache#1584
Closes apache#1591
Closes apache#1600
Closes apache#1611
Closes apache#1613
Closes apache#1639
Closes apache#1703
Closes apache#1711
Closes apache#1719
Closes apache#1737
Closes apache#1760
Closes apache#1767
Closes apache#1768
Closes apache#1785
Closes apache#1799
Closes apache#1822
Closes apache#1824
Closes apache#1844
Closes apache#1874
Closes apache#1918
Closes apache#1928
Closes apache#1937
Closes apache#1942
Closes apache#1951
Closes apache#1957
Closes apache#1963
Closes apache#1964
Closes apache#1965
Closes apache#1967
Closes apache#1968
Closes apache#1971
Closes apache#1985
Closes apache#1986
Closes apache#1998
Closes apache#2031
Closes apache#2032
Closes apache#2071
Closes apache#2076
Closes apache#2108
Closes apache#2119
Closes apache#2128
Closes apache#2142
Closes apache#2174
Closes apache#2206
Closes apache#2297
Closes apache#2322
Closes apache#2332
Closes apache#2341
Closes apache#2377
Closes apache#2414
Closes apache#2469
d2r pushed a commit to d2r/storm that referenced this pull request Oct 16, 2018
We are closing stale Pull Requests to make the list more manageable.

Please re-open any Pull Request that has been closed in error.

Closes apache#608
Closes apache#639
Closes apache#640
Closes apache#648
Closes apache#662
Closes apache#668
Closes apache#692
Closes apache#705
Closes apache#724
Closes apache#728
Closes apache#730
Closes apache#753
Closes apache#803
Closes apache#854
Closes apache#922
Closes apache#986
Closes apache#992
Closes apache#1019
Closes apache#1040
Closes apache#1041
Closes apache#1043
Closes apache#1046
Closes apache#1051
Closes apache#1078
Closes apache#1146
Closes apache#1164
Closes apache#1165
Closes apache#1178
Closes apache#1213
Closes apache#1225
Closes apache#1258
Closes apache#1259
Closes apache#1268
Closes apache#1272
Closes apache#1277
Closes apache#1278
Closes apache#1288
Closes apache#1296
Closes apache#1328
Closes apache#1342
Closes apache#1353
Closes apache#1370
Closes apache#1376
Closes apache#1391
Closes apache#1395
Closes apache#1399
Closes apache#1406
Closes apache#1410
Closes apache#1422
Closes apache#1427
Closes apache#1443
Closes apache#1462
Closes apache#1468
Closes apache#1483
Closes apache#1506
Closes apache#1509
Closes apache#1515
Closes apache#1520
Closes apache#1521
Closes apache#1525
Closes apache#1527
Closes apache#1544
Closes apache#1550
Closes apache#1566
Closes apache#1569
Closes apache#1570
Closes apache#1575
Closes apache#1580
Closes apache#1584
Closes apache#1591
Closes apache#1600
Closes apache#1611
Closes apache#1613
Closes apache#1639
Closes apache#1703
Closes apache#1711
Closes apache#1719
Closes apache#1737
Closes apache#1760
Closes apache#1767
Closes apache#1768
Closes apache#1785
Closes apache#1799
Closes apache#1822
Closes apache#1824
Closes apache#1844
Closes apache#1874
Closes apache#1918
Closes apache#1928
Closes apache#1937
Closes apache#1942
Closes apache#1951
Closes apache#1957
Closes apache#1963
Closes apache#1964
Closes apache#1965
Closes apache#1967
Closes apache#1968
Closes apache#1971
Closes apache#1985
Closes apache#1986
Closes apache#1998
Closes apache#2031
Closes apache#2032
Closes apache#2071
Closes apache#2076
Closes apache#2108
Closes apache#2119
Closes apache#2128
Closes apache#2142
Closes apache#2174
Closes apache#2206
Closes apache#2297
Closes apache#2322
Closes apache#2332
Closes apache#2341
Closes apache#2377
Closes apache#2414
Closes apache#2469
@asfgit asfgit closed this in #2880 Oct 22, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants