Skip to content

Commit

Permalink
Stop subclassing user pojos.
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Dec 8, 2018
1 parent 95d0ac5 commit b8e34a8
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 52 deletions.
Expand Up @@ -17,14 +17,12 @@
*/
package org.apache.beam.sdk.schemas;

import java.lang.reflect.Constructor;
import org.apache.beam.sdk.schemas.utils.POJOUtils;

/** Vends constructors for POJOs. */
class PojoTypeUserTypeCreatorFactory implements UserTypeCreatorFactory {
@Override
public SchemaUserTypeCreator create(Class<?> clazz, Schema schema) {
Constructor<?> constructor = POJOUtils.getConstructor(clazz, schema);
return new SchemaUserTypeConstructorCreator(clazz, constructor);
return POJOUtils.getCreator(clazz, schema);
}
}
Expand Up @@ -18,28 +18,30 @@
package org.apache.beam.sdk.schemas.utils;

import com.google.common.collect.Maps;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.field.FieldDescription.ForLoadedField;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.description.type.TypeDescription.ForLoadedType;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.dynamic.scaffold.InstrumentedType;
import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy.Default;
import net.bytebuddy.implementation.FixedValue;
import net.bytebuddy.implementation.Implementation;
import net.bytebuddy.implementation.MethodCall;
import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
import net.bytebuddy.implementation.bytecode.ByteCodeAppender.Size;
import net.bytebuddy.implementation.bytecode.Duplication;
import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.TypeCreation;
import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
import net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
import net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import net.bytebuddy.implementation.bytecode.member.MethodReturn;
import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
import net.bytebuddy.matcher.ElementMatchers;
Expand All @@ -49,6 +51,7 @@
import org.apache.beam.sdk.schemas.FieldValueSetter;
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForGetter;
import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
Expand Down Expand Up @@ -119,15 +122,15 @@ public static List<FieldValueGetter> getGetters(Class<?> clazz, Schema schema) {

// The list of constructors for a class is cached, so we only create the classes the first time
// getConstructor is called.
public static final Map<ClassWithSchema, Constructor> CACHED_CONSTRUCTORS =
public static final Map<ClassWithSchema, SchemaUserTypeCreator> CACHED_CREATORS =
Maps.newConcurrentMap();

public static <T> Constructor<? extends T> getConstructor(Class<T> clazz, Schema schema) {
return CACHED_CONSTRUCTORS.computeIfAbsent(
new ClassWithSchema(clazz, schema), c -> createConstructor(clazz, schema));
public static <T> SchemaUserTypeCreator getCreator(Class<T> clazz, Schema schema) {
return CACHED_CREATORS.computeIfAbsent(
new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
}

private static <T> Constructor<? extends T> createConstructor(Class<T> clazz, Schema schema) {
private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) {
// Get the list of class fields ordered by schema.
Map<String, Field> fieldMap =
ReflectUtils.getFields(clazz)
Expand All @@ -140,34 +143,25 @@ private static <T> Constructor<? extends T> createConstructor(Class<T> clazz, Sc
.map(f -> fieldMap.get(f.getName()))
.collect(Collectors.toList());

List<Type> types =
fields
.stream()
.map(Field::getType)
.map(TypeDescriptor::of)
// We need raw types back so we can setup the list of constructor params.
.map(new ConvertType(true)::convert)
.collect(Collectors.toList());

try {
DynamicType.Builder<? extends T> builder =
DynamicType.Builder<SchemaUserTypeCreator> builder =
BYTE_BUDDY
.subclass(clazz, Default.NO_CONSTRUCTORS)
.defineConstructor(Visibility.PUBLIC)
.withParameters(types)
.intercept(
MethodCall.invoke(clazz.getDeclaredConstructor())
.andThen(new ConstructInstruction(fields)));

Class typeArray[] = types.toArray(new Class[types.size()]);
.subclass(SchemaUserTypeCreator.class)
.method(ElementMatchers.named("create"))
.intercept(new CreateInstruction(fields, clazz));

return builder
.make()
.load(ReflectHelpers.findClassLoader(), ClassLoadingStrategy.Default.INJECTION)
.getLoaded()
.getDeclaredConstructor(typeArray);
} catch (NoSuchMethodException e) {
.getDeclaredConstructor()
.newInstance();
} catch (InstantiationException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
throw new RuntimeException(
"Unable to generate a getter for class " + clazz + " with schema " + schema);
"Unable to generate a creator for class " + clazz + " with schema " + schema);
}
}

Expand Down Expand Up @@ -368,11 +362,13 @@ public ByteCodeAppender appender(final Target implementationTarget) {
}

// Implements a method to construct an object.
static class ConstructInstruction implements Implementation {
static class CreateInstruction implements Implementation {
private final List<Field> fields;
private final Class pojoClass;

ConstructInstruction(List<Field> fields) {
CreateInstruction(List<Field> fields, Class pojoClass) {
this.fields = fields;
this.pojoClass = pojoClass;
}

@Override
Expand All @@ -386,32 +382,49 @@ public ByteCodeAppender appender(final Target implementationTarget) {
// this + method parameters.
int numLocals = 1 + instrumentedMethod.getParameters().size();

// Generate code to initialize all member variables.
StackManipulation stackManipulation = null;
// Create the POJO class.
ForLoadedType loadedType = new ForLoadedType(pojoClass);
StackManipulation stackManipulation =
new StackManipulation.Compound(
TypeCreation.of(loadedType),
Duplication.SINGLE,
MethodInvocation.invoke(
loadedType
.getDeclaredMethods()
.filter(
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(0)))
.getOnly()));

// The types in the POJO might be the types returned by Beam's Row class,
// so we have to convert the types used by Beam's Row class.
ConvertType convertType = new ConvertType(true);
for (int i = 0; i < fields.size(); ++i) {
Field field = fields.get(i);
// The instruction to read the field.
StackManipulation readField = MethodVariableAccess.REFERENCE.loadFrom(i + 1);

// Read the object onto the stack.
ForLoadedType convertedType =
new ForLoadedType((Class) convertType.convert(TypeDescriptor.of(field.getType())));

// The instruction to read the parameter.
StackManipulation readParameter =
new StackManipulation.Compound(
MethodVariableAccess.REFERENCE.loadFrom(1),
IntegerConstant.forValue(i),
ArrayAccess.REFERENCE.load(),
TypeCasting.to(convertedType));

StackManipulation updateField =
new StackManipulation.Compound(
// This param is offset 0.
MethodVariableAccess.REFERENCE.loadFrom(0),
// Duplicate object reference.
Duplication.SINGLE,
// Do any conversions necessary.
new ByteBuddyUtils.ConvertValueForSetter(readField)
new ByteBuddyUtils.ConvertValueForSetter(readParameter)
.convert(TypeDescriptor.of(field.getType())),
// Now update the field and return void.
// Now update the field.
FieldAccess.forField(new ForLoadedField(field)).write());
stackManipulation =
(stackManipulation == null)
? updateField
: new StackManipulation.Compound(stackManipulation, updateField);
stackManipulation = new StackManipulation.Compound(stackManipulation, updateField);
}
stackManipulation =
(stackManipulation == null)
? MethodReturn.VOID
: new StackManipulation.Compound(stackManipulation, MethodReturn.VOID);
new StackManipulation.Compound(stackManipulation, MethodReturn.REFERENCE);

StackManipulation.Size size = stackManipulation.apply(methodVisitor, implementationContext);
return new Size(size.getMaximalSize(), numLocals);
Expand Down

0 comments on commit b8e34a8

Please sign in to comment.