-
Notifications
You must be signed in to change notification settings - Fork 140
[Detailed Tutorial] Using event channels and strongly typed event bus with QBit (The employee example)
##Overview
The event channel is both the interface to send the event from the producer and the interface to receive the event. An advantage to this strongly typed channel approach is you can use the IDE to quickly see what implements the channel methods, i.e., event listeners and what calls it to see all of your event producers (call is asynchronous). QBit event channels are part of QBit's event bus system which uses a microservice approach for communication and Consul for clustering and JSON and WebSocket for high-speed replication that is reader tolerant.
This wiki will walk you through a simple "employee example" to demonstrate to you how to use QBit's event channels.
You will build a simple "employee example" that includes four services; each service will handle the following situations: when a new employees is hired, add the employee to the payroll system, enroll the employee into the benefits system, and invite them to our community outreach program.
This exmaple is very similar to [[Quick Start] Using QBit's Event Bus System (The Employee example)] (https://github.com/advantageous/qbit/wiki/%5BQuick-Start%5D-Using-QBit's-Event-Bus-System-(The-Employee-example)) but this time we are using event channels to send events instead of the plain event bus system. This will be great example to demonstrate to you how to use QBit's event channels to send events. When you run this example you will get the following:
Hired employee Employee{firstName='Rick', employeeId=1}
Employee will be invited to the community outreach program Rick 1
DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll Rick 1 100
Employee enrolled into benefits system employee Rick 1
In order to complete this example successfully you will need the following installed on your machine:
- Gradle; if you need help installing it, visit Installing Gradle.
- Your favorite IDE or text editor (we recommend [Intellig IDEA ] (https://www.jetbrains.com/idea/) latest version).
- [JDK ] (http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) 1.8 or later.
- Build and install QBit on your machine click [Building QBit ] (https://github.com/advantageous/qbit/wiki/%5BQuick-Start%5D-Building-QBit-the-microservice-lib-for-Java) for instrutions.
Now that your machine is all ready let's get started:
- [Download ] (https://github.com/fadihub/event-channels-qbit/archive/master.zip) and unzip the source repository for this guide, or clone it using Git:
https://github.com/fadihub/event-channels-qbit.git
Once this is done you can test the service, let's first explain the process:
This example has four services: EmployeeHiringService, BenefitsService, VolunteerService, and PayrollService:
public static class EmployeeHiringService {
final EmployeeEventManager eventManager;
final SalaryChangedChannel salaryChangedChannel;
public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
final SalaryChangedChannel salaryChangedChannel) {
this.eventManager = employeeEventManager;
this.salaryChangedChannel = salaryChangedChannel;
}
@QueueCallback(QueueCallbackType.EMPTY)
private void noMoreRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
@QueueCallback(QueueCallbackType.LIMIT)
private void hitLimitOfRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
eventManager.sendNewEmployee(employee);
salaryChangedChannel.salaryChanged(employee, salary);
}
}
public static class BenefitsService {
@OnEvent(NEW_HIRE_CHANNEL)
public void enroll(final Employee employee) {
System.out.printf("Employee enrolled into benefits system employee %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class VolunteerService {
@OnEvent(NEW_HIRE_CHANNEL)
public void invite(final Employee employee) {
System.out.printf("Employee will be invited to the community outreach program %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class PayrollService implements SalaryChangedChannel{
@Override
public void salaryChanged(Employee employee, int newSalary) {
System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), newSalary);
}
}
as you can see all services involve in some way an employee, the employee looks like this:
public Employee(String firstName, int employeeId) {
this.firstName = firstName;
this.employeeId = employeeId;
}
here are the employee's getters:
public String getFirstName() {
return firstName;
}
public int getEmployeeId() {
return employeeId;
}
@Override
public String toString() {
return "Employee{" +
"firstName='" + firstName + '\'' +
", employeeId=" + employeeId +
'}';
}
This example also has three interfaces: EmployeeHiringServiceClient, SalaryChangedChannel, and EmployeeEventManager:
interface EmployeeHiringServiceClient {
void hireEmployee(final Employee employee);
}
@EventChannel
interface SalaryChangedChannel {
void salaryChanged(Employee employee, int newSalary);
}
interface EmployeeEventManager {
@EventChannel(NEW_HIRE_CHANNEL)
void sendNewEmployee(Employee employee);
}
The above are the event channels interfaces as you can see each takes a method hireEmployee
, salaryChanged
, and sendNewEmployee
. The method name is the thing that happened, It is the event. The @EventChannel annotation can be passed a channel name, but if it is not past a name then the name becomes the fully qualified class name of the interface.
The EmployeeHiringService actually fires off the events to the other services. This service will use the channel to send events. The other three services are listening as you will see later. Then we will evoke a method on the sender service and it will send a message over the bus that the three other methods are listening to.
public static class EmployeeHiringService {
final EmployeeEventManager eventManager;
final SalaryChangedChannel salaryChangedChannel;
public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
final SalaryChangedChannel salaryChangedChannel) {
this.eventManager = employeeEventManager;
this.salaryChangedChannel = salaryChangedChannel;
}
@QueueCallback(QueueCallbackType.EMPTY)
private void noMoreRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
@QueueCallback(QueueCallbackType.LIMIT)
private void hitLimitOfRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
eventManager.sendNewEmployee(employee);
salaryChangedChannel.salaryChanged(employee, salary);
}
}
Above we are hiring a new employee and changing his salary using the following channels: eventManager, and salaryChangedChannel.
Next we have two services BenefitsService and VolunteerService that are listening via @OnEvent(NEW_HIRE_CHANNEL)
annotation:
public static class BenefitsService {
@OnEvent(NEW_HIRE_CHANNEL)
public void enroll(final Employee employee) {
System.out.printf("Employee enrolled into benefits system employee %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class VolunteerService {
@OnEvent(NEW_HIRE_CHANNEL)
public void invite(final Employee employee) {
System.out.printf("Employee will be invited to the community outreach program %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
The third service PayrollService is listening by implementing the channel interface as follows:
public static class PayrollService implements SalaryChangedChannel{
@Override
public void salaryChanged(Employee employee, int newSalary) {
System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), newSalary);
}
}
Now to test this example we need to wire up the services this is done as follows:
-Create the private event bus and the channels:
/* Create you own private event bus. */
EventManager privateEventBus =
EventManagerBuilder.eventManagerBuilder()
.setName("foo").build();
final EventBusProxyCreator eventBusProxyCreator =
QBit.factory().eventBusProxyCreator();
final EmployeeEventManager employeeEventManager =
eventBusProxyCreator.createProxy(privateEventBus, EmployeeEventManager.class);
final SalaryChangedChannel salaryChangedChannel = eventBusProxyCreator.createProxy(privateEventBus, SalaryChangedChannel.class);
-Create a EmployeeHiringService and pass it the private event bus:
EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManager, salaryChangedChannel);
-Create the other services POJOs, note they have no compile time dependencies on QBit:
PayrollService payroll = new PayrollService();
BenefitsService benefits = new BenefitsService();
VolunteerService volunteering = new VolunteerService();
-Make the services, QBit services by implementing service queues:
/* Create a service queue for this event bus. */
ServiceQueue privateEventBusServiceQueue = serviceBuilder()
.setServiceObject(privateEventBus)
.setInvokeDynamic(false).build();
/** Employee hiring service. */
ServiceQueue employeeHiringServiceQueue = serviceBuilder()
.setServiceObject(employeeHiring)
.setInvokeDynamic(false).build();
/** Payroll service */
ServiceQueue payrollServiceQueue = serviceBuilder()
.setServiceObject(payroll)
.setInvokeDynamic(false).build();
/** Employee Benefits service. */
ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
.setServiceObject(benefits)
.setInvokeDynamic(false).build();
/* Community outreach program. */
ServiceQueue volunteeringServiceQueue = serviceBuilder()
.setServiceObject(volunteering)
.setInvokeDynamic(false).build();
-Wire the event bus to the services so it can fire off events to the service queues:
/* Now wire in the event bus so it can fire events into the service queues. */
privateEventBus.joinService(payrollServiceQueue);
privateEventBus.joinService(employeeBenefitsServiceQueue);
privateEventBus.joinService(volunteeringServiceQueue);
-Start the QBit services:
privateEventBusServiceQueue.start();
employeeHiringServiceQueue.start();
volunteeringServiceQueue.start();
payrollServiceQueue.start();
employeeBenefitsServiceQueue.start();
-Create the EmployeeHiringServiceClient proxy so we can hire a new employee to fire off the other services:
/** Now createWithWorkers the service proxy like before. */
EmployeeHiringServiceClient employeeHiringServiceClientProxy =
employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);
-And hire the new employee using the proxy:
/** Call the hireEmployee method which triggers the other events. */
employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));
-And finally flush the proxy:
flushServiceProxy(employeeHiringServiceClientProxy);
Every so often, we have to flush calls to the client proxy.
The client proxy will flush calls every time the queue batch size is met. So if the queue batch size was set to 5, then it would flush every five calls. But no matter, when you are done making calls, you should flush the calls.
##The full code Listing
src/main/java/io.advantageous.qbit.example/EmployeeEventExampleUsingChannelsToSendEvents
/*
* Copyright (c) 2015. Rick Hightower, Geoff Chandler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web!
*/
package io.advantageous.qbit.example;
import io.advantageous.qbit.QBit;
import io.advantageous.qbit.annotation.EventChannel;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.events.EventBusProxyCreator;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.boon.core.Sys;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
/**
* EmployeeEventExampleUsingChannelsToSendEvents
* Created by rhightower on 2/11/15.
*/
public class EmployeeEventExampleUsingChannelsToSendEvents {
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static void main(String... args) {
/* Create you own private event bus. */
EventManager privateEventBus =
EventManagerBuilder
.eventManagerBuilder().setName("foo")
.build();
/* Create a service queue for this event bus. */
ServiceQueue privateEventBusServiceQueue = serviceBuilder()
.setServiceObject(privateEventBus)
.setInvokeDynamic(false).build();
final EventBusProxyCreator eventBusProxyCreator =
QBit.factory().eventBusProxyCreator();
final EmployeeEventManager employeeEventManager =
eventBusProxyCreator.createProxy(privateEventBus, EmployeeEventManager.class);
final SalaryChangedChannel salaryChangedChannel = eventBusProxyCreator.createProxy(privateEventBus, SalaryChangedChannel.class);
/*
Create your EmployeeHiringService but this time pass the private event bus.
Note you could easily use Spring or Guice for this wiring.
*/
EmployeeHiringService employeeHiring = new EmployeeHiringService(employeeEventManager, salaryChangedChannel);
/* Now createWithWorkers your other service POJOs which have no compile time dependencies on QBit. */
PayrollService payroll = new PayrollService();
BenefitsService benefits = new BenefitsService();
VolunteerService volunteering = new VolunteerService();
/** Employee hiring service. */
ServiceQueue employeeHiringServiceQueue = serviceBuilder()
.setServiceObject(employeeHiring)
.setInvokeDynamic(false).build();
/** Payroll service */
ServiceQueue payrollServiceQueue = serviceBuilder()
.setServiceObject(payroll)
.setInvokeDynamic(false).build();
/** Employee Benefits service. */
ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
.setServiceObject(benefits)
.setInvokeDynamic(false).build();
/* Community outreach program. */
ServiceQueue volunteeringServiceQueue = serviceBuilder()
.setServiceObject(volunteering)
.setInvokeDynamic(false).build();
/* Now wire in the event bus so it can fire events into the service queues. */
privateEventBus.joinService(payrollServiceQueue);
privateEventBus.joinService(employeeBenefitsServiceQueue);
privateEventBus.joinService(volunteeringServiceQueue);
privateEventBusServiceQueue.start();
employeeHiringServiceQueue.start();
volunteeringServiceQueue.start();
payrollServiceQueue.start();
employeeBenefitsServiceQueue.start();
/** Now createWithWorkers the service proxy like before. */
EmployeeHiringServiceClient employeeHiringServiceClientProxy =
employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);
/** Call the hireEmployee method which triggers the other events. */
employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));
flushServiceProxy(employeeHiringServiceClientProxy);
Sys.sleep(5_000);
}
interface EmployeeHiringServiceClient {
void hireEmployee(final Employee employee);
}
@EventChannel
interface SalaryChangedChannel {
void salaryChanged(Employee employee, int newSalary);
}
interface EmployeeEventManager {
@EventChannel(NEW_HIRE_CHANNEL)
void sendNewEmployee(Employee employee);
}
public static class Employee {
final String firstName;
final int employeeId;
public Employee(String firstName, int employeeId) {
this.firstName = firstName;
this.employeeId = employeeId;
}
public String getFirstName() {
return firstName;
}
public int getEmployeeId() {
return employeeId;
}
@Override
public String toString() {
return "Employee{" +
"firstName='" + firstName + '\'' +
", employeeId=" + employeeId +
'}';
}
}
public static class EmployeeHiringService {
final EmployeeEventManager eventManager;
final SalaryChangedChannel salaryChangedChannel;
public EmployeeHiringService(final EmployeeEventManager employeeEventManager,
final SalaryChangedChannel salaryChangedChannel) {
this.eventManager = employeeEventManager;
this.salaryChangedChannel = salaryChangedChannel;
}
@QueueCallback(QueueCallbackType.EMPTY)
private void noMoreRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
@QueueCallback(QueueCallbackType.LIMIT)
private void hitLimitOfRequests() {
flushServiceProxy(salaryChangedChannel);
flushServiceProxy(eventManager);
}
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
eventManager.sendNewEmployee(employee);
salaryChangedChannel.salaryChanged(employee, salary);
}
}
public static class BenefitsService {
@OnEvent(NEW_HIRE_CHANNEL)
public void enroll(final Employee employee) {
System.out.printf("Employee enrolled into benefits system employee %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class VolunteerService {
@OnEvent(NEW_HIRE_CHANNEL)
public void invite(final Employee employee) {
System.out.printf("Employee will be invited to the community outreach program %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class PayrollService implements SalaryChangedChannel{
@Override
public void salaryChanged(Employee employee, int newSalary) {
System.out.printf("DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), newSalary);
}
}
}
With your terminal cd event-channels-qbit
then gradle clean build
then gradle run
and you should get the following:
Hired employee Employee{firstName='Rick', employeeId=1}
Employee will be invited to the community outreach program Rick 1
DIRECT FROM CHANNEL SalaryChangedChannel Employee added to payroll Rick 1 100
Employee enrolled into benefits system employee Rick 1
Again the advantage to this strongly typed channel approach is you can use the IDE to quickly see what implements the channel methods, i.e., event listeners and what calls it to see all of your event producers. You have built and tested "The Employee example" to learn about QBit's event channels and strongly typed event bus, see you in the next tutorial!
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting