Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;

Expand All @@ -40,10 +41,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
Expand Down Expand Up @@ -159,14 +162,8 @@ private void finishWriteAndResult() throws IOException {
}
dataStream.flush();
StreamStateHandle underlying = checkpointStream.closeAndGetHandle();
complete(
result.inputChannelStateHandles,
inputChannelOffsets,
(chan, offsets) -> new InputChannelStateHandle(chan, underlying, offsets));
complete(
result.resultSubpartitionStateHandles,
resultSubpartitionOffsets,
(chan, offsets) -> new ResultSubpartitionStateHandle(chan, underlying, offsets));
complete(underlying, result.inputChannelStateHandles, inputChannelOffsets, HandleFactory.INPUT_CHANNEL);
complete(underlying, result.resultSubpartitionStateHandles, resultSubpartitionOffsets, HandleFactory.RESULT_SUBPARTITION);
}

private void doComplete(boolean precondition, RunnableWithException complete, RunnableWithException... callbacks) throws Exception {
Expand All @@ -180,17 +177,34 @@ private void doComplete(boolean precondition, RunnableWithException complete, Ru
}

private <I, H extends AbstractChannelStateHandle<I>> void complete(
StreamStateHandle underlying,
CompletableFuture<Collection<H>> future,
Map<I, List<Long>> offsets,
BiFunction<I, List<Long>, H> buildHandle) {
HandleFactory<I, H> handleFactory) throws IOException {
final Collection<H> handles = new ArrayList<>();
for (Map.Entry<I, List<Long>> e : offsets.entrySet()) {
handles.add(buildHandle.apply(e.getKey(), e.getValue()));
handles.add(createHandle(handleFactory, underlying, e.getKey(), e.getValue()));
}
future.complete(handles);
LOG.debug("channel state write completed, checkpointId: {}, handles: {}", checkpointId, handles);
}

private <I, H extends AbstractChannelStateHandle<I>> H createHandle(
HandleFactory<I, H> handleFactory,
StreamStateHandle underlying,
I channelInfo,
List<Long> offsets) throws IOException {
Optional<byte[]> bytes = underlying.asBytesIfInMemory(); // todo: consider restructuring channel state and removing this method: https://issues.apache.org/jira/browse/FLINK-17972
if (bytes.isPresent()) {
return handleFactory.create(
channelInfo,
new ByteStreamStateHandle(randomUUID().toString(), serializer.extractAndMerge(bytes.get(), offsets)),
singletonList(serializer.getHeaderLength()));
} else {
return handleFactory.create(channelInfo, underlying, offsets);
}
}

private void runWithChecks(RunnableWithException r) throws Exception {
try {
checkState(!result.isDone(), "result is already completed", result);
Expand All @@ -206,4 +220,11 @@ public void fail(Throwable e) throws Exception {
checkpointStream.close();
}

private interface HandleFactory<I, H extends AbstractChannelStateHandle<I>> {
H create(I info, StreamStateHandle underlying, List<Long> offsets);

HandleFactory<InputChannelInfo, InputChannelStateHandle> INPUT_CHANNEL = InputChannelStateHandle::new;

HandleFactory<ResultSubpartitionInfo, ResultSubpartitionStateHandle> RESULT_SUBPARTITION = ResultSubpartitionStateHandle::new;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public ChannelStateReaderImpl(TaskStateSnapshot snapshot) {
this(snapshot, new ChannelStateSerializerImpl());
}

ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateDeserializer serializer) {
ChannelStateReaderImpl(TaskStateSnapshot snapshot, ChannelStateSerializer serializer) {
RefCountingFSDataInputStreamFactory streamFactory = new RefCountingFSDataInputStreamFactory(serializer);
final HashMap<InputChannelInfo, ChannelStateStreamReader> inputChannelHandleReadersTmp = new HashMap<>();
final HashMap<ResultSubpartitionInfo, ChannelStateStreamReader> resultSubpartitionHandleReadersTmp = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@

import javax.annotation.concurrent.NotThreadSafe;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;

import static java.lang.Math.addExact;
import static java.lang.Math.min;
Expand All @@ -39,15 +42,16 @@ interface ChannelStateSerializer {
void writeHeader(DataOutputStream dataStream) throws IOException;

void writeData(DataOutputStream stream, Buffer... flinkBuffers) throws IOException;
}

interface ChannelStateDeserializer {

void readHeader(InputStream stream) throws IOException;

int readLength(InputStream stream) throws IOException;

int readData(InputStream stream, ChannelStateByteBuffer buffer, int bytes) throws IOException;

byte[] extractAndMerge(byte[] bytes, List<Long> offsets) throws IOException;

long getHeaderLength();
}

/**
Expand Down Expand Up @@ -128,7 +132,7 @@ public int writeBytes(InputStream input, int bytesToRead) throws IOException {
}
}

class ChannelStateSerializerImpl implements ChannelStateSerializer, ChannelStateDeserializer {
class ChannelStateSerializerImpl implements ChannelStateSerializer {
private static final int SERIALIZATION_VERSION = 0;

@Override
Expand Down Expand Up @@ -174,4 +178,35 @@ public int readData(InputStream stream, ChannelStateByteBuffer buffer, int bytes
private static int readInt(InputStream stream) throws IOException {
return new DataInputStream(stream).readInt();
}

@Override
public byte[] extractAndMerge(byte[] bytes, List<Long> offsets) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(out);
byte[] merged = extractByOffsets(bytes, offsets);
writeHeader(dataOutputStream);
dataOutputStream.writeInt(merged.length);
dataOutputStream.write(merged, 0, merged.length);
dataOutputStream.close();
return out.toByteArray();
}

private byte[] extractByOffsets(byte[] data, List<Long> offsets) throws IOException {
DataInputStream lengthReadingStream = new DataInputStream(new ByteArrayInputStream(data, 0, data.length));
ByteArrayOutputStream out = new ByteArrayOutputStream();
long prevOffset = 0;
for (long offset : offsets) {
lengthReadingStream.skipBytes((int) (offset - prevOffset));
int dataWithLengthOffset = (int) offset + Integer.BYTES;
out.write(data, dataWithLengthOffset, lengthReadingStream.readInt());
prevOffset = dataWithLengthOffset;
}
return out.toByteArray();
}

@Override
public long getHeaderLength() {
return Integer.BYTES;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
class ChannelStateStreamReader implements Closeable {

private final RefCountingFSDataInputStream stream;
private final ChannelStateDeserializer serializer;
private final ChannelStateSerializer serializer;
private final Queue<Long> offsets;
private int remainingBytes = -1;
private boolean closed = false;
Expand All @@ -54,7 +54,7 @@ class ChannelStateStreamReader implements Closeable {
this(streamFactory.getOrCreate(handle), handle.getOffsets(), streamFactory.getSerializer());
}

private ChannelStateStreamReader(RefCountingFSDataInputStream stream, List<Long> offsets, ChannelStateDeserializer serializer) {
private ChannelStateStreamReader(RefCountingFSDataInputStream stream, List<Long> offsets, ChannelStateSerializer serializer) {
this.stream = stream;
this.stream.incRef();
this.serializer = serializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ private enum State {NEW, OPENED, CLOSED}

private final SupplierWithException<FSDataInputStream, IOException> streamSupplier;
private FSDataInputStream stream;
private final ChannelStateDeserializer serializer;
private final ChannelStateSerializer serializer;
private int refCount = 0;
private State state = State.NEW;

private RefCountingFSDataInputStream(
SupplierWithException<FSDataInputStream, IOException> streamSupplier,
ChannelStateDeserializer serializer) {
ChannelStateSerializer serializer) {
this.streamSupplier = checkNotNull(streamSupplier);
this.serializer = checkNotNull(serializer);
}
Expand Down Expand Up @@ -105,9 +105,9 @@ private void checkNotClosed() {
@NotThreadSafe
static class RefCountingFSDataInputStreamFactory {
private final Map<StreamStateHandle, RefCountingFSDataInputStream> streams = new HashMap<>(); // not clearing: expecting short life
private final ChannelStateDeserializer serializer;
private final ChannelStateSerializer serializer;

RefCountingFSDataInputStreamFactory(ChannelStateDeserializer serializer) {
RefCountingFSDataInputStreamFactory(ChannelStateSerializer serializer) {
this.serializer = checkNotNull(serializer);
}

Expand All @@ -121,7 +121,7 @@ <T> RefCountingFSDataInputStream getOrCreate(AbstractChannelStateHandle<T> handl
return stream;
}

ChannelStateDeserializer getSerializer() {
ChannelStateSerializer getSerializer() {
return serializer;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.Optional;

/**
* A handle to the partitioned stream operator state after it has been checkpointed. This state
Expand Down Expand Up @@ -117,6 +118,11 @@ public FSDataInputStream openInputStream() throws IOException {
return stateHandle.openInputStream();
}

@Override
public Optional<byte[]> asBytesIfInMemory() {
return stateHandle.asBytesIfInMemory();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

/**
* State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a
Expand Down Expand Up @@ -66,6 +67,11 @@ public FSDataInputStream openInputStream() throws IOException {
return delegateStateHandle.openInputStream();
}

@Override
public Optional<byte[]> asBytesIfInMemory() {
return delegateStateHandle.asBytesIfInMemory();
}

@Override
public StreamStateHandle getDelegateStateHandle() {
return delegateStateHandle;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;

import java.util.Optional;

/**
* A placeholder state handle for shared state that will replaced by an original that was
* created in a previous checkpoint. So we don't have to send a state handle twice, e.g. in
Expand All @@ -40,6 +42,12 @@ public FSDataInputStream openInputStream() {
"This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator.");
}

@Override
public Optional<byte[]> asBytesIfInMemory() {
throw new UnsupportedOperationException(
"This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator.");
}

@Override
public void discardState() throws Exception {
// nothing to do.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Optional;

/**
* Wrapper around a {@link StreamStateHandle} to make the referenced state object retrievable trough a simple get call.
Expand Down Expand Up @@ -64,6 +65,11 @@ public FSDataInputStream openInputStream() throws IOException {
return wrappedStreamStateHandle.openInputStream();
}

@Override
public Optional<byte[]> asBytesIfInMemory() {
return wrappedStreamStateHandle.asBytesIfInMemory();
}

@Override
public void discardState() throws Exception {
wrappedStreamStateHandle.discardState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.core.fs.FSDataInputStream;

import java.io.IOException;
import java.util.Optional;

/**
* A {@link StateObject} that represents state that was written to a stream. The data can be read
Expand All @@ -33,4 +34,9 @@ public interface StreamStateHandle extends StateObject {
* was previously written to the stream.
*/
FSDataInputStream openInputStream() throws IOException;

/**
* @return Content of this handle as bytes array if it is already in memory.
*/
Optional<byte[]> asBytesIfInMemory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.runtime.state.StreamStateHandle;

import java.io.IOException;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -68,6 +69,11 @@ public FSDataInputStream openInputStream() throws IOException {
return getFileSystem().open(filePath);
}

@Override
public Optional<byte[]> asBytesIfInMemory() {
return Optional.empty();
}

/**
* Discard the state by deleting the file that stores the state. If the parent directory
* of the state is empty after deleting the state file, it is also deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.Optional;

/**
* A state handle that contains stream state in a byte array.
Expand Down Expand Up @@ -55,6 +56,11 @@ public FSDataInputStream openInputStream() throws IOException {
return new ByteStateHandleInputStream(data);
}

@Override
public Optional<byte[]> asBytesIfInMemory() {
return Optional.of(getData());
}

public byte[] getData() {
return data;
}
Expand Down
Loading