Skip to content

QBit Boon New Wave of JSON HTTP and Websocket

Richard Hightower edited this page Feb 21, 2015 · 3 revisions

Service handling with QBit.

Services in QBit are available via JSON, and a binary encoding format called BNSF (Binary Notation for Streaming and Framing). All services are available via REST, and Websocket. BNSF is still under development.

QBit Boon also allows you to stream calls from front end servers (currently over websocket) handling many thousands and thousands (or even millions) of TPS to back-end servers handling very few connections but very large batches of requests so you can keep CPU intensive, in-memory services doing millions of TPS and avoid the normal expense of Cloud scale out. In essence, cost effective Cloud scale out. On a recent project we were able to achieve 2M TPS for a content rules engine with ten servers and one of the clients competitors uses 150 servers for a similar project and only gets around 200K TPS. It works.

Currently QBit uses plain JSON and normal REST parameters to expose services to the web. Front end QBit services can communicate with the back end. QBit can batch requests and forward them to a back end server allowing you to put more objects in memory on fewer in-memory services on the back end. The full vision of QBit is beyond the scope of this document. But suffice it is to say that principles behind QBit have been deployed on high traffic websites to great effect. (It went really well).

You write QBit servers much like you would a Spring MVC REST services or just a plan Java class.

    import com.example;

    public class EmployeeService {
        Map<Integer, Employee> map = new ConcurrentHashMap<>();

        public boolean addEmployee(Employee employee) {
            map.put(employee.id, employee);
            return true;
        }


        public boolean promoteEmployee(Employee employee, int level) {

            employee.level = level;

            final Employee employee1 = map.get(employee.id);

            employee1.level = level;


            map.put(employee.id, employee1);
            return true;
        }

        public Employee readEmployee(int id) {
            return map.get(id);
        }


        public boolean removeEmployee(int id) {
            map.remove(id);
            return true;
        }
    }

The above would be a QBit service. Notice that is has nothing special about it at all. QBit uses apartment model threading and is similar in concept to the Actor model. QBit is like the strongly typed Actors in Akka.

The above service would be available instantly to WebSocket and HTTP POST/GET traffic. No more code would be needed at all.

If you desire to create a truly REST app and not just a RESTful app, then you have the following constructs:

    import com.example;

    @RequestMapping("/employeeRest/")
    public static class EmployeeService {
        Map<Integer, Employee> map = new ConcurrentHashMap<>();


        @RequestMapping("/employee/add")
        public boolean addEmployee(Employee employee) {
            map.put(employee.id, employee);
            return true;
        }


        @RequestMapping("/employee/search/")
        public Employee findEmployee(Employee employee) {
            return employee;
        }


        @RequestMapping("/employee/promote/{1}/{0}")
        public boolean promoteEmployee(int id, int level) {

            final Employee employee = map.get(id);

            employee.level = level;


            map.put(employee.id, employee);
            return true;
        }


        @RequestMapping("/employee/{0}")
        public Employee readEmployee(int id) {
            return map.get(id);
        }


        @RequestMapping("/employeeRead")
        public Employee readEmployeeWithParamBindings(
                @RequestParam(value="idOfEmployee") int id) {
            return map.get(id);
        }


        @RequestMapping("/addEmployeeWithParams")
        public boolean addEmployeeWithParams(
                @RequestParam(required =true, value="idOfEmployee") int id, Employee employee) {

            puts("addEmployeeWithParams CALLED", id, employee);
            map.put(id, employee);
            return true;

        }

        @RequestMapping("/employee/remove/")
        public boolean removeEmployee(int id) {
            map.remove(id);
            return true;
        }


        @RequestMapping("/async/")
        public void async(Handler<String> handler) {
            handler.handle("hi mom");
        }

        @RequestMapping("/asyncHelloWorld/")
        public void asyncHelloWorld(Handler<String> handler, String arg) {
            handler.handle("Hello " + arg);
        }

    }

QBit Boon supports

@Name 
@PathVariable
@RequestMapping
@RequestParam
@Service
@ServiceMethod

These annotations working in a very similar way to their Spring MVC counterparts. QBit Boon runs inside of a reactive dev environment like Vertx. All services of QBit Boon are more like Akka Actors than typical Spring Services.

To make an async call, you can do use Boon's Handler<?> as follows:

        @RequestMapping("/expensiveOperation/")
        public void expensiveOperation(final Handler<SomeObject> handler, final String param, final int i, int x) {

            //Do some stuff
            callOtherService(new Handler<SomeObjectOther>() {
                    public void handle(SomeOtherObject soo) {
                             ...
                             handler.handle(new SomeObject(...));
                    }
            });
        }

If you have to call a slower system, you typically do so with handlers so you do not block.

Some key concepts in QBit

Services are queues:

/**
 * Created by Richard on 7/21/14.
 */
public interface ServiceQueue {

    SendQueue<MethodCall<Object>> requests();
    ReceiveQueue<Response<Object>> responses();

    ReceiveQueue<Event> events();

    String name();


    void stop();

    Collection<String> addresses(String address);
}

Everything inside QBit uses very fast Queues (like 50 M to 100 M TPS ping / pong queues).

A Queue in QBit looks like this:

public interface Queue <T> {
    ReceiveQueue<T> receiveQueue();
    SendQueue<T> sendQueue();

    void startListener(ReceiveQueueListener<T> listener);

    void stop();
}

Everything uses Queues.

The Receive side of the queue is separated from the send side of the queue so each direction can deal with batching.

public interface ReceiveQueue<T> extends Input {

    /** Gets the next item. If the item is null,
     * means the timeout has been reached. */
    T pollWait();


    /** Gets the next item. If the item is null the queue currently has no items. */
    T poll();

    /** Wait for the next item. */
    T take();

    /** Read in a batch of items. */
    Iterable<T> readBatch(int max);


    /** Read in a batch of items. */
    Iterable<T> readBatch();
}
public interface SendQueue<T> extends Output {
    void send(T item);
    void sendAndFlush(T item);

    void sendMany(T... item);
    void sendBatch(Collection<T> item);
    void sendBatch(Iterable<T> item);
    boolean shouldBatch();
    void flushSends();


 }

You can write queue handlers which will listen to queue events and act accordingly.

/**
 * Created by Richard on 7/18/14.
 * Simplifies queue handler loop code by abstracting queue operations.
 * @author Richard Hightower
 */
public interface ReceiveQueueListener<T> extends Input {

    /** Notifies a queue listener that an item has been recieved */
    void receive(T item);

    /** Notifies the queue listener that currently the queue is empty.
     * This is good for batch operations. This could mean the queue is empty or we reached our max batch size limit.
     *
     * */
    void empty();


    /** Notifies the queue listener that we processed up to batch size.
     * This is good for batch operations. This could mean the queue is empty or we reached our max batch size limit.
     *
     * */
    void limit();

    /** Notifies the queue listener that currently the queue is closed for business. */
    void shutdown();

    /** This means we did not find an item. We waited for an item as well and there was still not an item in the queue
     * This would be a good time to do some clean up.
     */
    void idle();
}

Queue are easy to work with:

       BasicQueue<String> queue = new BasicQueue<>("test", 1000, TimeUnit.MILLISECONDS, 10);

       final int []counter = new int[1];

       queue.startListener(new ReceiveQueueListener<String>() {
           @Override
           public void receive(String item) {
               puts (item);
               synchronized (counter) {
                   counter[0]++;
               }
           }

           @Override
           public void empty() {
                puts ("Queue is empty");

           }

           @Override
           public void limit() {

               puts ("Batch size limit is reached");
           }

           @Override
           public void shutdown() {

               puts("Queue is shut down");
           }

           @Override
           public void idle() {

               puts("Queue is idle");

           }
       });

        final SendQueue<String> sendQueue = queue.sendQueue();
        for (int index = 0; index < 10; index++) {
            sendQueue.send("item" + index);
       }


        sendQueue.flushSends();

        sleep(100);
        synchronized (counter) {
            puts("1", counter[0]);
        }


       for (int index = 0; index < 100; index++) {
            sendQueue.send("item2nd" + index);
       }

        sendQueue.flushSends();


        sleep(100);
        synchronized (counter) {
            puts("2", counter[0]);
        }

        for (int index = 0; index < 5; index++) {
            sleep(100);
            sendQueue.send("item3rd" + index);
       }
        sendQueue.flushSends();

        sleep(100);
        synchronized (counter) {
            puts("3", counter[0]);
        }


       sendQueue.sendMany("hello", "how", "are", "you");


        sleep(100);
        synchronized (counter) {
            puts("4", counter[0]);
        }

       List<String> list = Lists.linkedList("Good", "Thanks");

       sendQueue.sendBatch(list);


        sleep(100);
        synchronized (counter) {
            puts("1", counter[0]);
        }



       sleep(100);
       synchronized (counter) {
           ok = counter[0] == 121 || die("Crap not 121", counter[0]);
       }


      queue.stop();

One very special queue is a service queue. You can programmatically create service queue as use them as you see fit:

Our advanced service

    public static class Adder {
        int all;
        int add(int a, int b) {
            int total;

            total = a + b;
            all += total;
            return total;
        }
    }

Expose Adder as a service

        Adder adder = new Adder();
        ServiceQueue service = Services.jsonService("test", adder, 1000, TimeUnit.MILLISECONDS, 100);

        ReceiveQueue<Response<Object>> responses = service.responses();
        SendQueue<MethodCall<Object>> requests = service.requests();

Send methods to the service:

        requests.send(MethodCallImpl.method("add", "[1,2]"));

        requests.send(MethodCallImpl.method("add", "[4,5]"));
        requests.flushSends();
        Response<Object> response = responses.take();


        Object o = fromJson(response.body().toString());

        ok = o.equals(Integer.valueOf(3)) || die(response);

Easy right?

You can also send many requests at a time:

        requests.sendMany(
                MethodCallImpl.method("add", "[1,2]"),
                MethodCallImpl.method("add", "[4,5]"));



        Response<Object> response = responses.take();

        Object o = fromJson(response.body().toString());

        ok = o.equals(Integer.valueOf(3)) || die(response);

Here is another example of sending a batch of methods

        Adder adder = new Adder();

        ServiceQueue service = Services.jsonService("test", adder, 1000, TimeUnit.MILLISECONDS, 100);

        ReceiveQueue<Response<Object>> responses = service.responses();
        SendQueue<MethodCall<Object>> requests = service.requests();

        List<MethodCall<Object>> methods = Lists.list(
                MethodCallImpl.method("add", "[1,2]"),
                MethodCallImpl.method("add", "[4,5]"));

        requests.sendBatch(methods);

Now we get there responses:

        Response<Object> response = responses.take();

        ok = response != null || die(response);


        Object o = fromJson(response.body().toString());

        ok = o.equals(Integer.valueOf(3)) || die(response);

        response = responses.take();

        ok = response != null || die(response);

        o = fromJson(response.body().toString());

        ok = o.equals(Integer.valueOf(9)) || die(response);

You don't have to use JSON at all. Here is an example that uses just Java and lists to send method calls.

package org.qbit.service;

import org.boon.Lists;
import org.boon.core.Sys;
import org.junit.Test;
import org.qbit.message.MethodCall;
import org.qbit.queue.ReceiveQueue;
import org.qbit.queue.SendQueue;
import org.qbit.service.method.impl.MethodCallImpl;
import org.qbit.message.Response;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.boon.Boon.puts;
import static org.boon.Exceptions.die;

/**
 * Created by Richard on 8/26/14.
 */
public class RegularCalls {

    public static class Adder {
        int all;
        int add(int a, int b) {
            int total;

            total = a + b;
            all += total;
            return total;
        }

        void queueIdle() {
            puts("Queue Idle");
        }


        void queueEmpty() {
            puts("Queue Empty");
        }


        void queueShutdown() {
            puts("Queue Shutdown");
        }


        void queueLimit() {
            puts("Queue Limit");
        }
    }

    boolean ok;

    @Test
    public void test() {

        Adder adder = new Adder();
        ServiceQueue service = Services.regularService("test", adder, 1000, TimeUnit.MILLISECONDS, 10);
        SendQueue<MethodCall<Object>> requests = service.requests();
        ReceiveQueue<Response<Object>> responses = service.responses();

        requests.send(MethodCallImpl.method("add", Lists.list(1, 2)));

        requests.sendAndFlush(MethodCallImpl.methodWithArgs("add", 4, 5));


        Response<Object> response = responses.take();


        Object o = response.body();

        ok = o.equals(Integer.valueOf(3)) || die(response);

        response = responses.take();

        ok = response != null || die(response);

        o = response.body();

        ok = o.equals(Integer.valueOf(9)) || die(response);





        synchronized (adder) {
            ok = adder.all == 12 || die(adder.all);
        }


        List<MethodCall<Object>> methods = new ArrayList<>();

        for (int index = 0; index < 1000; index++) {
            methods.add(MethodCallImpl.method("add", Lists.list(1, 2)));
        }

        requests.sendBatch(methods);

        Sys.sleep(3000);

        synchronized (adder) {
            ok = adder.all == 3012 || die(adder.all);
        }

        service.stop();

        Sys.sleep(100);

    }


    @Test
    public void testMany() {

        Adder adder = new Adder();


        ServiceQueue service = Services.regularService("test", adder, 1000, TimeUnit.MILLISECONDS, 10);
        SendQueue<MethodCall<Object>> requests = service.requests();
        ReceiveQueue<Response<Object>> responses = service.responses();

        requests.sendMany(MethodCallImpl.method("add",
                            Lists.list(1, 2)),
                        MethodCallImpl.method("add",
                                Lists.list(4, 5)));



        Response<Object> response = responses.take();


        Object o = response.body();

        ok = o.equals(Integer.valueOf(3)) || die(response);

        response = responses.take();

        ok = response != null || die(response);

        o = response.body();

        ok = o.equals(Integer.valueOf(9)) || die(response);




        synchronized (adder) {
            ok = adder.all == 12 || die(adder.all);
        }
    }



    @Test
    public void testBatch() {

        Adder adder = new Adder();
        ServiceQueue service = Services.regularService("test", adder, 1000, TimeUnit.MILLISECONDS, 10);
        SendQueue<MethodCall<Object>> requests = service.requests();
        ReceiveQueue<Response<Object>> responses = service.responses();

        List<MethodCall<Object>> methods = Lists.list(
                MethodCallImpl.method("add", Lists.list(1, 2)),
                MethodCallImpl.method("add", Lists.list(4, 5)));


        requests.sendBatch(methods);




        Response<Object> response = responses.take();


        Object o = response.body();

        ok = o.equals(Integer.valueOf(3)) || die(response);

        response = responses.take();

        ok = response != null || die(response);

        o = response.body();

        ok = o.equals(Integer.valueOf(9)) || die(response);





        synchronized (adder) {
            ok = adder.all == 12 || die(adder.all);
        }
    }

}

You can also programmatically call REST style services which is convenient for testing and prototyping.

    public class Employee {
        String firstName;
        String lastName;
        BigDecimal salary;
        boolean active;
        int id;
        int level;
     ...
    }

Now the Employee service yet again.

    @RequestMapping("/employeeRest/")
    public static class EmployeeService {
        Map<Integer, Employee> map = new ConcurrentHashMap<>();


        @RequestMapping("/employee/add")
        public boolean addEmployee(Employee employee) {
            map.put(employee.id, employee);
            return true;
        }


        @RequestMapping("/employee/search/")
        public Employee findEmployee(Employee employee) {
            return employee;
        }


        @RequestMapping("/employee/promote/{1}/{0}")
        public boolean promoteEmployee(int id, int level) {

            final Employee employee = map.get(id);

            employee.level = level;


            map.put(employee.id, employee);
            return true;
        }


        @RequestMapping("/employee/{0}")
        public Employee readEmployee(int id) {
            return map.get(id);
        }


        @RequestMapping("/employeeRead")
        public Employee readEmployeeWithParamBindings(
                @RequestParam(value="idOfEmployee") int id) {
            return map.get(id);
        }


        @RequestMapping("/addEmployeeWithParams")
        public boolean addEmployeeWithParams(
                @RequestParam(required =true, value="idOfEmployee") int id, Employee employee) {

            puts("addEmployeeWithParams CALLED", id, employee);
            map.put(id, employee);
            return true;

        }

        @RequestMapping("/employee/remove/")
        public boolean removeEmployee(int id) {
            map.remove(id);
            return true;
        }


        @RequestMapping("/employee/error/")
        public boolean throwAnExceptionNoMatterWhat() {
            die("YOU ARE NOT THE BOSS OF ME JAVA!");
            return true;
        }

        @RequestMapping("/async/")
        public void async(Handler<String> handler) {
            handler.handle("hi mom");
        }

        @RequestMapping("/asyncHelloWorld/")
        public void asyncHelloWorld(Handler<String> handler, String arg) {
            handler.handle("Hello " + arg);
        }

    }

Now lets create a new Employee using our REST like interface. First we create a service bundle

        final ServiceBundle bundle = QBit.factory().createBundle("/root");
       
        responseReceiveQueue = bundle.responses(); //get your responses here

A service bundle is just a collection of services. You add services to the bundle as follows:

        employeeService = new EmployeeService(); //our employee service

        /* Create employee service */
        serviceBundle.addService(employeeService);

Yep. That is it. :)

Now we can call!

        String addressToMethodCall = "/root/employeeRest/employee/add";

        call = factory.createMethodCallByAddress(addressToMethodCall,
                returnAddress, rick, params );

        serviceBundle.call(call);
        serviceBundle.flushSends();

You can make many calls the serviceBundle at once, and then flush all of the sends in one go. There is also a callAndFlush() method, when to batch and when to send is beyond the scope of this first document, but basically, if you can batch a few items at a time, you can save some expense in thread hand off and IO overhead.

serviceBundle and service for that matter will flush once you get beyond a certain limit. If you have a service that gets hit all of the time, you may never need to call flushSends. :)

QBit has an encoder for turning groups of Messages, Requests, MethodCalls, etc. into something that is very efficient to stream over websocket.

public interface ProtocolEncoder {



    String encodeAsString(Response<Object> response);

    String encodeAsString(MethodCall<Object> methodCall);


    String encodeAsString(List<Message<Object>> methodCalls);

}

Thus you have your front end requests which can be very REST like and then your back end services where you can just forward front end requests direct with little or no transform.

public interface ProtocolParser {

    boolean supports(Object object, MultiMap<String, String> params);

    MethodCall<Object> parseMethodCall(Object body);


    List<Message<Object>> parse(Object body);


    List<MethodCall<Object>> parseMethods(Object body);



    Response<Object> parseResponse(Object body);
}


Messages in in QBit are like this:

public interface Message <T> {
    long id();

    T body(); //Body could be a Map for parameters for forms or JSON or bytes[] or String

    boolean isSingleton();

}

public interface CompositeMessage<M extends Message<T>, T> extends Message<T>, Iterable<M> {


}

Requests which extend Message:

public interface Request <T> extends Message<T> {


    String address();
    String returnAddress();
    MultiMap<String, String> params();

    MultiMap<String, String> headers();
    long timestamp();

}

Events.


public interface Event<T> extends Message<T> {
}

Requests.

public interface Request <T> extends Message<T> {


    String address();
    String returnAddress();
    MultiMap<String, String> params();

    MultiMap<String, String> headers();
    long timestamp();

}

Requests have Responses.

public interface Response <T> extends Message<T> {

    boolean wasErrors();

    void body(T body);

    String returnAddress();

    String address();


    long timestamp();
}

Then you also have a special kind of Request called a MethodCall. :)

public interface MethodCall<T> extends Request<T> {

    String name();

    long timestamp();

    String objectName();
}

None of this is tied to HTTP per se. You could use the same concepts on top of RabbitMQ, or JMS. QBit supports HTTP and Websocket via Vertx.

Hmmmm...

You can intercept method calls over queues as follows:

public interface BeforeMethodCall {

    boolean before(MethodCall call);
}

And

public interface AfterMethodCall {


    boolean after(MethodCall call, Response response);
}

You can intercept calls before or after encoding.

Why are you still reading this, go download it and play around. :)

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally