Skip to content
Richard Hightower edited this page Jun 14, 2015 · 2 revisions

Tutorials on QBit Java Microservice lib

Tutorials are classified:

  • Rough Cut - little instruction assumes you know QBit basics are pick on the uptake
  • Quick Start - Code centric tutorial focusing more on code then explanation
  • Detailed Tutorial - A lot of theory, explanation and background with some hand holding.
  • Doc - A snippet of documentation with no full example per se

Rough cuts will one day be a Quick Start or even a Detailed Tutorial, but right now. It is documentation.

What is QBit again?

QBit is a queuing library for microservices. It is similar to many other projects like Akka, Spring Reactor, etc. QBit is just a library not a platform. QBit has libraries to put a service behind a queue. You can use QBit queues directly or you can create a service. QBit services can be exposed by WebSocket, HTTP, HTTP pipeline, and other types of remoting. A service in QBit is a Java class whose methods are executed behind service queues. QBit implements apartment model threading and is similar to the Actor model or a better description would be Active Objects. QBit does not use a disruptor. It uses regular Java Queues. QBit can do north of 100 million ping pong calls per second which is an amazing speed (seen as high as 200M). QBit also supports calling services via REST, and WebSocket. QBit is microservices in the pure Web sense: JSON, HTTP, WebSocket, etc.

QBit lingo

QBit is a Java microservice lib supporting REST, JSON and WebSocket. It is written in Java but I might one day write a version in Rust or Go or C# (but that would require a large payday).

Service POJO (plain old Java object) behind a queue that can receive method calls via proxy calls or events (May have one thread managing events, method calls, and responses or two one for method calls and events and the other for responses so response handlers do not block service. One is faster unless responses block). Services can use Spring MVC style REST annotations to expose themselves to the outside world via REST and WebSocket.

ServiceBundle Many POJOs behind one response queue and many receive queues. There may be one thread for all responses or not. They also can be one receive queue.

Queue A thread managing a queue. It supports batching. It has events for empty, reachedLimit, startedBatch, idle. You can listen to these events from services that sit behind a queue. You don't have to use Services. You can use Queue's direct.

ServiceServer ServiceBundle that is exposed to REST and WebSocket communication

EventBus EventBus is a way to send a lot of messages to services that may be loosely coupled

ClientProxy Way to invoke service through async interface, service can be inproc (same process) or remoted over WebSocket.

Non-blocking QBit is a non-blocking lib. You use CallBacks via Java 8 Lambdas. You can also send event messages and get replies. Messaging is built into the system so you can easily coordinate complex tasks.

Speed There is a lot of room for improvement with Speed. But already QBit is VERY fast. 200M+ TPS inproc ping pong, 10M-20M+ TPS event bus, 500K TPS RPC calls over WebSocket/JSON, etc. More work needs to be done to improve speed, but now it is fast enough where I am working more with usability.

Besides if I make it too fast, then no one will publish benchmarks, and then I can't publish follow up benchmarks that kick their ass. Where would be the fun? There is no story if their is no conflict.

##TODO web microservice server with QBit

-TodoItem.java Listing

package io.advantageous.qbit.examples;

import java.util.Date;


public class TodoItem {


    private final String description;
    private final String name;
    private final Date due;

    public TodoItem(final String description, final String name, final Date due) {
        this.description = description;
        this.name = name;
        this.due = due;
    }

    public String getDescription() {
        return description;
    }

    public String getName() {
        return name;
    }

    public Date getDue() {
        return due;
    }
}

-TodoService.java Listing

package io.advantageous.qbit.examples;


import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.RequestMethod;

import java.util.ArrayList;
import java.util.List;

@RequestMapping("/todo-service")
public class TodoService {

    List<TodoItem> items = new ArrayList<>();


    @RequestMapping("/todo/count")
    public int size() {

        return items.size();
    }

    @RequestMapping("/todo/")
    public List<TodoItem> list() {
        return items;
    }

    @RequestMapping(value = "/todo", method = RequestMethod.POST)
    public void add(TodoItem todoItem) {
        items.add(todoItem);
    }

}

-TodoMain.java Listing

package io.advantageous.qbit.examples;

import io.advantageous.qbit.server.ServiceServer;
import io.advantageous.qbit.server.ServiceServerBuilder;

public class TodoMain {

    public static void main(String... args) {
        ServiceServer server = new ServiceServerBuilder().setHost("localhost").setPort(8080).build();
        server.initServices(new TodoService());
        server.start();
    }

}

##TODO web microservice client with QBit

-TodoItem.java Listing

package io.advantageous.qbit.examples.client;

import java.util.Date;


public class TodoItem {


    private String description;
    private String name;
    private Date due;

    public TodoItem(final String description, final String name, final Date due) {
        this.description = description;
        this.name = name;
        this.due = due;
    }

    public TodoItem(String description, String name) {

        this.description = description;
        this.name = name;
        due = new Date();
    }

    public String getDescription() {
        return description;
    }

    public String getName() {
        return name;
    }

    public Date getDue() {
        return due;
    }


    public void setDescription(String description) {
        this.description = description;
    }

    public void setName(String name) {
        this.name = name;
    }

    public void setDue(Date due) {
        this.due = due;
    }
}

-TodoServiceClientInterface.java Listing

package io.advantageous.qbit.examples.client;

import io.advantageous.qbit.service.Callback;

import java.util.List;

public interface TodoServiceClientInterface {

    void list(Callback<List<TodoItem>> handler);

    void add(TodoItem todoItem);


}

-TodoClientMain.java Listing

package io.advantageous.qbit.examples.client;


import io.advantageous.qbit.client.Client;
import io.advantageous.qbit.client.ClientBuilder;
import org.boon.core.Sys;

/**
 * Created by fadi on 1/9/15.
 */
public class TodoClientMain {

    public static void main(String... args) {

        String host = "localhost";

        int port = 8080;


        Client client = new ClientBuilder().setPort(port).setHost(host).setPollTime(10)
                .setAutoFlush(true).setFlushInterval(50).setRequestBatchSize(50)
                .setProtocolBatchSize(50).build();

        TodoServiceClientInterface todoService =
                client.createProxy(TodoServiceClientInterface.class, "todoService");

        client.start();


        todoService.add(new TodoItem("Greeting", "Hello World"));
        todoService.add(new TodoItem("Tutorial","I hope you got this to work"));

        client.flush();


        todoService.list(todoItems -> { //LAMBDA EXPRESSION Java 8

            for (TodoItem item : todoItems) {
                System.out.println("TODO ITEM " + item.getDescription() + " " + item.getName() + " " + item.getDue());
            }
        });

        client.flush();

        Sys.sleep(1000);



    }
}

Simple Rest web service server with QBit

-SimpleRestServer.java Listing

package io.advantageous.qbit.examples;

import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.queue.QueueBuilder;
import io.advantageous.qbit.server.ServiceServer;
import io.advantageous.qbit.server.ServiceServerBuilder;
import org.boon.Boon;

import java.util.Collections;
import java.util.List;

/**
 *

 $ ./wrk -c 200 -d 10s http://localhost:6060/services/myservice/ping -H "X_USER_ID: RICK"  --timeout 100000s -t 8
 Running 10s test @ http://localhost:6060/services/myservice/ping
 8 threads and 200 connections
 Thread Stats   Avg      Stdev     Max   +/- Stdev
 Latency     2.63ms  360.02us   4.41ms   68.14%
 Req/Sec    10.17k     1.32k   12.89k    55.97%
 766450 requests in 10.00s, 76.75MB read
 Requests/sec:  76651.84
 Transfer/sec:      7.68MB

 */
public class SimpleRestServer {


    public static class MyService {

        /*
        curl http://localhost:6060/services/myservice/ping -H "X_USER_ID: RICK"
         */
        @RequestMapping
        public List ping() {
            return Collections.singletonList("Hello: This REST server is working!");
        }
    }

    public static void main(String... args) throws Exception {


        final ServiceServer serviceServer = new ServiceServerBuilder().setPort(6060).setQueueBuilder(
                new QueueBuilder().setLinkTransferQueue().setTryTransfer(true).setBatchSize(10).setPollWait(10)
        ).setNumberOfOutstandingRequests(1000000).setTimeoutSeconds(40)
                .build();

        serviceServer.initServices(new MyService());
        serviceServer.start();

        Boon.gets();
    }
}

Simple Hello World app using QBit

-helloWorld.html Listing

<!DOCTYPE html>
<html>
<body>

<p>click the button:</p>


<button onclick="myFunction()">Try it</button>

<p id="demo"></p>

<script>
    function httpGet(theUrl)
    {
        var xmlHttp = null;
        xmlHttp = new XMLHttpRequest();
        xmlHttp.open( "GET", theUrl, false );
        xmlHttp.send( null );
        return xmlHttp.responseText;
    }
    function myFunction() {
        var json = httpGet("/services/helloservice/hello");
        var helloObject = JSON.parse(json);
        document.getElementById("demo").innerHTML = helloObject.hello;
    }
</script>

</body>
</html>

-HelloObject.java Listing

package io.advantageous.qbit.example.hello;

/**
 * Created by rhightower on 2/10/15.
 */


public class HelloObject {
    private final String hello;
    private final long time = System.currentTimeMillis();

    public HelloObject(String hello) {
        this.hello = hello;
    }
}

-HelloService.java Listing

package io.advantageous.qbit.example.hello;

import io.advantageous.qbit.annotation.RequestMapping;

/**
 * Created by rhightower on 2/10/15.
 */

@RequestMapping("/helloservice")
public class HelloService {


    @RequestMapping("/hello")
    public HelloObject hello() {
        return new HelloObject("Hello World!");
    }

}

-HelloWorldRestServer.java Listing

package io.advantageous.qbit.example.hello;

import io.advantageous.qbit.http.HttpServer;
import io.advantageous.qbit.server.ServiceServer;
import io.advantageous.qbit.system.QBitSystemManager;

import static io.advantageous.qbit.http.HttpServerBuilder.httpServerBuilder;
import static io.advantageous.qbit.server.ServiceServerBuilder.serviceServerBuilder;
import static org.boon.Boon.resource;

/**
 * Created by rhightower on 2/9/15.
 */
public class HelloWorldRestServer {


    public static final String HTML_HELLO_PAGE = "/ui/helloWorld.html";


    public static void main(String... args) {

        /* Create the system manager to manage the shutdown. */
        QBitSystemManager systemManager = new QBitSystemManager();

        HttpServer httpServer = httpServerBuilder()
                .setPort(9999).build();

        /* Register the Predicate using a Java 8 lambda expression. */
        httpServer.setShouldContinueHttpRequest(httpRequest -> {
            /* If not the page uri we want to then just continue by returning true. */
            if (!httpRequest.getUri().equals(HTML_HELLO_PAGE)) {
                return true;
            }
            /* read the page from the file system or classpath. */
            final String helloWorldWebPage = resource(HTML_HELLO_PAGE);
            /* Send the HTML file out to the browser. */
            httpRequest.getResponse().response(200, "text/html", helloWorldWebPage);
            return false;
        });


        /* Start the service. */
        final ServiceServer serviceServer = serviceServerBuilder().setSystemManager(systemManager)
                .setHttpServer(httpServer).build().initServices(new HelloService()).startServer();

        /* Wait for the service to shutdown. */
        systemManager.waitForShutdown();

    }


}

Working with event bus for QBit the microservice engine (the employee example)

-The employee example, offers 4 services:

-EmployeeHiringService.java
-BenefitsService.java
-VolunteerService.java

######- PayrollService.java

package io.advantageous.qbit.example.events;

import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.Service;
import io.advantageous.qbit.service.ServiceProxyUtils;
import org.boon.Lists;
import org.boon.core.Sys;

import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * Created by rhightower on 2/4/15.
 */
public class EmployeeEventExample {

    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

    public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

    public class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }

    public class EmployeeHiringService {



        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee

            //Sends events
            final EventManager eventManager =
                              serviceContext().eventManager();

            eventManager.send(NEW_HIRE_CHANNEL, employee);

            eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL,
                                                employee, salary);


        }

    }



    public class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }



    public class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }

    public static void main(String... args) {


        EmployeeHiringService employeeHiring = new EmployeeHiringService();
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();


        ServiceQueue employeeHiringService = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();

        Sys.sleep(100);


        ServiceQueue payrollService = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build();

        Sys.sleep(100);

        ServiceQueue employeeBenefitsService = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build();


        ServiceQueue volunteeringService = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build();

        EmployeeHiringServiceClient employeeHiringServiceClientProxy =

                   employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

    }
}

-Hello World Example Listing

package io.advantageous.qbit.events.impl;

import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.Listen;
import io.advantageous.qbit.client.ClientProxy;
import io.advantageous.qbit.events.EventConsumer;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.events.EventSubscriber;
import io.advantageous.qbit.message.Event;
import io.advantageous.qbit.service.Service;
import io.advantageous.qbit.service.ServiceBuilder;
import io.advantageous.qbit.service.ServiceContext;
import io.advantageous.qbit.service.ServiceProxyUtils;
import org.boon.core.Sys;
import org.junit.Before;
import org.junit.Test;

import static io.advantageous.qbit.events.EventUtils.callbackEventListener;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static org.boon.Boon.puts;
import static org.boon.Exceptions.die;

public class BoonEventManagerTest {

    EventManager eventManager;

    ClientProxy clientProxy;

    volatile int subscribeMessageCount = 0;

    volatile int consumerMessageCount = 0;

    boolean ok;

    @Before
    public void setup() {
        eventManager = QBit.factory().systemEventManager();
        clientProxy = (ClientProxy) eventManager;
        subscribeMessageCount = 0;
        consumerMessageCount = 0;

    }


    @Test
    public void test() throws Exception {


        String rick = "rick";

        MyEventListener myEventListener = new MyEventListener();

        eventManager.listen(myEventListener);
        clientProxy.clientProxyFlush();

        eventManager.register(rick, new EventConsumer<Object>() {
            @Override
            public void listen(Event<Object> event) {
                //puts(event);
                consumerMessageCount++;
            }
        });


        eventManager.register(rick, new EventSubscriber<Object>() {
            @Override
            public void listen(Event<Object> event) {
                //puts(event);
                subscribeMessageCount++;
            }
        });

        eventManager.register(rick, callbackEventListener(event -> {
            if (subscribeMessageCount < 1000) puts(event);
            subscribeMessageCount++;
        }));


        final MyServiceConsumer myServiceConsumer = new MyServiceConsumer();

        final MyService myService = new MyService();

        ServiceQueue consumerService = serviceBuilder()
                .setServiceObject(myServiceConsumer)
                .setInvokeDynamic(false).build();

        clientProxy.clientProxyFlush();

        Sys.sleep(100);


        eventManager.send(rick, "Hello Rick");
        clientProxy.clientProxyFlush();

        Sys.sleep(100);

        ok = subscribeMessageCount == 2 || die(subscribeMessageCount);

        ok = consumerMessageCount == 1 || die();


        ok = myEventListener.callCount == 1 || die();

        ok = myServiceConsumer.callCount() == 1 || die();


        Sys.sleep(100);
        ServiceQueue senderService = serviceBuilder()
                .setServiceObject(myService)
                .setInvokeDynamic(false).build();

        final MyServiceClient clientProxy = senderService.createProxy(MyServiceClient.class);

        clientProxy.sendHi("Hello");
        ServiceProxyUtils.flushServiceProxy(clientProxy);

        Sys.sleep(100);

        ok = subscribeMessageCount == 4 || die(subscribeMessageCount);

        ok = consumerMessageCount == 2 || die();


        ok = myEventListener.callCount == 2 || die();

        ok = myServiceConsumer.callCount() == 2 || die();

    }


    //@Test This takes a long time to run. I only need it for perf tuning.
    public void testPerfMultiple() throws Exception {

        for (int index = 0; index < 5; index++) {
            testPerf();
            Sys.sleep(5_000);
        }
    }

    @Test
    public void testPerf() throws Exception {


        eventManager = QBit.factory().systemEventManager();
        consumerMessageCount = 0;
        Sys.sleep(100);
        subscribeMessageCount = 0;
        Sys.sleep(100);


        String rick = "rick";

        eventManager.register(rick, new EventConsumer<Object>() {
            @Override
            public void listen(Event<Object> event) {
                consumerMessageCount++;
            }
        });


        eventManager.register(rick, callbackEventListener(event -> {
            subscribeMessageCount++;
        }));

        clientProxy.clientProxyFlush();
        Sys.sleep(100);


        long start = System.currentTimeMillis();

        for (int index = 0; index < 1_000_000; index++) {
            eventManager.send(rick, "PERF");

        }


        clientProxy.clientProxyFlush();
        Sys.sleep(100);


        while (true) {

            Sys.sleep(10);


            if (consumerMessageCount >= 900_000) {
                break;
            }

            if (start - System.currentTimeMillis() > 3_000) {
                break;
            }
        }


        long stop = System.currentTimeMillis();
        Sys.sleep(100);


        long duration = (stop - start);

        if (duration > 10_000) {
            die("duration", duration);
        }


        if (consumerMessageCount < 900_000) {
            die("consumerMessageCount", consumerMessageCount);
        }

        puts("Duration to send messages", duration,
                "ms. \nconsume message count", consumerMessageCount,
                "\ntotal message count", consumerMessageCount + subscribeMessageCount);


    }


    public static class MyEventListener {

        volatile int callCount = 0;

        @Listen("rick")
        void listen(String message) {
            callCount++;
        }
    }


    public static interface MyServiceClient  {

        void sendHi(String hi);
    }

    public static class MyService {

        private void queueInit() {
            puts("QUEUE START MyService");
        }

        public void sendHi(String hi) {
            serviceContext().send("rick", "hello rick " + hi);
        }
    }

    public static class MyServiceConsumer {

        int callCount = 0;

        public MyServiceConsumer() {
            puts("MyService created");
        }



        @Listen("rick")
        private void listen(String message) {

            puts(message);
            callCount++;
        }


        private int callCount() {
            return callCount;
        }


    }


}

Using QBit microservice lib's REST support with URI Params

-Full Example Listing

-SimpleRestServerWithURIParamsMain.java
-AdderServiceClientInterface.java
/*
 * Copyright (c) 2015. Rick Hightower, Geoff Chandler
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *          http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web!
 */

package io.advantageous.qbit.example.servers;

import io.advantageous.qbit.annotation.PathVariable;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.client.Client;
import io.advantageous.qbit.http.client.HttpClient;
import io.advantageous.qbit.http.request.HttpResponse;
import io.advantageous.qbit.server.ServiceServer;
import io.advantageous.qbit.service.Callback;
import io.advantageous.qbit.system.QBitSystemManager;
import org.boon.core.Sys;

import static io.advantageous.qbit.client.ClientBuilder.clientBuilder;
import static io.advantageous.qbit.http.client.HttpClientBuilder.httpClientBuilder;
import static io.advantageous.qbit.server.ServiceServerBuilder.serviceServerBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * @author rhightower
 *         on 2/17/15.
 */
public class SimpleRestServerWithURIParamsMain {


    public static void main(String... args) throws Exception {

        QBitSystemManager systemManager = new QBitSystemManager();

       /* Start Service server. */
        final ServiceServer server = serviceServerBuilder()
                .setSystemManager(systemManager)
                .setPort(7000).build();

        server.initServices(new AdderService());
        server.start();

       /* Start QBit client for WebSocket calls. */
        final Client client = clientBuilder().setPort(7000).setRequestBatchSize(1).build();


       /* Create a proxy to the service. */
        final AdderServiceClientInterface adderService =
                client.createProxy(AdderServiceClientInterface.class, "adder-service");

        client.start();



       /* Call the service */
        adderService.add(System.out::println, 1, 2);



        HttpClient httpClient = httpClientBuilder()
                .setHost("localhost")
                .setPort(7000).build();

        httpClient.start();
        String results = httpClient
                .get("/services/adder-service/add/2/2")
                .body();
        System.out.println(results);


        Sys.sleep(100);

        client.stop();
        systemManager.shutDown();


    }

    interface AdderServiceClientInterface {

        void add(Callback<Integer> callback, int a, int b);
    }

    @RequestMapping("/adder-service")
    public static class AdderService {


        @RequestMapping("/add/{0}/{1}")
        public int add(@PathVariable int a, @PathVariable int b) {

            return a + b;
        }
    }


}

Working with private event bus for inproc microservices

-Full Example Listing

-EmployeeEventExampleUsingStandaloneEventBus.java
-EmployeeHiringServiceClient.java
-EventManagerClient.java
-EmployeeHiringService.java
-BenefitsService.java
-VolunteerService.java
-PayrollService.java
package io.advantageous.qbit.example.events;

import io.advantageous.qbit.QBit;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.Service;
import org.boon.core.Sys;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * Created by rhightower on 2/5/15.
 */
public class EmployeeEventExampleUsingStandaloneEventBus {

    @Target({ ElementType.METHOD, ElementType.FIELD })
    @Retention(RetentionPolicy.RUNTIME)
    public @interface OnEvent {


    /* The channel you want to listen to. */;
        String value() default "";

        /* The consume is the last object listening to this event.
           An event channel can have many subscribers but only one consume.
         */
        boolean consume() default false;


    }


    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

    public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

    public static class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }

    interface EventManagerClient {

        <T> void send(String channel, T event);

        <T> void sendArray(String channel, T... event);

        void clientProxyFlush();
    }


    public static class EmployeeHiringService {

        final EventManagerClient eventManager;

        public EmployeeHiringService (final EventManagerClient eventManager) {
            this.eventManager = eventManager;
        }

        void queueEmpty() {
            eventManager.clientProxyFlush();
        }

        void queueLimit() {
            eventManager.clientProxyFlush();
        }



        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee

            //Sends events
            eventManager.send(NEW_HIRE_CHANNEL, employee);
            eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL, employee, salary);


        }

    }



    public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }



    public static class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }

    public static void main(String... args) {


        /* Create you own private event bus. */
        EventManager privateEventBus =
                       EventManagerBuilder.eventManagerBuilder()
                       .setName("foo").build();

        /* Create a service queue for this event bus. */
        ServiceQueue privateEventBusService = serviceBuilder()
                .setServiceObject(privateEventBus)
                .setInvokeDynamic(false).build();

        /*
         Create a proxy client for the queued event bus service.
         */
        EventManagerClient eventBus = privateEventBusService.createProxy(EventManagerClient.class);


        /*
        Create your EmployeeHiringService but this time pass the private event bus.
        Note you could easily use Spring or Guice for this wiring.
         */
        EmployeeHiringService employeeHiring = new EmployeeHiringService(eventBus);



        /* Now create your other service POJOs which have no compile time dependencies on QBit. */
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();



        /** Employee hiring service. */
        ServiceQueue employeeHiringService = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();
        /** Payroll service */
        ServiceQueue payrollService = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build();
        /** Employee Benefits service. */
        ServiceQueue employeeBenefitsService = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build();
        /* Community outreach program. */
        ServiceQueue volunteeringService = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build();


        /* Now wire in the event bus so it can fire events into the service queues. */
        privateEventBus.joinService(payrollService);
        privateEventBus.joinService(employeeBenefitsService);
        privateEventBus.joinService(volunteeringService);

        /** Now create the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

    }
}

Working with strongly typed event bus proxies for QBit Java Microservice lib

-Full example Listing

-EmployeeEventExampleUsingEventProxyToSendEvents.java
-EmployeeHiringServiceClient.java
-EmployeeEventManager.java
-EmployeeHiringService.java
-BenefitsService.java
-VolunteerService.java
-PayrollService.java
package io.advantageous.qbit.example.events;

import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.EventChannel;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.events.EventBusProxyCreator;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.Service;
import io.advantageous.qbit.service.ServiceProxyUtils;
import org.boon.core.Sys;


import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;

/**
 * Created by rhightower on 2/11/15.
 */
public class EmployeeEventExampleUsingEventProxyToSendEvents {



    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

    public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

    public static class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }


    interface EmployeeEventManager {

        @EventChannel(NEW_HIRE_CHANNEL)
        void sendNewEmployee(Employee employee);


        @EventChannel(PAYROLL_ADJUSTMENT_CHANNEL)
        void sendSalaryChangeEvent(Employee employee, int newSalary);

    }

    public static class EmployeeHiringService {

        final EmployeeEventManager eventManager;

        public EmployeeHiringService (final EmployeeEventManager employeeEventManager) {
            this.eventManager = employeeEventManager;
        }



        @QueueCallback(QueueCallbackType.EMPTY)
        private void noMoreRequests() {
            flushServiceProxy(eventManager);
        }


        @QueueCallback(QueueCallbackType.LIMIT)
        private void hitLimitOfRequests() {
            flushServiceProxy(eventManager);
        }



        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee


            eventManager.sendNewEmployee( employee);
            eventManager.sendSalaryChangeEvent( employee, salary );


        }

    }



    public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }



    public static class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }

    public static void main(String... args) {


        /* Create you own private event bus. */
        EventManager privateEventBus =
         EventManagerBuilder.eventManagerBuilder()
          .setName("foo").build();();

        /* Create a service queue for this event bus. */
        ServiceQueue privateEventBusService = serviceBuilder()
                .setServiceObject(privateEventBus)
                .setInvokeDynamic(false).build();


        final EventBusProxyCreator eventBusProxyCreator =
                           QBit.factory().eventBusProxyCreator();

        final EmployeeEventManager employeeEventManager =
                       eventBusProxyCreator.createProxy(privateEventBus, EmployeeEventManager.class);

        /*
        Create your EmployeeHiringService but this time pass the private event bus.
        Note you could easily use Spring or Guice for this wiring.
         */
        EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManager);



        /* Now createWithWorkers your other service POJOs which have no compile time dependencies on QBit. */
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();



        /** Employee hiring service. */
        ServiceQueue employeeHiringService = serviceBuilder()
                .setServiceObject(employeeHiring)
                .setInvokeDynamic(false).build();
        /** Payroll service */
        ServiceQueue payrollService = serviceBuilder()
                .setServiceObject(payroll)
                .setInvokeDynamic(false).build();
        /** Employee Benefits service. */
        ServiceQueue employeeBenefitsService = serviceBuilder()
                .setServiceObject(benefits)
                .setInvokeDynamic(false).build();
        /* Community outreach program. */
        ServiceQueue volunteeringService = serviceBuilder()
                .setServiceObject(volunteering)
                .setInvokeDynamic(false).build();


        /* Now wire in the event bus so it can fire events into the service queues. */
        privateEventBus.joinService(payrollService);
        privateEventBus.joinService(employeeBenefitsService);
        privateEventBus.joinService(volunteeringService);


        privateEventBusService.start();
        employeeHiringService.start();
        volunteeringService.start();
        payrollService.start();
        employeeBenefitsService.start();


        /** Now createWithWorkers the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

    }
}

Working with System Manager for QBit Mircoservice lib

-Full Example Listing

-UsingShutDown.java
-EmployeeHiringServiceClient.java
-EmployeeEventManager.java
-EmployeeHiringService.java
-BenefitsService.java
-VolunteerService.java
-PayrollService.java
package io.advantageous.qbit.example.events;

import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.EventChannel;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.events.EventBusProxyCreator;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.Service;
import io.advantageous.qbit.service.ServiceBuilder;
import io.advantageous.qbit.system.QBitSystemManager;
import org.boon.core.Sys;

import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
import static org.boon.Boon.puts;

/**
 * Created by rhightower on 2/11/15.
 */
public class UsingShutDown {

    public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";

    public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";

    public static class Employee {
        final String firstName;
        final int employeeId;

        public Employee(String firstName, int employeeId) {
            this.firstName = firstName;
            this.employeeId = employeeId;
        }

        public String getFirstName() {
            return firstName;
        }

        public int getEmployeeId() {
            return employeeId;
        }

        @Override
        public String toString() {
            return "Employee{" +
                    "firstName='" + firstName + '\'' +
                    ", employeeId=" + employeeId +
                    '}';
        }
    }

    interface EmployeeHiringServiceClient {
        void hireEmployee(final Employee employee);

    }


    interface EmployeeEventManager {

        @EventChannel(NEW_HIRE_CHANNEL)
        void sendNewEmployee(Employee employee);


        @EventChannel(PAYROLL_ADJUSTMENT_CHANNEL)
        void sendSalaryChangeEvent(Employee employee, int newSalary);

    }

    public static class EmployeeHiringService {

        final EmployeeEventManager eventManager;

        public EmployeeHiringService (final EmployeeEventManager employeeEventManager) {
            this.eventManager = employeeEventManager;
        }



        @QueueCallback(QueueCallbackType.EMPTY)
        private void noMoreRequests() {
            flushServiceProxy(eventManager);
        }


        @QueueCallback(QueueCallbackType.LIMIT)
        private void hitLimitOfRequests() {
            flushServiceProxy(eventManager);
        }



        public void hireEmployee(final Employee employee) {

            int salary = 100;
            System.out.printf("Hired employee %s\n", employee);

            //Does stuff to hire employee


            eventManager.sendNewEmployee( employee);
            eventManager.sendSalaryChangeEvent( employee, salary );


        }

    }



    public static class BenefitsService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void enroll(final Employee employee) {

            System.out.printf("Employee enrolled into benefits system employee %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }

    public static class VolunteerService {

        @OnEvent(NEW_HIRE_CHANNEL)
        public void invite(final Employee employee) {

            System.out.printf("Employee will be invited to the community outreach program %s %d\n",
                    employee.getFirstName(), employee.getEmployeeId());

        }

    }



    public static class PayrollService {

        @OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
        public void addEmployeeToPayroll(final Employee employee, int salary) {

            System.out.printf("Employee added to payroll  %s %d %d\n",
                    employee.getFirstName(), employee.getEmployeeId(), salary);

        }

    }

    public static void main(String... args) {


        /* Create you own private event bus. */
        EventManager privateEventBus =
                      EventManagerBuilder
                          .eventManagerBuilder()
                          .setName("foo")
                          .build();


        final EventBusProxyCreator eventBusProxyCreator =
                QBit.factory().eventBusProxyCreator();

        final EmployeeEventManager employeeEventManager =
                eventBusProxyCreator.createProxy(privateEventBus, EmployeeEventManager.class);

        /*
        Create your EmployeeHiringService but this time pass the private event bus.
        Note you could easily use Spring or Guice for this wiring.
         */
        EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManager);



        /* Now create your other service POJOs which have no compile time dependencies on QBit. */
        PayrollService payroll = new PayrollService();
        BenefitsService benefits = new BenefitsService();
        VolunteerService volunteering = new VolunteerService();

        final QBitSystemManager systemManager = new QBitSystemManager();
        ServiceBuilder serviceBuilder = serviceBuilder()
                .setSystemManager(systemManager)
                .setInvokeDynamic(false);



        /* Create a service queue for this event bus. */
        ServiceQueue privateEventBusService = serviceBuilder.build(privateEventBus);

        /** Employee hiring service. */
        ServiceQueue employeeHiringService = serviceBuilder.build(employeeHiring);

        /** Payroll service */
        ServiceQueue payrollService = serviceBuilder.build(payroll);

        /** Employee Benefits service. */
        ServiceQueue employeeBenefitsService = serviceBuilder.build(benefits);

        /* Community outreach program. */
        ServiceQueue volunteeringService = serviceBuilder.build(volunteering);


        /* Now wire in the event bus so it can fire events into the service queues. */
        privateEventBus.joinServices(
                payrollService,
                employeeBenefitsService,
                volunteeringService);

        systemManager.startAll();



        /** Now createWithWorkers the service proxy like before. */
        EmployeeHiringServiceClient employeeHiringServiceClientProxy =
                employeeHiringService.createProxy(EmployeeHiringServiceClient.class);

        /** Call the hireEmployee method which triggers the other events. */
        employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));

        flushServiceProxy(employeeHiringServiceClientProxy);

        Sys.sleep(5_000);

        Thread thread = new Thread(systemManager::waitForShutdown);
        thread.start();

        Sys.sleep(1_000);

        systemManager.shutDown();

        puts("Shutdown complete from my sample");







    }
}

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally