ControlFlow is an Android library that facilitates task sequencing, rollback actions, and error handling. It systematically oversees task execution, offering structured error handling and facilitating rollback processes for efficient management.
Explore the implementation of the controlflow library through the code samples available in the ControlFlowDemo. repository.
- Task Execution Sequencing: Define and manage a sequence of tasks.
- Subtasks Execution Sequencing: Define and manage a sequence of subtasks for each primary task.
- Rollback Mechanism: Implement rollback functionalities for tasks that need to revert changes made during their execution.
- Error Handling: Handle errors occurring during task execution and initiate rollback processes if required.
- Automated Data Forwarding: Each task's output data type is automatically forwarded as the input for the subsequent task by default.
To include ControlFlow in your Android project, add the following dependency to your app's build.gradle
file:
implementation("io.github.codestarx:control-flow:1.0.0-alpha11")
repositories {
//..
//..
mavenCentral()
}
- Task Management: Create instances of tasks inheriting from
Dispatcher
and implementingTaskProcessor
. - ControlFlow Class: Use the
ControlFlow
class to manage task sequences and their execution flow. - Start Execution: Begin executing tasks using
start()
method.
Inherit from Dispatcher: Create tasks by inheriting from the Dispatcher
class and implementing the TaskProcessor
properties. For example:
class MyTask : Dispatcher(), TaskProcessor {
override val info: TaskInfo
get() = TaskInfo().apply {
index = 0
name = MyTask::class.java.name
runIn = Dispatchers.IO
}
override suspend fun doProcess(param: Any?): Flow<TaskStatus> {
// Define the action here
return launchAwait(
action = {
// Perform the action
// ...
},
transformer = {
// The output generated by this function will serve as the input for the subsequent task
// Return TransformData(data= ...)
},
actionCondition = {
// Define conditions for continuation or breaking
// Return ConditionData(status = ... , throwable = ... )
}
)
}
}
Tasks implementing rollback functionality should also override methods from the RollbackTaskProcessor
interface, specifying rollback actions.
Example of a task with rollback:
class MyRollbackTask : Dispatcher(), RollbackTaskProcessor {
override val info: TaskInfo
get() = TaskInfo().apply {
index = 0
name = MyRollbackTask::class.java.name
runIn = Dispatchers.IO
}
override val rollbackInfo: RollbackInfo
get() = RollbackInfo().apply {
index = 0
name = MyRollbackTask::class.java.name
runIn = Dispatchers.IO
}
override suspend fun doProcess(param: Any?): Flow<TaskStatus> {
// Define the action for the task
return launchAwait(
action = {
// Perform the action
// ...
},
actionCondition = {
// Define conditions for continuation or breaking
// Return ConditionData(status = ... , throwable = ... )
}
)
}
override suspend fun doRollbackProcess(): Flow<TaskStatus> {
// Define the rollback action here
return launchAwait(
action = {
// Perform the rollback action
// ...
},
actionCondition = {
// Define conditions for rollback continuation or breaking
// Return ConditionData(status = ... , throwable = ... )
}
)
}
}
Each task's output data type is automatically forwarded as the input for the subsequent task by default.
If you require altering the output data type passed to the next task, utilize the transformer
method for this purpose.
Example:
class Task : Dispatcher(), TaskProcessor {
override val info: TaskInfo
get() = TaskInfo().apply {
index = 0
name = Task::class.java.name
runIn = Dispatchers.IO
}
override suspend fun doProcess(param: Any?): Flow<TaskStatus> {
// Define the action here
return launchAwait(
action = {
// Perform the action
// ...
},
transformer = {
// The output generated by this function will serve as the input for the subsequent task
// Return TransformData(data= ...)
},
actionCondition = {
// Define conditions for continuation or breaking
// Return ConditionData(status = ... , throwable = ... )
}
)
}
}
The attributes of each task are outlined using the TaskInfo
class.
The index
, name
and runIn
parameters define the task's specifications and execution thread. By utilizing index
or name
and the startFrom
method within the ControlFlow
, tasks can be rerun as needed.
Example:
class Task : Dispatcher(), TaskProcessor {
get() = TaskInfo().apply {
index = 0
name = Task::class.java.name
runIn = Dispatchers.IO
}
override suspend fun doProcess(param: Any?): Flow<TaskStatus> {
...
}
To activate the Retry mechanism for each task, set the count
to define the number of retries in case of failure.
Additionally, assign specific causes
, a list of errors, to trigger retries upon encountering these errors.
Adjust the delay
value to determine the interval between each retry attempt.
Example:
class Task : Dispatcher(), TaskProcessor {
get() = TaskInfo().apply {
index = 0
name = Task::class.java.name
retry = RetryStrategy().apply {
count = 2
causes = setOf(TimeoutException::class,AnotherException::class,...)
delay = 1000L
}
runIn = Dispatchers.IO
}
override suspend fun doProcess(param: Any?): Flow<TaskStatus> {
...
}
The attributes of each task are outlined using the RollbackInfo
class.
The index
, name
and runIn
parameters define the task's specifications and execution thread. By utilizing index
or name
and the startRollbackFrom
method within the ControlFlow
, tasks can be rerun as needed.
Example:
class Task : Dispatcher(), RollbackTaskProcessor {
override val info: TaskInfo
get() = TaskInfo().apply {
index = 0
name = Task::class.java.name
runIn = Dispatchers.IO
}
override val rollbackInfo: RollbackInfo
get() = RollbackInfo().apply {
index = 0
name = Task::class.java.name
runIn = Dispatchers.IO
}
override suspend fun doProcess(param: Any?): Flow<TaskStatus> {
...
}
override suspend fun doRollbackProcess(): Flow<TaskStatus> {
...
}
To activate the Retry mechanism for each rollback task, set the count
to define the number of retries in case of failure.
Additionally, assign specific causes
, a list of errors, to trigger retries upon encountering these errors.
Adjust the delay
value to determine the interval between each retry attempt.
Example:
class Task : Dispatcher(), RollbackTaskProcessor {
override val info: TaskInfo
get() = ...
override val rollbackInfo: RollbackInfo
get() = RollbackInfo().apply {
index = 0
name = Task::class.java.name
retry = RetryStrategy().apply {
count = 2
causes = setOf(TimeoutException::class,AnotherException::class,...)
delay = 1000L
}
runIn = Dispatchers.IO
}
override suspend fun doProcess(param: Any?): Flow<TaskStatus> {
...
}
override suspend fun doRollbackProcess(): Flow<TaskStatus> {
...
}
ControlFlow
manages the execution sequence of tasks and potential rollback actions.
It orchestrates the execution, rollback, completion, and error handling of tasks and their rollbacks.
This class offers a structured way to manage a series of tasks and handles their execution flow and potential rollbacks.
Example usage:
// Create a ControlFlow instance
val controlFlow = ControlFlow(object : WorkFlowTracker {
// Implement work Flow callback methods
})
// Define your tasks
controlFlow.startWith(MyTask())
controlFlow.then(AnotherTask())
controlFlow.then(AnotherTask())
// Set up TaskStatusTracker if needed
controlFlow.useTaskStatusTracker(object : TaskStatusTracker {
// Implement callback methods
})
// Set up RollbackStatusTracker if needed
controlFlow.useRollbackStatusTracker(object : RollbackStatusTracker {
// Implement callback methods
})
// Start executing tasks
controlFlow.start()
To incorporate subtasks for each task, you can define their implementation as outlined below:
Example usage:
// Create a ControlFlow instance
val controlFlow = ControlFlow(object : WorkFlowTracker {
// Implement work Flow callback methods
})
// Define your tasks
controlFlow.startWith(MyTask().apply{
// Define your subtasks
then(subtask= MySubtask())
then(subtask= AnotherSubtask())
})
controlFlow.then(AnotherTask().apply{
// Define your subtasks
then(subtask= MySubtask())
then(subtask= AnotherSubtask())
})
// Set up TaskStatusTracker if needed
controlFlow.useTaskStatusTracker(object : TaskStatusTracker {
// Implement callback methods
})
// Set up RollbackStatusTracker if needed
controlFlow.useRollbackStatusTracker(object : RollbackStatusTracker {
// Implement callback methods
})
// Start executing tasks
controlFlow.start()
/**
* Add the first task to the control flow sequence.
* @param first task to be added to the control flow sequence.
*/
startWith(first: TaskProcessor)
/**
* Adds the next task to the control flow sequence.
* @param next The subsequent task to be added to the control flow sequence.
*/
then(next: TaskProcessor)
/**
* Starts executing the tasks in the control flow sequence.
* @param runAutomaticallyRollback Set to true if you want tasks to automatically rollback on failure.
*/
start(runAutomaticallyRollback: Boolean = false)
/**
* Starts executing tasks from a specific task name in the sequence.
* @param taskName The name of the task from which to start the execution.
*/
startFrom(taskName: String)
/**
* Starts executing tasks from a specific task index in the sequence.
* @param taskIndex The index of the task from which to start the execution.
*/
startFrom(taskIndex: Int)
/**
* Restarts the control flow from the beginning.
*/
restart()
/**
* Starts executing the rollback tasks in the control flow sequence.
*/
startRollback()
/**
* Initiates the rollback process from a specific task name in the rollback sequence.
* @param taskName The name of the task from which to start the rollback process.
*/
startRollbackFrom(taskName: String)
/**
* Initiates the rollback process from a specific task index in the rollback sequence.
* @param taskIndex The index of the task from which to start the rollback process.
*/
startRollbackFrom(taskIndex: Int)
/**
* Restarts the rollback process.
*/
restartRollback()
/**
* Associates a callback for the main task execution.
* @param callBack The callback to be associated with the main task execution.
*/
useTaskStatusTracker(callBack: TaskStatusTracker)
/**
* Associates a callback for the rollback task execution.
* @param callBack The callback to be associated with the rollback task execution.
*/
useRollbackStatusTracker( callBack: RollbackStatusTracker)
/**
* Stops and cleans up the resources associated with the task processing system.
* This method sets various internal variables to null, effectively releasing
* the references to the task-related objects, job instances, and other components.
*/
fun stop()
The Dispatcher
class serves as a utility to execute asynchronous actions, manage errors, and handle various scenarios using coroutines. This class offers a range of methods to facilitate asynchronous operations and streamline error handling within tasks.
- Usage: Executes a provided coroutine block within the
viewModelScope
, managing potential exceptions via aCoroutineExceptionHandler
. - Example:
safeLauncher { // Coroutine block to execute // ... }
- Usage: Executes an asynchronous action and emits the result or errors via a Flow, based on specified custom conditions.
- Example:
launchAwait(
// Action to execute asynchronously with callbacks for continuation and failure
{ onContinuation, onFailure ->
// Invoke the action and handle results
// Call onContinuation with the result to continue or onFailure with an error to break
},
// Condition to check if continuation or breaking is required based on the result
{ result ->
// Define conditions for rollback continuation or breaking
// Return ConditionData(status = ... , throwable = ... )
}
)
- Usage: Executes an asynchronous action that returns a Unit result and emits the status via a Flow.
- Example:
launchUnitAwait { onContinuation, onFailure -> // Invoke the action and handle results // Call onContinuation to indicate successful completion or onFailure with an error }
- Usage: Executes an asynchronous action returning a Flow, evaluates conditions for success or breaking, and emits the status accordingly.
- Example:
launchFlow( // Action returning a Flow { flowAction() }, // Condition to check if continuation or breaking is required based on the Flow's result { result -> // Define conditions for rollback continuation or breaking // Return ConditionData(status = ... , throwable = ... ) } )
- Usage: Executes a synchronous action, evaluates conditions for success or breaking, and emits the status via a Flow.
- Example:
launch( // Synchronous action to execute { syncAction() }, // Condition to check if continuation or breaking is required based on the action's result { result -> // Define conditions for rollback continuation or breaking // Return ConditionData(status = ... , throwable = ... ) } )
For detailed information about classes, methods, and functionalities provided by ControlFlow, refer to the inline comments in the source code.
If you have a suggestion that would make this better, please fork the repo and create a pull request. You can also simply open an issue with the tag "enhancement". Don't forget to give the project a star! Thanks again!
- Fork the Project
- Create your Feature Branch (
git checkout -b feature/YourFeatureName
) - Commit your Changes (
git commit -m 'Add some YourFeatureName'
) - Push to the Branch (
git push origin feature/YourFeatureName
) - Open a Pull Request
The Apache Software License, Version 2.0