Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9976][streaming] Remove unnecessary generic parameters #6437

Merged
merged 4 commits into from Jul 31, 2018

Conversation

zentol
Copy link
Contributor

@zentol zentol commented Jul 27, 2018

What is the purpose of the change

This PR removes unnecessary generic parameter from format builder methods of the StreamingFileSink.
These are inconsistent with other existing methods that use the already defined bucket ID type of the builder, and as of right now would always lead to ClassCastExceptions if one tried to use a different type.

@aljoscha
Copy link
Contributor

Isn't this meant for the case where you want to change the BucketID, so you actually need the reinterpretation?

@zentol
Copy link
Contributor Author

zentol commented Jul 27, 2018

but the bucket ID type is already defined in the constructor. you can't just change that.

@aljoscha
Copy link
Contributor

I think the Builders are done wrong. Instead of reusing the builder, the builder should be immutable and each call should return a new builder that has fields from the old builder and the new value that the user wants to set. This way we don't need to reinterpret. WDYT?

@zentol
Copy link
Contributor Author

zentol commented Jul 27, 2018

I think that would be a nicer approach. The fact that this compiles and runs without error is rather disgusting:

final RowFormatBuilder<String, String> original = forRowFormat(new Path("test"), (Encoder<String>) (element, stream) -> stream.write(0));
final RowFormatBuilder<String, Integer> casted = original
	.withBucketerAndPolicy(new Bucketer<String, Integer>() {
		@Override
		public Integer getBucketId(String element, Context context) {
			return 0;
		}

		@Override
		public SimpleVersionedSerializer<Integer> getSerializer() {
			return null;
		}
	}, new RollingPolicy<String, Integer>() {
		@Override
		public boolean shouldRollOnCheckpoint(PartFileInfo<Integer> partFileState) throws IOException {
			return false;
		}

		@Override
		public boolean shouldRollOnEvent(PartFileInfo<Integer> partFileState, String element) throws IOException {
			return false;
		}

		@Override
		public boolean shouldRollOnProcessingTime(PartFileInfo<Integer> partFileState, long currentTime) throws IOException {
			return false;
		}
	});

if (!original.equals(casted)) {
	throw new RuntimeException();
}

@zentol
Copy link
Contributor Author

zentol commented Jul 27, 2018

@aljoscha I've reworked the builders. This introduced a small gotcha for tests; if you set the bucketFactory, and afterwards change the type the factory is overridden with a default factory.

@aljoscha
Copy link
Contributor

👍 that's better now

@yanghua
Copy link
Contributor

yanghua commented Jul 27, 2018

Shall we also change the PR's title?

@kl0u
Copy link
Contributor

kl0u commented Jul 27, 2018

I agree that the code looks nicer now, but essentially the only thing changed is that in the:

if (!original.equals(casted)) {
	throw new RuntimeException();
}

now we throw the RuntimeException, right?

@@ -205,33 +205,40 @@ private StreamingFileSink(
this.rollingPolicy = DefaultRollingPolicy.create().build();
}

RowFormatBuilder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor can be private. Actually also the other constructor could be, as it is called by the enclosing class, but if it were to move the builder to a different file, then it would have a problem. So I would recommend to make the new constructor private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also call this constructor from the other one, just to have a single point of assignment of variables.

@@ -275,22 +282,30 @@ private StreamingFileSink(
this.bucketer = Preconditions.checkNotNull(bucketer);
}

BulkFormatBuilder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, this constructor can be private and call this constructor from the previous one.

@zentol
Copy link
Contributor Author

zentol commented Jul 27, 2018

yes, the functionality is the same, and in my example the test would now fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants