Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-11706] Add the SumFunction to support KeyedStream.sum with field which is a vector #7786

Conversation

wangpeibin713
Copy link
Contributor

What is the purpose of the change

Brief change log

  • add the class:
    • add ArraySum in SumFucntion to support KeyedStream.sum with the field which is array
  • add unit test in follow ut
    • add unit test in AggregationFunctionTest and DataStreamTest

Verifying this change

This change added tests and can be verified as follows:

  • Added integration tests for end-to-end deployment with small data collection

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with@Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not documented

- add ArraySum in SumFucntion to support KeyedStream.sum with the field which is array.
- add unit test in AggregationFunctionTest and DataStreamTest
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 21, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot approve description to approve the 1st aspect (similarly, it also supports the consensus, architecture and quality keywords)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval

@wangpeibin713
Copy link
Contributor Author

wangpeibin713 commented Feb 21, 2019

My personal Travis test passed(https://travis-ci.org/wangpeibin713/flink/builds/496369161)
How can I restart the Travis test for this pr without new commit

@klion26
Copy link
Member

klion26 commented Feb 21, 2019

@wangpeibin713 thank you for your contribution, but I think this is not needed. I think flatMap cloud solve the problem you described in the issue.

@wangpeibin713
Copy link
Contributor Author

@wangpeibin713 thank you for your contribution, but I think this is not needed. I think flatMap cloud solve the problem you described in the issue.

@klion26 thanks for your review. I think this pr will help the user to sum the value for convenience.

with flatMap, people have to convert the tuple (such as Tuple2<Long, Integer[]> while the length of the Integer[] is 3 ) to a flat tuple ( Tuple4<Long, Integer, Integer,Integer>) then sum with fields (1,2,3).

In my opinion, It's too heavy for user. And When the length of the Integer[] changed, we have to change the Tuple4 to TupleX in java code ?

If I make some mistake with your review. please let me know. Thanks very much.

@klion26
Copy link
Member

klion26 commented Feb 21, 2019

How about the below code

src.flatMap(new FlatMapFunction<Tuple2<Long, Integer[]>, Tuple2<Long, Integer>>() {
	@Override
	public void flatMap(Tuple2<Long, Integer[]> value, Collector<Tuple2<Long, Integer>> out) throws Exception {
	    for (Integer v : value.f1) {
	        out.collect(new Tuple2<>(value.f0, v));
            }
	}
    }).keyBy(0)
    .sum(1)
    .print();

@wangpeibin713
Copy link
Contributor Author

wangpeibin713 commented Feb 21, 2019

by the way, Sum Aggregator with multi fields with KeyedStream is also not supported now .
I have another pr for this feature. #7638

@wangpeibin713
Copy link
Contributor Author

wangpeibin713 commented Feb 21, 2019

DataStream<Tuple2<Long, Integer[]>> src = env.fromCollection(Arrays.asList(
	new Tuple2<>(1L, new Integer[]{2, 4}),
	new Tuple2<>(1L, new Integer[]{3, 6}),
	new Tuple2<>(1L, new Integer[]{4, 8})
));

with the flatmap function as your code, the output of the datastream (show as above) is :

(1,2)
(1,6)
(1,9)
(1,15)
(1,19)
(1,27)

output is expected to :

(1,[2, 4])
(1,[5, 10])
(1,[9, 18])

How about the below code

src.flatMap(new FlatMapFunction<Tuple2<Long, Integer[]>, Tuple2<Long, Integer>>() {
	@Override
	public void flatMap(Tuple2<Long, Integer[]> value, Collector<Tuple2<Long, Integer>> out) throws Exception {
	    for (Integer v : value.f1) {
	        out.collect(new Tuple2<>(value.f0, v));
            }
	}
    }).keyBy(0)
    .sum(1)
    .print();

@klion26
Copy link
Member

klion26 commented Feb 21, 2019

how about some code like below

int index = 0
for (Integer v : value.f1) {
	 out.collect(new Tuple2<>(String.format("%d_%d", value.f0, index++), v));
}

@wangpeibin713
Copy link
Contributor Author

wangpeibin713 commented Feb 21, 2019

After this transform function done, some function(maybe flatmap + keyBy + reduce ) should be called to combine the result value into Integer[]. It's not inconvenient for the developer.

so I want to introduce this feature. It's transparent to developer.

how about some code like below

int index = 0
for (Integer v : value.f1) {
	 out.collect(new Tuple2<>(String.format("%d_%d", value.f0, index++), v));
}

@dawidwys
Copy link
Contributor

Personally I don't like that semantic. Sum operation on array is not well defined. Some users could expect that result of plus on arrays should be concatenation of those. Therefore I am personally against introducing this feature.

@wangpeibin713 wangpeibin713 changed the title [FLINK-11706] Add the SumFunction to support KeyedStream.sum [FLINK-11706] Add the SumFunction to support KeyedStream.sum with Array Feb 22, 2019
@wangpeibin713 wangpeibin713 changed the title [FLINK-11706] Add the SumFunction to support KeyedStream.sum with Array [FLINK-11706] Add the SumFunction to support KeyedStream.sum with field which is array like vector sum Feb 22, 2019
@wangpeibin713 wangpeibin713 changed the title [FLINK-11706] Add the SumFunction to support KeyedStream.sum with field which is array like vector sum [FLINK-11706] Add the SumFunction to support KeyedStream.sum with field which is a vector Feb 22, 2019
@dawidwys
Copy link
Contributor

Closing this for now. If you still think it should go in, please reach consensus in the JIRA first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants