Skip to content

Conversation

gallenvara
Copy link
Contributor

Sometime user have better knowledge of the source data, and they can build customized data distribution to do range partition more efficiently.

@gallenvara gallenvara force-pushed the flink-2997 branch 2 times, most recently from a3cd265 to e91407d Compare March 9, 2016 04:30
@gallenvara
Copy link
Contributor Author

@fhueske @ChengXiangLi Can you please help me with review work? The error of CI build failure is not relevant.


String expected = "[(1), (4), (2), (3), (5), (12)]";
assertEquals(expected, out1.collect().toString());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add a test which use 2 fields as partition key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, i will modify the test code.

@ChengXiangLi
Copy link

Just a minor comment, mostly looks good to me. @fhueske , do you want to take a look at this PR?

this.pKeys = pKeys;
this.partitionLocationName = partitionLocationName;
this.customPartitioner = customPartitioner;
this.distribution = distribution;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a check that a distribution is only set if pMethod == PartitionMethod.RANGE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other pMethod like hashPartition and customPartition don't need the distribution and the distribution use its default value: null, i think no check here also can make sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check would prevent an invalid parameter combination and also shows readers of the code that such a combination is not valid. Hence, I think the check should be added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check here that the key types of distribution.getKeyTypes() are equal to pKeys.getKeyFieldTypes().

@fhueske
Copy link
Contributor

fhueske commented Mar 10, 2016

Thanks @gallenvara for opening the PR. Looks mostly good but the tests need to be improved, IMO.

It would also be good to extend the DataDistribution interface by adding a method TypeInformation[] getKeyTypes(). This will allow us to validate the key types of the DataDistribution and the specified keys in the PartitionOperator.

@gallenvara
Copy link
Contributor Author

Hi, @fhueske . I have modified the relevant code. I still use the generic class CustomDistribution for the tests because it is not flexible with build-in data, IMO. Also it is not good if mix the logic of distribution to the tests. Can you please help with review?

@fhueske
Copy link
Contributor

fhueske commented Mar 15, 2016

Sorry @gallenvara, I was busy the last days. I will look into this PR tomorrow.

prPlanNode.setParallelism(targetParallelism);
GlobalProperties globalProperties = new GlobalProperties();
globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING));
globalProperties.setRangePartitioned(new Ordering(0, null, Order.ASCENDING), channel.getDataDistribution());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you pass null instead of channel.getDataDistribution() to make it clear that no provided data distribution is available?

checkPartition = false;
}
}
out.collect(checkPartition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to emit something if assert is done in function.

@fhueske
Copy link
Contributor

fhueske commented Mar 18, 2016

I had a few comments, but we are getting closer :-)

@gallenvara
Copy link
Contributor Author

@fhueske , thanks a lot for review work, codes have been modified based on your advice. I change the second test with modifying the range boundary from (bucketNum+1)*7,bucketNum*2+3 to (bucketNum+1)*(bucketNum+2)/2,bucketNum+1. The reason for this is for the record Tuple3(15, 5L, "Comment#9"), it will be emitted to partition:2 with first field and partition:1 with second field and the test will be failed.

@gallenvara gallenvara force-pushed the flink-2997 branch 4 times, most recently from 7094543 to ecdb437 Compare March 20, 2016 19:16
if (distribution != null) {
Preconditions.checkArgument(distribution.getNumberOfFields() == pKeys.getNumberOfKeyFields(), "The number of key fields in the distribution and range partitioner should be the same.");
for (int i = 0; i < pKeys.getNumberOfKeyFields(); i++) {
Preconditions.checkArgument(distribution.getKeyTypes()[i].equals(pKeys.getKeyFieldTypes()[i]), "The types of key from the distribution and range partitioner are not equal.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you fetch the key type arrays only once and not for every key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code modified and rebase the new commit with previous one.( you must have stayed up late last night :) )

/**
* The class is used to do the tests of range partition with customed data distribution.
*/
public static class TestDataDist implements DataDistribution {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two distributions defined by this class have not a lot in common. How about we define two classes, one for each distribution?

@gallenvara
Copy link
Contributor Author

@fhueske , PR has been updated.

public static class TestDataDist2 implements DataDistribution {

public int rightBoundary[] = new int[]{6, 4, 9, 1, 2};
private int dim;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dim should always be 2. Remove the field, the constructor, and update write() and read().

@fhueske
Copy link
Contributor

fhueske commented Mar 23, 2016

Thanks for the quick updates @gallenvara! I think we are almost done. The test distribution and input data looks good. Only the boundary checks need to be fixed.
Thanks, Fabian

@gallenvara
Copy link
Contributor Author

@fhueske codes has been modified :)

@fhueske
Copy link
Contributor

fhueske commented Mar 23, 2016

Thanks for the fast update. The PR looks good. :-)
I will merge it.

@asfgit asfgit closed this in b8299bf Mar 23, 2016
fijolekProjects pushed a commit to fijolekProjects/flink that referenced this pull request May 1, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants