From 89a6d164be77f759da16ac3fce8839f6eb5ad7b1 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 31 May 2016 17:42:06 -0700 Subject: [PATCH 1/3] AvroCoder,Source: workaround AVRO-607 See https://issues.apache.org/jira/browse/AVRO-607 --- .../java/org/apache/beam/sdk/coders/AvroCoder.java | 11 ++++++++++- .../main/java/org/apache/beam/sdk/io/AvroIO.java | 6 +++--- .../java/org/apache/beam/sdk/io/AvroSource.java | 5 ++--- .../java/org/apache/beam/sdk/util/AvroUtils.java | 14 ++++++++++++++ .../org/apache/beam/sdk/coders/AvroCoderTest.java | 4 ++-- .../org/apache/beam/sdk/io/AvroSourceTest.java | 7 ++++--- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index 3b93ec37f3960..f000de85f885b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.util.Structs.addString; +import org.apache.beam.sdk.util.AvroUtils; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.values.TypeDescriptor; @@ -121,7 +122,9 @@ public static AvroCoder of(TypeDescriptor type) { * @param the element type */ public static AvroCoder of(Class clazz) { - return new AvroCoder<>(clazz, ReflectData.get().getSchema(clazz)); + // Workaround AVRO-607, a race in Avro's getSchema function. + // https://issues.apache.org/jira/browse/AVRO-607 + return new AvroCoder<>(clazz, AvroUtils.getSchemaThreadsafe(clazz)); } /** @@ -312,6 +315,9 @@ public DatumReader createDatumReader() { if (type.equals(GenericRecord.class)) { return new GenericDatumReader<>(schema); } else { + // Workaround AVRO-607, a race in Avro's getSchema function. + // https://issues.apache.org/jira/browse/AVRO-607 + AvroUtils.getSchemaThreadsafe(type); return new ReflectDatumReader<>(schema); } } @@ -327,6 +333,9 @@ public DatumWriter createDatumWriter() { if (type.equals(GenericRecord.class)) { return new GenericDatumWriter<>(schema); } else { + // Workaround AVRO-607, a race in Avro's getSchema function. + // https://issues.apache.org/jira/browse/AVRO-607 + AvroUtils.getSchemaThreadsafe(type); return new ReflectDatumWriter<>(schema); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 4b40c01eedfc8..8def07aeaa00c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.AvroUtils; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; import org.apache.beam.sdk.values.PCollection; @@ -39,7 +40,6 @@ import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.reflect.ReflectData; import java.io.IOException; import java.nio.channels.Channels; @@ -254,7 +254,7 @@ public Bound from(String filepattern) { * the resulting PCollection */ public Bound withSchema(Class type) { - return new Bound<>(name, filepattern, type, ReflectData.get().getSchema(type), validate); + return new Bound<>(name, filepattern, type, AvroUtils.getSchemaThreadsafe(type), validate); } /** @@ -623,7 +623,7 @@ public Bound withSchema(Class type) { numShards, shardTemplate, type, - ReflectData.get().getSchema(type), + AvroUtils.getSchemaThreadsafe(type), validate); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java index 9cc0b9825a3c3..78edfdd695335 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java @@ -37,7 +37,6 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.ReflectDatumReader; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream; @@ -167,7 +166,7 @@ public class AvroSource extends BlockBasedSource { */ public static Read.Bounded readFromFileWithClass(String filePattern, Class clazz) { return Read.from(new AvroSource(filePattern, DEFAULT_MIN_BUNDLE_SIZE, - ReflectData.get().getSchema(clazz).toString(), clazz, null, null)); + AvroUtils.getSchemaThreadsafe(clazz).toString(), clazz, null, null)); } /** @@ -210,7 +209,7 @@ public AvroSource withSchema(Schema schema) { */ public AvroSource withSchema(Class clazz) { return new AvroSource(getFileOrPatternSpec(), getMinBundleSize(), - ReflectData.get().getSchema(clazz).toString(), clazz, codec, syncMarker); + AvroUtils.getSchemaThreadsafe(clazz).toString(), clazz, codec, syncMarker); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java index df2308d720829..88fb14b2c2233 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java @@ -34,6 +34,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificData; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -316,6 +317,19 @@ private static Object convertRequiredField( } } + /** + * As a workaround for AVRO-607, whenever invoking {@link SpecificData#getSchema} (or as + * inherited {@link org.apache.avro.reflect.ReflectData#getSchema}) a user can call this + * method instead or first to populate the {@link SpecificData} cache. + * + * @see https://issues.apache.org/jira/browse/AVRO-607 + */ + public static Schema getSchemaThreadsafe(java.lang.reflect.Type type) { + synchronized (AvroUtils.class) { + return SpecificData.get().getSchema(type); + } + } + @Nullable private static Object convertNullableField( Schema avroSchema, TableFieldSchema fieldSchema, Object v) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java index 8f28cc40d322f..3bec4e08b922d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.AvroUtils; import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.values.PCollection; @@ -45,7 +46,6 @@ import org.apache.avro.reflect.AvroName; import org.apache.avro.reflect.AvroSchema; import org.apache.avro.reflect.Nullable; -import org.apache.avro.reflect.ReflectData; import org.apache.avro.reflect.Stringable; import org.apache.avro.reflect.Union; import org.apache.avro.specific.SpecificData; @@ -353,7 +353,7 @@ public void testAvroProhibitsShadowing() { // the same name. This is important for our error reporting, and also how // we lookup a field. try { - ReflectData.get().getSchema(SubclassHidingParent.class); + AvroUtils.getSchemaThreadsafe(SubclassHidingParent.class); fail("Expected AvroTypeException"); } catch (AvroTypeException e) { assertThat(e.getMessage(), containsString("mapField")); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java index 13f8e7f596b0a..c434a7f54efe0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -34,6 +35,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.AvroUtils; import com.google.common.base.MoreObjects; @@ -45,7 +47,6 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.AvroDefault; import org.apache.avro.reflect.Nullable; -import org.apache.avro.reflect.ReflectData; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -385,13 +386,13 @@ public void testCreationWithSchema() throws Exception { AvroCoder.of(Bird.class), DataFileConstants.NULL_CODEC); // Create a source with a schema object - Schema schema = ReflectData.get().getSchema(Bird.class); + Schema schema = AvroUtils.getSchemaThreadsafe(Bird.class); AvroSource source = AvroSource.from(filename).withSchema(schema); List records = SourceTestUtils.readFromSource(source, null); assertEqualsWithGeneric(expected, records); // Create a source with a JSON schema - String schemaString = ReflectData.get().getSchema(Bird.class).toString(); + String schemaString = AvroUtils.getSchemaThreadsafe(Bird.class).toString(); source = AvroSource.from(filename).withSchema(schemaString); records = SourceTestUtils.readFromSource(source, null); assertEqualsWithGeneric(expected, records); From 3ff3c8570bbaae9cb6664a2c024ead935906fac7 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Tue, 31 May 2016 19:29:18 -0700 Subject: [PATCH 2/3] temp state --- .../org/apache/beam/sdk/coders/AvroCoder.java | 4 +- .../org/apache/beam/sdk/util/AvroUtils.java | 8 +-- .../apache/beam/sdk/util/AvroUtilsTest.java | 60 +++++++++++++++++++ 3 files changed, 66 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java index f000de85f885b..4d2edce86bd60 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java @@ -317,7 +317,7 @@ public DatumReader createDatumReader() { } else { // Workaround AVRO-607, a race in Avro's getSchema function. // https://issues.apache.org/jira/browse/AVRO-607 - AvroUtils.getSchemaThreadsafe(type); +// AvroUtils.getSchemaThreadsafe(type); return new ReflectDatumReader<>(schema); } } @@ -335,7 +335,7 @@ public DatumWriter createDatumWriter() { } else { // Workaround AVRO-607, a race in Avro's getSchema function. // https://issues.apache.org/jira/browse/AVRO-607 - AvroUtils.getSchemaThreadsafe(type); +// AvroUtils.getSchemaThreadsafe(type); return new ReflectDatumWriter<>(schema); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java index 88fb14b2c2233..a1248a35ca261 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AvroUtils.java @@ -34,6 +34,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.specific.SpecificData; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; @@ -318,15 +319,14 @@ private static Object convertRequiredField( } /** - * As a workaround for AVRO-607, whenever invoking {@link SpecificData#getSchema} (or as - * inherited {@link org.apache.avro.reflect.ReflectData#getSchema}) a user can call this - * method instead or first to populate the {@link SpecificData} cache. + * As a workaround for AVRO-607, whenever invoking {@link ReflectData#getSchema} a user can call + * this method instead or first to populate the cache. * * @see https://issues.apache.org/jira/browse/AVRO-607 */ public static Schema getSchemaThreadsafe(java.lang.reflect.Type type) { synchronized (AvroUtils.class) { - return SpecificData.get().getSchema(type); + return ReflectData.get().getSchema(type); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java index 38e921a1d5ac0..7dad24c22b85f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.DefaultCoder; @@ -27,6 +28,10 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; @@ -46,8 +51,13 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; /** * Tests for AvroUtils. @@ -224,4 +234,54 @@ private static List createRandomRecords(long n) { } return records; } + + @DefaultCoder(AvroCoder.class) + public static class MyClass { + public MyClass() {} + public MyClass(@Nullable Long value, @Nullable MyClass inner) { + this.value = value; + this.inner = inner; + } + @Nullable MyClass inner; + @Nullable Long value; + @Nullable Bird inner2; + } + + @Test + public void getMyClassSchema() + throws ClassNotFoundException, ExecutionException, InterruptedException { + fail(AvroCoder.of(MyClass.class).getSchema().toString()); + } + + + @Test + public void reproThreadAvro607Issue() + throws ClassNotFoundException, ExecutionException, InterruptedException { + final String schemaStr = "{\"type\":\"record\",\"name\":\"MyClass\",\"namespace\":\"org.apache.beam.sdk.util.AvroUtilsTest$\",\"fields\":[{\"name\":\"inner\",\"type\":[\"null\",\"MyClass\"],\"default\":null},{\"name\":\"value\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"inner2\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bird\",\"fields\":[{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"species\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"quality\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"quantity\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"birthday\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"flighted\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"scion\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubBird\",\"namespace\":\"org.apache.beam.sdk.util.AvroUtilsTest$Bird$\",\"fields\":[{\"name\":\"species\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null},{\"name\":\"associates\",\"type\":{\"type\":\"array\",\"items\":\"org.apache.beam.sdk.util.AvroUtilsTest$Bird$.SubBird\",\"java-class\":\"[Lorg.apache.beam.sdk.util.AvroUtilsTest$Bird$SubBird;\"}}]}],\"default\":null}]}"; + final Schema schema = new Schema.Parser().parse(schemaStr); + final AvroCoder coder = AvroCoder.of(MyClass.class, schema); + final int numThreads = 128; + final CountDownLatch latch = new CountDownLatch(numThreads); + ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads)); + List> ret = new LinkedList<>(); + for (int i = 0; i < numThreads; ++i) { + ret.add(executorService.submit(new Callable() { + @Override + public byte[] call() throws Exception { + latch.countDown(); + latch.await(); + CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + return CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + } + })); + } + Futures.allAsList(ret).get(); + executorService.shutdown(); + } } From d8453e3e8eac67f1368d59498d85b30a3d20a993 Mon Sep 17 00:00:00 2001 From: Dan Halperin Date: Wed, 1 Jun 2016 17:51:32 -0700 Subject: [PATCH 3/3] checkpoint --- .../apache/beam/sdk/util/AvroUtilsTest.java | 52 ++++++++++++------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java index 7dad24c22b85f..cd3a7390ed0f0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java @@ -41,6 +41,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.reflect.Nullable; +import org.apache.avro.reflect.ReflectData; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -51,9 +52,12 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -237,14 +241,26 @@ private static List createRandomRecords(long n) { @DefaultCoder(AvroCoder.class) public static class MyClass { - public MyClass() {} - public MyClass(@Nullable Long value, @Nullable MyClass inner) { - this.value = value; - this.inner = inner; + public MyClass() { + str = "abadsdsa"; + bird = new Bird(); + birds = new LinkedList(); + map = new HashMap<>(); + map2 = new HashMap<>(); + map2.put("aninnermap", new HashMap()); } - @Nullable MyClass inner; - @Nullable Long value; - @Nullable Bird inner2; + @Nullable String str; + @Nullable Bird bird; + @Nullable List birds; + @Nullable Map> map; + @Nullable Map> map2; + } + + @DefaultCoder(AvroCoder.class) + public static class MyClass2 { + public MyClass2() {} + long value; + Bird bird; } @Test @@ -257,27 +273,27 @@ public void getMyClassSchema() @Test public void reproThreadAvro607Issue() throws ClassNotFoundException, ExecutionException, InterruptedException { - final String schemaStr = "{\"type\":\"record\",\"name\":\"MyClass\",\"namespace\":\"org.apache.beam.sdk.util.AvroUtilsTest$\",\"fields\":[{\"name\":\"inner\",\"type\":[\"null\",\"MyClass\"],\"default\":null},{\"name\":\"value\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"inner2\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bird\",\"fields\":[{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"species\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"quality\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"quantity\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"birthday\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"flighted\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"scion\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubBird\",\"namespace\":\"org.apache.beam.sdk.util.AvroUtilsTest$Bird$\",\"fields\":[{\"name\":\"species\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null},{\"name\":\"associates\",\"type\":{\"type\":\"array\",\"items\":\"org.apache.beam.sdk.util.AvroUtilsTest$Bird$.SubBird\",\"java-class\":\"[Lorg.apache.beam.sdk.util.AvroUtilsTest$Bird$SubBird;\"}}]}],\"default\":null}]}"; + final String schemaStr = "{\"type\":\"record\",\"name\":\"MyClass\",\"namespace\":\"org.apache.beam.sdk.util.AvroUtilsTest$\",\"fields\":[{\"name\":\"str\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"bird\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bird\",\"fields\":[{\"name\":\"number\",\"type\":\"long\"},{\"name\":\"species\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"quality\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"quantity\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"birthday\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"flighted\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"scion\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubBird\",\"namespace\":\"org.apache.beam.sdk.util.AvroUtilsTest$Bird$\",\"fields\":[{\"name\":\"species\",\"type\":[\"null\",\"string\"],\"default\":null}]}],\"default\":null},{\"name\":\"associates\",\"type\":{\"type\":\"array\",\"items\":\"org.apache.beam.sdk.util.AvroUtilsTest$Bird$.SubBird\",\"java-class\":\"[Lorg.apache.beam.sdk.util.AvroUtilsTest$Bird$SubBird;\"}}]}],\"default\":null},{\"name\":\"birds\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"Bird\",\"java-class\":\"java.util.List\"}],\"default\":null},{\"name\":\"map\",\"type\":[\"null\",{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"MyClass\",\"java-class\":\"java.util.Set\"}}],\"default\":null},{\"name\":\"map2\",\"type\":[\"null\",{\"type\":\"map\",\"values\":{\"type\":\"map\",\"values\":\"MyClass\"}}],\"default\":null}]}"; final Schema schema = new Schema.Parser().parse(schemaStr); - final AvroCoder coder = AvroCoder.of(MyClass.class, schema); - final int numThreads = 128; + final int numThreads = 1024; final CountDownLatch latch = new CountDownLatch(numThreads); ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(numThreads)); List> ret = new LinkedList<>(); for (int i = 0; i < numThreads; ++i) { + final int current = i; ret.add(executorService.submit(new Callable() { @Override public byte[] call() throws Exception { latch.countDown(); latch.await(); - CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); - CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); - CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); - CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); - CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); - CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); - CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); - return CoderUtils.encodeToByteArray(coder, new MyClass(5L, new MyClass(6L, new MyClass(7L, null)))); + if (current % 3 == 0) { + ReflectData.get().getSchema(MyClass.class); + } else if (current % 3 == 1) { + ReflectData.get().getSchema(MyClass2.class); + } else { + ReflectData.get().getSchema(Bird.class); + } + return null; } })); }