Skip to content

Commit

Permalink
[FLINK-7461] Remove Backwards compatibility with <= Flink 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanRRichter committed Aug 24, 2017
1 parent 5456cf9 commit 6642768
Show file tree
Hide file tree
Showing 126 changed files with 318 additions and 12,512 deletions.
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -132,7 +131,7 @@
@Deprecated @Deprecated
public class RollingSink<T> extends RichSinkFunction<T> public class RollingSink<T> extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, implements InputTypeConfigurable, CheckpointedFunction,
CheckpointListener, CheckpointedRestoring<RollingSink.BucketState> { CheckpointListener {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


Expand Down Expand Up @@ -758,25 +757,6 @@ private void handleRestoredBucketState(BucketState bucketState) {
} }
} }


// --------------------------------------------------------------------------------------------
// Backwards compatibility with Flink 1.1
// --------------------------------------------------------------------------------------------

@Override
public void restoreState(BucketState state) throws Exception {
LOG.info("{} (taskIdx={}) restored bucket state from an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);

try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when restoring the state of the RollingSink.", e);
throw new RuntimeException("Error while creating FileSystem when restoring the state of the RollingSink.", e);
}

handleRestoredBucketState(state);
}

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Setters for User configuration values // Setters for User configuration values
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.Clock;
Expand Down Expand Up @@ -154,8 +153,7 @@
*/ */
public class BucketingSink<T> public class BucketingSink<T>
extends RichSinkFunction<T> extends RichSinkFunction<T>
implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback {
CheckpointedRestoring<RollingSink.BucketState>, ProcessingTimeCallback {


private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;


Expand Down Expand Up @@ -872,25 +870,6 @@ private void handlePendingFilesForPreviousCheckpoints(Map<Long, List<String>> pe
} }
} }


// --------------------------------------------------------------------------------------------
// Backwards compatibility with Flink 1.1
// --------------------------------------------------------------------------------------------

@Override
public void restoreState(RollingSink.BucketState state) throws Exception {
LOG.info("{} (taskIdx={}) restored bucket state from the RollingSink an older Flink version: {}",
getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), state);

try {
initFileSystem();
} catch (IOException e) {
LOG.error("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
throw new RuntimeException("Error while creating FileSystem when restoring the state of the BucketingSink.", e);
}

handleRestoredRollingSinkState(state);
}

// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
// Setters for User configuration values // Setters for User configuration values
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
Expand Down

This file was deleted.

0 comments on commit 6642768

Please sign in to comment.