Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topology stream-awareness to storm-redis #1760

Closed
wants to merge 2 commits into from

Conversation

mo-getter
Copy link

Allows users to control the streams to which the provided Bolts emit, via a StreamMapper interface, rather than emitting only to the "default" stream.

The existing constructors for the Bolts use a DefaultStreamMapper so that there are no breaking changes in behavior. Although, in the future, it might make more sense to use the InputSourceStreamMapper by default, which emits new tuples to the same stream the input tuple came in on (especially for the RedisFilterBolt).

Allows users to control the streams to which the provided Bolts emit,
via a StreamMapper interface, rather than emitting only to the default
stream.
private final RedisDataTypeDescription.RedisDataType dataType;
private final String additionalKey;

/**
* Constructor for single Redis environment (JedisPool)
* Constructor for single Redis environment (JedisPool).
* Tuples will be emitted to Storm's default streamId.
* @param config configuration for initializing JedisPool
* @param filterMapper mapper containing which datatype, query key that Bolt uses
*/
public RedisFilterBolt(JedisPoolConfig config, RedisFilterMapper filterMapper) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This now creates a lot of coupling between the filter mapper and the stream mapper. Simply because the Filter Mapper is the one that declares the output fields.

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        filterMapper.declareOutputFields(declarer);
    }

So either we need to embrace the coupling and have StreamMapper also be a FilterMapper. (which would require some documentation) or we find a way to fake out FilterMapper and have it declare multiple outputs for what the StreamMapper wants.

I prefer the first one, because it seems like it would be more flexible.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look at this. I see your point, and I agree it makes the most sense for both to be done in the same interface. TL;DR is at the bottom ;-)

The same problem also applies to RedisLookupMapper, which also defines declareOutputFields separately. I just came across STORM-1953.

About your first option, would it make more sense for a FilterMapper to be a StreamMapper, rather than a StreamMapper be a FilterMapper? I'm afraid that making StreamMapper a FilterMapper would introduce too much ambiguity in the lookup and filter bolts (if the constructors accepted both), since we'd have to rely on only the docs to define which object's declareOutputFields would be called. It also would make STORM-1953 worse. Or did you mean the bolts only accept FilterMapper, and have something like this in execute:

if (filterMapper instanceof StreamMapper) {
    String streamId = ((StreamMapper) filterMapper).getStreamId(input, value);
    collector.emit(streamId, input, value);
} else {
    collector.emit(input, value);
}

Either way, if you combine them, one downside is that the provided convenience StreamMapper implementations would have to be sacrificed. Making them abstract probably wouldn't be worth it for something like just specifying the stream.

In case you want to see what having FilterMapper and LookupMapper also extend StreamMapper looks like, I implemented that in a branch here. The flexibility to dynamically choose a stream is there, but the problem is that the trident-related classes also use LookupMapper, and have no need to declare a streamId, yet users will have to implement this method in their LookupMappers. Just returning null is one [not so good] option here, and is also an option when using LookupMapper for bolts (in which case, the existing behavior of emitting to the default stream is maintained).

TL;DR: I can't think of a great solution for what you mentioned, while maintaining user-friendliness of the API, without totally redoing the Mapper interfaces, i.e. STORM-1953. On the other hand, the above commit does maintain full backward compatibility and is probably most convenient for users!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to target this for master? Or do you also want this in 1.x? If it is just master we can play some games with a default method implementation in the FilterMapper interface.

If you want it in 1.x I would suggest that we leave FilterMapper untouched and create a LookupMapper that also has the same, or similar methods to FilterMapper, but is not a FilterMapper. Then you can have a wrapper class that is a LookupMapper, but takes a FilterMapper. The code could then wrap any FilterMapper passed in, and just use the LookupMapper interface.

I prefer the first one with the default methods because it reduce the number of classes and interfaces but is also binary compatible. If we are not on java 8 like 1.x then we cannot use default methods.

d2r pushed a commit to d2r/storm that referenced this pull request Oct 16, 2018
We are closing stale Pull Requests to make the list more manageable.

Please re-open any Pull Request that has been closed in error.

