Skip to content

Commit

Permalink
[FLINK-5163] Port the StatefulSequenceSource to the new state abstrac…
Browse files Browse the repository at this point in the history
…tions.
  • Loading branch information
kl0u committed Nov 28, 2016
1 parent 1840680 commit dde5d3a
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -18,25 +18,44 @@
package org.apache.flink.streaming.api.functions.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.util.Preconditions;

import java.util.ArrayDeque;
import java.util.Deque;

/**
* A stateful streaming source that emits each number from a given interval exactly once,
* possibly in parallel.
* <p>
* For the source to be re-scalable, the first time the job is run, we precompute all the elements
* that each of the tasks should emit and upon checkpointing, each element constitutes its own
* partition. When rescaling, these partitions will be randomly re-assigned to the new tasks.
* <p>
* This strategy guarantees that each element will be emitted exactly-once, but elements will not
* necessarily be emitted in ascending order, even for the same tasks.
*/
@PublicEvolving
public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
public class StatefulSequenceSource extends RichParallelSourceFunction<Long> implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private final long start;
private final long end;

private long collected;

private volatile boolean isRunning = true;

private transient Deque<Long> valuesToEmit;

private static final String STATEFUL_SOURCE_STATE = "stateful-source-state";

private transient ListState<Long> checkpointedState;

/**
* Creates a source that emits all numbers from the given interval exactly once.
*
Expand All @@ -49,24 +68,47 @@ public StatefulSequenceSource(long start, long end) {
}

@Override
public void run(SourceContext<Long> ctx) throws Exception {
final Object checkpointLock = ctx.getCheckpointLock();
public void initializeState(FunctionInitializationContext context) throws Exception {

Preconditions.checkState(this.checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");

this.checkpointedState = context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>(
STATEFUL_SOURCE_STATE,
LongSerializer.INSTANCE
)
);

this.valuesToEmit = new ArrayDeque<>();
if (context.isRestored()) {
// upon restoring

RuntimeContext context = getRuntimeContext();
for (Long v : this.checkpointedState.get()) {
this.valuesToEmit.add(v);
}
} else {
// the first time the job is executed

final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
final long congruence = start + taskIdx;

final long stepSize = context.getNumberOfParallelSubtasks();
final long congruence = start + context.getIndexOfThisSubtask();
long totalNoOfElements = Math.abs(end - start + 1);
final int baseSize = safeDivide(totalNoOfElements, stepSize);
final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;

final long toCollect =
((end - start + 1) % stepSize > (congruence - start)) ?
((end - start + 1) / stepSize + 1) :
((end - start + 1) / stepSize);
for (long collected = 0; collected < toCollect; collected++) {
this.valuesToEmit.add(collected * stepSize + congruence);
}
}
}

while (isRunning && collected < toCollect) {
synchronized (checkpointLock) {
ctx.collect(collected * stepSize + congruence);
collected++;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (isRunning && !this.valuesToEmit.isEmpty()) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(this.valuesToEmit.poll());
}
}
}
Expand All @@ -77,12 +119,20 @@ public void cancel() {
}

@Override
public Long snapshotState(long checkpointId, long checkpointTimestamp) {
return collected;
public void snapshotState(FunctionSnapshotContext context) throws Exception {
Preconditions.checkState(this.checkpointedState != null,
"The " + getClass().getSimpleName() + " state has not been properly initialized.");

this.checkpointedState.clear();
for (Long v : this.valuesToEmit) {
this.checkpointedState.add(v);
}
}

@Override
public void restoreState(Long state) {
collected = state;
private static int safeDivide(long left, long right) {
Preconditions.checkArgument(right > 0);
Preconditions.checkArgument(left >= 0);
Preconditions.checkArgument(left <= Integer.MAX_VALUE * right);
return (int) (left / right);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,4 @@ public void fromCollectionTest() throws Exception {
Arrays.asList(1, 2, 3))));
assertEquals(expectedList, actualList);
}

@Test
public void generateSequenceTest() throws Exception {
List<Long> expectedList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L);
List<Long> actualList = SourceFunctionUtil.runSourceFunction(new StatefulSequenceSource(1,
7));
assertEquals(expectedList, actualList);
}
}

0 comments on commit dde5d3a

Please sign in to comment.