Skip to content

[Z Blog] Using Consul from QBit, the Java, JSON, WebSocket and REST microservice library, for health monitoring, clustering and service discovery

Richard Hightower edited this page Mar 17, 2015 · 1 revision

Using Consul from QBit, the Java, JSON, WebSocket and REST microservice library, for health monitoring, clustering and service discovery

QBit is a microservice framework that focuses on easy to use and develop services written in Java that can be exposed via WebSocket/JSOn and REST/JSON interfaces.

Consul is a service discovery system that provides a microservice style interface to services, service topology and service health.

Early steps have been taken to integrate QBit with Consul.

Consul

Consul provides, service discovery, health monitoring and config services.

With service discovery you can look up services which are organized in the topology of your datacenters. Consul uses client agents and RAFT to provide a consistent view of services. Consul provides a consistent view of configuration as well also using RAFT. Consul provides a microservice interface to a replicated view of your service topology and its configuration. Consul can monitor and change services topology based on health of individual nodes.

Consul provides scalable distributed health checks. Consul only does minimal datacenter to datacenter communication so each datacenter has its own Consul cluster. Consul provides a domain model for managing topology of datacenters, server nodes, and services running on server nodes along with their configuration and current health status.

Consul is like combining the features of a DNS server plus Consistent Key/Value Store like etcd plus features of ZooKeeper for service discovery, and health monitoring like Nagios but all rolled up into a consistent system. Essentially, Consul is all the bits you need to have a coherent domain service model available to provide service discovery, health and replicated config, service topology and health status. Consul also provides a nice REST interface and Web UI to see your service topology and distributed service config.

Consul organizes your services in a Catalog called the Service Catalog and then provides a DNS and REST/HTTP/JSON interface to it.

To use Consul you start up an agent process. The Consul agent process is a long running daemon on every member of Consul cluster. The agent process can be run in server mode or client mode. Consul agent clients would run on every physical server or OS virtual machine (if that makes more sense). Client runs on server hosting services. The clients use gossip and RPC calls to stay in sync with Consul.

A client, consul agent running in client mode, forwards request to a server, consul agent running in server mode. Clients are mostly stateless. The client does LAN gossip to the server nodes to communicate changes.

A server, consul agent running in server mode, is like a client agent but with more tasks. The consul servers use the RAFT quorum mechanism to see who is the leader. The consul servers maintain cluster state like the Service Catalog. The leader manages a consistent view of config key/value pairs, and service health and topology. Consul servers also handle WAN gossip to other datacenters. Consul server nodes forwards queries to leader, and forward queries to other datacenters.

A Datacenter is fairly obvious. It is anything that allows for fast communication between nodes, with as few or no hops, little or no routing, and in short: high speed communication. This could be an Amazon EC2 availability zone, a networking environment like a subnet, or any private, low latency, high bandwidth network. You can decide exactly what datacenter means for your organization.

Consul server nodes use consensus to determine who is the leader. They have agreement on who is the leader. They use a transactional finite state machine (FSM) to make sure all server nodes are in lock step with critical tasks. They also employ a replicated log, FSM, peer set (who gets the replicated log), quorum (majority of peers agree), committed entry, and a leader. The end result is consistent view of configuration and live services and their topology.

Consul is built on top of Serf. Serf is a full gossip protocol and provides membership, failure detection, and event broadcast mechanisms. Serf provides the clustering for the server nodes. Consul uses LAN gossip for client and servers in the same local network or datacenter. Consul uses WAN Gossip but it is only for servers.

With Consul you run 3 to five servers per datacenter. You use WAN gossip to communicate over Internet or wide area networks. Consul also provides an RPC - Remote Procedure Call which is a request / response mechanism allowing a client to make a request of a server to for example join a cluster, leave a cluster, etc.

Consul client agents maintain its set of service, check registrations and health information. Clients also update health checks and local state.

Local agent state (agent client node) is different than catalog state (agent server node). Local Agent notifies Catalog of changes. Updates are synced right away from client agent to the services Catalog which is stored in the triad of servers.

The local agents (agents running in client mode) check to make sure its view of the world matches the catalog, and if not the catalog is updated. An Example would be an client registers a new service check with a client agent, then the client agent notifies the consul server to update catalog that this service check exists. When a check is removed from the local client agent, it is removed from catalog which exists in the cluster of consul servers.

If the agents run health checks, and status changes, then update is sent to catalog. Agent is the authority of that node (client and server), and the services that exist on that node. The Agent scope is the Server or Virtual Machine running the agent. The Catalog scope is a single datacenter or local area network or EC2 availability zone. Changes are also synchronized when a change is made and periodically every minute to ten minutes depending on the size of the cluster.