Closes apache#608
Closes apache#639
Closes apache#640
Closes apache#648
Closes apache#662
Closes apache#668
Closes apache#692
Closes apache#705
Closes apache#724
Closes apache#728
Closes apache#730
Closes apache#753
Closes apache#803
Closes apache#854
Closes apache#922
Closes apache#986
Closes apache#992
Closes apache#1019
Closes apache#1040
Closes apache#1041
Closes apache#1043
Closes apache#1046
Closes apache#1051
Closes apache#1078
Closes apache#1146
Closes apache#1164
Closes apache#1165
Closes apache#1178
Closes apache#1213
Closes apache#1225
Closes apache#1258
Closes apache#1259
Closes apache#1268
Closes apache#1272
Closes apache#1277
Closes apache#1278
Closes apache#1288
Closes apache#1296
Closes apache#1328
Closes apache#1342
Closes apache#1353
Closes apache#1370
Closes apache#1376
Closes apache#1391
Closes apache#1395
Closes apache#1399
Closes apache#1406
Closes apache#1410
Closes apache#1422
Closes apache#1427
Closes apache#1443
Closes apache#1462
Closes apache#1468
Closes apache#1483
Closes apache#1506
Closes apache#1509
Closes apache#1515
Closes apache#1520
Closes apache#1521
Closes apache#1525
Closes apache#1527
Closes apache#1544
Closes apache#1550
Closes apache#1566
Closes apache#1569
Closes apache#1570
Closes apache#1575
Closes apache#1580
Closes apache#1584
Closes apache#1591
Closes apache#1600
Closes apache#1611
Closes apache#1613
Closes apache#1639
Closes apache#1703
Closes apache#1711
Closes apache#1719
Closes apache#1737
Closes apache#1760
Closes apache#1767
Closes apache#1768
Closes apache#1785
Closes apache#1799
Closes apache#1822
Closes apache#1824
Closes apache#1844
Closes apache#1874
Closes apache#1918
Closes apache#1928
Closes apache#1937
Closes apache#1942
Closes apache#1951
Closes apache#1957
Closes apache#1963
Closes apache#1964
Closes apache#1965
Closes apache#1967
Closes apache#1968
Closes apache#1971
Closes apache#1985
Closes apache#1986
Closes apache#1998
Closes apache#2031
Closes apache#2032
Closes apache#2071
Closes apache#2076
Closes apache#2108
Closes apache#2119
Closes apache#2128
Closes apache#2142
Closes apache#2174
Closes apache#2206
Closes apache#2297
Closes apache#2322
Closes apache#2332
Closes apache#2341
Closes apache#2377
Closes apache#2414
Closes apache#2469
d2r pushed a commit to d2r/storm that referenced this pull request Oct 16, 2018
We are closing stale Pull Requests to make the list more manageable.

Please re-open any Pull Request that has been closed in error.

Closes apache#608
Closes apache#639
Closes apache#640
Closes apache#648
Closes apache#662
Closes apache#668
Closes apache#692
Closes apache#705
Closes apache#724
Closes apache#728
Closes apache#730
Closes apache#753
Closes apache#803
Closes apache#854
Closes apache#922
Closes apache#986
Closes apache#992
Closes apache#1019
Closes apache#1040
Closes apache#1041
Closes apache#1043
Closes apache#1046
Closes apache#1051
Closes apache#1078
Closes apache#1146
Closes apache#1164
Closes apache#1165
Closes apache#1178
Closes apache#1213
Closes apache#1225
Closes apache#1258
Closes apache#1259
Closes apache#1268
Closes apache#1272
Closes apache#1277
Closes apache#1278
Closes apache#1288
Closes apache#1296
Closes apache#1328
Closes apache#1342
Closes apache#1353
Closes apache#1370
Closes apache#1376
Closes apache#1391
Closes apache#1395
Closes apache#1399
Closes apache#1406
Closes apache#1410
Closes apache#1422
Closes apache#1427
Closes apache#1443
Closes apache#1462
Closes apache#1468
Closes apache#1483
Closes apache#1506
Closes apache#1509
Closes apache#1515
Closes apache#1520
Closes apache#1521
Closes apache#1525
Closes apache#1527
Closes apache#1544
Closes apache#1550
Closes apache#1566
Closes apache#1569
Closes apache#1570
Closes apache#1575
Closes apache#1580
Closes apache#1584
Closes apache#1591
Closes apache#1600
Closes apache#1611
Closes apache#1613
Closes apache#1639
Closes apache#1703
Closes apache#1711
Closes apache#1719
Closes apache#1737
Closes apache#1760
Closes apache#1767
Closes apache#1768
Closes apache#1785
Closes apache#1799
Closes apache#1822
Closes apache#1824
Closes apache#1844
Closes apache#1874
Closes apache#1918
Closes apache#1928
Closes apache#1937
Closes apache#1942
Closes apache#1951
Closes apache#1957
Closes apache#1963
Closes apache#1964
Closes apache#1965
Closes apache#1967
Closes apache#1968
Closes apache#1971
Closes apache#1985
Closes apache#1986
Closes apache#1998
Closes apache#2031
Closes apache#2032
Closes apache#2071
Closes apache#2076
Closes apache#2108
Closes apache#2119
Closes apache#2128
Closes apache#2142
Closes apache#2174
Closes apache#2206
Closes apache#2297
Closes apache#2322
Closes apache#2332
Closes apache#2341
Closes apache#2377
Closes apache#2414
Closes apache#2469
@asfgit asfgit closed this in #2880 Oct 22, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants