Skip to content

Commit

Permalink
[SCB-2035]Use vertx Json and Pojo model to replace own serizalization…
Browse files Browse the repository at this point in the history
… mechanism
  • Loading branch information
jungan21 authored and liubao68 committed Jul 10, 2020
1 parent 44ad6c0 commit 08676b5
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 269 deletions.
Expand Up @@ -40,8 +40,6 @@ public class ZeroConfigRegistration implements Registration {

private static final Logger LOGGER = LoggerFactory.getLogger(ZeroConfigRegistration.class);

public static ZeroConfigRegistration INSTANCE = new ZeroConfigRegistration();

private static final String NAME = "zero-config registration";

private ZeroConfigClient zeroConfigClient = ZeroConfigClient.INSTANCE;
Expand All @@ -54,8 +52,8 @@ public boolean enabled() {
@Override
public void init() {
zeroConfigClient.init();
ServerUtil.init();
ClientUtil.init();
ServerUtil.INSTANCE.init();
ClientUtil.INSTANCE.init();
}

@Override
Expand Down
Expand Up @@ -43,23 +43,12 @@ public interface ZeroConfigRegistryConstants {
String HEARTBEAT_EVENT = "heartbeat";

// Microservice & Instance Attributes
String APP_ID = "appId";
String SERVICE_NAME = "serviceName";
String VERSION = "version";
String SERVICE_ID = "serviceId";
String STATUS = "status";
String SCHEMA_IDS = "schemas";
String INSTANCE_ID = "instanceId";
String ENDPOINTS = "endpoints";
String HOST_NAME = "hostName";
String INSTANCE_HEARTBEAT_RESPONSE_MESSAGE_OK = "OK";

// others
String MAP_STRING_LEFT = "{";
String MAP_STRING_RIGHT = "}";
String MAP_ELEMENT_SPILITER = ",";
String MAP_KV_SPILITER = "=";
String LIST_STRING_SPLITER = "$";
String UUID_SPLITER = "-";
String SERVICE_ID_SPLITER = "/";
String ENDPOINT_PREFIX_REST = "rest";
Expand Down
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.servicecomb.zeroconfig.client;

import io.vertx.core.json.Json;
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstanceStatus;
Expand All @@ -28,11 +29,8 @@
import java.net.InetAddress;
import java.net.MulticastSocket;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.servicecomb.zeroconfig.ZeroConfigRegistryConstants.*;
Expand All @@ -41,86 +39,68 @@ public class ClientUtil {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientUtil.class);

private static ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
public static final ClientUtil INSTANCE = new ClientUtil();

private static Microservice microserviceSelf = new Microservice();
private ServerMicroserviceInstance serviceInstanceForHeartbeat;

private static Map<String, String> serviceInstanceMapForHeartbeat = null;
private MulticastSocket multicastSocket;
private InetAddress group;

private static MulticastSocket multicastSocket;

public static Microservice getMicroserviceSelf() {
return microserviceSelf;
public ServerMicroserviceInstance getServiceInstanceForHeartbeat() {
return serviceInstanceForHeartbeat;
}

public static void setMicroserviceSelf(Microservice microserviceSelf) {
ClientUtil.microserviceSelf = microserviceSelf;
public void setServiceInstanceForHeartbeat(
ServerMicroserviceInstance serviceInstanceForHeartbeat) {
this.serviceInstanceForHeartbeat = serviceInstanceForHeartbeat;
}

public static Map<String, String> getServiceInstanceMapForHeartbeat() {
return serviceInstanceMapForHeartbeat;
}
private ClientUtil(){}

public static void setServiceInstanceMapForHeartbeat(
Map<String, String> serviceInstanceMapForHeartbeat) {
ClientUtil.serviceInstanceMapForHeartbeat = serviceInstanceMapForHeartbeat;
}

public static synchronized void init() {
public synchronized void init() {
try {
group = InetAddress.getByName(GROUP);
multicastSocket = new MulticastSocket();
multicastSocket.setLoopbackMode(false);
multicastSocket.setTimeToLive(TIME_TO_LIVE);
} catch (IOException e) {
LOGGER.error("Failed to create MulticastSocket object", e);
}

Runnable heartbeatRunnable = new Runnable() {
@Override
public void run() {
if (serviceInstanceMapForHeartbeat != null && !serviceInstanceMapForHeartbeat.isEmpty()) {
// after first registration succeeds
try {
byte[] heartbeatEventDataBytes = serviceInstanceMapForHeartbeat.toString().getBytes();
DatagramPacket instanceDataPacket = new DatagramPacket(heartbeatEventDataBytes,
heartbeatEventDataBytes.length,
InetAddress.getByName(GROUP), PORT);

multicastSocket.send(instanceDataPacket);
} catch (Exception e) {
LOGGER.error("Failed to send heartbeat event for object: {}",
serviceInstanceMapForHeartbeat, e);
}
}
}
};
executor.scheduleAtFixedRate(heartbeatRunnable, CLIENT_DELAY, HEALTH_CHECK_INTERVAL,
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::runHeartbeatTask, CLIENT_DELAY, HEALTH_CHECK_INTERVAL,
TimeUnit.SECONDS);
}

public static Map<String, String> convertToRegisterDataModel(String serviceId,
String microserviceInstanceId,
private void runHeartbeatTask(){
if (serviceInstanceForHeartbeat != null) {
// after first registration succeeds
try {
byte[] heartbeatEventDataBytes = Json.encode(serviceInstanceForHeartbeat).getBytes();
DatagramPacket instanceDataPacket = new DatagramPacket(heartbeatEventDataBytes,
heartbeatEventDataBytes.length, group, PORT);

multicastSocket.send(instanceDataPacket);
} catch (Exception e) {
LOGGER.error("Failed to send heartbeat event for object: {}",
serviceInstanceForHeartbeat.toString(), e);
}
}
}

public static ServerMicroserviceInstance convertToRegisterDataModel(
MicroserviceInstance microserviceInstance, Microservice microservice) {
Map<String, String> serviceInstanceTextAttributesMap = new HashMap<>();

serviceInstanceTextAttributesMap.put(EVENT, REGISTER_EVENT);
serviceInstanceTextAttributesMap.put(VERSION, microservice.getVersion());
serviceInstanceTextAttributesMap.put(SERVICE_ID, serviceId);
serviceInstanceTextAttributesMap.put(INSTANCE_ID, microserviceInstanceId);
serviceInstanceTextAttributesMap.put(STATUS, microserviceInstance.getStatus().toString());
serviceInstanceTextAttributesMap.put(APP_ID, microservice.getAppId());
serviceInstanceTextAttributesMap.put(SERVICE_NAME, microservice.getServiceName());

String hostName = microserviceInstance.getHostName();
serviceInstanceTextAttributesMap.put(HOST_NAME, hostName);

// schema1$schema2
serviceInstanceTextAttributesMap
.put(ENDPOINTS, String.join(LIST_STRING_SPLITER, microserviceInstance.getEndpoints()));
serviceInstanceTextAttributesMap
.put(SCHEMA_IDS, String.join(LIST_STRING_SPLITER, microservice.getSchemas()));

return serviceInstanceTextAttributesMap;
ServerMicroserviceInstance instance = new ServerMicroserviceInstance();
instance.setEvent(REGISTER_EVENT);
instance.setVersion(microservice.getVersion());
instance.setServiceId(microservice.getServiceId());
instance.setInstanceId(microserviceInstance.getInstanceId());
instance.setStatus(microserviceInstance.getStatus().toString());
instance.setAppId(microservice.getAppId());
instance.setServiceName(microservice.getServiceName());
instance.setHostName(microserviceInstance.getHostName());
instance.setEndpoints(microserviceInstance.getEndpoints());
instance.setSchemas(microservice.getSchemas());
return instance;
}

public static MicroserviceInstance convertToClientMicroserviceInstance(
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.servicecomb.zeroconfig.client;

import com.google.common.annotations.VisibleForTesting;
import io.vertx.core.json.Json;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
Expand Down Expand Up @@ -114,28 +115,27 @@ private static ZeroConfigClient buildZeroConfigClient() {
}

public boolean register() {
Map<String, String> data = prepareRegisterData();
String serviceInstanceId = doRegister(data);
String serviceInstanceId = doRegister(ClientUtil.convertToRegisterDataModel(selfMicroserviceInstance, selfMicroservice));
return StringUtils.isNotEmpty(serviceInstanceId);
}

private String doRegister(Map<String, String> serviceInstanceDataMap) {
private String doRegister(ServerMicroserviceInstance instance) {
try {
byte[] instanceData = serviceInstanceDataMap.toString().getBytes(ENCODE);
byte[] instanceData = Json.encode(instance).getBytes(ENCODE);
DatagramPacket instanceDataPacket = new DatagramPacket(instanceData, instanceData.length,
InetAddress.getByName(GROUP), PORT);
this.multicastSocket.send(instanceDataPacket);

// set this variable for heartbeat itself status
serviceInstanceDataMap.put(EVENT, HEARTBEAT_EVENT);
ClientUtil.setServiceInstanceMapForHeartbeat(serviceInstanceDataMap);
instance.setEvent(HEARTBEAT_EVENT);
ClientUtil.INSTANCE.setServiceInstanceForHeartbeat(instance);
} catch (IOException e) {
LOGGER.error(
"Failed to Multicast Microservice Instance Registration Event in Zero-Config mode. servcieId: {} instanceId:{}",
serviceInstanceDataMap.get(SERVICE_ID), serviceInstanceDataMap.get(INSTANCE_ID), e);
instance.getServiceId(), instance.getInstanceId(), e);
return null;
}
return serviceInstanceDataMap.get(INSTANCE_ID);
return instance.getInstanceId();
}

public boolean unregister() {
Expand Down Expand Up @@ -203,19 +203,6 @@ public String getSchema(String microserviceId, String schemaId) {
}
}

private String getEndpointForMicroservice(String microserviceId) {
ServerMicroserviceInstance serverMicroserviceInstance = zeroConfigRegistryService
.getMicroservice(microserviceId);
LOGGER.info("Retrieve endpoint for serve rMicroservice Instance: {}",
serverMicroserviceInstance);
if (serverMicroserviceInstance != null && !serverMicroserviceInstance.getEndpoints()
.isEmpty()) {
return serverMicroserviceInstance.getEndpoints().get(0)
.replace(ENDPOINT_PREFIX_REST, ENDPOINT_PREFIX_HTTP);
}
return null;
}

public MicroserviceInstance findMicroserviceInstance(String serviceId, String instanceId) {
ServerMicroserviceInstance instance = this.zeroConfigRegistryService
.findServiceInstance(serviceId, instanceId);
Expand Down Expand Up @@ -282,12 +269,6 @@ private ServerMicroserviceInstance findLatestVersionInstance(
return latestVersionInstance;
}

private Map<String, String> prepareRegisterData() {
// Convert to Multicast data format
return ClientUtil.convertToRegisterDataModel(selfMicroservice.getServiceId(),
selfMicroserviceInstance.getInstanceId(), selfMicroserviceInstance, selfMicroservice);
}

private ServerMicroserviceInstance preUnregisterCheck() {
ServerMicroserviceInstance instance = zeroConfigRegistryService
.findServiceInstance(selfMicroserviceInstance.getServiceId(),
Expand Down
Expand Up @@ -22,6 +22,8 @@

public class ServerMicroserviceInstance {

private String event;

private String appId;

private String serviceName;
Expand Down Expand Up @@ -125,10 +127,19 @@ public void setSchemas(List<String> schemas) {
this.schemas = schemas;
}

public String getEvent() {
return event;
}

public void setEvent(String event) {
this.event = event;
}

@Override
public String toString() {
return "ServerMicroserviceInstance{" +
"appId='" + appId + '\'' +
"event='" + event + '\'' +
", appId='" + appId + '\'' +
", serviceName='" + serviceName + '\'' +
", version='" + version + '\'' +
", instanceId='" + instanceId + '\'' +
Expand All @@ -140,4 +151,5 @@ public String toString() {
", lastHeartbeatTimeStamp=" + lastHeartbeatTimeStamp +
'}';
}

}

0 comments on commit 08676b5

Please sign in to comment.