Skip to content

[Z Dev Notes] Reactive programming for QBit Java microservices library

Richard Hightower edited this page Mar 23, 2015 · 5 revisions

I get a lot of questions about RxJava and QBit. The first thing that I always say is that you can use RxJava and QBit.

I feel that RxJava is good at handling many calls to services, and coordinating the results.

The best description I heard about RxJava was what if you wanted to call ServiceA, ServiceB and you wanted to take what ServiceB returns and call ServiceC, but you need to return the results from ServiceA and ServiceC to a client. This is a great example of the power of RxJava.

QBit is async from the ground up. This is there from the get go. QBit does not have a lot of utilities to coordinate complex calls to/from local and out of process services. QBit has a callback. The rest is up to you, rolling your own stuff or using a lib like RxJava. Until now....

Before we begin: QBit is a microservices Java lib. It has a fast queue (100M to 200M messages a second), and it has a fast event bus that can be replicated to other nodes and can integrate with Consul to be clustered. It also has a fast JSON parser and support for HTTP calls over JSON (REST) using Spring MVC style annotations and WebSockets.

I have been on projects where I had to coordinate calls to more than one service. I wanted a good way to do this in QBit. I have always rolled one-off solutions. RxJava is a great library, but I also wanted a QBit way to do things.

Since QBit relies on Java 8, I can use what comes with Java 8 and backwards compatibility to Java 6 and Java 7 is not an issue.

Let's show an example, please note that this is from working code, but it is still in early phases.

It will become part of QBit in short order. You can roll your own like I have been doing with Runnables and such and you can see examples of this in the wiki. Or you can use RxJava. Or you can use this new feature (once I finish adding it to QBit, this is a preview).

My pretend service

    public static class PretendService {


        private final String name;

        PretendService(final String name) {

            this.name = name;
        }

        public void serviceCall(final Callback<String> callback, final int seconds, String message) {

            Thread thread = new Thread(() -> {
                Sys.sleep(seconds * 1000);
                callback.accept(name + "::" + message);
            });
            thread.start();
        }
    }

The pretend service is nice in that it just waits as long as I tell it, and then returns its name as a return value. It is a great service for demonstrating the concept of Callback coordination.

Callback coordinator for a Java 8 Lambda reactor system

        final PretendService serviceA = new PretendService("SERVICE A");
        final PretendService serviceB = new PretendService("SERVICE B");
        final PretendService serviceC = new PretendService("SERVICE C");


        //Inside Z Service method
        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();


        /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final AsyncFutureCallback<String> serviceACallback =
                reactor.callback(String.class, serviceAReturn::set);




        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinateWithTimeout(() -> {

            /* If service A and service C are done, then we are done.
            * Let the client know.
            */
            if (serviceACallback.isDone() && serviceCReturn.get() != null) {
                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                return true;  //true means we are done
            }

            return false;

        }, Timer.timer().now(), 5, TimeUnit.SECONDS, RunnableCallbackTest::sendTimeoutBackToClient);




        /* Call service A using the A callbackWithTimeout. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callbackWithTimeout. */
        final AsyncFutureCallback<String> serviceCCallback = reactor.callback(String.class,
                returnValueFromC -> {
                    serviceCReturn.set(returnValueFromC);
                    handleReturnFromC(serviceAReturn, coordinator);
                }
        );


        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callback(returnValue ->
                        serviceC.serviceCall(serviceCCallback, 1, " from " + returnValue)
                ), 1, " from main");



...
...


    private void sendTimeoutBackToClient() {

        System.out.println("You have timed out");
    }

    private void sendResponseBackToClient(String a, String ab) {

        System.out.println(a + "::" + ab);

    }


    public void handleReturnFromC(AtomicReference<String> serviceAReturn,
                                         CallbackCoordinator coordinator) {

        if (serviceAReturn.get()!=null) {
            coordinator.finished();
        }

    }

By letting the coordinator decide how to handle things and making the state of the coordination be local variables that are captured by the lambda, we can handle the coordination in the context of one method call. This simplifies the code as we use regular imperative Java to do the coordination instead of tons of small callbacks.

One more version.... This one has an async latch. This seems to be the cleanest yet. That last countdown triggers the runnable, which completes the job.

        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();





        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinateWithTimeout(() -> {

            /* If service A and service C are done, then we are done.
            * Let the client know.
            */
            if (serviceAReturn.get()!=null && serviceCReturn.get() != null) {
                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                return true;  //true means we are done
            }

            return false;

        }, Timer.timer().now(), 5, TimeUnit.SECONDS, RunnableCallbackTest::sendTimeoutBackToClient);




        final CountDownAsyncLatch latch = countDownLatch(2,
                () -> {

                    System.out.println("From Latch");
                    coordinator.finished();

                }

        );


          /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final AsyncFutureCallback<String> serviceACallback =
                reactor.callback(String.class, (t) -> {
                    serviceAReturn.set(t);
                    latch.countDown();
                });


        /* Call service A using the A callbackWithTimeout. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callbackWithTimeout. */
        final AsyncFutureCallback<String> serviceCCallback = reactor.callback(String.class,
                returnValueFromC -> {
                    serviceCReturn.set(returnValueFromC);
                    latch.countDown();
                }
        );


        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callback(returnValue ->
                        serviceC.serviceCall(serviceCCallback, 1,
                                " from " + returnValue)
                ),
                1, " from main");

