Skip to content

Commit

Permalink
[FLINK-1463] Fix stateful/stateless Serializers and Comparators
Browse files Browse the repository at this point in the history
Before, Serializers would announce whether they are stateful or not and
rely on RuntimeStatefulSerializerFactory to do the duplication.
Comparators, on the other hand, had a duplicate method that the user was
required to call.

This commit removes the statful/stateless property from Serializers but
instead introduces a duplicate() method, similar to Comparators, that
can return the same instance.

The two serializer factories are merged into one that always calls
duplicate() before returning a serializer.
  • Loading branch information
aljoscha committed Feb 9, 2015
1 parent 2f16ca2 commit 7bc78cb
Show file tree
Hide file tree
Showing 68 changed files with 182 additions and 494 deletions.
Expand Up @@ -43,8 +43,7 @@
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.CompilerPostPassException;
import org.apache.flink.compiler.plan.BulkIterationPlanNode;
Expand Down Expand Up @@ -278,12 +277,8 @@ private static <T> TypeInformation<T> getTypeInfoFromSource(SourcePlanNode node)

private static <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) {
TypeSerializer<T> serializer = typeInfo.createSerializer();

if (serializer.isStateful()) {
return new RuntimeStatefulSerializerFactory<T>(serializer, typeInfo.getTypeClass());
} else {
return new RuntimeStatelessSerializerFactory<T>(serializer, typeInfo.getTypeClass());
}

return new RuntimeSerializerFactory<T>(serializer, typeInfo.getTypeClass());
}

@SuppressWarnings("unchecked")
Expand Down
Expand Up @@ -50,17 +50,15 @@ public abstract class TypeSerializer<T> implements Serializable {
*/
public abstract boolean isImmutableType();


/**
* Gets whether the serializer is stateful. Statefulness means in this context that some of the serializer's
* methods have objects with state and are thus not inherently thread-safe. A stateful serializer might be used by
* multiple threads concurrently. For a stateful one, different instances will be used by different threads.
*
* @return True, if the serializer is stateful, false if it is stateless;
* Creates a deep copy of this serializer if it is necessary, i.e. if it is stateful. This
* can return itself if the serializer is not stateful.
*
* We need this because Serializers might be used in several threads. Stateless serializers
* are inherently thread-safe while stateful serializers might not be thread-safe.
*/
public abstract boolean isStateful();


public abstract TypeSerializer<T> duplicate();

// --------------------------------------------------------------------------------------------
// Instantiation & Cloning
// --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -36,11 +36,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Boolean createInstance() {
return FALSE;
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public BooleanValue createInstance() {
return new BooleanValue();
Expand Down
Expand Up @@ -38,11 +38,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Byte createInstance() {
return ZERO;
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public ByteValue createInstance() {
return new ByteValue();
Expand Down
Expand Up @@ -38,11 +38,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Character createInstance() {
return ZERO;
Expand Down
Expand Up @@ -36,11 +36,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public CharValue createInstance() {
return new CharValue();
Expand Down
Expand Up @@ -36,11 +36,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Date createInstance() {
return new Date();
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Double createInstance() {
return ZERO;
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public DoubleValue createInstance() {
return new DoubleValue();
Expand Down
Expand Up @@ -46,8 +46,8 @@ public boolean isImmutableType() {
}

@Override
public boolean isStateful() {
return false;
public EnumSerializer<T> duplicate() {
return this;
}

@Override
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Float createInstance() {
return ZERO;
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public FloatValue createInstance() {
return new FloatValue();
Expand Down
Expand Up @@ -50,15 +50,21 @@ public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> compone
this.componentSerializer = componentSerializer;
this.EMPTY = create(0);
}

@Override
public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return this.componentSerializer.isStateful();
public GenericArraySerializer<C> duplicate() {
TypeSerializer<C> duplicateComponentSerializer = this.componentSerializer.duplicate();
if (duplicateComponentSerializer == this.componentSerializer) {
// is not stateful, return ourselves
return this;
} else {
return new GenericArraySerializer<C>(componentClass, duplicateComponentSerializer);
}
}


Expand Down
Expand Up @@ -38,11 +38,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Integer createInstance() {
return ZERO;
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public IntValue createInstance() {
return new IntValue();
Expand Down
Expand Up @@ -38,11 +38,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Long createInstance() {
return ZERO;
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public LongValue createInstance() {
return new LongValue();
Expand Down
Expand Up @@ -38,11 +38,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Short createInstance() {
return ZERO;
Expand Down
Expand Up @@ -37,11 +37,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public ShortValue createInstance() {
return new ShortValue();
Expand Down
Expand Up @@ -38,11 +38,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public String createInstance() {
return EMPTY;
Expand Down
Expand Up @@ -39,11 +39,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public StringValue createInstance() {
return new StringValue();
Expand Down
Expand Up @@ -25,7 +25,12 @@ public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
private static final long serialVersionUID = 8766687317209282373L;

// --------------------------------------------------------------------------------------------


@Override
public TypeSerializerSingleton<T> duplicate() {
return this;
}

@Override
public int hashCode() {
return super.hashCode();
Expand Down
Expand Up @@ -34,11 +34,6 @@ public boolean isImmutableType() {
return true;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public Void createInstance() {
return null;
Expand Down
Expand Up @@ -40,11 +40,6 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet
public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public boolean[] createInstance() {
Expand Down
Expand Up @@ -40,11 +40,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public byte[] createInstance() {
return EMPTY;
Expand Down
Expand Up @@ -41,11 +41,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public char[] createInstance() {
return EMPTY;
Expand Down
Expand Up @@ -41,11 +41,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public double[] createInstance() {
return EMPTY;
Expand Down
Expand Up @@ -41,11 +41,6 @@ public boolean isImmutableType() {
return false;
}

@Override
public boolean isStateful() {
return false;
}

@Override
public float[] createInstance() {
return EMPTY;
Expand Down

0 comments on commit 7bc78cb

Please sign in to comment.