Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,11 @@ public void watch(String selfMicroserviceId, AsyncResultCallback<MicroserviceIns
ipPort.getHostOrIp(),
ipPort.getPort());
}, c -> {
LOGGER.warn(
"watching microservice {} connection is closed accidentally",
selfMicroserviceId);
watchErrorHandler(new ClientException("connection is closed accidentally"),
selfMicroserviceId,
callback);

onClose.success(null);
}, bodyBuffer -> {

MicroserviceInstanceChangedEvent response = null;
try {
response = JsonUtils.readValue(bodyBuffer.getBytes(),
Expand All @@ -514,20 +509,9 @@ public void watch(String selfMicroserviceId, AsyncResultCallback<MicroserviceIns
e);
}
}, e -> {
LOGGER.error(
"watcher read microservice {} message from service center failed,"
+ " {}",
selfMicroserviceId,
e.getMessage());
watchErrorHandler(e, selfMicroserviceId, callback);
onClose.success(null);
}, f -> {

if (!watchServices.containsKey(selfMicroserviceId)) {
return;
}
LOGGER.error(
"watcher connect to service center server failed, microservice {}, {}",
selfMicroserviceId,
f.getMessage());
watchErrorHandler(f, selfMicroserviceId, callback);
});
}
Expand Down Expand Up @@ -575,6 +559,10 @@ public List<MicroserviceInstance> findServiceInstance(String consumerId, String

private void watchErrorHandler(Throwable e, String selfMicroserviceId,
AsyncResultCallback<MicroserviceInstanceChangedEvent> callback) {
LOGGER.error(
"watcher connect to service center server failed, microservice {}, {}",
selfMicroserviceId,
e.getMessage());
callback.fail(e);
watchServices.remove(selfMicroserviceId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public HttpClientOptions createHttpClientOptions() {
HttpClientOptions httpClientOptions = new HttpClientOptions();
httpClientOptions.setProtocolVersion(ver);
httpClientOptions.setConnectTimeout(ServiceRegistryConfig.INSTANCE.getConnectionTimeout());
// 不能设置IDLE后,会自动断开ws连接
// httpClientOptions.setIdleTimeout(ServiceRegistryConfig.INSTANCE.getIdleConnectionTimeout());
// idl timeout in seconds. we add 30 seconds for websocket idle timeout.
httpClientOptions.setIdleTimeout(ServiceRegistryConfig.INSTANCE.getIdleConnectionTimeout() + 30);
if (ver == HttpVersion.HTTP_2) {
LOGGER.debug("service center ws client protocol version is HTTP/2");
httpClientOptions.setHttp2ClearTextUpgrade(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

import io.servicecomb.foundation.common.net.IpPort;
import io.servicecomb.foundation.vertx.client.http.HttpClientWithContext;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Created by on 2017/4/28.
*/
public final class WebsocketUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketUtils.class);

private WebsocketUtils() {
}

Expand All @@ -41,13 +44,20 @@ public static void open(IpPort ipPort, String url, Handler<Void> onOpen, Handler
ws -> {
onOpen.handle(null);

ws.exceptionHandler(onException);

ws.exceptionHandler(v -> {
onException.handle(v);
try {
ws.close();
} catch (Exception err) {
LOGGER.error("ws close error.", err);
}
});
ws.closeHandler(v -> {
onClose.handle(v);
try {
ws.close();
} catch (Exception err) {
LOGGER.error("ws close error.", err);
}
});
ws.handler(onMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public final class ServiceRegistryConfig {

private static final int DEFAULT_TIMEOUT_IN_MS = 30000;

private static final int DEFAULT_TIMEOUT_IN_SECONDS = 30;

private static final int DEFAULT_REQUEST_TIMEOUT_IN_MS = 30000;

private static final int DEFAULT_CHECK_INTERVAL_IN_S = 30;
Expand Down Expand Up @@ -126,9 +128,9 @@ public int getConnectionTimeout() {
public int getIdleConnectionTimeout() {
DynamicIntProperty property =
DynamicPropertyFactory.getInstance()
.getIntProperty("cse.service.registry.client.timeout.idle", DEFAULT_TIMEOUT_IN_MS);
.getIntProperty("cse.service.registry.client.timeout.idle", DEFAULT_TIMEOUT_IN_SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed from millisecond to second
make sure that nobody used this config.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this config is not documented yet, we will add it to wiki when neccesary(now no need to give this. because 30 seconds is the best and not neccessary to change it .

int timeout = property.get();
return timeout < TimeUnit.SECONDS.toMillis(1) ? DEFAULT_TIMEOUT_IN_MS : timeout;
return timeout < 1 ? DEFAULT_TIMEOUT_IN_SECONDS : timeout;
}

public int getRequestTimeout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.servicecomb.serviceregistry.task;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

import io.servicecomb.serviceregistry.api.registry.Microservice;
import io.servicecomb.serviceregistry.client.ServiceRegistryClient;

Expand All @@ -28,25 +26,20 @@ public AbstractRegisterTask(EventBus eventBus, ServiceRegistryClient srClient, M
super(eventBus, srClient, microservice);
}

@Subscribe
public void onHeartbeatEvent(MicroserviceInstanceHeartbeatTask task) {
if (task.isNeedRegisterInstance() && isSameMicroservice(task.getMicroservice())) {
this.registered = false;
}
}

public boolean isRegistered() {
return registered;
}

@Override
public void run() {
public void doRun() {
if (registered) {
return;
}

registered = doRegister();
eventBus.post(this);
if(doRegister()) {
registered = true;
taskStatus = TaskStatus.FINISHED;
}
}

protected abstract boolean doRegister();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import io.servicecomb.serviceregistry.client.ServiceRegistryClient;

public abstract class AbstractTask implements Runnable {
protected TaskStatus taskStatus = TaskStatus.INIT;

protected EventBus eventBus;

protected ServiceRegistryClient srClient;
Expand All @@ -43,6 +45,17 @@ public Microservice getMicroservice() {
return microservice;
}

@Override
public void run() {
if(taskStatus == TaskStatus.READY) {
// if this task is actually run, we send a notification
doRun();
eventBus.post(this);
}
}

abstract protected void doRun();

protected boolean isSameMicroservice(Microservice otherMicroservice) {
return microservice.getServiceName().equals(otherMicroservice.getServiceName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@
*/
package io.servicecomb.serviceregistry.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

import io.servicecomb.serviceregistry.api.registry.Microservice;
import io.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
import io.servicecomb.serviceregistry.api.response.HeartbeatResponse;
import io.servicecomb.serviceregistry.client.ServiceRegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MicroserviceInstanceHeartbeatTask extends AbstractTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceInstanceHeartbeatTask.class);
Expand All @@ -43,9 +41,9 @@ public MicroserviceInstanceHeartbeatTask(EventBus eventBus, ServiceRegistryClien
}

@Subscribe
public void onRegisterInstance(MicroserviceInstanceRegisterTask task) {
if (task.isRegistered() && isSameMicroservice(task.getMicroservice())) {
instanceRegistered = true;
public void onMicroserviceWatchTask(MicroserviceWatchTask task) {
if (task.taskStatus == TaskStatus.READY && isSameMicroservice(task.getMicroservice())) {
this.taskStatus = TaskStatus.READY;
}
}

Expand All @@ -59,24 +57,18 @@ public boolean isNeedRegisterInstance() {
}

@Override
public void run() {
if (!instanceRegistered) {
return;
}

public void doRun() {
// will always run heartbeat when it is ready
heartbeatResult = heartbeat();
eventBus.post(this);
}

private HeartbeatResult heartbeat() {
HeartbeatResponse response =
srClient.heartbeat(microserviceInstance.getServiceId(), microserviceInstance.getInstanceId());
if (response == null) {
// if (!needToWatch()) {
// // TODO:不需要watch,为何要抛异常?
// exception(new ClientException("could not connect to service center"));
// }

LOGGER.error("Disconnected from service center and heartbeat failed for microservice instance={}/{}",
microserviceInstance.getServiceId(),
microserviceInstance.getInstanceId());
return HeartbeatResult.DISCONNECTED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
*/
package io.servicecomb.serviceregistry.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import com.google.common.eventbus.EventBus;

import com.google.common.eventbus.Subscribe;
import io.servicecomb.serviceregistry.RegistryUtils;
import io.servicecomb.serviceregistry.api.registry.Microservice;
import io.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
import io.servicecomb.serviceregistry.client.ServiceRegistryClient;
import io.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import javafx.concurrent.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

public class MicroserviceInstanceRegisterTask extends AbstractRegisterTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceInstanceRegisterTask.class);
Expand All @@ -43,18 +43,19 @@ public MicroserviceInstanceRegisterTask(EventBus eventBus, ServiceRegistryConfig
this.microserviceInstance = microservice.getIntance();
}

@Override
public void run() {
if (StringUtils.isEmpty(microservice.getServiceId())) {
// can not register instance now.
return;
@Subscribe
public void onMicroserviceRegisterTask(MicroserviceRegisterTask task) {
if (task.taskStatus == TaskStatus.FINISHED && isSameMicroservice(task.getMicroservice())) {
this.taskStatus = TaskStatus.READY;
this.registered = false;
} else {
this.taskStatus = TaskStatus.INIT;
}

super.run();
}

@Override
protected boolean doRegister() {
LOGGER.info("running microservice instance register task.");
String hostName = "";
if (serviceRegistryConfig.isPreferIpAddress()) {
hostName = RegistryUtils.getPublishAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.Map.Entry;
import java.util.Set;

import com.google.common.eventbus.Subscribe;
import javafx.concurrent.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
Expand All @@ -35,14 +37,23 @@ public class MicroserviceRegisterTask extends AbstractRegisterTask {

public MicroserviceRegisterTask(EventBus eventBus, ServiceRegistryClient srClient, Microservice microservice) {
super(eventBus, srClient, microservice);
this.taskStatus = TaskStatus.READY;
}

public boolean isSchemaIdSetMatch() {
return schemaIdSetMatch;
}

@Subscribe
public void onMicroserviceInstanceHeartbeatTask(MicroserviceInstanceHeartbeatTask task) {
if (task.getHeartbeatResult() != HeartbeatResult.SUCCESS && isSameMicroservice(task.getMicroservice())) {
LOGGER.info("read MicroserviceInstanceHeartbeatTask status is {}", task.taskStatus);
this.taskStatus = TaskStatus.READY;
this.registered = false;
}
}
@Override
protected boolean doRegister() {
LOGGER.info("running microservice register task.");
String serviceId = srClient.getMicroserviceId(microservice.getAppId(),
microservice.getServiceName(),
microservice.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
*/
package io.servicecomb.serviceregistry.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

import com.google.common.eventbus.EventBus;

import com.google.common.eventbus.Subscribe;
import io.servicecomb.serviceregistry.api.Const;
import io.servicecomb.serviceregistry.api.registry.Microservice;
import io.servicecomb.serviceregistry.api.response.MicroserviceInstanceChangedEvent;
import io.servicecomb.serviceregistry.client.ServiceRegistryClient;
import io.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import io.servicecomb.serviceregistry.task.event.ExceptionEvent;
import io.servicecomb.serviceregistry.task.event.RecoveryEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MicroserviceWatchTask extends AbstractTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MicroserviceWatchTask.class);
Expand All @@ -40,13 +38,17 @@ public MicroserviceWatchTask(EventBus eventBus, ServiceRegistryConfig serviceReg
this.serviceRegistryConfig = serviceRegistryConfig;
}

public void run() {
if (!needToWatch()) {
return;
@Subscribe
public void onMicroserviceInstanceRegisterTask(MicroserviceInstanceRegisterTask task) {
if (task.taskStatus == TaskStatus.FINISHED && isSameMicroservice(task.getMicroservice())) {
this.taskStatus = TaskStatus.READY;
}
}

if (StringUtils.isEmpty(microservice.getServiceId())) {
// can not watch now.
@Override
public void doRun() {
// will always run watch when it is ready
if (!needToWatch()) {
return;
}

Expand Down
Loading