Saga framework / Supports redis-stream and blocking, reactive paradigms.
TPS(6,000) on my Macbook air m2(default options). link
Netx is a Saga framework, that provides following features.
- Supports redis-stream.
- Supports synchronous API and asynchronous Reactor API.
- Supports both Orchestration and Choreograph.
- Automatically reruns loss events.
- Automatically applies
Transactional messaging pattern
. - Supports backpressure to control the number of events that can be processed per node.
- Prevents multiple nodes in the same group from receiving duplicate events.
- Ensures message delivery using the
At Least Once
approach.
You can see the test results here.
dependencies {
implementation "org.rooftopmsa:netx:${version}"
}
Netx can be used in Spring environments, and it can be easily configured by adding the @EnableSaga
annotation as follows:
@EnableSaga
@SpringBootApplication
class Application {
companion object {
@JvmStatic
fun main(vararg args: String) {
SpringApplication.run(Application::class.java, *args)
}
}
}
When configured automatically with the @EnableSaga
annotation, netx uses the following properties to establish connections with event stream services:
KEY | EXAMPLE | DESCRIPTION | DEFAULT |
---|---|---|---|
netx.mode | redis | Specifies the mode implementation used for Saga management. Currently, only redis is available as an option. | |
netx.host | localhost | The host URL of the event stream used for Saga management. (e.g., redis host) | |
netx.password | 0000 | The password used to connect to the event stream used for Saga management. If not set, 0000 is mapped as the password. | 0000 |
netx.port | 6379 | The port of the message queue used for Saga management. | |
netx.group | pay-group | The group of distributed nodes. Saga events are sent to only one node within the same group. | |
netx.node-id | 1 | The identifier used for id generation. Each server must be assigned a different id, and ids can be set from 1 to 256. Ids are generated using the Twitter Snowflake algorithm to prevent duplicate id generation. |
|
netx.node-name | pay-1 | The name of the server participating in the netx.group . There should be no duplicate names within the same group. |
|
netx.recovery-milli | 1000 | Finds and reruns Sagas not processed for netx.orphan-milli milliseconds every netx.recovery-milli milliseconds. |
1000 |
netx.orphan-milli | 60000 | Finds events in the PENDING state that have not become ACK state even after netx.orphan-milli milliseconds and restarts them. |
60000 |
netx.backpressure | 40 | Adjusts the number of events that can be received at once. Setting this too high can cause server load, and setting it too low can reduce performance. This setting affects the amount of events received from other servers and the amount of events failed to be processed. Unreceived or dropped events are automatically put into the retry queue. | 40 |
netx.logging.level | info | Specifies the logging level. Possible values are: "info", "warn", "off" | "off" |
netx.pool-size | 40 | Used to adjust the maximum number of connections when connections need to be continuously established. | 10 |
Tip
When using Orchestrator, Transactional messaging pattern
is automatically applied.
The retry unit for event loss is each operation (one function) of the Orchestrator, and either all chains succeed or rollback is called.
// Use Orchestrator
@Service
class OrderService(private val orderOrchestrator: Orchestrator<Order, OrderResponse>) {
fun order(orderRequest: Order): OrderResult {
val result = orderOrchestrator.sagaSync(orderRequest)
result.decodeResultOrThrow(OrderResult::class) // If success get result or else throw exception
}
}
// Register Orchestrator
@Configurer
class OrchestratorConfigurer(
private val orchestratorFactory: OrchestratorFactory,
) {
@Bean
fun orderOrchestartor(): Orchestrator<Order, OrderResponse> { // <First Request, Last Response>
return orchestratorFactory.create<Order>("orderOrchestrator")
.start(
orchestrate = { order -> // its order type
// Do your bussiness logic
// something like ... "Check valid seller"
return@start user
},
rollback = { order ->
// do rollback logic
}
)
.joinReactive(
orchestrate = { user -> // Before operations response type "User" flow here
// Webflux supports, should return Mono type.
},
// Can skip rollback operation, if you want
)
.joinWithContext(
contextOrchestrate = { context, request ->
context.set("key", request) // save data on context
context.decode("foo", Foo::class) // The context set in the upstream chain can be retrieved.
},
)
.commit(
orchestrate = { request ->
// If a rollback occurs here, all the above rollback functions will be executed sequentially.
throw IllegalArgumentException("Oops! Something went wrong..")
}
)
}
}
When another distributed server (or itself) starts or changes the state of a saga through the sagaManager, the appropriate handler is called based on the state.
By implementing this handler, you can implement logic for each saga state. If an error is thrown in each handler, rollback is automatically called, and when the handler is terminated, the state set in the annotation successWith is automatically called.
Warning
Saga handlers must accept only one Saga...Event
that corresponds to the handler.
When using Event, Transactional messaging pattern
must be applied directly.
You can easily apply it by moving all business logic into the @Saga...Listener as shown below.
@SagaHandler
class SagaHandler(
private val sagaManager: SagaManager,
) {
fun start() {
val foo = Foo("...")
sagaManager.startSync(foo) // it will call
}
@SagaStartListener(event = Foo::class, successWith = SuccessWith.PUBLISH_JOIN) // Receive saga event when event can be mapped to Foo.class
fun handleSagaStartEvent(event: SagaStartEvent) {
val foo: Foo = event.decodeEvent(Foo::class) // Get event field to Foo.class
// ...
event.setNextEvent(nextFoo) // When this handler terminates and calls the next event or rollback, the event set here is published together.
}
@SagaJoinListener(successWith = SuccessWith.PUBLISH_COMMIT) // Receive all saga event when no type is defined. And, when terminated this function, publish commit state
fun handleSagaJoinEvent(event: SagaJoinEvent) {
// ...
}
@SagaCommitListener(
event = Foo::class,
noRollbackFor = [IllegalArgumentException::class] // Don't rollback when throw IllegalArgumentException. *Rollback if throw Throwable or IllegalArgumentException's super type*
)
fun handleSagaCommitEvent(event: SagaCommitEvent): Mono<String> { // In Webflux framework, publisher must be returned.
throw IllegalArgumentException("Ignore this exception")
// ...
}
@SagaRollbackListener(Foo::class)
fun handleSagaRollbackEvent(event: SagaRollbackEvent) { // In Mvc framework, publisher must not returned.
val undo: Foo = event.decodeUndo(Foo::class) // Get event field to Foo.class
}
}
// Sync
fun pay(param: Any): Any {
val sagaId = sagaManager.syncStart(Pay(id = 1L, paid = 1000L)) // start saga
runCatching {
// Do your bussiness logic
}.fold(
onSuccess = { sagaManager.syncCommit(sagaId) }, // commit saga
onFailure = {
sagaManager.syncRollback(
sagaId,
it.message
)
} // rollback saga
)
}
// Async
fun pay(param: Any): Mono<Any> {
return sagaManager.start(
Pay(
id = 1L,
paid = 1000L
)
) // Start distributed saga and publish saga start event
.flatMap { sagaId ->
service.pay(param)
.doOnError { throwable ->
sagaManager.rollback(
sagaId,
throwable.message
) // Publish rollback event to all saga joined node
}
}.doOnSuccess { sagaId ->
sagaManager.commit(sagaId) // Publish commit event to all saga joined node
}
}
//Sync
fun order(param: Any): Any {
val sagaId = sagaManager.syncJoin(
param.saganId,
Order(id = 1L, state = PENDING)
) // join saga
runCatching { // This is kotlin try catch, not netx library spec
// Do your bussiness logic
}.fold(
onSuccess = { sagaManager.syncCommit(sagaId) }, // commit saga
onFailure = {
sagaManager.syncRollback(
sagaId,
it.message
)
} // rollback saga
)
}
// Async
fun order(param: Any): Mono<Any> {
return sagaManager.join(
param.sagaId,
Order(id = 1L, state = PENDING)
) // join exists distributed saga and publish saga join event
.flatMap { sagaId ->
service.order(param)
.doOnError { throwable ->
sagaManager.rollback(sagaId, throwable.message)
}
}.doOnSuccess { sagaId ->
sagaManager.commit(sagaId)
}
}
// Sync
fun exists(param: Any): Any {
return sagaManager.syncExists(param.sagaId)
}
// Async
fun exists(param: Any): Mono<Any> {
return sagaManager.exists(param.sagaId) // Find any saga has ever been started
}
How to test?
For 333,333 tests, the sequence proceeds as follows: saga start -> saga join -> saga commit.
For 444,444 tests, the sequence proceeds as follows: saga start -> saga join -> saga commit -> saga rollback.
The combined test, consisting of both sequences, took a total of 2 minutes and 10 seconds.
How to test?
Pending order -> Pending payment -> Successful payment -> Successful order -> Inventory deduction failure -> Order failure -> Payment failure