From 1c3c2d4e9235ae13ddad0d8524544b191e541c19 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Mon, 12 Jan 2015 21:11:09 +0100 Subject: [PATCH 1/4] [FLINK-1391] Add support for using Avro-POJOs and Avro types with Kryo Conflicts: flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java Conflicts: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java --- docs/example_connectors.md | 27 + docs/programming_guide.md | 2 +- flink-addons/flink-avro/pom.xml | 30 +- .../flink/api/avro/EncoderDecoderTest.java | 9 +- .../org/apache/flink/api/io/avro/.gitignore | 1 + .../flink/api/io/avro/AvroPojoTest.java | 157 ++++ .../io/avro/AvroRecordInputFormatTest.java | 144 +++- .../flink/api/io/avro/generated/Colors.java | 32 - .../flink/api/io/avro/generated/User.java | 755 ------------------ .../src/test/resources/avro/user.avsc | 10 +- .../common/typeutils/ComparatorTestBase.java | 4 +- flink-java/pom.xml | 9 +- .../flink/api/java/ExecutionEnvironment.java | 2 +- .../api/java/typeutils/AvroTypeInfo.java | 72 ++ .../api/java/typeutils/TypeExtractor.java | 8 +- .../typeutils/runtime/KryoSerializer.java | 110 ++- .../typeutils/runtime/PojoComparator.java | 7 +- pom.xml | 4 +- tools/maven/suppressions.xml | 1 + 19 files changed, 551 insertions(+), 833 deletions(-) create mode 100644 flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore create mode 100644 flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java delete mode 100644 flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java delete mode 100644 flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java create mode 100644 flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java diff --git a/docs/example_connectors.md b/docs/example_connectors.md index 28f35bb78e074..fcd65afe60c98 100644 --- a/docs/example_connectors.md +++ b/docs/example_connectors.md @@ -69,6 +69,33 @@ users to use all existing Hadoop input formats with Flink. This section shows some examples for connecting Flink to other systems. [Read more about Hadoop compatibility in Flink](hadoop_compatibility.html). +## Avro support in Flink + +Flink has extensive build-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink. +Also, the serialization framework of Flink is able to handle classes generated from Avro schemas. + +In order to read data from an Avro file, you have to specify an `AvroInputFormat`. + +**Example**: + +~~~java +AvroInputFormat users = new AvroInputFormat(in, User.class); +DataSet usersDS = env.createInput(users); +~~~ + +Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example: + +~~~java +usersDS.groupBy("name") +~~~ + + +Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. + +Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key. +Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible! + + ### Access Microsoft Azure Table Storage diff --git a/docs/programming_guide.md b/docs/programming_guide.md index 7a2211df9d4b2..f41fc97f1c7db 100644 --- a/docs/programming_guide.md +++ b/docs/programming_guide.md @@ -982,7 +982,7 @@ These are valid expressions for the example POJO above: - `complex.word.f2`: Selects the last field in the Tuple3. - `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key. -Please note that you can only use types inside POJOs that Flink is able to serialize. Currently, we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`). +Please note that you can only use types inside POJOs that Flink is able to serialize. Currently, we are using [Kryo](https://github.com/EsotericSoftware/kryo) to serialize arbitrary objects (such as `java.util.Collection`). ### Define key using a Key Selector Function {:.no_toc} diff --git a/flink-addons/flink-avro/pom.xml b/flink-addons/flink-avro/pom.xml index a0d8d9d880ab9..dfc9105ccd7fb 100644 --- a/flink-addons/flink-avro/pom.xml +++ b/flink-addons/flink-avro/pom.xml @@ -41,7 +41,8 @@ under the License. flink-java ${project.version} - + + org.apache.flink flink-clients @@ -53,6 +54,14 @@ under the License. avro + + + org.apache.flink + flink-core + ${project.version} + test-jar + test + org.apache.flink @@ -116,7 +125,24 @@ under the License. - + + + org.apache.avro + avro-maven-plugin + 1.7.7 + + + generate-sources + + schema + + + ${project.basedir}/src/test/resources/avro + ${project.basedir}/src/test/java/ + + + + diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java index 07244576f7bc9..8f14cb3248f78 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java @@ -29,9 +29,11 @@ import java.util.Map; import java.util.Random; +import org.apache.avro.generic.GenericData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.flink.api.io.avro.generated.Colors; +import org.apache.flink.api.io.avro.generated.Fixed16; import org.apache.flink.api.io.avro.generated.User; import org.apache.flink.util.StringUtils; import org.junit.Test; @@ -232,8 +234,11 @@ public void testGeneratedObjectWithNullableFields() { map.put("1", 1L); map.put("2", 2L); map.put("3", 3L); - - User user = new User("Freudenreich", 1337, "macintosh gray", 1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map); + + byte[] b = new byte[16]; + new Random().nextBytes(b); + Fixed16 f = new Fixed16(b); + User user = new User("Freudenreich", 1337, "macintosh gray", 1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map, f, new Boolean(true)); testObjectSerialization(user); } diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore new file mode 100644 index 0000000000000..dc9b2375c7ad7 --- /dev/null +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore @@ -0,0 +1 @@ +generated \ No newline at end of file diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java new file mode 100644 index 0000000000000..6ff483627d0ae --- /dev/null +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.io.avro; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.io.avro.generated.User; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.fs.Path; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.util.Collector; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; + +@RunWith(Parameterized.class) +public class AvroPojoTest extends MultipleProgramsTestBase { + public AvroPojoTest(ExecutionMode mode) { + super(mode); + } + + private File inFile; + private String resultPath; + private String expected; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + inFile = tempFolder.newFile(); + AvroRecordInputFormatTest.writeTestFile(inFile); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expected, resultPath); + } + + @Test + public void testSimpleAvroRead() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat users = new AvroInputFormat(in, User.class); + DataSet usersDS = env.createInput(users) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction() { + @Override + public User map(User value) throws Exception { + value.setTypeMap(null); + return value; + } + }); + + usersDS.writeAsText(resultPath); + + env.execute("Simple Avro read job"); + + + expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; + } + + @Test + public void testKeySelection() throws Exception { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat users = new AvroInputFormat(in, User.class); + DataSet usersDS = env.createInput(users); + + DataSet> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction>() { + @Override + public void reduce(Iterable values, Collector> out) throws Exception { + for(User u : values) { + out.collect(new Tuple2(u.getName().toString(), 1)); + } + } + }); + + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + expected = "(Alyssa,1)\n(Charlie,1)\n"; + } + + /** + * Test some know fields for grouping on + */ + @Test + public void testAllFields() throws Exception { + for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { + testField(fieldName); + } + } + + private void testField(final String fieldName) throws Exception { + before(); + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat users = new AvroInputFormat(in, User.class); + DataSet usersDS = env.createInput(users); + + DataSet res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction() { + @Override + public void reduce(Iterable values, Collector out) throws Exception { + for(User u : values) { + out.collect(u.get(fieldName)); + } + } + }); + res.writeAsText(resultPath); + env.execute("Simple Avro read job"); + if(fieldName.equals("name")) { + expected = "Alyssa\nCharlie"; + } else if(fieldName.equals("type_enum")) { + expected = "GREEN\nRED\n"; + } else if(fieldName.equals("type_double_test")) { + expected = "123.45\n1.337\n"; + } else { + Assert.fail("Unknown field"); + } + + after(); + } +} diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java index d8d8b46a029fd..1ec4a8a4bdd5f 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java @@ -27,13 +27,24 @@ import java.util.List; import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.file.FileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.util.Utf8; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.io.avro.generated.Colors; import org.apache.flink.api.io.avro.generated.User; import org.apache.flink.api.java.io.AvroInputFormat; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; @@ -48,7 +59,7 @@ */ public class AvroRecordInputFormatTest { - private File testFile; + public File testFile; final static String TEST_NAME = "Alyssa"; @@ -65,24 +76,25 @@ public class AvroRecordInputFormatTest { final static String TEST_MAP_KEY2 = "KEY 2"; final static long TEST_MAP_VALUE2 = 17554L; - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroInputFormatTest", null); - + private Schema userSchema = new User().getSchema(); + + + public static void writeTestFile(File testFile) throws IOException { ArrayList stringArray = new ArrayList(); stringArray.add(TEST_ARRAY_STRING_1); stringArray.add(TEST_ARRAY_STRING_2); - + ArrayList booleanArray = new ArrayList(); booleanArray.add(TEST_ARRAY_BOOLEAN_1); booleanArray.add(TEST_ARRAY_BOOLEAN_2); - + HashMap longMap = new HashMap(); longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - + + User user1 = new User(); + user1.setName(TEST_NAME); user1.setFavoriteNumber(256); user1.setTypeDoubleTest(123.45d); @@ -91,22 +103,24 @@ public void createFiles() throws IOException { user1.setTypeArrayBoolean(booleanArray); user1.setTypeEnum(TEST_ENUM_COLOR); user1.setTypeMap(longMap); - + // Construct via builder User user2 = User.newBuilder() - .setName("Charlie") - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList()) - .setTypeArrayBoolean(new ArrayList()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap()) - .build(); + .setName("Charlie") + .setFavoriteColor("blue") + .setFavoriteNumber(null) + .setTypeBoolTest(false) + .setTypeDoubleTest(1.337d) + .setTypeNullTest(null) + .setTypeLongTest(1337L) + .setTypeArrayString(new ArrayList()) + .setTypeArrayBoolean(new ArrayList()) + .setTypeNullableArray(null) + .setTypeEnum(Colors.RED) + .setTypeMap(new HashMap()) + .setTypeFixed(null) + .setTypeUnion(null) + .build(); DatumWriter userDatumWriter = new SpecificDatumWriter(User.class); DataFileWriter dataFileWriter = new DataFileWriter(userDatumWriter); dataFileWriter.create(user1.getSchema(), testFile); @@ -114,7 +128,17 @@ public void createFiles() throws IOException { dataFileWriter.append(user2); dataFileWriter.close(); } - + @Before + public void createFiles() throws IOException { + testFile = File.createTempFile("AvroInputFormatTest", null); + writeTestFile(testFile); + } + + + /** + * Test if the AvroInputFormat is able to properly read data from an avro file. + * @throws IOException + */ @Test public void testDeserialisation() throws IOException { Configuration parameters = new Configuration(); @@ -159,9 +183,79 @@ public void testDeserialisation() throws IOException { format.close(); } - + + /** + * Test if the Flink serialization is able to properly process GenericData.Record types. + * Usually users of Avro generate classes (POJOs) from Avro schemas. + * However, if generated classes are not available, one can also use GenericData.Record. + * It is an untyped key-value record which is using a schema to validate the correctness of the data. + * + * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. + */ + @Test + public void testDeserializeToGenericType() throws IOException { + DatumReader datumReader = new GenericDatumReader(userSchema); + + FileReader dataFileReader = DataFileReader.openReader(testFile, datumReader); + // initialize Record by reading it from disk (thats easier than creating it by hand) + GenericData.Record rec = new GenericData.Record(userSchema); + dataFileReader.next(rec); + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + assertEquals(null, rec.get("type_long_test")); // it is null for the first record. + + // now serialize it with our framework: + TypeInformation te = (TypeInformation) TypeExtractor.createTypeInfo(GenericData.Record.class); + TypeSerializer tser = te.createSerializer(); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); + + GenericData.Record newRec = tser.deserialize(target.getInputView()); + + // check if it is still the same + assertNotNull(newRec); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); + assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() ); + assertEquals(null, newRec.get("type_long_test")); + + } + + /** + * This test validates proper serialization with specific (generated POJO) types. + */ + @Test + public void testDeserializeToSpecificType() throws IOException { + + DatumReader datumReader = new SpecificDatumReader(userSchema); + + FileReader dataFileReader = DataFileReader.openReader(testFile, datumReader); + User rec = dataFileReader.next(); + + // check if record has been read correctly + assertNotNull(rec); + assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); + + // now serialize it with our framework: + TypeInformation te = (TypeInformation) TypeExtractor.createTypeInfo(User.class); + TypeSerializer tser = te.createSerializer(); + ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); + tser.serialize(rec, target); + + User newRec = tser.deserialize(target.getInputView()); + + // check if it is still the same + assertNotNull(newRec); + assertEquals("name not equal", TEST_NAME, newRec.getName().toString() ); + assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString() ); + } + + @After public void deleteFiles() { testFile.delete(); } + } diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java deleted file mode 100644 index 58e1f5cb89d0f..0000000000000 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.flink.api.io.avro.generated; -@SuppressWarnings("all") -@org.apache.avro.specific.AvroGenerated -public enum Colors { - RED, GREEN, BLUE ; - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } -} diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java deleted file mode 100644 index 505857ead2677..0000000000000 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java +++ /dev/null @@ -1,755 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * Autogenerated by Avro - * - * DO NOT EDIT DIRECTLY - */ -package org.apache.flink.api.io.avro.generated; -@SuppressWarnings("all") -@org.apache.avro.specific.AvroGenerated -public class User extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type_map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}"); - public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } - @Deprecated public java.lang.CharSequence name; - @Deprecated public java.lang.Integer favorite_number; - @Deprecated public java.lang.CharSequence favorite_color; - @Deprecated public java.lang.Long type_long_test; - @Deprecated public java.lang.Object type_double_test; - @Deprecated public java.lang.Object type_null_test; - @Deprecated public java.lang.Object type_bool_test; - @Deprecated public java.util.List type_array_string; - @Deprecated public java.util.List type_array_boolean; - @Deprecated public java.util.List type_nullable_array; - @Deprecated public org.apache.flink.api.io.avro.generated.Colors type_enum; - @Deprecated public java.util.Map type_map; - - /** - * Default constructor. Note that this does not initialize fields - * to their default values from the schema. If that is desired then - * one should use {@link \#newBuilder()}. - */ - public User() {} - - /** - * All-args constructor. - */ - public User(java.lang.CharSequence name, java.lang.Integer favorite_number, java.lang.CharSequence favorite_color, java.lang.Long type_long_test, java.lang.Object type_double_test, java.lang.Object type_null_test, java.lang.Object type_bool_test, java.util.List type_array_string, java.util.List type_array_boolean, java.util.List type_nullable_array, org.apache.flink.api.io.avro.generated.Colors type_enum, java.util.Map type_map) { - this.name = name; - this.favorite_number = favorite_number; - this.favorite_color = favorite_color; - this.type_long_test = type_long_test; - this.type_double_test = type_double_test; - this.type_null_test = type_null_test; - this.type_bool_test = type_bool_test; - this.type_array_string = type_array_string; - this.type_array_boolean = type_array_boolean; - this.type_nullable_array = type_nullable_array; - this.type_enum = type_enum; - this.type_map = type_map; - } - - public org.apache.avro.Schema getSchema() { return SCHEMA$; } - // Used by DatumWriter. Applications should not call. - public java.lang.Object get(int field$) { - switch (field$) { - case 0: return name; - case 1: return favorite_number; - case 2: return favorite_color; - case 3: return type_long_test; - case 4: return type_double_test; - case 5: return type_null_test; - case 6: return type_bool_test; - case 7: return type_array_string; - case 8: return type_array_boolean; - case 9: return type_nullable_array; - case 10: return type_enum; - case 11: return type_map; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - // Used by DatumReader. Applications should not call. - @SuppressWarnings(value="unchecked") - public void put(int field$, java.lang.Object value$) { - switch (field$) { - case 0: name = (java.lang.CharSequence)value$; break; - case 1: favorite_number = (java.lang.Integer)value$; break; - case 2: favorite_color = (java.lang.CharSequence)value$; break; - case 3: type_long_test = (java.lang.Long)value$; break; - case 4: type_double_test = (java.lang.Object)value$; break; - case 5: type_null_test = (java.lang.Object)value$; break; - case 6: type_bool_test = (java.lang.Object)value$; break; - case 7: type_array_string = (java.util.List)value$; break; - case 8: type_array_boolean = (java.util.List)value$; break; - case 9: type_nullable_array = (java.util.List)value$; break; - case 10: type_enum = (org.apache.flink.api.io.avro.generated.Colors)value$; break; - case 11: type_map = (java.util.Map)value$; break; - default: throw new org.apache.avro.AvroRuntimeException("Bad index"); - } - } - - /** - * Gets the value of the 'name' field. - */ - public java.lang.CharSequence getName() { - return name; - } - - /** - * Sets the value of the 'name' field. - * @param value the value to set. - */ - public void setName(java.lang.CharSequence value) { - this.name = value; - } - - /** - * Gets the value of the 'favorite_number' field. - */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** - * Sets the value of the 'favorite_number' field. - * @param value the value to set. - */ - public void setFavoriteNumber(java.lang.Integer value) { - this.favorite_number = value; - } - - /** - * Gets the value of the 'favorite_color' field. - */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** - * Sets the value of the 'favorite_color' field. - * @param value the value to set. - */ - public void setFavoriteColor(java.lang.CharSequence value) { - this.favorite_color = value; - } - - /** - * Gets the value of the 'type_long_test' field. - */ - public java.lang.Long getTypeLongTest() { - return type_long_test; - } - - /** - * Sets the value of the 'type_long_test' field. - * @param value the value to set. - */ - public void setTypeLongTest(java.lang.Long value) { - this.type_long_test = value; - } - - /** - * Gets the value of the 'type_double_test' field. - */ - public java.lang.Object getTypeDoubleTest() { - return type_double_test; - } - - /** - * Sets the value of the 'type_double_test' field. - * @param value the value to set. - */ - public void setTypeDoubleTest(java.lang.Object value) { - this.type_double_test = value; - } - - /** - * Gets the value of the 'type_null_test' field. - */ - public java.lang.Object getTypeNullTest() { - return type_null_test; - } - - /** - * Sets the value of the 'type_null_test' field. - * @param value the value to set. - */ - public void setTypeNullTest(java.lang.Object value) { - this.type_null_test = value; - } - - /** - * Gets the value of the 'type_bool_test' field. - */ - public java.lang.Object getTypeBoolTest() { - return type_bool_test; - } - - /** - * Sets the value of the 'type_bool_test' field. - * @param value the value to set. - */ - public void setTypeBoolTest(java.lang.Object value) { - this.type_bool_test = value; - } - - /** - * Gets the value of the 'type_array_string' field. - */ - public java.util.List getTypeArrayString() { - return type_array_string; - } - - /** - * Sets the value of the 'type_array_string' field. - * @param value the value to set. - */ - public void setTypeArrayString(java.util.List value) { - this.type_array_string = value; - } - - /** - * Gets the value of the 'type_array_boolean' field. - */ - public java.util.List getTypeArrayBoolean() { - return type_array_boolean; - } - - /** - * Sets the value of the 'type_array_boolean' field. - * @param value the value to set. - */ - public void setTypeArrayBoolean(java.util.List value) { - this.type_array_boolean = value; - } - - /** - * Gets the value of the 'type_nullable_array' field. - */ - public java.util.List getTypeNullableArray() { - return type_nullable_array; - } - - /** - * Sets the value of the 'type_nullable_array' field. - * @param value the value to set. - */ - public void setTypeNullableArray(java.util.List value) { - this.type_nullable_array = value; - } - - /** - * Gets the value of the 'type_enum' field. - */ - public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() { - return type_enum; - } - - /** - * Sets the value of the 'type_enum' field. - * @param value the value to set. - */ - public void setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) { - this.type_enum = value; - } - - /** - * Gets the value of the 'type_map' field. - */ - public java.util.Map getTypeMap() { - return type_map; - } - - /** - * Sets the value of the 'type_map' field. - * @param value the value to set. - */ - public void setTypeMap(java.util.Map value) { - this.type_map = value; - } - - /** Creates a new User RecordBuilder */ - public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder() { - return new org.apache.flink.api.io.avro.generated.User.Builder(); - } - - /** Creates a new User RecordBuilder by copying an existing Builder */ - public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User.Builder other) { - return new org.apache.flink.api.io.avro.generated.User.Builder(other); - } - - /** Creates a new User RecordBuilder by copying an existing User instance */ - public static org.apache.flink.api.io.avro.generated.User.Builder newBuilder(org.apache.flink.api.io.avro.generated.User other) { - return new org.apache.flink.api.io.avro.generated.User.Builder(other); - } - - /** - * RecordBuilder for User instances. - */ - public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase - implements org.apache.avro.data.RecordBuilder { - - private java.lang.CharSequence name; - private java.lang.Integer favorite_number; - private java.lang.CharSequence favorite_color; - private java.lang.Long type_long_test; - private java.lang.Object type_double_test; - private java.lang.Object type_null_test; - private java.lang.Object type_bool_test; - private java.util.List type_array_string; - private java.util.List type_array_boolean; - private java.util.List type_nullable_array; - private org.apache.flink.api.io.avro.generated.Colors type_enum; - private java.util.Map type_map; - - /** Creates a new Builder */ - private Builder() { - super(org.apache.flink.api.io.avro.generated.User.SCHEMA$); - } - - /** Creates a Builder by copying an existing Builder */ - private Builder(org.apache.flink.api.io.avro.generated.User.Builder other) { - super(other); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - if (isValidValue(fields()[3], other.type_long_test)) { - this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test); - fieldSetFlags()[3] = true; - } - if (isValidValue(fields()[4], other.type_double_test)) { - this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test); - fieldSetFlags()[4] = true; - } - if (isValidValue(fields()[5], other.type_null_test)) { - this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test); - fieldSetFlags()[5] = true; - } - if (isValidValue(fields()[6], other.type_bool_test)) { - this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test); - fieldSetFlags()[6] = true; - } - if (isValidValue(fields()[7], other.type_array_string)) { - this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string); - fieldSetFlags()[7] = true; - } - if (isValidValue(fields()[8], other.type_array_boolean)) { - this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean); - fieldSetFlags()[8] = true; - } - if (isValidValue(fields()[9], other.type_nullable_array)) { - this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array); - fieldSetFlags()[9] = true; - } - if (isValidValue(fields()[10], other.type_enum)) { - this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum); - fieldSetFlags()[10] = true; - } - if (isValidValue(fields()[11], other.type_map)) { - this.type_map = data().deepCopy(fields()[11].schema(), other.type_map); - fieldSetFlags()[11] = true; - } - } - - /** Creates a Builder by copying an existing User instance */ - private Builder(org.apache.flink.api.io.avro.generated.User other) { - super(org.apache.flink.api.io.avro.generated.User.SCHEMA$); - if (isValidValue(fields()[0], other.name)) { - this.name = data().deepCopy(fields()[0].schema(), other.name); - fieldSetFlags()[0] = true; - } - if (isValidValue(fields()[1], other.favorite_number)) { - this.favorite_number = data().deepCopy(fields()[1].schema(), other.favorite_number); - fieldSetFlags()[1] = true; - } - if (isValidValue(fields()[2], other.favorite_color)) { - this.favorite_color = data().deepCopy(fields()[2].schema(), other.favorite_color); - fieldSetFlags()[2] = true; - } - if (isValidValue(fields()[3], other.type_long_test)) { - this.type_long_test = data().deepCopy(fields()[3].schema(), other.type_long_test); - fieldSetFlags()[3] = true; - } - if (isValidValue(fields()[4], other.type_double_test)) { - this.type_double_test = data().deepCopy(fields()[4].schema(), other.type_double_test); - fieldSetFlags()[4] = true; - } - if (isValidValue(fields()[5], other.type_null_test)) { - this.type_null_test = data().deepCopy(fields()[5].schema(), other.type_null_test); - fieldSetFlags()[5] = true; - } - if (isValidValue(fields()[6], other.type_bool_test)) { - this.type_bool_test = data().deepCopy(fields()[6].schema(), other.type_bool_test); - fieldSetFlags()[6] = true; - } - if (isValidValue(fields()[7], other.type_array_string)) { - this.type_array_string = data().deepCopy(fields()[7].schema(), other.type_array_string); - fieldSetFlags()[7] = true; - } - if (isValidValue(fields()[8], other.type_array_boolean)) { - this.type_array_boolean = data().deepCopy(fields()[8].schema(), other.type_array_boolean); - fieldSetFlags()[8] = true; - } - if (isValidValue(fields()[9], other.type_nullable_array)) { - this.type_nullable_array = data().deepCopy(fields()[9].schema(), other.type_nullable_array); - fieldSetFlags()[9] = true; - } - if (isValidValue(fields()[10], other.type_enum)) { - this.type_enum = data().deepCopy(fields()[10].schema(), other.type_enum); - fieldSetFlags()[10] = true; - } - if (isValidValue(fields()[11], other.type_map)) { - this.type_map = data().deepCopy(fields()[11].schema(), other.type_map); - fieldSetFlags()[11] = true; - } - } - - /** Gets the value of the 'name' field */ - public java.lang.CharSequence getName() { - return name; - } - - /** Sets the value of the 'name' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setName(java.lang.CharSequence value) { - validate(fields()[0], value); - this.name = value; - fieldSetFlags()[0] = true; - return this; - } - - /** Checks whether the 'name' field has been set */ - public boolean hasName() { - return fieldSetFlags()[0]; - } - - /** Clears the value of the 'name' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearName() { - name = null; - fieldSetFlags()[0] = false; - return this; - } - - /** Gets the value of the 'favorite_number' field */ - public java.lang.Integer getFavoriteNumber() { - return favorite_number; - } - - /** Sets the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteNumber(java.lang.Integer value) { - validate(fields()[1], value); - this.favorite_number = value; - fieldSetFlags()[1] = true; - return this; - } - - /** Checks whether the 'favorite_number' field has been set */ - public boolean hasFavoriteNumber() { - return fieldSetFlags()[1]; - } - - /** Clears the value of the 'favorite_number' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteNumber() { - favorite_number = null; - fieldSetFlags()[1] = false; - return this; - } - - /** Gets the value of the 'favorite_color' field */ - public java.lang.CharSequence getFavoriteColor() { - return favorite_color; - } - - /** Sets the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setFavoriteColor(java.lang.CharSequence value) { - validate(fields()[2], value); - this.favorite_color = value; - fieldSetFlags()[2] = true; - return this; - } - - /** Checks whether the 'favorite_color' field has been set */ - public boolean hasFavoriteColor() { - return fieldSetFlags()[2]; - } - - /** Clears the value of the 'favorite_color' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearFavoriteColor() { - favorite_color = null; - fieldSetFlags()[2] = false; - return this; - } - - /** Gets the value of the 'type_long_test' field */ - public java.lang.Long getTypeLongTest() { - return type_long_test; - } - - /** Sets the value of the 'type_long_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeLongTest(java.lang.Long value) { - validate(fields()[3], value); - this.type_long_test = value; - fieldSetFlags()[3] = true; - return this; - } - - /** Checks whether the 'type_long_test' field has been set */ - public boolean hasTypeLongTest() { - return fieldSetFlags()[3]; - } - - /** Clears the value of the 'type_long_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeLongTest() { - type_long_test = null; - fieldSetFlags()[3] = false; - return this; - } - - /** Gets the value of the 'type_double_test' field */ - public java.lang.Object getTypeDoubleTest() { - return type_double_test; - } - - /** Sets the value of the 'type_double_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeDoubleTest(java.lang.Object value) { - validate(fields()[4], value); - this.type_double_test = value; - fieldSetFlags()[4] = true; - return this; - } - - /** Checks whether the 'type_double_test' field has been set */ - public boolean hasTypeDoubleTest() { - return fieldSetFlags()[4]; - } - - /** Clears the value of the 'type_double_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeDoubleTest() { - type_double_test = null; - fieldSetFlags()[4] = false; - return this; - } - - /** Gets the value of the 'type_null_test' field */ - public java.lang.Object getTypeNullTest() { - return type_null_test; - } - - /** Sets the value of the 'type_null_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullTest(java.lang.Object value) { - validate(fields()[5], value); - this.type_null_test = value; - fieldSetFlags()[5] = true; - return this; - } - - /** Checks whether the 'type_null_test' field has been set */ - public boolean hasTypeNullTest() { - return fieldSetFlags()[5]; - } - - /** Clears the value of the 'type_null_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullTest() { - type_null_test = null; - fieldSetFlags()[5] = false; - return this; - } - - /** Gets the value of the 'type_bool_test' field */ - public java.lang.Object getTypeBoolTest() { - return type_bool_test; - } - - /** Sets the value of the 'type_bool_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeBoolTest(java.lang.Object value) { - validate(fields()[6], value); - this.type_bool_test = value; - fieldSetFlags()[6] = true; - return this; - } - - /** Checks whether the 'type_bool_test' field has been set */ - public boolean hasTypeBoolTest() { - return fieldSetFlags()[6]; - } - - /** Clears the value of the 'type_bool_test' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeBoolTest() { - type_bool_test = null; - fieldSetFlags()[6] = false; - return this; - } - - /** Gets the value of the 'type_array_string' field */ - public java.util.List getTypeArrayString() { - return type_array_string; - } - - /** Sets the value of the 'type_array_string' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayString(java.util.List value) { - validate(fields()[7], value); - this.type_array_string = value; - fieldSetFlags()[7] = true; - return this; - } - - /** Checks whether the 'type_array_string' field has been set */ - public boolean hasTypeArrayString() { - return fieldSetFlags()[7]; - } - - /** Clears the value of the 'type_array_string' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayString() { - type_array_string = null; - fieldSetFlags()[7] = false; - return this; - } - - /** Gets the value of the 'type_array_boolean' field */ - public java.util.List getTypeArrayBoolean() { - return type_array_boolean; - } - - /** Sets the value of the 'type_array_boolean' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeArrayBoolean(java.util.List value) { - validate(fields()[8], value); - this.type_array_boolean = value; - fieldSetFlags()[8] = true; - return this; - } - - /** Checks whether the 'type_array_boolean' field has been set */ - public boolean hasTypeArrayBoolean() { - return fieldSetFlags()[8]; - } - - /** Clears the value of the 'type_array_boolean' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeArrayBoolean() { - type_array_boolean = null; - fieldSetFlags()[8] = false; - return this; - } - - /** Gets the value of the 'type_nullable_array' field */ - public java.util.List getTypeNullableArray() { - return type_nullable_array; - } - - /** Sets the value of the 'type_nullable_array' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeNullableArray(java.util.List value) { - validate(fields()[9], value); - this.type_nullable_array = value; - fieldSetFlags()[9] = true; - return this; - } - - /** Checks whether the 'type_nullable_array' field has been set */ - public boolean hasTypeNullableArray() { - return fieldSetFlags()[9]; - } - - /** Clears the value of the 'type_nullable_array' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeNullableArray() { - type_nullable_array = null; - fieldSetFlags()[9] = false; - return this; - } - - /** Gets the value of the 'type_enum' field */ - public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() { - return type_enum; - } - - /** Sets the value of the 'type_enum' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) { - validate(fields()[10], value); - this.type_enum = value; - fieldSetFlags()[10] = true; - return this; - } - - /** Checks whether the 'type_enum' field has been set */ - public boolean hasTypeEnum() { - return fieldSetFlags()[10]; - } - - /** Clears the value of the 'type_enum' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeEnum() { - type_enum = null; - fieldSetFlags()[10] = false; - return this; - } - - /** Gets the value of the 'type_map' field */ - public java.util.Map getTypeMap() { - return type_map; - } - - /** Sets the value of the 'type_map' field */ - public org.apache.flink.api.io.avro.generated.User.Builder setTypeMap(java.util.Map value) { - validate(fields()[11], value); - this.type_map = value; - fieldSetFlags()[11] = true; - return this; - } - - /** Checks whether the 'type_map' field has been set */ - public boolean hasTypeMap() { - return fieldSetFlags()[11]; - } - - /** Clears the value of the 'type_map' field */ - public org.apache.flink.api.io.avro.generated.User.Builder clearTypeMap() { - type_map = null; - fieldSetFlags()[11] = false; - return this; - } - - @Override - public User build() { - try { - User record = new User(); - record.name = fieldSetFlags()[0] ? this.name : (java.lang.CharSequence) defaultValue(fields()[0]); - record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : (java.lang.Integer) defaultValue(fields()[1]); - record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : (java.lang.CharSequence) defaultValue(fields()[2]); - record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : (java.lang.Long) defaultValue(fields()[3]); - record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : (java.lang.Object) defaultValue(fields()[4]); - record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : (java.lang.Object) defaultValue(fields()[5]); - record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : (java.lang.Object) defaultValue(fields()[6]); - record.type_array_string = fieldSetFlags()[7] ? this.type_array_string : (java.util.List) defaultValue(fields()[7]); - record.type_array_boolean = fieldSetFlags()[8] ? this.type_array_boolean : (java.util.List) defaultValue(fields()[8]); - record.type_nullable_array = fieldSetFlags()[9] ? this.type_nullable_array : (java.util.List) defaultValue(fields()[9]); - record.type_enum = fieldSetFlags()[10] ? this.type_enum : (org.apache.flink.api.io.avro.generated.Colors) defaultValue(fields()[10]); - record.type_map = fieldSetFlags()[11] ? this.type_map : (java.util.Map) defaultValue(fields()[11]); - return record; - } catch (Exception e) { - throw new org.apache.avro.AvroRuntimeException(e); - } - } - } -} diff --git a/flink-addons/flink-avro/src/test/resources/avro/user.avsc b/flink-addons/flink-avro/src/test/resources/avro/user.avsc index af3cb75bc49d2..6801b100d63a6 100644 --- a/flink-addons/flink-avro/src/test/resources/avro/user.avsc +++ b/flink-addons/flink-avro/src/test/resources/avro/user.avsc @@ -1,5 +1,5 @@ -{"namespace": "org.apache.flink.api.java.record.io.avro.generated", +{"namespace": "org.apache.flink.api.io.avro.generated", "type": "record", "name": "User", "fields": [ @@ -7,13 +7,17 @@ {"name": "favorite_number", "type": ["int", "null"]}, {"name": "favorite_color", "type": ["string", "null"]}, {"name": "type_long_test", "type": ["long", "null"]}, - {"name": "type_double_test", "type": ["double"]}, + {"name": "type_double_test", "type": "double"}, {"name": "type_null_test", "type": ["null"]}, {"name": "type_bool_test", "type": ["boolean"]}, {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, - {"name": "type_map", "type": {"type": "map", "values": "long"}} + {"name": "type_map", "type": {"type": "map", "values": "long"}}, + {"name": "type_fixed", + "size": 16, + "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, + {"name": "type_union", "type": ["null", "boolean", "long", "double"]} ] } \ No newline at end of file diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java index e6a8cb6148c9a..bc5c6b6dd31a5 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java @@ -448,7 +448,7 @@ protected void writeSortedData(T value, TestOutputView out) throws IOException { } // -------------------------------------------------------------------------------------------- - protected static final class TestOutputView extends DataOutputStream implements DataOutputView { + public static final class TestOutputView extends DataOutputStream implements DataOutputView { public TestOutputView() { super(new ByteArrayOutputStream(4096)); @@ -474,7 +474,7 @@ public void write(DataInputView source, int numBytes) throws IOException { } } - protected static final class TestInputView extends DataInputStream implements DataInputView { + public static final class TestInputView extends DataInputStream implements DataInputView { public TestInputView(byte[] data) { super(new ByteArrayInputStream(data)); diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 1469dac4aafa4..4a4bbfacebd04 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -63,6 +63,13 @@ under the License. 0.5.1 + + com.twitter + chill-avro_2.10 + 0.5.1 + + + com.google.guava @@ -80,7 +87,7 @@ under the License. - + diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index c19e9aa7c9a1d..563787f607de8 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -371,7 +371,7 @@ public DataSource createInput(InputFormat inputFormat) { } catch (Exception e) { throw new InvalidProgramException("The type returned by the input format could not be automatically determined. " + - "Please specify the TypeInformation of the produced type explicitly."); + "Please specify the TypeInformation of the produced type explicitly.", e); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java new file mode 100644 index 0000000000000..ccdf7f722d82c --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.api.java.typeutils; + +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * Special type information to generate a POJO type info from an avro schema. + * + * Proceeding: It uses a regular pojo type analysis and replaces all GenericType + * with a GenericType + * @param + */ +public class AvroTypeInfo extends PojoTypeInfo { + public AvroTypeInfo(Class typeClass) { + super(typeClass, generateFieldsFromAvroSchema(typeClass)); + } + + private static List generateFieldsFromAvroSchema(Class typeClass) { + PojoTypeExtractor pte = new PojoTypeExtractor(); + TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList(), null); + + if(!(ti instanceof PojoTypeInfo)) { + throw new IllegalStateException("Expecting type to be a PojoTypeInfo"); + } + PojoTypeInfo pti = (PojoTypeInfo) ti; + List newFields = new ArrayList(pti.getTotalFields()); + + for(int i = 0; i < pti.getTotalFields(); i++) { + PojoField f = pti.getPojoFieldAt(i); + TypeInformation newType = f.type; + // check if type is a CharSequence + if(newType instanceof GenericTypeInfo) { + if((newType).getTypeClass().equals(CharSequence.class)) { + // replace the type by a org.apache.avro.util.Utf8 + newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class); + } + } + PojoField newField = new PojoField(f.field, newType); + newFields.add(newField); + } + return newFields; + } + + private static class PojoTypeExtractor extends TypeExtractor { + private PojoTypeExtractor() { + super(); + } + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index b528d00d95a2e..2e90c8a6133ad 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Set; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.commons.lang3.Validate; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.CrossFunction; @@ -66,7 +67,7 @@ public class TypeExtractor { // in an endless recursion private Set> alreadySeen; - private TypeExtractor() { + protected TypeExtractor() { alreadySeen = new HashSet>(); } @@ -936,6 +937,11 @@ private TypeInformation privateGetForClass(Class clazz, ArrayList) new EnumTypeInfo(clazz); } + // special case for POJOs generated by Avro. + if(SpecificRecordBase.class.isAssignableFrom(clazz)) { + return (TypeInformation) new AvroTypeInfo(clazz); + } + if (alreadySeen.contains(clazz)) { return new GenericTypeInfo(clazz); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 3a5153493fdd7..36fcf27ca5f2b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -20,21 +20,39 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.serializers.CollectionSerializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.twitter.chill.ScalaKryoInstantiator; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import scala.reflect.ClassTag; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import java.lang.reflect.Modifier; +/** + * A type serializer that serializes its type using the Kryo serialization + * framework (https://github.com/EsotericSoftware/kryo). + * + * This serializer is intended as a fallback serializer for the cases that are + * not covered by the basic types, tuples, and POJOs. + * + * @param The type to be serialized. + */ public class KryoSerializer extends TypeSerializer { private static final long serialVersionUID = 2L; @@ -48,6 +66,8 @@ public class KryoSerializer extends TypeSerializer { private transient Input input; private transient Output output; + + // ------------------------------------------------------------------------ public KryoSerializer(Class type){ if(type == null){ @@ -56,6 +76,7 @@ public KryoSerializer(Class type){ this.type = type; } + // ------------------------------------------------------------------------ @Override public boolean isImmutableType() { @@ -191,15 +212,92 @@ public boolean equals(Object obj) { private void checkKryoInitialized() { if (this.kryo == null) { this.kryo = new ScalaKryoInstantiator().newKryo(); - this.kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); - this.kryo.setRegistrationRequired(false); - this.kryo.register(type); - this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + + // Throwable and all subclasses should be serialized via java serialization + kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); + + // If the type we have to serialize as a GenricType is implementing SpecificRecordBase, + // we have to register the avro serializer + // This rule only applies if users explicitly use the GenericTypeInformation for the avro types + // usually, we are able to handle Avro POJOs with the POJO serializer. + if(SpecificRecordBase.class.isAssignableFrom(type)) { + ClassTag tag = scala.reflect.ClassTag$.MODULE$.apply(type); + this.kryo.register(type, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag)); + + } + // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type + // because Kryo is not able to serialize them properly, we use this serializer for them + this.kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializer(ArrayList.class)); + // We register this serializer for users who want to use untyped Avro records (GenericData.Record). + // Kryo is able to serialize everything in there, except for the Schema. + // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea. + // we add the serializer as a default serializer because Avro is using a private sub-type at runtime. + this.kryo.addDefaultSerializer(Schema.class, new AvroSchemaSerializer()); + + + // register the type of our class + kryo.register(type); + + // register given types. we do this first so that any registration of a + // more specific serializer overrides this + for (Class type : registeredTypes) { + kryo.register(type); + } + + + kryo.setRegistrationRequired(false); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + } + } + + // -------------------------------------------------------------------------------------------- + // Custom Serializers + // -------------------------------------------------------------------------------------------- + + /** + * Special serializer for Java collections enforcing certain instance types. + * Avro is serializing collections with an "GenericData.Array" type. Kryo is not able to handle + * this type, so we use ArrayLists. + */ + public static class SpecificInstanceCollectionSerializer extends CollectionSerializer { + Class type; + public SpecificInstanceCollectionSerializer(Class type) { + this.type = type; + } + + @Override + protected Collection create(Kryo kryo, Input input, Class type) { + return kryo.newInstance(this.type); + } + + @Override + protected Collection createCopy(Kryo kryo, Collection original) { + return kryo.newInstance(this.type); + } + } + + /** + * Slow serialization approach for Avro schemas. + * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types. + * Having this serializer, we are able to handle avro Records. + */ + public static class AvroSchemaSerializer extends Serializer { + @Override + public void write(Kryo kryo, Output output, Schema object) { + String schemaAsString = object.toString(false); + output.writeString(schemaAsString); + } + + @Override + public Schema read(Kryo kryo, Input input, Class type) { + String schemaAsString = input.readString(); + // the parser seems to be stateful, to we need a new one for every type. + Schema.Parser sParser = new Schema.Parser(); + return sParser.parse(schemaAsString); } } - // -------------------------------------------------------------------------------------------- - // for testing + // For testing // -------------------------------------------------------------------------------------------- Kryo getKryo() { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java index ae4a806506030..c0c7797249643 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java @@ -210,7 +210,12 @@ public int hash(T value) { int code = 0; for (; i < this.keyFields.length; i++) { code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; - code += this.comparators[i].hash(accessField(keyFields[i], value)); + try { + code += this.comparators[i].hash(accessField(keyFields[i], value)); + }catch(NullPointerException npe) { + throw new RuntimeException("A NullPointerException occured while accessing a key field in a POJO. " + + "Most likely, the value grouped/joined on is null. Field name: "+keyFields[i].getName(), npe); + } } return code; diff --git a/pom.xml b/pom.xml index b729e174b08f7..e59f3405683fc 100644 --- a/pom.xml +++ b/pom.xml @@ -923,7 +923,9 @@ under the License. flink-runtime/resources/web-docs-infoserver/js/timeline.js flink-tests/src/test/resources/testdata/terainput.txt - flink-addons/flink-avro/src/test/resources/avro/user.avsc + flink-addons/flink-avro/src/test/resources/avro/*.avsc + + flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java **/flink-bin/conf/slaves diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml index 377cbfddc0a31..b17dbcefb9351 100644 --- a/tools/maven/suppressions.xml +++ b/tools/maven/suppressions.xml @@ -24,4 +24,5 @@ under the License. + \ No newline at end of file From c233685e5959eb20d585338961c4328cd7d5238f Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 13 Jan 2015 10:21:29 +0100 Subject: [PATCH 2/4] [FLINK-1392] Add Kryo serializer for Protobuf Conflicts: flink-java/pom.xml flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java Conflicts: flink-shaded/pom.xml pom.xml --- flink-java/pom.xml | 33 +++++++++++++++++++ .../typeutils/runtime/KryoSerializer.java | 14 +++++--- flink-shaded/pom.xml | 5 ++- pom.xml | 6 +++- 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/flink-java/pom.xml b/flink-java/pom.xml index 4a4bbfacebd04..cc5664b6d4da8 100644 --- a/flink-java/pom.xml +++ b/flink-java/pom.xml @@ -69,6 +69,39 @@ under the License. 0.5.1 + + com.twitter + chill-protobuf + 0.5.1 + + + + com.google.protobuf + protobuf-java + 2.5.0 + + + + com.twitter + chill-thrift + 0.5.1 + + + + org.apache.thrift + libthrift + 0.6.1 + + + javax.servlet + servlet-api + + + org.apache.httpcomponents + httpclient + + + diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 36fcf27ca5f2b..0f3c31c4f1f88 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -26,15 +26,15 @@ import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.protobuf.Message; import com.twitter.chill.ScalaKryoInstantiator; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.specific.SpecificRecordBase; +import com.twitter.chill.protobuf.ProtobufSerializer; +import com.twitter.chill.thrift.TBaseSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import scala.reflect.ClassTag; +import org.apache.thrift.TBase; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -216,6 +216,12 @@ private void checkKryoInitialized() { // Throwable and all subclasses should be serialized via java serialization kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); + // add serializers for popular other serialization frameworks + // Google Protobuf (FLINK-1392) + this.kryo.addDefaultSerializer(Message.class, ProtobufSerializer.class); + // thrift + this.kryo.addDefaultSerializer(TBase.class, TBaseSerializer.class); + // If the type we have to serialize as a GenricType is implementing SpecificRecordBase, // we have to register the avro serializer // This rule only applies if users explicitly use the GenericTypeInformation for the avro types diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml index 4dbae5d62f7cc..51191c70bd252 100644 --- a/flink-shaded/pom.xml +++ b/flink-shaded/pom.xml @@ -25,7 +25,7 @@ under the License. org.apache.flink flink-parent - 0.8-SNAPSHOT + 0.9-SNAPSHOT .. @@ -71,6 +71,9 @@ under the License. com.google org.apache.flink.shaded.com.google + + com.google.protobuf.** + diff --git a/pom.xml b/pom.xml index e59f3405683fc..84690b271f4a5 100644 --- a/pom.xml +++ b/pom.xml @@ -277,7 +277,8 @@ under the License. - + + org.apache.hadoop hadoop-common @@ -853,6 +854,9 @@ under the License. com.google org.apache.flink.shaded.com.google + + com.google.protobuf.** + From fa5996758b6962fec763f4fd731a075405a50839 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 15 Jan 2015 11:46:53 +0100 Subject: [PATCH 3/4] [hotfix] Also use java closure cleaner on grouped operations --- .../org/apache/flink/api/java/operators/SortedGrouping.java | 2 +- .../org/apache/flink/api/java/operators/UnsortedGrouping.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java index 63e5a190b86d1..57ee50cc7c873 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/SortedGrouping.java @@ -128,7 +128,7 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc } TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType()); - return new GroupReduceOperator(this, resultType, reducer, Utils.getCallLocationName() ); + return new GroupReduceOperator(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName() ); } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java index d323eae3631f3..b8049f62811dd 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java @@ -134,7 +134,7 @@ public ReduceOperator reduce(ReduceFunction reducer) { if (reducer == null) { throw new NullPointerException("Reduce function must not be null."); } - return new ReduceOperator(this, reducer, Utils.getCallLocationName()); + return new ReduceOperator(this, dataSet.clean(reducer), Utils.getCallLocationName()); } /** @@ -156,7 +156,7 @@ public GroupReduceOperator reduceGroup(GroupReduceFunction reduc } TypeInformation resultType = TypeExtractor.getGroupReduceReturnTypes(reducer, this.getDataSet().getType()); - return new GroupReduceOperator(this, resultType, reducer, Utils.getCallLocationName()); + return new GroupReduceOperator(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName()); } /** From c87ab555ee9bfa8b9887a3919bda3dcbc27e5b40 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 5 Feb 2015 14:07:48 +0100 Subject: [PATCH 4/4] [backports] Cleanup and port changes to 0.8 branch. --- .../flink/api/io/avro/AvroPojoTest.java | 178 +++++++++++------- .../api/java/typeutils/TypeExtractor.java | 4 +- .../typeutils/runtime/KryoSerializer.java | 14 +- flink-shaded/pom.xml | 2 +- 4 files changed, 114 insertions(+), 84 deletions(-) diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java index 6ff483627d0ae..b91a3d8210973 100644 --- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java +++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java @@ -24,29 +24,33 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.AvroInputFormat; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; -import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.test.util.JavaProgramTestBase; import org.apache.flink.util.Collector; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; -import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; @RunWith(Parameterized.class) -public class AvroPojoTest extends MultipleProgramsTestBase { - public AvroPojoTest(ExecutionMode mode) { - super(mode); +public class AvroPojoTest extends JavaProgramTestBase { + + public AvroPojoTest(Configuration config) { + super(config); } private File inFile; - private String resultPath; private String expected; @Rule @@ -59,99 +63,129 @@ public void before() throws Exception{ AvroRecordInputFormatTest.writeTestFile(inFile); } - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - @Test - public void testSimpleAvroRead() throws Exception { + private String testField(final String fieldName) throws Exception { + before(); + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Path in = new Path(inFile.getAbsoluteFile().toURI()); AvroInputFormat users = new AvroInputFormat(in, User.class); - DataSet usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction() { + DataSet usersDS = env.createInput(users); + + DataSet res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction() { @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; + public void reduce(Iterable values, Collector out) throws Exception { + for(User u : values) { + out.collect(u.get(fieldName)); + } } }); + res.writeAsText(resultPath); + env.execute("Simple Avro read job"); + if(fieldName.equals("name")) { + return "Alyssa\nCharlie"; + } else if(fieldName.equals("type_enum")) { + return "GREEN\nRED\n"; + } else if(fieldName.equals("type_double_test")) { + return "123.45\n1.337\n"; + } else { + Assert.fail("Unknown field"); + } - usersDS.writeAsText(resultPath); + postSubmit(); + return ""; + } - env.execute("Simple Avro read job"); + private static int NUM_PROGRAMS = 3; + private int curProgId = config.getInteger("ProgramId", -1); + private String resultPath; + private String expectedResult; - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; + @Override + protected void preSubmit() throws Exception { + resultPath = getTempDirPath("result"); } - @Test - public void testKeySelection() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat users = new AvroInputFormat(in, User.class); - DataSet usersDS = env.createInput(users); + @Override + protected void testProgram() throws Exception { + expectedResult = runProgram(curProgId); + } - DataSet> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction>() { - @Override - public void reduce(Iterable values, Collector> out) throws Exception { - for(User u : values) { - out.collect(new Tuple2(u.getName().toString(), 1)); + private String runProgram(int curProgId) throws Exception { + switch(curProgId) { + case 1: + for (String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { + return testField(fieldName); } - } - }); + break; + case 2: + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Path in = new Path(inFile.getAbsoluteFile().toURI()); - res.writeAsText(resultPath); - env.execute("Avro Key selection"); + AvroInputFormat users = new AvroInputFormat(in, User.class); + DataSet usersDS = env.createInput(users); + DataSet> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction>() { + @Override + public void reduce(Iterable values, Collector> out) throws Exception { + for (User u : values) { + out.collect(new Tuple2(u.getName().toString(), 1)); + } + } + }); - expected = "(Alyssa,1)\n(Charlie,1)\n"; - } + res.writeAsText(resultPath); + env.execute("Avro Key selection"); + + + return "(Alyssa,1)\n(Charlie,1)\n"; + case 3: + env = ExecutionEnvironment.getExecutionEnvironment(); + in = new Path(inFile.getAbsoluteFile().toURI()); + + AvroInputFormat users1 = new AvroInputFormat(in, User.class); + DataSet usersDS1 = env.createInput(users1) + // null map type because the order changes in different JVMs (hard to test) + .map(new MapFunction() { + @Override + public User map(User value) throws Exception { + value.setTypeMap(null); + return value; + } + }); + + usersDS1.writeAsText(resultPath); + + env.execute("Simple Avro read job"); - /** - * Test some know fields for grouping on - */ - @Test - public void testAllFields() throws Exception { - for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { - testField(fieldName); + + return "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" + + "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n"; + + default: + throw new RuntimeException("Unknown test"); } + return ""; } - private void testField(final String fieldName) throws Exception { - before(); + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(expectedResult, resultPath); + } - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); + @Parameterized.Parameters + public static Collection getConfigurations() throws FileNotFoundException, IOException { - AvroInputFormat users = new AvroInputFormat(in, User.class); - DataSet usersDS = env.createInput(users); + LinkedList tConfigs = new LinkedList(); - DataSet res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction() { - @Override - public void reduce(Iterable values, Collector out) throws Exception { - for(User u : values) { - out.collect(u.get(fieldName)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Simple Avro read job"); - if(fieldName.equals("name")) { - expected = "Alyssa\nCharlie"; - } else if(fieldName.equals("type_enum")) { - expected = "GREEN\nRED\n"; - } else if(fieldName.equals("type_double_test")) { - expected = "123.45\n1.337\n"; - } else { - Assert.fail("Unknown field"); + for(int i=1; i <= NUM_PROGRAMS; i++) { + Configuration config = new Configuration(); + config.setInteger("ProgramId", i); + tConfigs.add(config); } - after(); + return toParameterList(tConfigs); } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 2e90c8a6133ad..acf47b61b03cc 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -939,7 +939,7 @@ private TypeInformation privateGetForClass(Class clazz, ArrayList) new AvroTypeInfo(clazz); + return (TypeInformation) new AvroTypeInfo(clazz); } if (alreadySeen.contains(clazz)) { @@ -1034,7 +1034,7 @@ private boolean isValidPojoField(Field f, Class clazz, ArrayList typeHi } } - private TypeInformation analyzePojo(Class clazz, ArrayList typeHierarchy, ParameterizedType clazzTypeHint) { + protected TypeInformation analyzePojo(Class clazz, ArrayList typeHierarchy, ParameterizedType clazzTypeHint) { // try to create Type hierarchy, if the incoming only contains the most bottom one or none. if(typeHierarchy.size() <= 1) { getTypeHierarchy(typeHierarchy, clazz, Object.class); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index 0f3c31c4f1f88..6064ce9d2a722 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -21,7 +21,6 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.serializers.CollectionSerializer; @@ -31,10 +30,14 @@ import com.twitter.chill.protobuf.ProtobufSerializer; import com.twitter.chill.thrift.TBaseSerializer; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.thrift.TBase; +import scala.reflect.ClassTag; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -243,14 +246,7 @@ private void checkKryoInitialized() { // register the type of our class kryo.register(type); - - // register given types. we do this first so that any registration of a - // more specific serializer overrides this - for (Class type : registeredTypes) { - kryo.register(type); - } - - + kryo.setRegistrationRequired(false); kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } diff --git a/flink-shaded/pom.xml b/flink-shaded/pom.xml index 51191c70bd252..1f00b62aac599 100644 --- a/flink-shaded/pom.xml +++ b/flink-shaded/pom.xml @@ -25,7 +25,7 @@ under the License. org.apache.flink flink-parent - 0.9-SNAPSHOT + 0.8-SNAPSHOT ..