Tried a few more things. Added builders so it was a little more clear what we were working with.

        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();


        final CallbackBuilder callbackBuilder = reactor.callbackBuilder();

        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinatorBuilder()
                .setCoordinator(
                        () -> {

                                /* If service A and service C are done, then we are done.
                                * Let the client know.
                                */
                            if (serviceAReturn.get() != null && serviceCReturn.get() != null) {
                                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                                return true;  //true means we are done
                            }

                            return false;
                        })
                .setTimeoutDuration(5)
                .setTimeoutTimeUnit(TimeUnit.SECONDS)
                .setTimeOutHandler(() -> {
                    System.out.println("Coordinator timed out");
                    sendTimeoutBackToClient();
                })
                .build();

        final CountDownAsyncLatch latch = countDownLatch(2,
                () -> {
                    System.out.println("From Latch");
                    coordinator.finished();
                }
        );


          /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final Callback<String> serviceACallback =
                callbackBuilder
                        .setCallback(String.class, returnValueFromA -> {

                            serviceAReturn.set(returnValueFromA);
                            latch.countDown();
                        })
                        .setOnTimeout(() -> {
                                System.out.println("Service A timed out");
                                sendTimeoutBackToClient();
                                coordinator.cancel();
                            })
                        .setTimeoutDuration(4).setTimeoutTimeUnit(TimeUnit.SECONDS)
                        .build();

        /* Call service A using the serviceACallback. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callback. */
        final Callback<String> serviceCCallback =
                callbackBuilder
                    .setCallback(String.class, returnValueFromC -> {

                        serviceCReturn.set(returnValueFromC);
                        latch.countDown();
                    })
                    .setOnTimeout(() -> {
                        System.out.println("Service C timed out");
                        sendTimeoutBackToClient();
                        coordinator.cancel();
                    })
                     .setTimeoutDuration(4).setTimeoutTimeUnit(TimeUnit.SECONDS)
                     .build();



        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callback(returnValue ->
                                serviceC.serviceCall(serviceCCallback, 1,
                                        " from " + returnValue)
                ),
                1, " from main");

The builder has reasonable defaults.

        final AtomicReference<String> serviceAReturn = new AtomicReference<>();
        final AtomicReference<String> serviceCReturn = new AtomicReference<>();


        final CallbackBuilder callbackBuilder = reactor.callbackBuilder();

        /* Register a coordinator that checks for return values from service A and C */
        final CallbackCoordinator coordinator = reactor.coordinatorBuilder()
                .setCoordinator(
                        () -> {

                                /* If service A and service C are done, then we are done.
                                * Let the client know.
                                */
                            if (serviceAReturn.get() != null && serviceCReturn.get() != null) {
                                sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
                                return true;  //true means we are done
                            }

                            return false;
                        })
                .setTimeOutHandler(() -> {
                    System.out.println("Coordinator timed out");
                    sendTimeoutBackToClient();
                })
                .build();

        final CountDownAsyncLatch latch = countDownLatch(2,
                () -> {
                    System.out.println("From Latch");
                    coordinator.finished();
                }
        );


          /* Create a callbackWithTimeout for service A to demonstrate
            a callbackWithTimeout to show that it can be cancelled. */
        final Callback<String> serviceACallback =
                callbackBuilder
                        .setCallback(String.class, returnValueFromA -> {

                            serviceAReturn.set(returnValueFromA);
                            latch.countDown();
                        })
                        .setOnTimeout(() -> {
                                System.out.println("Service A timed out");
                                sendTimeoutBackToClient();
                                coordinator.cancel();
                            })
                        .setTimeoutDuration(4)
                        .build();

        /* Call service A using the serviceACallback. */
        serviceA.serviceCall(serviceACallback, 1, " from main");

        /* Create service C callback. */
        final Callback<String> serviceCCallback =
                callbackBuilder
                    .setCallback(String.class, returnValueFromC -> {

                        serviceCReturn.set(returnValueFromC);
                        latch.countDown();
                    })
                    .setOnTimeout(() -> {
                        System.out.println("Service C timed out");
                        sendTimeoutBackToClient();
                        coordinator.cancel();
                    })
                   .setTimeoutDuration(4)
                   .build();



        /* Call Service B, register a callback
             which call service C on service b completion. */
        serviceB.serviceCall(
                reactor.callbackBuilder()
                        .setCallback(
                            returnValue ->
                                serviceC.serviceCall(serviceCCallback, 1,
                                        " from " + returnValue)
                        ).build(),
                1, " from main");

More will be done. The early work seems to work really well.

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally