Skip to content

Commit

Permalink
Merge pull request #12 from AO-StreetArt/ZmqMessageDesign
Browse files Browse the repository at this point in the history
Zmq message design
  • Loading branch information
AO-StreetArt committed Oct 25, 2017
2 parents 5b2e7b9 + 3b7bb76 commit ba242f8
Show file tree
Hide file tree
Showing 10 changed files with 736 additions and 174 deletions.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
:caption: Contents:

pages/quickstart.rst
pages/retryDesign.rst

.. include:: ../README.rst

Expand Down
39 changes: 39 additions & 0 deletions docs/pages/retryDesign.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
.. _retryDesign:

Messaging Retry Design
======================

:ref:`Go Home <index>`

Terms
-----

Greylist - Short-term store for services marked as potentially failed
Redlist - Very short-term store to prevent the same failed service from being used immediately after
Blacklist - Long-term store for failed services. Entries expire slowly

Phase 1 - Find a Service Instance
---------------------------------

1. Check the socket pool for open, unused sockets. If we have one, return it
2. Get a List of Service Instances from the Service Manager
3. Get a random number (seed value = time of program startup)
4. For each Service in the list (random number from step 2 as start index):
a. If the service is not in use, not in the blacklist, and not in the redlist return it
b. Else, return null

Phase 2 - Sending Message
-------------------------

1. Send message
2. On success we continue, on failure we move to phase 3

Phase 3 - After Failure
-----------------------

1. Re-try on same socket (timeout and # of retries configurable)
2. If all attempts are unsuccessful, then report failure
a. If instance not in greylist or blacklist, add to greylist and redlist
b. If instance in greylist and not blacklist, add to blacklist
c. Remove failed socket from socket pool
3. Return to Phase 1
222 changes: 222 additions & 0 deletions src/main/java/adrestia/dao/ServiceManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
Apache2 License Notice
Copyright 2017 Alex Barry
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package adrestia;

import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.annotation.PreDestroy;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import org.zeromq.ZContext;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMQ;
import org.zeromq.ZPoller;

/**
* Uses the Consul Discovery Client to find Service Instances.
*/
@Component
public class ServiceManager implements ServiceManagerInterface {

// How long should entries stay in the redlist
@Value("${server.zmq.redlist.duration}")
private int redlistDuration;
// How long should entries stay in the greylist
@Value("${server.zmq.greylist.duration}")
private int greylistDuration;
// How long should entries stay in the blacklist
@Value("${server.zmq.blacklist.duration}")
private int blacklistDuration;

// Consul Client for executing Service Discovery
@Autowired
DiscoveryClient consulClient;

// Utility Provider, providing us with basic utility methods
@Autowired
UtilityProviderInterface utils;

// Service Manager Logger
private final Logger logger = LogManager.getLogger("adrestia.ServiceManager");

// Loading cache to hold blacklisted CLyman hosts
// Keys will expire after 5 minutes, at which point Consul should be able
// to determine if the service is active or inactive.
LoadingCache<String, Object> blacklist = CacheBuilder.newBuilder()
.expireAfterAccess(blacklistDuration, TimeUnit.SECONDS)
.maximumSize(60)
.weakKeys()
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) throws Exception {
return key;
}
});

// Loading Cache to hold greylisted CLyman hosts
// Keys will expire after 30 seconds, if we report another failure in this
// time then the service will be blacklisted
LoadingCache<String, Object> greylist = CacheBuilder.newBuilder()
.expireAfterAccess(greylistDuration, TimeUnit.SECONDS)
.maximumSize(50)
.weakKeys()
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) throws Exception {
return key;
}
});

// Loading Cache to hold redlisted CLyman hosts
// Keys will expire after 5 seconds, if we report another failure in this
// time then the service will be blacklisted
LoadingCache<String, Object> redlist = CacheBuilder.newBuilder()
.expireAfterAccess(redlistDuration, TimeUnit.SECONDS)
.maximumSize(40)
.weakKeys()
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) throws Exception {
return key;
}
});

/**
* Default empty ServiceManager constructor.
*/
public ServiceManager() {
super();
}

// Setup method to find and connect to an instance of a specified service name
private ServiceInstance findService(String serviceName) {
ServiceInstance returnService = null;
logger.info("Finding a new Service instance");
// Find an instance of CrazyIvan
List<ServiceInstance> serviceInstances =
consulClient.getInstances(serviceName);
if (serviceInstances != null) {
//Log if we find no service instances
if (serviceInstances.size() == 0) {
logger.error("No Service instances found");
}
// Find a service Instance
// Start by getting a random start index
int startIndex = utils.getRandomInt(serviceInstances.size());
for (int i = 0; i < serviceInstances.size(); i++) {
// Correct our compare index for the start index
int currentIndex = i + startIndex;
if (currentIndex >= serviceInstances.size()) {
currentIndex = currentIndex - serviceInstances.size();
}
// Pull the service instance, and the value from the blacklist
returnService = serviceInstances.get(currentIndex);
logger.debug("Found Service Instance: "
+ returnService.getUri().toString());
Object blacklistResp =
blacklist.getIfPresent(returnService.getUri().toString());
Object redlistResp =
redlist.getIfPresent(returnService.getUri().toString());
// We can go ahead and connect to the instance as long as it isn't
// on the blacklist
if (blacklistResp == null && redlistResp == null) {
return returnService;
} else {
logger.error("Invalid host found");
returnService = null;
}
}
} else {
logger.error("Unable to find Service instance");
}
return returnService;
}

/**
* Report a Service Failure.
* @param connectedInstance A ServiceInstance object with failed instance info
*/
@Override
public void reportFailure(ServiceInstance connectedInstance) {
// Is the current host already on the greylist?
Object cacheResp =
greylist.getIfPresent(connectedInstance.getUri().toString());
try {
if (cacheResp != null) {
// We have found an entry in the greylist, add the host to the blacklist
cacheResp = blacklist.get(connectedInstance.getUri().toString());
} else {
// We have no entry in the greylist, add the hostname to the greylist and redlist
cacheResp = greylist.get(connectedInstance.getUri().toString());
cacheResp = redlist.get(connectedInstance.getUri().toString());
}
} catch (Exception e) {
logger.error("Error reporting service failure");
logger.error(e.getMessage());
}
}

/**
* Find an instance of Crazy Ivan.
* @return A ServiceInstance object with the instance details found
*/
@Override
public ServiceInstance findCrazyIvan() {
// Actually try to send the message
try {
return findService("Ivan");
} catch (Exception e) {
logger.error("Error retrieving service: ", e);
}
return null;
}

/**
* Find an instance of CLyman.
* @return A ServiceInstance object with the instance details found
*/
@Override
public ServiceInstance findClyman() {
// Actually try to send the message
try {
return findService("clyman");
} catch (Exception e) {
logger.error("Error retrieving service: ", e);
}
return null;
}
}
43 changes: 43 additions & 0 deletions src/main/java/adrestia/dao/ServiceManagerInterface.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
Apache2 License Notice
Copyright 2017 Alex Barry
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package adrestia;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.stereotype.Service;

/**
* Finds Service Instances.
*/
@Service
public interface ServiceManagerInterface {

/**
* Find an instance of Crazy Ivan.
* @return A ServiceInstance object with the instance details found
*/
public ServiceInstance findCrazyIvan();

/**
* Find an instance of CLyman.
* @return A ServiceInstance object with the instance details found
*/
public ServiceInstance findClyman();

/**
* Report a Service Failure.
* @param connectedInstance A ServiceInstance object with failed instance info
*/
public void reportFailure(ServiceInstance connectedInstance);
}

0 comments on commit ba242f8

Please sign in to comment.