Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import io.servicecomb.serviceregistry.client.IpPortManager;
import io.servicecomb.serviceregistry.client.ServiceRegistryClient;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientResponse;

public final class ServiceRegistryClientImpl implements ServiceRegistryClient {
Expand Down Expand Up @@ -122,6 +123,36 @@ private <T> Handler<RestResponse> syncHandler(CountDownLatch countDownLatch, Cla
};
}

static class ResponseWrapper {
HttpClientResponse response;

Buffer bodyBuffer;
}

// temporary copy from syncHandler
// we will use swagger invocation to replace RestUtils later.
private Handler<RestResponse> syncHandlerEx(CountDownLatch countDownLatch, Holder<ResponseWrapper> holder) {
return restResponse -> {
RequestContext requestContext = restResponse.getRequestContext();
HttpClientResponse response = restResponse.getResponse();
if (response == null) {
// invoke failed, call another SC instance
if (!retry(requestContext, syncHandlerEx(countDownLatch, holder))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the service center doesn't send the response back.
Do we have timeout mechanism for CountDownLatch?
BTW, is it enough if we just retry once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

httpClientRequest.setTimeout to control timeout.
if no response, then timeout and callback with new RestResponse(requestContext, null),that means response is null
retry just copy from old code, infact maybe no need to retry, because registerSchema failed means registerMicroservice failed, and will auto retry.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need the retry here, we can consider to remove this part of code.

countDownLatch.countDown();
}
return;
}

response.bodyHandler(bodyBuffer -> {
ResponseWrapper responseWrapper = new ResponseWrapper();
responseWrapper.response = response;
responseWrapper.bodyBuffer = bodyBuffer;
holder.value = responseWrapper;
countDownLatch.countDown();
});
};
}

