Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTag.StateBinder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.ContextSensitiveCoder.Context;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ContextSensitiveCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.state.BagState;
Expand Down Expand Up @@ -184,7 +185,7 @@ protected T readValue() {
// TODO: reuse input
Input input = new Input(buf);
try {
return coder.decode(input, Context.OUTER);
return ContextSensitiveCoder.decode(coder, input, Context.OUTER);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -195,7 +196,7 @@ protected T readValue() {
public void writeValue(T input) {
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
coder.encode(input, output, Context.OUTER);
ContextSensitiveCoder.encode(coder, input, output, Context.OUTER);
stateTable.put(namespace.stringKey(), address.getId(), output.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,27 @@ protected ApexStreamTupleCoder(Coder<T> valueCoder) {
}

@Override
public void encode(ApexStreamTuple<T> value, OutputStream outStream, Context context)
public void encode(ApexStreamTuple<T> value, OutputStream outStream)
throws CoderException, IOException {
if (value instanceof WatermarkTuple) {
outStream.write(1);
new DataOutputStream(outStream).writeLong(((WatermarkTuple<?>) value).getTimestamp());
} else {
outStream.write(0);
outStream.write(((DataTuple<?>) value).unionTag);
valueCoder.encode(value.getValue(), outStream, context);
valueCoder.encode(value.getValue(), outStream);
}
}

@Override
public ApexStreamTuple<T> decode(InputStream inStream, Context context)
public ApexStreamTuple<T> decode(InputStream inStream)
throws CoderException, IOException {
int b = inStream.read();
if (b == 1) {
return new WatermarkTuple<>(new DataInputStream(inStream).readLong());
} else {
int unionTag = inStream.read();
return new DataTuple<>(valueCoder.decode(inStream, context), unionTag);
return new DataTuple<>(valueCoder.decode(inStream), unionTag);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import java.io.IOException;
import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.ContextSensitiveCoder.Context;
import org.apache.beam.sdk.coders.ContextSensitiveCoder;

/**
* The Apex {@link StreamCodec} adapter for using Beam {@link Coder}.
Expand All @@ -42,7 +43,7 @@ public Object fromByteArray(Slice fragment) {
ByteArrayInputStream bis = new ByteArrayInputStream(fragment.buffer, fragment.offset,
fragment.length);
try {
return coder.decode(bis, Context.OUTER);
return ContextSensitiveCoder.decode(coder, bis, Context.OUTER);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -52,7 +53,7 @@ public Object fromByteArray(Slice fragment) {
public Slice toByteArray(Object wv) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
coder.encode(wv, bos, Context.OUTER);
ContextSensitiveCoder.encode(coder, wv, bos, Context.OUTER);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import java.io.IOException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.ContextSensitiveCoder.Context;
import org.apache.beam.sdk.coders.ContextSensitiveCoder;


/**
Expand Down Expand Up @@ -54,7 +55,7 @@ public void write(Kryo kryo, Output output) {
try {
kryo.writeClass(output, coder.getClass());
kryo.writeObject(output, coder, JAVA_SERIALIZER);
coder.encode(value, output, Context.OUTER);
ContextSensitiveCoder.encode(coder, value, output, Context.OUTER);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -66,7 +67,7 @@ public void read(Kryo kryo, Input input) {
@SuppressWarnings("unchecked")
Class<Coder<T>> type = kryo.readClass(input).getType();
coder = kryo.readObject(input, type, JAVA_SERIALIZER);
value = coder.decode(input, Context.OUTER);
value = ContextSensitiveCoder.decode(coder, input, Context.OUTER);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.coders.ContextSensitiveCoder.Context;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
Expand All @@ -45,7 +45,7 @@ public ValuesSource(Iterable<T> values, Coder<T> coder) {
this.iterableCoder = IterableCoder.of(coder);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
iterableCoder.encode(values, bos, Context.OUTER);
iterableCoder.encode(values, bos);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
Expand All @@ -63,7 +63,7 @@ public UnboundedReader<T> createReader(PipelineOptions options,
@Nullable CheckpointMark checkpointMark) {
ByteArrayInputStream bis = new ByteArrayInputStream(codedValues);
try {
Iterable<T> values = this.iterableCoder.decode(bis, Context.OUTER);
Iterable<T> values = this.iterableCoder.decode(bis);
return new ValuesReader<>(values, this);
} catch (IOException ex) {
throw new RuntimeException(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,19 @@ static class CheckpointCoder<T> extends CustomCoder<Checkpoint<T>> {
}

@Override
public void encode(Checkpoint<T> value, OutputStream outStream, Context context)
public void encode(Checkpoint<T> value, OutputStream outStream)
throws CoderException, IOException {
elemsCoder.encode(value.residualElements, outStream, context.nested());
sourceCoder.encode(value.residualSource, outStream, context);
elemsCoder.encode(value.residualElements, outStream);
sourceCoder.encode(value.residualSource, outStream);
}

@SuppressWarnings("unchecked")
@Override
public Checkpoint<T> decode(InputStream inStream, Context context)
public Checkpoint<T> decode(InputStream inStream)
throws CoderException, IOException {
return new Checkpoint<>(
elemsCoder.decode(inStream, context.nested()),
sourceCoder.decode(inStream, context));
elemsCoder.decode(inStream),
sourceCoder.decode(inStream));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ static class Record implements Serializable {}

private static class RecordCoder extends CustomCoder<Record> {
@Override
public void encode(Record value, OutputStream outStream, Context context)
public void encode(Record value, OutputStream outStream)
throws CoderException, IOException {}

@Override
public Record decode(InputStream inStream, Context context)
public Record decode(InputStream inStream)
throws CoderException, IOException {
return new Record();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,20 @@ private ElementAndRestrictionCoder(

@Override
public void encode(
ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context)
ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream)
throws IOException {
if (value == null) {
throw new CoderException("cannot encode a null ElementAndRestriction");
}
elementCoder.encode(value.element(), outStream, context.nested());
restrictionCoder.encode(value.restriction(), outStream, context);
elementCoder.encode(value.element(), outStream);
restrictionCoder.encode(value.restriction(), outStream);
}

@Override
public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context)
public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream)
throws IOException {
ElementT key = elementCoder.decode(inStream, context.nested());
RestrictionT value = restrictionCoder.decode(inStream, context);
ElementT key = elementCoder.decode(inStream);
RestrictionT value = restrictionCoder.decode(inStream);
return ElementAndRestriction.of(key, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,19 @@ public Coder<ElemT> getElementCoder() {
}

@Override
public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream)
throws CoderException, IOException {
Coder.Context nestedContext = context.nested();
keyCoder.encode(value.key(), outStream, nestedContext);
timersCoder.encode(value.timersIterable(), outStream, nestedContext);
elemsCoder.encode(value.elementsIterable(), outStream, context);
keyCoder.encode(value.key(), outStream);
timersCoder.encode(value.timersIterable(), outStream);
elemsCoder.encode(value.elementsIterable(), outStream);
}

@Override
public KeyedWorkItem<K, ElemT> decode(InputStream inStream, Coder.Context context)
public KeyedWorkItem<K, ElemT> decode(InputStream inStream)
throws CoderException, IOException {
Coder.Context nestedContext = context.nested();
K key = keyCoder.decode(inStream, nestedContext);
Iterable<TimerData> timers = timersCoder.decode(inStream, nestedContext);
Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream, context);
K key = keyCoder.decode(inStream);
Iterable<TimerData> timers = timersCoder.decode(inStream);
Iterable<WindowedValue<ElemT>> elems = elemsCoder.decode(inStream);
return KeyedWorkItems.workItem(key, timers, elems);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,24 +238,22 @@ private TimerDataCoder(Coder<? extends BoundedWindow> windowCoder) {
}

@Override
public void encode(TimerData timer, OutputStream outStream, Context context)
public void encode(TimerData timer, OutputStream outStream)
throws CoderException, IOException {
Context nestedContext = context.nested();
STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext);
INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
STRING_CODER.encode(timer.getDomain().name(), outStream, context);
STRING_CODER.encode(timer.getTimerId(), outStream);
STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
INSTANT_CODER.encode(timer.getTimestamp(), outStream);
STRING_CODER.encode(timer.getDomain().name(), outStream);
}

@Override
public TimerData decode(InputStream inStream, Context context)
public TimerData decode(InputStream inStream)
throws CoderException, IOException {
Context nestedContext = context.nested();
String timerId = STRING_CODER.decode(inStream, nestedContext);
String timerId = STRING_CODER.decode(inStream);
StateNamespace namespace =
StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, context));
StateNamespaces.fromString(STRING_CODER.decode(inStream), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
return TimerData.of(timerId, namespace, timestamp, domain);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ static class RecordNoEncodeCoder extends CustomCoder<Record> {
public void encode(
Record value,
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws IOException {
throw new CoderException("Encode not allowed");
}

@Override
public Record decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
InputStream inStream, org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws IOException {
return null;
}
Expand All @@ -197,12 +197,12 @@ static class RecordNoDecodeCoder extends CustomCoder<Record> {
public void encode(
Record value,
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws IOException {}

@Override
public Record decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
InputStream inStream, org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws IOException {
throw new CoderException("Decode not allowed");
}
Expand All @@ -213,12 +213,12 @@ private static class RecordStructuralValueCoder extends CustomCoder<Record> {
public void encode(
Record value,
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws CoderException, IOException {}

@Override
public Record decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
InputStream inStream, org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws CoderException, IOException {
return new Record() {
@Override
Expand All @@ -245,12 +245,12 @@ private static class RecordNotConsistentWithEqualsStructuralValueCoder
public void encode(
Record value,
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws CoderException, IOException {}

@Override
public Record decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
InputStream inStream, org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws CoderException, IOException {
return new Record() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,14 @@ public static class Coder extends CustomCoder<TestCheckpointMark> {
public void encode(
TestCheckpointMark value,
OutputStream outStream,
org.apache.beam.sdk.coders.Coder.Context context)
org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws IOException {
VarInt.encode(value.index, outStream);
}

@Override
public TestCheckpointMark decode(
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
InputStream inStream, org.apache.beam.sdk.coders.ContextSensitiveCoder.Context context)
throws IOException {
TestCheckpointMark decoded = new TestCheckpointMark(VarInt.decodeInt(inStream));
decoded.decoded = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public int getLength() {
@Override
public void serialize(T t, DataOutputView dataOutputView) throws IOException {
DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView);
coder.encode(t, outputWrapper, Coder.Context.NESTED);
coder.encode(t, outputWrapper);
}

@Override
public T deserialize(DataInputView dataInputView) throws IOException {
try {
DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView);
return coder.decode(inputWrapper, Coder.Context.NESTED);
return coder.decode(inputWrapper);
} catch (CoderException e) {
Throwable cause = e.getCause();
if (cause instanceof EOFException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,17 @@ public Coder<ElemT> getElementCoder() {

@Override
public void encode(SingletonKeyedWorkItem<K, ElemT> value,
OutputStream outStream,
Context context)
OutputStream outStream)
throws CoderException, IOException {
keyCoder.encode(value.key(), outStream, context.nested());
valueCoder.encode(value.value, outStream, context);
keyCoder.encode(value.key(), outStream);
valueCoder.encode(value.value, outStream);
}

@Override
public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream)
throws CoderException, IOException {
K key = keyCoder.decode(inStream, context.nested());
WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
K key = keyCoder.decode(inStream);
WindowedValue<ElemT> value = valueCoder.decode(inStream);
return new SingletonKeyedWorkItem<>(key, value);
}

Expand Down
Loading