No description, website, or topics provided.
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
licenses
sbin
src
.arcconfig
.arclint
.gitignore
.travis.yml
LICENSE.md
README.md
pom.xml

README.md

Protobuf-ActiveMQ-RPC

![Build Status] (http://img.shields.io/travis/MediaMiser/protobuf-activemq-rpc/master.svg?style=flat-square) ![Coverage Status] (http://img.shields.io/coveralls/MediaMiser/protobuf-activemq-rpc/master.svg?style=flat-square) ![Maven Central] (https://maven-badges.herokuapp.com/maven-central/com.mediamiser/protobuf-activemq-rpc/badge.svg?style=flat-square)

This library provides the classes necessary to create RPC clients and servers defined as [Google Protocol Buffer Services] (https://developers.google.com/protocol-buffers/docs/proto#services) using Apache ActiveMQ to communicate and act as a fault-tolerant message broker so that clients and servers do not need to communicate directly. View the [API documentation] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/index.html) for more details.

This project was inspired/based in small part upon the protobuf-socket-rpc (MIT License) and protobuf-rpc-pro (Apache License 2.0) projects.

Requirements

To build this project, your development environment must have installed:

This project is designed to be used with:

Usage

To use this in your project, you should have an [Apache ActiveMQ] (http://activemq.apache.org/) cluster to connect to and add the following dependency to your project's pom.xml file:

<dependency>
  <groupId>com.mediamiser</groupId>
  <artifactId>protobuf-activemq-rpc</artifactId>
  <version>1.0.6</version>
</dependency>

RPCs and their input/output messages are defined in a language-agnostic manner using Google Protocol Buffers . An example can be found in tests as [src/test/proto/pings.proto] (src/test/proto/pings.proto).

package com.mediamiser.service;

option java_generic_services = true;
option java_package = "com.mediamiser.service";
option java_outer_classname = "PingsProtocol";

message Host {
  required string ip = 1;
  optional string hostname = 2;
}

message Pings {
  repeated uint32 time_ms = 1;
}

service PingsServiceV1 {
  rpc ping(Host) returns (Pings);
}

...

This proto file can be compiled using Maven and [maven-protoc-plugin] (http://sergei-ivanov.github.io/maven-protoc-plugin/index.html) (see pom.xml for example usage). These plugins require that the protoc compiler is executable from your shell and on your $PATH (on Ubuntu you can install it using [sudo install-protobuff.sh] (sbin/install-prodobuff.sh)). Running mvn compile will generate a PingsProtocol.java file providing messages and RPC stubs for PingsServiceV1.

Useful links

When writing your RPCs and messages, take into account the protobuf:

Client

A client that includes this library and has built the proto files for PingsServiceV1 can try to reach a server hosting this service using the code below. A full client is demonstrated in [ExampleClient.java] (src/test/java/com/mediamiser/service/ExampleClient.java).

Note that [ActiveMqChannel] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqChannel.html) is a heavyweight object and it should be recycled if possible e.g. by using an [object pool] (https://commons.apache.org/proper/commons-pool/).

// Connect to an ActiveMQ broker (in this case a local one on your development
// machine)
final Connection connection =
    new ActiveMQConnectionFactory("tcp://localhost:61616")
        .createConnection();

// Start the connection to allow it to be used to send RPC calls
connection.start();

// Specify which services are available through this ActiveMQ broker
final Set<ServiceDescriptor> availableServices =
    Sets.newHashSet(PingsProtocol.PingsServiceV1.getDescriptor());

// Specify the time to live for RPC calls in milliseconds.  ActiveMQ and the
// server will drop messages that expire, and clients that are blocking will
// block for this amount of time before reporting failure. This can be set to
// ActiveMqChannel.DISABLE_EXPIRY to disable message expiry for a specific
// client (call forwarders on remote machines all respect each channel's
// individual expiry time).
final long timeToLiveMs = 1000;

// Create a channel to the available services with a specific call expiry time
final ActiveMqChannel channel =
    new ActiveMqChannel(connection, availableServices, timeToLiveMs);

// Create a controller to get status and error information
final RpcController controller = new ActiveMqController();

// Define a request
final Host request = Host.newBuilder()
        .setIp("127.0.0.1")
        .setHostname("42")
        .build();

// Synchronously make 1 call and wait for its completion
final Pings responseTimes =
    PingsProtocol.PingsServiceV1.newBlockingStub(channel)
        .ping(controller, request);

// Clean up resources
channel.close();
connection.close();

Server

A server can create a provider for a service and extends the automatically generated PingsServiceV1 class. Providers are instances which implement remote procedures and they are executed by [ActiveMqCallForwarder] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqCallForwarder.html) instances (each of which can be thought of as a thread). Providers should be thread safe if you are going to share one provider instance with many forwarders.

// Copyright (c) 2014 MediaMiser Ltd. All rights reserved.
package com.mediamiser.service.providers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.mediamiser.service.PingsProtocol;
import com.mediamiser.service.PingsProtocol.Host;
import com.mediamiser.service.PingsProtocol.Pings;

/**
 * This provider functions properly, but returns the requested hostname as a
 * response time to help indicate that the correct response was delivered to
 * the correct caller.
 *
 * @author Chris Fournier <chris.fournier@mediamiser.com>
 */
public class PingsServiceProvider extends PingsProtocol.PingsServiceV1 {
    private static final Logger LOG =
        LoggerFactory.getLogger(PingsServiceProvider.class);

    @Override
    public void ping(final RpcController controller,
                     final Host request,
                     final RpcCallback<Pings> done) {
        LOG.trace("Got request to ping {} ({})",
            request.getIp(),
            request.getHostname());

        // Add fake times (in this case, the hostname, so that we can make sure
        // that the correct message got sent back to the correct caller during
        // tests)
        final Pings.Builder responseTimes = Pings.newBuilder();
        responseTimes.addTimeMs(Integer.parseInt(request.getHostname()));

        // Return the response times via the callback
        done.run(responseTimes.build());
    }

}

This provider alone will not serve requests; it must be connect to at least one [ActiveMqCallForwarder] (https://mediamiser.github.io/protobuf-activemq-rpc/apidocs/com/mediamiser/service/ActiveMqCallForwarder.html) which has a live connection to an ActiveMQ broker (as demonstrated superficially below and fully in [ExampleServer.java] (src/test/java/com/mediamiser/service/ExampleServer.java)).

// Connect to ActiveMQ
final Connection connection =
    new ActiveMQConnectionFactory("tcp://localhost:61616")
        .createConnection();

// Construct services to host
final Set<Service> availableServices =
    Sets.newHashSet(new PingsServiceProvider());

// Create a forwarder (akin to a thread) to host this service and store it
final ActiveMqCallForwarder forwarder =
    new ActiveMqCallForwarder(connection, availableServices);

// Begin processing calls
connection.start();

...

// Cleanup resources
connection.stop();
forwarder.close();
connection.close();

Development

Feel free to report an issue or submit a pull-request. Changes can be tested locally using:

mvn clean test

Licensing

Protobuf-ActiveMQ-RPC is licensed under the BSD 3-Clause license.