Consul provides the following REST endpoints for interacting with Consul:

  • kv - key value
  • agent - api for dealing with agent
  • catalog - dealing with datacenter catalog
  • health - show health checks
  • sessions - group operations and manage consistent view
  • events - fire fast gossip based events
  • acl - setup access control lists and security for Consul
  • status - check status

Each endpoint provides operations which either take JSON, request params and deliver JSON responses. In addition to programmatically configuring Consul via REST, you can use JSON config files stored locally to bootstrap config and service topology.

Consul provides a series of health checks. The most common is called the Dead man switch or
time to live health check, the service has to dial in with status update every X time period. The other health check is the HTTP ping every N period of time where HTTP code 200-299 means PASS, HTTP code 429 is WARN and any other HTTP status code is FAIL. Lastly you can run a script every with the client agent (or server agent). If the process returns 0, then the check passes, 1 and it is considered a warning, any other process return value is considered a FAIL.

QBit Microservice Library for Java using Consul

QBit is a actor/reactive style Java microservice engine that provides an event bus, REST and WebSocket using JSON.

QBit added the ability to cluster servers running QBit with Consul for its event bus.

At first we started looking at Consul Client which is from Orbitz. This seemed to be the most complete version of a Java client API for Consul and the one most like the kind we would write. After some further investigation and trial and error, we decided that adding 13+ extra jar files for doing JSON parsing and HTTP calls was a bit much for a framework like QBit which does JSON parsing, marshaling and HTTP calls. Therefore we forked the code, ripped out the HTTP lib it was using and the JSON parser and a few other random things and used QBit equivalent. There was nothing wrong with the libs the Consul Client lib was using, we just did not want an extra 13 jar files for something so intrinsic to QBit. You can find the QBit version of Consul Client as a subproject of QBit, the microservice, JSON, WebSocket, REST library for Java.

Qbit has an event bus already. It can be remote via WebSocket and REST. It can be replicated via WebSocket. But the QBit event bus requires mostly programatic configuration. You could in theory have a config file and based on this config file, configure nodes. This is basically what we are doing but we are using the Consul Client lib for this.

Prior to the integration with Consul, QBit had a EventRemoteReplicatorService and a EventBusReplicationClientBuilder which configured a remote EventConnector that talks to the EventRemoteReplicatorService. The EventRemoteReplicatorService is a WebSocket endpoint for method invocations and is an EventConnector. An EventConnector interface defines a method for passing events between event busses. In the core of QBit there is the EventConnectorHub, which can take a collection of EventConnector and provides a single EventConnector interface for the entire collection. An EventManager manages and EventBus, and it takes an EventConnector which can be an EventConnectorHub.

To this effort we added an EventBusRing which talks to Consul and periodically adds or removes members from an EventConnectorHub based on changes in the services topology from Consul.

EventBusRingBuilder is used to construct EventBusRing. You can pass the EventBusRingBuilder an existing EventManager or it can create a new EventManager. With the EventBusRingBuilder you can specify where Consul is located, how often we should poll Consul, how long the long poll to Consul should be for topology changes and how often we should ping Consul to say that this event bus node is still alive as well as set the name of the event bus.

EventBusRing

public class **EventBusRing** implements Startable, Stoppable {

    …
    private AtomicReference<Consul> consul = new AtomicReference<>();
    
    private AtomicInteger lastIndex = new AtomicInteger();


    public EventBusRing(…) {
        this.consul.set(Consul.consul(consulHost, consulPort));

In the start method, we start the local service queue, we start consul, which starts the Qbit HttpClient class that talks REST/JSON to Consul. We also start the replicator service by calling startServerReplicator which exposes a WebSocket endpoint that other instances running EventBusRing can talk to, i.e., it starts a EventRemoteReplicatorService microservice that speaks WebSocket and QBit/JSON wire protocol. Then we register the local event bus with Consul by calling registerLocalBusInConsul.

EventBusRing start

    public void start() {

        consul.get().start();

        if (eventServiceQueue !=null) {
            eventServiceQueue.start();
        }

        startServerReplicator();

        registerLocalBusInConsul();

        healthyNodeMonitor = periodicScheduler.repeat(this::healthyNodeMonitor, peerCheckTimeInterval, peerCheckTimeUnit);

        if (replicationServerCheckInIntervalInSeconds > 2) {
            consulCheckInMonitor  = periodicScheduler.repeat(this::checkInWithConsul, replicationServerCheckInIntervalInSeconds / 2, TimeUnit.SECONDS);
        } else {
            consulCheckInMonitor  = periodicScheduler.repeat(this::checkInWithConsul, 100, TimeUnit.MILLISECONDS);
        }
    }

Notice that we start up two scheduled set of tasks. Using the QBit periodicScheduler which is a timer to schedule tasks. We periodically call healthyNodeMonitor and checkInWithConsul. The healthyNodeMonitor method does a long poll operation to Consul to check for changes to the list of healthy services. The checkInWithConsul uses Consul dead man switch (time to live health check) to notify Consul that this service, this event bus, is still alive.

The registerLocalBusInConsul calls the consul agent endpoint to register this service with Consul.

EventBusRing registerLocalBusInConsul

    private void registerLocalBusInConsul() {
        consul.get().agent().registerService(replicationPortLocal, 
                replicationServerCheckInIntervalInSeconds, eventBusName, localEventBusId, tag);
    }

The default eventBusName is “eventBus”. The default id (localEventBusId) is “eventBus-“+UUID. The tag is settable via the builder as is the event bus name. The replicationServerCheckInIntervalInSeconds is settable via the builder (EventBusRingBuilder) and specifies how many seconds before Consul thinks this eventBus service is dead if it does not check in.

The healthyNodeMonitor is used to periodically get a list of healthy event bus nodes from Consul.

EventBusRing healthyNodeMonitor

    private void healthyNodeMonitor() {

        try {
            rebuildHub(getHealthyServices());
        } catch (Exception ex) {
            logger.error("unable to contact consul or problems rebuilding event hub", ex);
            Consul oldConsul = consul.get();
            consul.compareAndSet(oldConsul, startNewConsul(oldConsul));
        }

    }

The healthyNodeMonitor calls getHealthyServices and passes the results of getHealthyServices to rebuildHub.

The method getHealthyServices uses the Consul REST health endpoint to get a list of healthy service nodes from Consul.

EventBusRing getHealthyServices

    private List<ServiceHealth> getHealthyServices() {
        final ConsulResponse<List<ServiceHealth>> consulResponse = consul.get().health()
                .getHealthyServices(eventBusName, datacenter, tag, requestOptions);
        this.lastIndex.set(consulResponse.getIndex());

        final List<ServiceHealth> healthyServices = consulResponse.getResponse();

        if (debug) {
            showHealthyServices(healthyServices);
        }
        buildRequestOptions();
        return healthyServices;
    }

Also periodically we have to check in our current node with Consul so it is not marked unhealthy. This is done as you recall by doing a periodic operation backed by the checkInWithConsul method.

EventBusRing

    private void checkInWithConsul() {
        try {
            consul.get().agent().pass(localEventBusId, "still running");
        } catch (NotRegisteredException ex) {
            registerLocalBusInConsul();
        } catch (Exception ex) {
            Consul oldConsul = consul.get();
            consul.compareAndSet(oldConsul, startNewConsul(oldConsul));
        }
    }

The checkInWithConsul method calls the Consul REST endpoint agent’s pass operation to notify to Consul that this service node is still passing. If Consul does not know this service exists, then we reregister it. If we get any other exception then we drop our Consul instance and create a new one.

To test this, I run a bunch of these:

EventBusRing

        EventBusRing eventBusRing;

        int port = 0;

                   //grabs first local port in this range that is free
        port = useOneOfThePortsInThisRange(9000, 9900);
        EventBusRingBuilder eventBusRingBuilder = eventBusRingBuilder();
        eventBusRingBuilder.setConsulHost("localhost");
        eventBusRingBuilder.setReplicationPortLocal(port);

        eventBusRing = eventBusRingBuilder.build();
        eventBusRing.start();



    public void test() {

        eventBusRing.eventManager().register("mom", new EventConsumer<Object>() {
            @Override
            public void listen(Event<Object> event) {
                puts(event.channel(), event.body());
            }
        });

        for (int index = 0; index < 100; index++) {
            Sys.sleep(1000);
            eventBusRing.eventManager().send("mom", "hi mom " + port);


        }

    }


    public static void main(String... args) {

        EventBusRingBuilderTest test = new EventBusRingBuilderTest();
        test.setup();
        test.test();

    }

Then I start dropping them. I can also open up the Consul GUI and take a peek of what is healthy or not.

This is just the start, we plan on perhaps integrating with other similar frameworks, and perhaps even abstracting some of the clustering details of Consul in a common clustering interface. The project etcd is on our list and was in fact our first attempt but Consul seemed to be more like what we were trying to build with etcd so it seemed like a natural first pick. Expect more integration with QBit and Consul.

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally