Skip to content

Commit

Permalink
Merge pull request #1 from AO-StreetArt/dev
Browse files Browse the repository at this point in the history
Generalize methods in DvsManager
  • Loading branch information
AO-StreetArt committed Oct 6, 2017
2 parents 5b68f5d + 26302ef commit 56f9c26
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 66 deletions.
7 changes: 6 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ buildscript {
}
}

apply plugin: 'java'
plugins {
id 'java'
id 'net.ltgt.errorprone' version '0.0.11'
}

apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'
Expand Down Expand Up @@ -47,6 +51,7 @@ dependencies {
compile('com.fasterxml.jackson.core:jackson-databind')
compile('com.fasterxml.jackson.dataformat:jackson-dataformat-yaml')
compile('com.fasterxml.jackson.core:jackson-annotations')
errorprone 'com.google.errorprone:error_prone_core:2.1.1'
testCompile('org.springframework.boot:spring-boot-starter-test')
testCompile("junit:junit")
}
255 changes: 190 additions & 65 deletions src/main/java/adrestia/DvsManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@
*/
@Component
public class DvsManager {
// List of Blacklisted services
// Constant integers so that we can unify internal method calls for
// CLyman and CrazyIvan
private static int CLYMAN_TYPE = 0;
private static int IVAN_TYPE = 1;
// Consul Client for executing Service Discovery
@Autowired
org.springframework.cloud.client.discovery.DiscoveryClient consul_client;
Expand Down Expand Up @@ -91,7 +94,35 @@ public class DvsManager {
// DVS Manager Logger
private static final Logger logger = LogManager.getLogger("adrestia.DvsManager");

// Loading cache to hold blacklisted hosts
// Loading cache to hold blacklisted CLyman hosts
// Keys will expire after 5 minutes, at which point Consul should be able
// to determine if the service is active or inactive.
LoadingCache<String, Object> clyman_host_blacklist = CacheBuilder.newBuilder()
.expireAfterAccess(300, TimeUnit.SECONDS)
.maximumSize(50)
.weakKeys()
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) throws Exception {
return "new-value-loaded-" + key;
}
});

// Loading Cache to hold greylisted CLyman hosts
// Keys will expire after 30 seconds, if we report another failure in this
// time then the service will be blacklisted
LoadingCache<String, Object> clyman_host_greylist = CacheBuilder.newBuilder()
.expireAfterAccess(30, TimeUnit.SECONDS)
.maximumSize(30)
.weakKeys()
.build(new CacheLoader<String, Object>() {
@Override
public Object load(String key) throws Exception {
return "new-value-loaded-" + key;
}
});

// Loading cache to hold blacklisted CrazyIvan hosts
// Keys will expire after 5 minutes, at which point Consul should be able
// to determine if the service is active or inactive.
LoadingCache<String, Object> ivan_host_blacklist = CacheBuilder.newBuilder()
Expand All @@ -105,8 +136,9 @@ public Object load(String key) throws Exception {
}
});

// Loading Cache to hold greylisted hosts
// Keys will expire after 30 seconds, at which point we try again
// Loading Cache to hold greylisted CrazyIvan hosts
// Keys will expire after 30 seconds, if we report another failure in this
// time then the service will be blacklisted
LoadingCache<String, Object> ivan_host_greylist = CacheBuilder.newBuilder()
.expireAfterAccess(30, TimeUnit.SECONDS)
.maximumSize(30)
Expand All @@ -125,65 +157,119 @@ public DvsManager() {
super();
}

// Destroy the Scene Controller
@PreDestroy
public void destroy() {
if (crazyIvanSocket != null) {
context.destroySocket(crazyIvanSocket);
// Check if a service is active
private static boolean is_socket_active(int service_type) {
if (service_type == IVAN_TYPE) {
if (crazyIvanSocket != null) {
return true;
}
return false;
}
if (clymanSocket != null) {
context.destroySocket(crazyIvanSocket);
else if (service_type == CLYMAN_TYPE) {
if (clymanSocket != null) {
return true;
}
return false;
}
context.destroy();
return false;
}

private boolean ivan_socket_active() {
if (crazyIvanSocket != null) {
return true;
// Get the active socket for a service
private static ZMQ.Socket get_socket(int service_type) {
if (service_type == IVAN_TYPE) {
return crazyIvanSocket;
}
return false;
else if (service_type == CLYMAN_TYPE) {
return clymanSocket;
}
return null;
}

// Destroy the active Crazy Ivan Socket
private void destroy_ivan_socket() {
if (crazyIvanSocket != null) {
poller.unregister(crazyIvanSocket);
context.destroySocket(crazyIvanSocket);
crazyIvanSocket = null;
// Destroy the active socket for a service
private void destroy_socket(int service_type) {
if (service_type == IVAN_TYPE) {
if (crazyIvanSocket != null) {
poller.unregister(crazyIvanSocket);
context.destroySocket(crazyIvanSocket);
crazyIvanSocket = null;
}
} else if (service_type == CLYMAN_TYPE) {
if (clymanSocket != null) {
poller.unregister(clymanSocket);
context.destroySocket(clymanSocket);
clymanSocket = null;
}
}
}

// Reset the active socket for a service
private void reset_socket(int service_type) {
// First, destroy the socket
destroy_socket(service_type);
// Then, create a new socket
if (service_type == IVAN_TYPE) {
crazyIvanSocket = context.createSocket(ZMQ.REQ);
poller.register(crazyIvanSocket, ZPoller.POLLIN);
} else if (service_type == CLYMAN_TYPE) {
clymanSocket = context.createSocket(ZMQ.REQ);
poller.register(clymanSocket, ZPoller.POLLIN);
}
}

// Reset the active Crazy Ivan Socket
private void reset_ivan_socket() {
destroy_ivan_socket();
crazyIvanSocket = context.createSocket(ZMQ.REQ);
poller.register(crazyIvanSocket, ZPoller.POLLIN);
// Destroy the DvsManager
@PreDestroy
public void destroy() {
destroy_socket(IVAN_TYPE);
destroy_socket(CLYMAN_TYPE);
context.destroy();
}

// Connect to the current Crazy Ivan Instance
private void connect_to_ivan_socket() {
String crazyIvanUriString = crazyIvanInstance.getUri().toString();
int port_seperator_index = crazyIvanUriString.lastIndexOf(":");
String crazyIvanHostName = crazyIvanUriString.substring(7, port_seperator_index);
String crazyIvanPortStr = crazyIvanUriString.substring(port_seperator_index + 1, crazyIvanUriString.length());
String zmq_addr = String.format("tcp://%s:%s", crazyIvanHostName, crazyIvanPortStr);
// Connect to the current socket for a service
private void connect_to_socket(int service_type) {
// Pull the URL String
String UriString = null;
if (service_type == IVAN_TYPE) {
UriString = crazyIvanInstance.getUri().toString();
} else if (service_type == CLYMAN_TYPE) {
UriString = clymanInstance.getUri().toString();
}
// Parse the URL String
int port_seperator_index = UriString.lastIndexOf(":");
String hostName = UriString.substring(7, port_seperator_index);
String portStr = UriString.substring(port_seperator_index + 1, UriString.length());
String zmq_addr = String.format("tcp://%s:%s", hostName, portStr);
logger.info("Connecting to server: " + zmq_addr);
crazyIvanSocket.connect(zmq_addr);
// Connect to the service
get_socket(service_type).connect(zmq_addr);
}

private void report_ivan_failure() {
// Report a failure of a service
private void report_failure(int service_type) {
// Is the current host already on the greylist?
Object cache_resp = ivan_host_greylist.getIfPresent(crazyIvanInstance.getUri().toString());
Object cache_resp = null;
if (service_type == CLYMAN_TYPE) {
cache_resp = clyman_host_greylist.getIfPresent(clymanInstance.getUri().toString());
} else if (service_type == IVAN_TYPE) {
cache_resp = ivan_host_greylist.getIfPresent(crazyIvanInstance.getUri().toString());
}
// Grab the mutex so we ensure we operate atomically on connections
try {
// Eliminate the socket
destroy_ivan_socket();
destroy_socket(service_type);
if (cache_resp != null) {
// We have found an entry in the greylist, add the host to the blacklist
ivan_host_blacklist.get(crazyIvanInstance.getUri().toString());
if (service_type == CLYMAN_TYPE) {
cache_resp = clyman_host_blacklist.get(clymanInstance.getUri().toString());
} else if (service_type == IVAN_TYPE) {
cache_resp = ivan_host_blacklist.get(crazyIvanInstance.getUri().toString());
}
} else {
// We have no entry in the greylist, add the hostname to the greylist
ivan_host_greylist.get(crazyIvanInstance.getUri().toString());
if (service_type == CLYMAN_TYPE) {
cache_resp = clyman_host_greylist.get(clymanInstance.getUri().toString());
} else if (service_type == IVAN_TYPE) {
cache_resp = ivan_host_greylist.get(crazyIvanInstance.getUri().toString());
}
}
} catch (Exception e) {
logger.error("Error Resetting Crazy Ivan connection");
Expand All @@ -192,59 +278,77 @@ private void report_ivan_failure() {
}

// Setup method to find and connect to an instance of Crazy Ivan
private void find_ivan() {
private void find_service(int service_type) {
logger.info("Finding a new Crazy Ivan instance");
// Find an instance of CrazyIvan
List<org.springframework.cloud.client.ServiceInstance> serviceInstances =
consul_client.getInstances("Ivan");
List<org.springframework.cloud.client.ServiceInstance> serviceInstances = null;
if (service_type == IVAN_TYPE) {
serviceInstances = consul_client.getInstances("Ivan");
} else if (service_type == CLYMAN_TYPE) {
serviceInstances = consul_client.getInstances("Clyman");
}
if (serviceInstances != null ) {
//Log if we find no service instances
if (serviceInstances.size() == 0) {
logger.error("No Crazy Ivan instances found");
logger.error("No Service instances found");
}
// Find a service Instance not on the blacklist
for (int i = 0; i < serviceInstances.size(); i++) {
crazyIvanInstance = serviceInstances.get(i);
logger.debug("Found Crazy Ivan Instance: " + crazyIvanInstance.getUri().toString());
Object cache_resp = ivan_host_blacklist.getIfPresent(crazyIvanInstance.getUri().toString());
Object cache_resp = null;
// Pull the service instance from the list, and the value from the greylist
if (service_type == IVAN_TYPE) {
crazyIvanInstance = serviceInstances.get(i);
logger.debug("Found Crazy Ivan Instance: " + crazyIvanInstance.getUri().toString());
cache_resp = ivan_host_blacklist.getIfPresent(crazyIvanInstance.getUri().toString());
} else if (service_type == CLYMAN_TYPE) {
clymanInstance = serviceInstances.get(i);
logger.debug("Found CLyman Instance: " + clymanInstance.getUri().toString());
cache_resp = clyman_host_blacklist.getIfPresent(clymanInstance.getUri().toString());
}
// We can go ahead and connect to the instance as long as it isn't
// on the blacklist
if (cache_resp == null) {
try {
// Crazy Ivan ZMQ Context & Socket
// Close any existing socket before creating a new one
reset_ivan_socket();
reset_socket(service_type);

// Connect to the new socket
// First we need to format the address from Consul. We also assume tcp
// Communications between this class and Crazy Ivan
connect_to_ivan_socket();
connect_to_socket(service_type);
} catch (Exception e) {
logger.error("Error connecting to Crazy Ivan instance");
logger.error(e.getMessage());
report_ivan_failure();
report_failure(service_type);
}
// Exit the loop
break;
} else {
logger.error("Returned host found in blacklist");
crazyIvanInstance = null;
if (service_type == IVAN_TYPE) {
crazyIvanInstance = null;
} else if (service_type == CLYMAN_TYPE) {
clymanInstance = null;
}
}
}
} else {
logger.error("Unable to find Crazy Ivan instance");
logger.error("Unable to find Service instance");
}
}

private String send_to_ivan_recursive(String msg, int timeout, int retries) {
private String send_msg_recursive(String msg, int timeout, int retries, int service_type) {
logger.info("Attempting to send message to Crazy Ivan");
// Find a Crazy Ivan instance, if necessary
if (crazyIvanSocket == null) {
find_ivan();
if (!is_socket_active(service_type)) {
find_service(service_type);
}
// Response Processing
int retriesLeft = retries;
while (retriesLeft > 0 && !Thread.currentThread().isInterrupted() && crazyIvanSocket != null) {
while (retriesLeft > 0 && !Thread.currentThread().isInterrupted() && is_socket_active(service_type)) {
// We send a request, then we work to get a reply
crazyIvanSocket.send(msg.getBytes(ZMQ.CHARSET), 0);
get_socket(service_type).send(msg.getBytes(ZMQ.CHARSET), 0);

// We are going to use a poller with a timeout to get the value
// Pattern from ZMQ Guide - Lazy Pirate Client
Expand All @@ -259,22 +363,22 @@ private String send_to_ivan_recursive(String msg, int timeout, int retries) {
// reply is valid. If we didn't get a reply we close the client
// socket and resend the request. We try a number of times
// before finally abandoning:
if (poller.isReadable(crazyIvanSocket)) {
if (poller.isReadable(get_socket(service_type))) {
// We got a reply from the server
return crazyIvanSocket.recvStr();
return get_socket(service_type).recvStr();
} else if (--retriesLeft == 0) {
logger.error("Reporting Crazy Ivan Failure");
report_ivan_failure();
report_failure(service_type);
// Keep trying to send the message until we succeed or run out of
// Crazy Ivan instances
return send_to_ivan_recursive(msg, timeout, retries);
return send_msg_recursive(msg, timeout, retries, service_type);
} else {
logger.warn("No response from server, retrying");
// Old socket is confused; close it and open a new one
reset_ivan_socket();
connect_to_ivan_socket();
reset_socket(service_type);
connect_to_socket(service_type);
// Send request again, on new socket
crazyIvanSocket.send(msg.getBytes(ZMQ.CHARSET), 0);
get_socket(service_type).send(msg.getBytes(ZMQ.CHARSET), 0);
}
if (rc < 0) {break;}
}
Expand All @@ -293,7 +397,7 @@ public String send_to_ivan(String msg, int timeout, int retries) {
}
// Actually try to send the message
try {
return send_to_ivan_recursive(msg, timeout, retries);
return send_msg_recursive(msg, timeout, retries, IVAN_TYPE);
} catch (Exception e) {
logger.error("Error Sending message to Crazy Ivan: ", e);
} finally {
Expand All @@ -303,4 +407,25 @@ public String send_to_ivan(String msg, int timeout, int retries) {
return null;
}

// Send a message to CLyman, return the response
public String send_to_clyman(String msg, int timeout, int retries) {
// Grab the mutex so we ensure we operate atomically on connections
try {
clymanMutex.acquire();
} catch (InterruptedException e) {
logger.error("Error Establishing Mutex Lock on ZMQ Socket");
logger.error(e.getMessage());
}
// Actually try to send the message
try {
return send_msg_recursive(msg, timeout, retries, CLYMAN_TYPE);
} catch (Exception e) {
logger.error("Error Sending message to CLyman: ", e);
} finally {
// Release the mutex
clymanMutex.release();
}
return null;
}

}

0 comments on commit 56f9c26

Please sign in to comment.