Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4453] Stop subclassing user POJOs. #7234

Merged
merged 2 commits into from Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,14 +17,12 @@
*/
package org.apache.beam.sdk.schemas;

import java.lang.reflect.Constructor;
import org.apache.beam.sdk.schemas.utils.POJOUtils;

/** Vends constructors for POJOs. */
class PojoTypeUserTypeCreatorFactory implements UserTypeCreatorFactory {
@Override
public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
Constructor<?> constructor = POJOUtils.getConstructor(clazz, schema);
return new SchemaUserTypeConstructorCreator(clazz, constructor);
return POJOUtils.getCreator(clazz, schema);
}
}
Expand Up @@ -18,28 +18,30 @@
package org.apache.beam.sdk.schemas.utils;

import com.google.common.collect.Maps;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.field.FieldDescription.ForLoadedField;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.dynamic.scaffold.InstrumentedType;
import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default;
import net.bytebuddy.implementation.FixedValue;
import net.bytebuddy.implementation.Implementation;
import net.bytebuddy.implementation.MethodCall;
import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
import net.bytebuddy.implementation.bytecode.Duplication;
import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.TypeCreation;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
import net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
import net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import net.bytebuddy.implementation.bytecode.member.MethodReturn;
import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
import net.bytebuddy.matcher.ElementMatchers;
Expand All @@ -49,6 +51,7 @@
import org.apache.beam.sdk.schemas.FieldValueSetter;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
Expand Down Expand Up @@ -119,15 +122,15 @@ public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema schema) {

// The list of constructors for a class is cached, so we only create the classes the first time
// getConstructor is called.
public static final Map<ClassWithSchema, Constructor> CACHED_CONSTRUCTORS =
public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS =
Maps.newConcurrentMap();

public static <T> Constructor<? extends T> getConstructor(Class<T> clazz, Schema schema) {
return CACHED_CONSTRUCTORS.computeIfAbsent(
new ClassWithSchema(clazz, schema), c -> createConstructor(clazz, schema));
public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema schema) {
return CACHED_CREATORS.computeIfAbsent(
new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
}

private static <T> Constructor<? extends T> createConstructor(Class<T> clazz, Schema schema) {
private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) {
// Get the list of class fields ordered by schema.
Map<String, Field> fieldMap =
ReflectUtils.getFields(clazz)
Expand All @@ -140,34 +143,25 @@ private static <T> Constructor<? extends T> createConstructor(Class<T> clazz, Sc
.map(f -> fieldMap.get(f.getName()))
.collect(Collectors.toList());

List<Type> types =
fields
.stream()
.map(Field::getType)
.map(TypeDescriptor::of)
// We need raw types back so we can setup the list of constructor params.
.map(new ConvertType(true)::convert)
.collect(Collectors.toList());

try {
DynamicType.Builder<? extends T> builder =
DynamicType.Builder<SchemaUserTypeCreator> builder =
BYTE_BUDDY
.subclass(clazz, Default.NO_CONSTRUCTORS)
.defineConstructor(Visibility.PUBLIC)
.withParameters(types)
.intercept(
MethodCall.invoke(clazz.getDeclaredConstructor())
.andThen(new ConstructInstruction(fields)));

Class[] typeArray = types.toArray(new Class[types.size()]);
.subclass(SchemaUserTypeCreator.class)
.method(ElementMatchers.named("create"))
.intercept(new CreateInstruction(fields, clazz));

return builder
.make()
.load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
.getLoaded()
.getDeclaredConstructor(typeArray);
} catch (NoSuchMethodException e) {
.getDeclaredConstructor()
.newInstance();
} catch (InstantiationException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
throw new RuntimeException(
"Unable to generate a getter for class " + clazz + " with schema " + schema);
"Unable to generate a creator for class " + clazz + " with schema " + schema);
}
}

Expand Down Expand Up @@ -368,11 +362,13 @@ public ByteCodeAppender appender(final Target implementationTarget) {
}

// Implements a method to construct an object.
static class ConstructInstruction implements Implementation {
static class CreateInstruction implements Implementation {
private final List<Field> fields;
private final Class pojoClass;

ConstructInstruction(List<Field> fields) {
CreateInstruction(List<Field> fields, Class pojoClass) {
this.fields = fields;
this.pojoClass = pojoClass;
}

@Override
Expand All @@ -386,32 +382,49 @@ public ByteCodeAppender appender(final Target implementationTarget) {
// this + method parameters.
int numLocals = 1 + instrumentedMethod.getParameters().size();

// Generate code to initialize all member variables.
StackManipulation stackManipulation = null;
// Create the POJO class.
ForLoadedType loadedType = new ForLoadedType(pojoClass);
StackManipulation stackManipulation =
new StackManipulation.Compound(
TypeCreation.of(loadedType),
Duplication.SINGLE,
MethodInvocation.invoke(
loadedType
.getDeclaredMethods()
.filter(
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(0)))
.getOnly()));

// The types in the POJO might be the types returned by Beam's Row class,
// so we have to convert the types used by Beam's Row class.
ConvertType convertType = new ConvertType(true);
for (int i = 0; i < fields.size(); ++i) {
Field field = fields.get(i);
// The instruction to read the field.
StackManipulation readField = MethodVariableAccess.REFERENCE.loadFrom(i + 1);

// Read the object onto the stack.
ForLoadedType convertedType =
new ForLoadedType((Class) convertType.convert(TypeDescriptor.of(field.getType())));

// The instruction to read the parameter.
StackManipulation readParameter =
new StackManipulation.Compound(
MethodVariableAccess.REFERENCE.loadFrom(1),
IntegerConstant.forValue(i),
ArrayAccess.REFERENCE.load(),
TypeCasting.to(convertedType));

StackManipulation updateField =
new StackManipulation.Compound(
// This param is offset 0.
MethodVariableAccess.REFERENCE.loadFrom(0),
// Duplicate object reference.
Duplication.SINGLE,
// Do any conversions necessary.
new ByteBuddyUtils.ConvertValueForSetter(readField)
new ByteBuddyUtils.ConvertValueForSetter(readParameter)
.convert(TypeDescriptor.of(field.getType())),
// Now update the field and return void.
// Now update the field.
FieldAccess.forField(new ForLoadedField(field)).write());
stackManipulation =
(stackManipulation == null)
? updateField
: new StackManipulation.Compound(stackManipulation, updateField);
stackManipulation = new StackManipulation.Compound(stackManipulation, updateField);
}
stackManipulation =
(stackManipulation == null)
? MethodReturn.VOID
: new StackManipulation.Compound(stackManipulation, MethodReturn.VOID);
new StackManipulation.Compound(stackManipulation, MethodReturn.REFERENCE);

StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
return new Size(size.getMaximalSize(), numLocals);
Expand Down