Skip to content

Commit 2a40c57

Browse files
committed
Add remaining Schema support for AVRO records:
* Add support for SpecificRecord using ByteBuddy codegen. * Add helper methods for GenericRecord. * Fix uncovered bugs involving nullable support.
1 parent 17968a2 commit 2a40c57

21 files changed

+880
-129
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ rat {
121121
"**/.github/**/*",
122122

123123
"**/package-list",
124+
"**/test.avsc",
124125
"**/user.avsc",
125126
"**/test/resources/**/*.txt",
126127
"**/test/**/.placeholder",
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.schemas;
19+
20+
import java.util.List;
21+
import org.apache.avro.specific.SpecificRecord;
22+
import org.apache.beam.sdk.schemas.utils.AvroUtils;
23+
24+
/** A {@link FieldValueGetterFactory} for AVRO-generated specific records. */
25+
public class AvroSpecificRecordGetterFactory implements FieldValueGetterFactory {
26+
@Override
27+
public List<FieldValueGetter> create(Class<?> targetClass, Schema schema) {
28+
return AvroUtils.getGetters((Class<? extends SpecificRecord>) targetClass, schema);
29+
}
30+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.schemas;
19+
20+
import org.apache.avro.specific.SpecificRecord;
21+
import org.apache.beam.sdk.schemas.utils.AvroSpecificRecordTypeInformationFactory;
22+
import org.apache.beam.sdk.schemas.utils.AvroUtils;
23+
import org.apache.beam.sdk.values.TypeDescriptor;
24+
25+
/**
26+
* A {@link SchemaProvider} for AVRO generated SpecificRecords.
27+
*
28+
* <p>This provider infers a schema from generates SpecificRecord objects, and creates schemas and
29+
* rows that bind to the appropriate fields.
30+
*/
31+
public class AvroSpecificRecordSchema extends GetterBasedSchemaProvider {
32+
@Override
33+
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
34+
return AvroUtils.getSchema((Class<? extends SpecificRecord>) typeDescriptor.getRawType());
35+
}
36+
37+
@Override
38+
public FieldValueGetterFactory fieldValueGetterFactory() {
39+
return new AvroSpecificRecordGetterFactory();
40+
}
41+
42+
@Override
43+
public UserTypeCreatorFactory schemaTypeCreatorFactory() {
44+
return new AvroSpecificRecordUserTypeCreatorFactory();
45+
}
46+
47+
@Override
48+
public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
49+
return new AvroSpecificRecordTypeInformationFactory();
50+
}
51+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.schemas;
19+
20+
import org.apache.avro.specific.SpecificRecord;
21+
import org.apache.beam.sdk.schemas.utils.AvroUtils;
22+
23+
/** A {@link UserTypeCreatorFactory} for AVRO-generated specific records. */
24+
public class AvroSpecificRecordUserTypeCreatorFactory implements UserTypeCreatorFactory {
25+
@Override
26+
public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
27+
return AvroUtils.getCreator((Class<? extends SpecificRecord>) clazz, schema);
28+
}
29+
}

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java

Lines changed: 4 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.schemas;
1919

20-
import java.lang.reflect.InvocationTargetException;
2120
import java.util.List;
21+
import javax.annotation.Nullable;
2222
import org.apache.beam.sdk.annotations.Experimental;
2323
import org.apache.beam.sdk.annotations.Experimental.Kind;
2424
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -34,45 +34,12 @@ public abstract class GetterBasedSchemaProvider implements SchemaProvider {
3434
/** Implementing class should override to return a getter factory. */
3535
abstract FieldValueGetterFactory fieldValueGetterFactory();
3636

37-
/** Implementing class should override to return a setter factory. */
38-
abstract FieldValueSetterFactory fieldValueSetterFactory();
39-
4037
/** Implementing class should override to return a type-information factory. */
4138
abstract FieldValueTypeInformationFactory fieldValueTypeInformationFactory();
4239

43-
/**
44-
* Implementing class should override to return a constructor factory.
45-
*
46-
* <p>Tne default factory uses the default constructor and the setters to construct an object.
47-
*/
48-
UserTypeCreatorFactory schemaTypeCreatorFactory() {
49-
Factory<List<FieldValueSetter>> setterFactory = new CachingFactory<>(fieldValueSetterFactory());
50-
return new UserTypeCreatorFactory() {
51-
@Override
52-
public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
53-
List<FieldValueSetter> setters = setterFactory.create(clazz, schema);
54-
return new SchemaUserTypeCreator() {
55-
@Override
56-
public Object create(Object... params) {
57-
Object object;
58-
try {
59-
object = clazz.getDeclaredConstructor().newInstance();
60-
} catch (NoSuchMethodException
61-
| IllegalAccessException
62-
| InvocationTargetException
63-
| InstantiationException e) {
64-
throw new RuntimeException("Failed to instantiate object ", e);
65-
}
66-
for (int i = 0; i < params.length; ++i) {
67-
FieldValueSetter setter = setters.get(i);
68-
setter.set(object, params[i]);
69-
}
70-
return object;
71-
}
72-
};
73-
}
74-
};
75-
}
40+
/** Implementing class should override to return a constructor factory. */
41+
@Nullable
42+
abstract UserTypeCreatorFactory schemaTypeCreatorFactory();
7643

7744
@Override
7845
public <T> SerializableFunction<T, Row> toRowFunction(TypeDescriptor<T> typeDescriptor) {

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaBeanSchema.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.beam.sdk.schemas.utils.JavaBeanSetterFactory;
2424
import org.apache.beam.sdk.schemas.utils.JavaBeanTypeInformationFactory;
2525
import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
26+
import org.apache.beam.sdk.transforms.SerializableFunctions;
2627
import org.apache.beam.sdk.values.TypeDescriptor;
2728

2829
/**
@@ -42,7 +43,8 @@
4243
public class JavaBeanSchema extends GetterBasedSchemaProvider {
4344
@Override
4445
public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
45-
return JavaBeanUtils.schemaFromJavaBeanClass(typeDescriptor.getRawType());
46+
return JavaBeanUtils.schemaFromJavaBeanClass(
47+
typeDescriptor.getRawType(), SerializableFunctions.identity());
4648
}
4749

4850
@Override
@@ -51,8 +53,8 @@ public FieldValueGetterFactory fieldValueGetterFactory() {
5153
}
5254

5355
@Override
54-
public FieldValueSetterFactory fieldValueSetterFactory() {
55-
return new JavaBeanSetterFactory();
56+
UserTypeCreatorFactory schemaTypeCreatorFactory() {
57+
return new SetterBasedCreatorFactory(new JavaBeanSetterFactory());
5658
}
5759

5860
@Override

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/JavaFieldSchema.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.beam.sdk.annotations.Experimental.Kind;
2222
import org.apache.beam.sdk.schemas.utils.POJOUtils;
2323
import org.apache.beam.sdk.schemas.utils.PojoValueGetterFactory;
24-
import org.apache.beam.sdk.schemas.utils.PojoValueSetterFactory;
2524
import org.apache.beam.sdk.schemas.utils.PojoValueTypeInformationFactory;
2625
import org.apache.beam.sdk.values.TypeDescriptor;
2726

@@ -50,11 +49,6 @@ public FieldValueGetterFactory fieldValueGetterFactory() {
5049
return new PojoValueGetterFactory();
5150
}
5251

53-
@Override
54-
public FieldValueSetterFactory fieldValueSetterFactory() {
55-
return new PojoValueSetterFactory();
56-
}
57-
5852
@Override
5953
public FieldValueTypeInformationFactory fieldValueTypeInformationFactory() {
6054
return new PojoValueTypeInformationFactory();

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,14 @@ public FieldType withMetadata(String metadata) {
540540
return toBuilder().setMetadata(metadata.getBytes(StandardCharsets.UTF_8)).build();
541541
}
542542

543+
public String getMetadataString() {
544+
if (getMetadata() != null) {
545+
return new String(getMetadata(), StandardCharsets.UTF_8);
546+
} else {
547+
return "";
548+
}
549+
}
550+
543551
public FieldType withNullable(boolean nullable) {
544552
return toBuilder().setNullable(nullable).build();
545553
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.schemas;
19+
20+
import java.lang.reflect.InvocationTargetException;
21+
import java.util.List;
22+
23+
/**
24+
* A {@link UserTypeCreatorFactory} that uses a default constructor and a list of setters to
25+
* construct a class.
26+
*/
27+
class SetterBasedCreatorFactory implements UserTypeCreatorFactory {
28+
private final Factory<List<FieldValueSetter>> setterFactory;
29+
30+
public SetterBasedCreatorFactory(Factory<List<FieldValueSetter>> setterFactory) {
31+
this.setterFactory = new CachingFactory<>(setterFactory);
32+
}
33+
34+
@Override
35+
public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
36+
List<FieldValueSetter> setters = setterFactory.create(clazz, schema);
37+
return new SchemaUserTypeCreator() {
38+
@Override
39+
public Object create(Object... params) {
40+
Object object;
41+
try {
42+
object = clazz.getDeclaredConstructor().newInstance();
43+
} catch (NoSuchMethodException
44+
| IllegalAccessException
45+
| InvocationTargetException
46+
| InstantiationException e) {
47+
throw new RuntimeException("Failed to instantiate object ", e);
48+
}
49+
for (int i = 0; i < params.length; ++i) {
50+
FieldValueSetter setter = setters.get(i);
51+
setter.set(object, params[i]);
52+
}
53+
return object;
54+
}
55+
};
56+
}
57+
}

0 commit comments

Comments
 (0)