Skip to content

Commit

Permalink
fixup! [FLINK-21386][datastream] Postpone FromElementsFunction serial…
Browse files Browse the repository at this point in the history
…ization to respect later type customization

Changes after reviewing:
* Add `testFromElementsDeducedType` and `testFromElementsPostConstructionType` in `StreamExecutionEnvironmentTest`.
* More meaningful exception message for calling `setOutputType` after serialization/deserialization.
* Use `junit.rules.ExpectedException` instead of try-catch.
* Merge `testSetOutputTypeWithExistingBrokenSerializer` and `testSetOutputTypeWithDifferentSerializer` to one.
  • Loading branch information
kezhuw committed Apr 7, 2021
1 parent c5adc24 commit 493e760
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 55 deletions.
Expand Up @@ -143,7 +143,9 @@ private void serializeElements() throws IOException {
*/
@Override
public void setOutputType(TypeInformation<T> outTypeInfo, ExecutionConfig executionConfig) {
Preconditions.checkState(elements != null, "elements lost during serialization");
Preconditions.checkState(
elements != null,
"The output type should've been specified before shipping the graph to the cluster");
checkIterable(elements, outTypeInfo.getTypeClass());
TypeSerializer<T> newSerializer = outTypeInfo.createSerializer(executionConfig);
if (Objects.equals(serializer, newSerializer)) {
Expand Down
Expand Up @@ -49,6 +49,7 @@
import java.util.NoSuchElementException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -67,6 +68,35 @@ public void fromElementsWithBaseTypeTest2() {
env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello"));
}

@Test
public void testFromElementsDeducedType() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("a", "b");

FromElementsFunction<String> elementsFunction =
(FromElementsFunction<String>) getFunctionFromDataSource(source);
assertEquals(
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(env.getConfig()),
elementsFunction.getSerializer());
}

@Test
public void testFromElementsPostConstructionType() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements("a", "b");
TypeInformation<String> customType = new GenericTypeInfo<>(String.class);

source.returns(customType);

FromElementsFunction<String> elementsFunction =
(FromElementsFunction<String>) getFunctionFromDataSource(source);
assertNotEquals(
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(env.getConfig()),
elementsFunction.getSerializer());
assertEquals(
customType.createSerializer(env.getConfig()), elementsFunction.getSerializer());
}

@Test
@SuppressWarnings("unchecked")
public void testFromCollectionParallelism() {
Expand Down
Expand Up @@ -37,14 +37,15 @@
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
Expand All @@ -61,6 +62,8 @@ public class FromElementsFunctionTest {
private static final String[] STRING_ARRAY_DATA = {"Oh", "boy", "what", "a", "show", "!"};
private static final List<String> STRING_LIST_DATA = Arrays.asList(STRING_ARRAY_DATA);

@Rule public final ExpectedException thrown = ExpectedException.none();

private static <T> List<T> runSource(FromElementsFunction<T> source) throws Exception {
List<T> result = new ArrayList<>();
FromElementsFunction<T> clonedSource = InstantiationUtil.clone(source);
Expand Down Expand Up @@ -90,13 +93,10 @@ public void testStrings() {

@Test
public void testNullElement() throws Exception {
try {
new FromElementsFunction<>("a", null, "b");
fail("expect exception");
} catch (Exception ex) {
assertThat(ex, instanceOf(IllegalArgumentException.class));
assertThat(ex.getMessage(), containsString("contains a null element"));
}
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("contains a null element");

new FromElementsFunction<>("a", null, "b");
}

@Test
Expand Down Expand Up @@ -140,36 +140,11 @@ public void testSetOutputTypeWithSameSerializer() throws Exception {
@Test
@SuppressWarnings({"unchecked", "rawtypes"})
public void testSetOutputTypeWithIncompatibleType() throws Exception {
FromElementsFunction<String> source = new FromElementsFunction<>(STRING_LIST_DATA);

try {
source.setOutputType(
(TypeInformation) BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
fail("expect exception");
} catch (Exception ex) {
assertThat(ex, instanceOf(IllegalArgumentException.class));
assertThat(ex.getMessage(), containsString("not all subclasses of java.lang.Integer"));
}
}

@Test
public void testSetOutputTypeWithDifferentSerializer() throws Exception {
FromElementsFunction<String> source =
new FromElementsFunction<>(
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
STRING_LIST_DATA);

TypeSerializer<String> existingSerializer = source.getSerializer();

source.setOutputType(new GenericTypeInfo<>(String.class), new ExecutionConfig());

TypeSerializer<String> newSerializer = source.getSerializer();

assertNotEquals(existingSerializer, newSerializer);

List<String> result = runSource(source);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("not all subclasses of java.lang.Integer");

assertEquals(STRING_LIST_DATA, result);
FromElementsFunction<String> source = new FromElementsFunction<>(STRING_LIST_DATA);
source.setOutputType((TypeInformation) BasicTypeInfo.INT_TYPE_INFO, new ExecutionConfig());
}

@Test
Expand All @@ -181,9 +156,15 @@ public void testSetOutputTypeWithExistingBrokenSerializer() throws Exception {
new FromElementsFunction<>(
info.createSerializer(new ExecutionConfig()), new DeserializeTooMuchType());

TypeSerializer<DeserializeTooMuchType> existingSerializer = source.getSerializer();

source.setOutputType(
new GenericTypeInfo<>(DeserializeTooMuchType.class), new ExecutionConfig());

TypeSerializer<DeserializeTooMuchType> newSerializer = source.getSerializer();

assertNotEquals(existingSerializer, newSerializer);

List<DeserializeTooMuchType> result = runSource(source);

assertThat(result, hasSize(1));
Expand All @@ -192,29 +173,22 @@ public void testSetOutputTypeWithExistingBrokenSerializer() throws Exception {

@Test
public void testSetOutputTypeAfterTransferred() throws Exception {
try {
FromElementsFunction<String> source =
InstantiationUtil.clone(new FromElementsFunction<>(STRING_LIST_DATA));
thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"The output type should've been specified before shipping the graph to the cluster");

source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
fail("expect exception");
} catch (Exception ex) {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), containsString("elements lost during serialization"));
}
FromElementsFunction<String> source =
InstantiationUtil.clone(new FromElementsFunction<>(STRING_LIST_DATA));
source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new ExecutionConfig());
}

@Test
public void testNoSerializer() throws Exception {
try {
FromElementsFunction<String> source = new FromElementsFunction<>(STRING_LIST_DATA);
thrown.expect(IllegalStateException.class);
thrown.expectMessage("serializer not configured");

runSource(source);
fail("expect exception");
} catch (Exception ex) {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), containsString("serializer not configured"));
}
FromElementsFunction<String> source = new FromElementsFunction<>(STRING_LIST_DATA);
runSource(source);
}

@Test
Expand Down

0 comments on commit 493e760

Please sign in to comment.