Skip to content

Commit

Permalink
[SCB-2244] change DiscoveryEndpoint to make mock multiple instances p…
Browse files Browse the repository at this point in the history
…ossible when test zero config performance (#2323)
  • Loading branch information
wujimin committed Mar 27, 2021
1 parent e608843 commit ad4aeb9
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 32 deletions.
Expand Up @@ -45,7 +45,7 @@
@Component
public class InvocationTimeoutBootListener implements BootListener {
private static final Logger LOGGER = LoggerFactory.getLogger(InvocationTimeoutBootListener.class);

public static final String ENABLE_TIMEOUT_CHECK = "servicecomb.invocation.enableTimeoutCheck";

public static boolean timeoutCheckEnabled() {
Expand All @@ -54,7 +54,7 @@ public static boolean timeoutCheckEnabled() {
}

@Override
public void onAfterRegistry(BootEvent event) {
public void onAfterTransport(BootEvent event) {
if (timeoutCheckEnabled()) {
EventManager.getEventBus().register(this);
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;

import org.apache.servicecomb.core.Endpoint;
Expand All @@ -37,37 +38,40 @@ public interface DiscoveryClient {
@Path("/info")
@GET
@ApiOperation(value = "", nickname = "getInfo")
CompletableFuture<MicroserviceInfo> getInfoAsync(Endpoint endpoint);
CompletableFuture<MicroserviceInfo> getInfoAsync(Endpoint endpoint, @QueryParam("service-id") String serviceId);

default MicroserviceInfo getInfo(Endpoint endpoint) {
return AsyncUtils.toSync(getInfoAsync(endpoint));

default MicroserviceInfo getInfo(Endpoint endpoint, String serviceId) {
return AsyncUtils.toSync(getInfoAsync(endpoint, serviceId));
}

@Path("/microservice")
@GET
@ApiOperation(value = "", nickname = "getMicroservice")
CompletableFuture<Microservice> getMicroserviceAsync(Endpoint endpoint);
CompletableFuture<Microservice> getMicroserviceAsync(Endpoint endpoint, @QueryParam("service-id") String serviceId);

default Microservice getMicroservice(Endpoint endpoint) {
return AsyncUtils.toSync(getMicroserviceAsync(endpoint));
default Microservice getMicroservice(Endpoint endpoint, String serviceId) {
return AsyncUtils.toSync(getMicroserviceAsync(endpoint, serviceId));
}

@Path("/instance")
@GET
@ApiOperation(value = "", nickname = "getInstance")
CompletableFuture<MicroserviceInstance> getInstanceAsync(Endpoint endpoint);
CompletableFuture<MicroserviceInstance> getInstanceAsync(Endpoint endpoint,
@QueryParam("service-id") String serviceId);

default MicroserviceInstance getInstance(Endpoint endpoint) {
return AsyncUtils.toSync(getInstanceAsync(endpoint));
default MicroserviceInstance getInstance(Endpoint endpoint, String serviceId) {
return AsyncUtils.toSync(getInstanceAsync(endpoint, serviceId));
}

@Path("/schemas/{schema-id}")
@GET
@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "", nickname = "getSchema")
CompletableFuture<String> getSchemaAsync(Endpoint endpoint, @PathParam("schema-id") String schemaId);
CompletableFuture<String> getSchemaAsync(Endpoint endpoint, @QueryParam("service-id") String serviceId,
@PathParam("schema-id") String schemaId);

default String getSchema(Endpoint endpoint, String schemaId) {
return AsyncUtils.toSync(getSchemaAsync(endpoint, schemaId));
default String getSchema(Endpoint endpoint, String serviceId, String schemaId) {
return AsyncUtils.toSync(getSchemaAsync(endpoint, serviceId, schemaId));
}
}
Expand Up @@ -31,6 +31,9 @@
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceInstance;

import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;

@RestSchema(schemaId = SCHEMA_ID)
@Path("/v1/discovery")
public class DiscoveryEndpoint {
Expand All @@ -42,24 +45,48 @@ public DiscoveryEndpoint(Self self) {
this.self = self;
}

@ApiImplicitParams(
{
@ApiImplicitParam(name = "service-id", paramType = "query", dataType = "string",
value = "just make it possible to mock many instances by one real instance for performance test")
}
)
@Path("/info")
@GET
public CompletableFuture<MicroserviceInfo> getInfo() {
return CompletableFuture.completedFuture(self.getMicroserviceInfo());
}

@ApiImplicitParams(
{
@ApiImplicitParam(name = "service-id", paramType = "query", dataType = "string",
value = "just make it possible to mock many instances by one real instance for performance test")
}
)
@Path("/microservice")
@GET
public CompletableFuture<Microservice> getMicroservice() {
return CompletableFuture.completedFuture(self.getMicroservice());
}

@ApiImplicitParams(
{
@ApiImplicitParam(name = "service-id", paramType = "query", dataType = "string",
value = "just make it possible to mock many instances by one real instance for performance test")
}
)
@Path("/instance")
@GET
public CompletableFuture<MicroserviceInstance> getInstance() {
return CompletableFuture.completedFuture(self.getInstance());
}

@ApiImplicitParams(
{
@ApiImplicitParam(name = "service-id", paramType = "query", dataType = "string",
value = "just make it possible to mock many instances by one real instance for performance test")
}
)
@Path("/schemas/{schema-id}")
@GET
@Produces(MediaType.TEXT_PLAIN)
Expand Down
Expand Up @@ -93,12 +93,12 @@ private CompletableFuture<InstanceStore> addInstance(RegisterRequest request) {
}

return checkSchemaSummary(request, microserviceStore)
.thenCompose(v -> discoveryClient.getInstanceAsync(endpoint))
.thenCompose(v -> discoveryClient.getInstanceAsync(endpoint, request.getServiceId()))
.thenApply(instance -> addInstance(microserviceStore, instance));
}

private CompletableFuture<InstanceStore> addMicroserviceAndInstance(Endpoint endpoint, RegisterRequest request) {
return discoveryClient.getInfoAsync(endpoint)
return discoveryClient.getInfoAsync(endpoint, request.getServiceId())
.thenApply(info -> {
info.getMicroservice().getSchemaMap().putAll(info.getSchemasById());
MicroserviceStore microserviceStore = store
Expand Down
Expand Up @@ -114,6 +114,14 @@ public List<Microservice> getAllMicroservices() {
.collect(Collectors.toList());
}

public int getMicroserviceCount() {
return microservicesByName.size();
}

public int getInstanceCount() {
return instancesById.size();
}

public Stream<MicroserviceInstance> findDeadInstances(Duration timeout) {
long nanoNow = ticker.read();
long nanoTimeout = timeout.toNanos();
Expand Down
Expand Up @@ -21,9 +21,6 @@ servicecomb:
consumer:
policies:
scb-discovery: simple-load-balance, scb-consumer-transport
producer:
policies:
scb-discovery: scb-producer-transport, schedule, producer-operation
handler:
chain:
Consumer:
Expand Down
Expand Up @@ -51,9 +51,9 @@ class StoreServiceTest extends TestBase {

@BeforeEach
void setUp() {
Mockito.when(discoveryClient.getInfoAsync(any()))
Mockito.when(discoveryClient.getInfoAsync(any(), any()))
.thenReturn(CompletableFuture.completedFuture(self.getMicroserviceInfo()));
Mockito.when(discoveryClient.getInstanceAsync(any()))
Mockito.when(discoveryClient.getInstanceAsync(any(), any()))
.thenReturn(CompletableFuture.completedFuture(self.getInstance()));
}

Expand Down
Expand Up @@ -17,12 +17,12 @@

package org.apache.servicecomb.zeroconfig;

import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_ADDRESS;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_ENABLED;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_GROUP;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_HEARTBEAT_INTERVAL;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_HEARTBEAT_LOST_TIMES;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_MODE;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_MULTICAST_ADDRESS;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_MULTICAST_GROUP;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.CFG_PULL_INTERVAL;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.DEFAULT_ADDRESS;
import static org.apache.servicecomb.zeroconfig.ZeroConfigConst.DEFAULT_GROUP;
Expand Down Expand Up @@ -66,13 +66,13 @@ public String getMode() {
return dynamicProperties.getStringProperty(CFG_MODE, MODE_MULTICAST);
}

public String getAddress() {
return dynamicProperties.getStringProperty(CFG_ADDRESS, DEFAULT_ADDRESS);
public String getMulticastAddress() {
return dynamicProperties.getStringProperty(CFG_MULTICAST_ADDRESS, DEFAULT_ADDRESS);
}

// (224.0.0.0, 239.255.255.255]
public String getGroup() {
return dynamicProperties.getStringProperty(CFG_GROUP, DEFAULT_GROUP);
public String getMulticastGroup() {
return dynamicProperties.getStringProperty(CFG_MULTICAST_GROUP, DEFAULT_GROUP);
}

public Duration getHeartbeatInterval() {
Expand Down
Expand Up @@ -26,9 +26,9 @@ public interface ZeroConfigConst {

String CFG_ENABLED = PREFIX + "enabled";

String CFG_GROUP = PREFIX + "multicast.group";
String CFG_MULTICAST_GROUP = PREFIX + "multicast.group";

String CFG_ADDRESS = PREFIX + "multicast.address";
String CFG_MULTICAST_ADDRESS = PREFIX + "multicast.address";

String CFG_HEARTBEAT_INTERVAL = PREFIX + "heartbeat.interval";

Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -68,15 +69,24 @@ public Multicast(Config config) throws IOException {
this.multicastSocket.setSoTimeout((int) TimeUnit.SECONDS.toMillis(5));
}

public Multicast setSendBufferSize(int size) throws SocketException {
multicastSocket.setSendBufferSize(size);
return this;
}

public Multicast setReceiveBufferSize(int size) throws SocketException {
multicastSocket.setReceiveBufferSize(size);
return this;
}

@SuppressWarnings("UnstableApiUsage")
private InetSocketAddress initBindAddress(Config config) {
HostAndPort hostAndPort = HostAndPort.fromString(config.getAddress());
HostAndPort hostAndPort = HostAndPort.fromString(config.getMulticastAddress());
return new InetSocketAddress(hostAndPort.getHost(), hostAndPort.getPort());
}

private InetAddress initGroup(Config config) throws UnknownHostException {
return InetAddress.getByName(config.getGroup());
return InetAddress.getByName(config.getMulticastGroup());
}

public <T> void send(MessageType type, T body) throws IOException {
Expand Down
Expand Up @@ -33,7 +33,7 @@
public class MulticastRegistration extends AbstractZeroConfigRegistration implements InitializingBean {
private static final String NAME = "zero-config-multicast";

private Multicast multicast;
protected Multicast multicast;

@Autowired
public MulticastRegistration setMulticast(Multicast multicast) {
Expand Down

0 comments on commit ad4aeb9

Please sign in to comment.