A modern iteration API implementation for Apache Flink 2.0 that replaces the deprecated DataSet and DataStream iteration APIs. This implementation follows the FLIP-176 specification for unified iteration support.
This project provides a clean, non-deprecated iteration API for Apache Flink 2.0, supporting both bounded and unbounded iterations for machine learning and iterative algorithms.
- Unified Iteration Model: Support for both bounded and unbounded iterations
- Synchronous/Asynchronous Execution: Flexible execution modes for different algorithm requirements
- Per-round/All-round Operators: Configurable operator lifecycle management
- Epoch Tracking: Built-in epoch/round tracking for iteration progress
- Termination Control: Flexible termination conditions including max rounds and convergence criteria
- Checkpoint Support: Integration with Flink's checkpointing mechanism
The iteration API consists of several key components:
- IterationBody: Interface for defining iteration computation logic
- IterationListener: Callbacks for epoch watermarks and termination events
- Iterations: Main entry point for creating iterations
- HeadOperator/TailOperator: Internal operators managing the feedback loop
- DataStreamList: Helper for managing multiple typed streams
- IterationConfig: Configuration for iteration behavior
- Java 17 or higher
- Apache Flink 2.0+
- Maven 3.6 or higher
mvn clean package
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> initValues = env.fromElements(0);
DataStream<Integer> data = env.fromElements(1, 2, 3, 4, 5);
DataStreamList result = Iterations.iterateUnboundedStreams(
DataStreamList.of(initValues),
DataStreamList.of(data),
(variableStreams, dataStreams) -> {
DataStream<Integer> variable = variableStreams.get(0);
DataStream<Integer> input = dataStreams.get(0);
DataStream<Integer> updated = variable
.union(input)
.map(value -> value + 1);
return new IterationBodyResult(
DataStreamList.of(updated), // feedback
DataStreamList.of(updated) // output
);
}
);
IterationConfig config = IterationConfig.newBuilder()
.setMaxRounds(10)
.setOperatorLifeCycle(IterationConfig.OperatorLifeCycle.ALL_ROUND)
.build();
DataStreamList result = Iterations.iterateBoundedStreamsUntilTermination(
DataStreamList.of(initParameters),
ReplayableDataStreamList.notReplay(dataset),
config,
iterationBody
);
See src/main/java/org/apache/flink/iteration/examples/LinearRegressionExample.java
for a complete example of implementing linear regression with SGD using the iteration API.
The core interface for defining iteration logic:
public interface IterationBody {
IterationBodyResult process(
DataStreamList variableStreams,
DataStreamList dataStreams
);
}
For operators that need epoch notifications:
public interface IterationListener<T> {
void onEpochWatermarkIncremented(
int epochWatermark,
Context context,
Collector<T> collector
);
void onIterationTerminated(
Context context,
Collector<T> collector
);
}
Type-safe container for multiple streams:
DataStreamList streams = DataStreamList.of(stream1, stream2, stream3);
DataStream<MyType> first = streams.get(0);
Specify which streams should be replayed in bounded iteration:
ReplayableDataStreamList.replay(stream1, stream2)
.andNotReplay(stream3);
Run tests with:
mvn test
- Linear Regression: Demonstrates synchronous bounded iteration for ML training
- More examples coming soon...
Licensed under the Apache License, Version 2.0. See LICENSE for details.
Contributions are welcome! Please ensure:
- All tests pass
- Code follows Flink coding conventions
- New features include tests
- Documentation is updated
- Support for nested iterations
- Advanced termination criteria
- More efficient serialisation for IterationRecord etc
- Performance optimizations for large-scale iterations
- Additional ML algorithm examples
- Asynchronous Execution
- Integration with Flink ML library
For questions and support, please open an issue on GitHub.