@Override
public List<Microservice> getAllMicroservices() {
Holder<GetAllServicesResponse> holder = new Holder<>();
Expand Down Expand Up @@ -196,7 +227,7 @@ public boolean isSchemaExist(String microserviceId, String schemaId) {

@Override
public boolean registerSchema(String microserviceId, String schemaId, String schemaContent) {
Holder<HttpClientResponse> holder = new Holder<>();
Holder<ResponseWrapper> holder = new Holder<>();
IpPort ipPort = ipPortManager.get();

try {
Expand All @@ -208,22 +239,30 @@ public boolean registerSchema(String microserviceId, String schemaId, String sch
RestUtils.put(ipPort,
MS_API_PATH + MICROSERVICE_PATH + "/" + microserviceId + SCHEMA_PATH + "/" + schemaId,
new RequestParam().setBody(body),
syncHandler(countDownLatch, HttpClientResponse.class, holder));
syncHandlerEx(countDownLatch, holder));
countDownLatch.await();

boolean result = false;
if (holder.value != null) {
result = holder.value.statusCode() == Status.OK.getStatusCode();
if (holder.value == null) {
LOGGER.error("Register schema {}/{} failed.", microserviceId, schemaId);
return false;
}

LOGGER.info("register schema {}/{}, result {}",
microserviceId,
schemaId,
result);
if (!Status.Family.SUCCESSFUL.equals(Status.Family.familyOf(holder.value.response.statusCode()))) {
LOGGER.error("Register schema {}/{} failed, statusCode: {}, statusMessage: {}, description: {}.",
microserviceId,
schemaId,
holder.value.response.statusCode(),
holder.value.response.statusMessage(),
holder.value.bodyBuffer.toString());
return false;
}

return result;
LOGGER.info("register schema {}/{} success.",
microserviceId,
schemaId);
return true;
} catch (Exception e) {
LOGGER.error("register schema {}/{} fail",
LOGGER.error("register schema {}/{} fail.",
microserviceId,
schemaId,
e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,31 @@

import static org.hamcrest.core.Is.is;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import javax.xml.ws.Holder;

import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import io.servicecomb.serviceregistry.api.registry.Microservice;
import io.servicecomb.serviceregistry.api.registry.MicroserviceFactory;
import io.servicecomb.serviceregistry.client.ClientException;
import io.servicecomb.serviceregistry.client.IpPortManager;
import io.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl.ResponseWrapper;
import io.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpVersion;
import mockit.Deencapsulation;
import mockit.Mock;
Expand All @@ -58,7 +69,6 @@ void httpDo(RequestContext requestContext, Handler<RestResponse> responseHandler
@Mock
public void await() throws InterruptedException {
}

};
}

Expand Down Expand Up @@ -117,4 +127,125 @@ public void testException() {

Assert.assertEquals("a", new ClientException("a").getMessage());
}

static abstract class RegisterSchemaTester {
void run() {
Logger rootLogger = Logger.getRootLogger();

List<LoggingEvent> events = new ArrayList<>();
Appender appender = new MockUp<Appender>() {
@Mock
public void doAppend(LoggingEvent event) {
events.add(event);
}
}.getMockInstance();
rootLogger.addAppender(appender);

doRun(events);

rootLogger.removeAppender(appender);
}

abstract void doRun(List<LoggingEvent> events);
}

@Test
public void testRegisterSchemaNoResponse() {
new RegisterSchemaTester() {
void doRun(java.util.List<LoggingEvent> events) {
oClient.registerSchema("msid", "schemaId", "content");
Assert.assertEquals("Register schema msid/schemaId failed.", events.get(0).getMessage());
};
}.run();
}

@Test
public void testRegisterSchemaException() {
InterruptedException e = new InterruptedException();
new MockUp<CountDownLatch>() {
@Mock
public void await() throws InterruptedException {
throw e;
}
};

new RegisterSchemaTester() {
void doRun(java.util.List<LoggingEvent> events) {
oClient.registerSchema("msid", "schemaId", "content");
Assert.assertEquals(
"register schema msid/schemaId fail.",
events.get(0).getMessage());
Assert.assertEquals(e, events.get(0).getThrowableInformation().getThrowable());
};
}.run();
}

@Test
public void testRegisterSchemaErrorResponse() {
new MockUp<ServiceRegistryClientImpl>() {
@Mock
Handler<RestResponse> syncHandlerEx(CountDownLatch countDownLatch, Holder<ResponseWrapper> holder) {
return restResponse -> {
HttpClientResponse response = Mockito.mock(HttpClientResponse.class);
Mockito.when(response.statusCode()).thenReturn(400);
Mockito.when(response.statusMessage()).thenReturn("client error");

Buffer bodyBuffer = Buffer.buffer();
bodyBuffer.appendString("too big");

ResponseWrapper responseWrapper = new ResponseWrapper();
responseWrapper.response = response;
responseWrapper.bodyBuffer = bodyBuffer;
holder.value = responseWrapper;
};
}
};
new MockUp<RestUtils>() {
@Mock
void httpDo(RequestContext requestContext, Handler<RestResponse> responseHandler) {
responseHandler.handle(null);
}
};

new RegisterSchemaTester() {
void doRun(java.util.List<LoggingEvent> events) {
oClient.registerSchema("msid", "schemaId", "content");
Assert.assertEquals(
"Register schema msid/schemaId failed, statusCode: 400, statusMessage: client error, description: too big.",
events.get(0).getMessage());
};
}.run();
}

@Test
public void testRegisterSchemaSuccess() {
new MockUp<ServiceRegistryClientImpl>() {
@Mock
Handler<RestResponse> syncHandlerEx(CountDownLatch countDownLatch, Holder<ResponseWrapper> holder) {
return restResponse -> {
HttpClientResponse response = Mockito.mock(HttpClientResponse.class);
Mockito.when(response.statusCode()).thenReturn(200);

ResponseWrapper responseWrapper = new ResponseWrapper();
responseWrapper.response = response;
holder.value = responseWrapper;
};
}
};
new MockUp<RestUtils>() {
@Mock
void httpDo(RequestContext requestContext, Handler<RestResponse> responseHandler) {
responseHandler.handle(null);
}
};

new RegisterSchemaTester() {
void doRun(java.util.List<LoggingEvent> events) {
oClient.registerSchema("msid", "schemaId", "content");
Assert.assertEquals(
"register schema msid/schemaId success.",
events.get(0).getMessage());
};
}.run();
}
}