Skip to content

Commit

Permalink
Merge 4a925f7 into 908e20f
Browse files Browse the repository at this point in the history
  • Loading branch information
yhs0092 committed Feb 28, 2020
2 parents 908e20f + 4a925f7 commit 2f3e710
Show file tree
Hide file tree
Showing 81 changed files with 4,243 additions and 714 deletions.
63 changes: 48 additions & 15 deletions core/src/main/java/org/apache/servicecomb/core/SCBEngine.java
Expand Up @@ -48,13 +48,16 @@
import org.apache.servicecomb.core.provider.producer.ProducerProviderManager;
import org.apache.servicecomb.core.transport.TransportManager;
import org.apache.servicecomb.foundation.common.VendorExtensions;
import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
import org.apache.servicecomb.foundation.common.event.EnableExceptionPropagation;
import org.apache.servicecomb.foundation.common.event.EventManager;
import org.apache.servicecomb.foundation.common.log.LogMarkerLeakFixUtils;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.foundation.vertx.VertxUtils;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.ServiceRegistry;
import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstanceStatus;
import org.apache.servicecomb.serviceregistry.consumer.MicroserviceVersions;
import org.apache.servicecomb.serviceregistry.definition.MicroserviceNameParser;
import org.apache.servicecomb.serviceregistry.swagger.SwaggerLoader;
Expand All @@ -78,6 +81,10 @@ public class SCBEngine {

static final long DEFAULT_WAIT_UP_TIMEOUT = 10_000;

static final String CFG_KEY_TURN_DOWN_STATUS_WAIT_SEC = "servicecomb.boot.turnDown.waitInSeconds";

static final long DEFAULT_TURN_DOWN_STATUS_WAIT_SEC = 0;

private static final Object initializationLock = new Object();

private volatile static SCBEngine INSTANCE;
Expand All @@ -103,8 +110,6 @@ public class SCBEngine {

private volatile SCBStatus status = SCBStatus.DOWN;

private ServiceRegistry serviceRegistry;

private EventBus eventBus;

private ExecutorManager executorManager = new ExecutorManager();
Expand All @@ -123,8 +128,7 @@ public class SCBEngine {
private Thread shutdownHook;

protected SCBEngine() {
serviceRegistry = RegistryUtils.getServiceRegistry();
eventBus = serviceRegistry.getEventBus();
eventBus = EventManager.getEventBus();

// see SCB-1266, fix Log4j2 leak marker problem
LogMarkerLeakFixUtils.fix();
Expand All @@ -142,7 +146,7 @@ public VendorExtensions getVendorExtensions() {
}

public String getAppId() {
return serviceRegistry.getAppId();
return RegistryUtils.getAppId();
}

public void setStatus(SCBStatus status) {
Expand All @@ -165,11 +169,11 @@ public static SCBEngine getInstance() {
}

public ServiceRegistry getServiceRegistry() {
return serviceRegistry;
return RegistryUtils.getServiceRegistry();
}

public SwaggerLoader getSwaggerLoader() {
return serviceRegistry.getSwaggerLoader();
return RegistryUtils.getSwaggerLoader();
}

public ConsumerHandlerManager getConsumerHandlerManager() {
Expand Down Expand Up @@ -318,19 +322,18 @@ public synchronized SCBEngine run() {
private void printServiceInfo() {
StringBuilder serviceInfo = new StringBuilder();
serviceInfo.append("Service information is shown below:\n");
for (int i = 0; i < bootUpInformationCollectors.size(); i++) {
serviceInfo.append(bootUpInformationCollectors.get(i).collect()).append('\n');
for (BootUpInformationCollector bootUpInformationCollector : bootUpInformationCollectors) {
serviceInfo.append(bootUpInformationCollector.collect()).append('\n');
}
LOGGER.info(serviceInfo.toString());
}


private void doRun() throws Exception {
status = SCBStatus.STARTING;

bootListeners.sort(Comparator.comparingInt(BootListener::getOrder));

AbstractEndpointsCache.init(serviceRegistry.getInstanceCacheManager(), transportManager);
AbstractEndpointsCache.init(RegistryUtils.getInstanceCacheManager(), transportManager);

triggerEvent(EventType.BEFORE_HANDLER);
HandlerConfigUtils.init(consumerHandlerManager, producerHandlerManager);
Expand All @@ -354,14 +357,14 @@ private void doRun() throws Exception {

triggerAfterRegistryEvent();

serviceRegistry.run();
RegistryUtils.run();

shutdownHook = new Thread(this::destroyForShutdownHook);
Runtime.getRuntime().addShutdownHook(shutdownHook);
}

private void createProducerMicroserviceMeta() {
String microserviceName = serviceRegistry.getMicroservice().getServiceName();
String microserviceName = RegistryUtils.getMicroservice().getServiceName();
List<Handler> consumerHandlerChain = consumerHandlerManager.getOrCreate(microserviceName);
List<Handler> producerHandlerChain = producerHandlerManager.getOrCreate(microserviceName);

Expand Down Expand Up @@ -391,6 +394,12 @@ private void doDestroy() {
if (shutdownHook != null) {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}

//Step 0: turn down the status of this instance in service center,
// so that the consumers can remove this instance record in advance
turnDownInstanceStatus();
blockShutDownOperationForConsumerRefresh();

//Step 1: notify all component stop invoke via BEFORE_CLOSE Event
safeTriggerEvent(EventType.BEFORE_CLOSE);

Expand All @@ -399,7 +408,7 @@ private void doDestroy() {

//Step 3: Unregister microservice instance from Service Center and close vertx
// Forbidden other consumers find me
serviceRegistry.destroy();
RegistryUtils.destroy();
VertxUtils.blockCloseVertxByName("registry");
serviceRegistryListener.destroy();

Expand All @@ -422,6 +431,30 @@ private void doDestroy() {
safeTriggerEvent(EventType.AFTER_CLOSE);
}

private void turnDownInstanceStatus() {
RegistryUtils.executeOnEachServiceRegistry(sr -> new SuppressedRunnableWrapper(() -> {
MicroserviceInstance selfInstance = sr.getMicroserviceInstance();
sr.getServiceRegistryClient().updateMicroserviceInstanceStatus(
selfInstance.getServiceId(),
selfInstance.getInstanceId(),
MicroserviceInstanceStatus.DOWN);
}).run());
}

private void blockShutDownOperationForConsumerRefresh() {
try {
long turnDownWaitSeconds = DynamicPropertyFactory.getInstance()
.getLongProperty(CFG_KEY_TURN_DOWN_STATUS_WAIT_SEC, DEFAULT_TURN_DOWN_STATUS_WAIT_SEC)
.get();
if (turnDownWaitSeconds <= 0) {
return;
}
Thread.sleep(TimeUnit.SECONDS.toMillis(turnDownWaitSeconds));
} catch (InterruptedException e) {
LOGGER.warn("failed to block the shutdown procedure", e);
}
}

private void validAllInvocationFinished() throws InterruptedException {
long start = System.currentTimeMillis();
while (true) {
Expand Down Expand Up @@ -464,7 +497,7 @@ public MicroserviceReferenceConfig createMicroserviceReferenceConfig(String micr
* @return
*/
public MicroserviceReferenceConfig createMicroserviceReferenceConfig(String microserviceName, String versionRule) {
MicroserviceVersions microserviceVersions = serviceRegistry.getAppManager()
MicroserviceVersions microserviceVersions = RegistryUtils.getAppManager()
.getOrCreateMicroserviceVersions(parseAppId(microserviceName), microserviceName);
ConsumerMicroserviceVersionsMeta microserviceVersionsMeta = CoreMetaUtils
.getMicroserviceVersionsMeta(microserviceVersions);
Expand Down
Expand Up @@ -22,7 +22,6 @@

import javax.ws.rs.core.Response.Status;

import org.apache.servicecomb.core.Const;
import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.exception.ExceptionUtils;
Expand Down
Expand Up @@ -100,7 +100,7 @@ public ReferenceConfig createReferenceConfig(String transport, OperationMeta ope
}

private void mark3rdPartyService(OperationMeta operationMeta, ReferenceConfig referenceConfig) {
final MicroserviceVersions microserviceVersions = RegistryUtils.getServiceRegistry().getAppManager()
final MicroserviceVersions microserviceVersions = RegistryUtils.getAppManager()
.getOrCreateMicroserviceVersions(
operationMeta.getMicroserviceMeta().getAppId(),
operationMeta.getMicroserviceName());
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.definition.SchemaMeta;
import org.apache.servicecomb.foundation.common.utils.IOUtils;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.api.Const;
import org.apache.servicecomb.serviceregistry.api.registry.BasePath;
import org.apache.servicecomb.serviceregistry.api.registry.Microservice;
Expand Down Expand Up @@ -65,10 +66,14 @@ public void onAfterTransport(BootEvent event) {
microserviceMeta.getMicroserviceName(),
schemaMeta.getSchemaId(),
content);
microservice.addSchema(schemaMeta.getSchemaId(), content);
RegistryUtils.executeOnEachServiceRegistry(sr -> {
sr.getMicroservice().addSchema(schemaMeta.getSchemaId(), content);
});
}

saveBasePaths(microserviceMeta, microservice);
RegistryUtils.executeOnEachServiceRegistry(sr -> {
saveBasePaths(microserviceMeta, sr.getMicroservice());
});
}

// just compatible to old 3rd components, servicecomb not use it......
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.servicecomb.core.Transport;
import org.apache.servicecomb.foundation.common.exceptions.ServiceCombException;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.api.registry.Microservice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,8 +61,10 @@ public void init(SCBEngine scbEngine) throws Exception {
Endpoint endpoint = transport.getPublishEndpoint();
if (endpoint != null && endpoint.getEndpoint() != null) {
LOGGER.info("endpoint to publish: {}", endpoint.getEndpoint());
Microservice microservice = scbEngine.getServiceRegistry().getMicroservice();
microservice.getInstance().getEndpoints().add(endpoint.getEndpoint());
RegistryUtils.executeOnEachServiceRegistry(sr -> {
Microservice microservice = sr.getMicroservice();
microservice.getInstance().getEndpoints().add(endpoint.getEndpoint());
});
}
continue;
}
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.servicecomb.core.bootstrap.SCBBootstrap;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils;
import org.apache.servicecomb.serviceregistry.RegistryUtils;
import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance;
import org.apache.servicecomb.serviceregistry.discovery.DiscoveryFilter;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
Expand Down Expand Up @@ -67,9 +68,9 @@ public void setUp() throws Exception {
}
};

new Expectations(scbEngine.getServiceRegistry().getInstanceCacheManager()) {
new Expectations(RegistryUtils.getInstanceCacheManager()) {
{
scbEngine.getServiceRegistry().getInstanceCacheManager()
RegistryUtils.getInstanceCacheManager()
.getOrCreateVersionedCache(anyString, anyString, anyString);
result = instanceVersionedCache;
}
Expand Down
Expand Up @@ -264,7 +264,7 @@ protected void invoke(String appendUrl, int x, int y, List<ResultWithInstance> r

private URIEndpointObject prepareEdge(String prefix) {
Microservice microservice = RegistryUtils.getMicroservice();
MicroserviceInstance microserviceInstance = (MicroserviceInstance) RegistryUtils.getServiceRegistry()
MicroserviceInstance microserviceInstance = (MicroserviceInstance) RegistryUtils
.getAppManager()
.getOrCreateMicroserviceVersionRule(microservice.getAppId(), "edge", DefinitionConst.VERSION_RULE_ALL)
.getVersionedCache()
Expand Down
Expand Up @@ -78,7 +78,7 @@ public void testHighwayTransport() throws Exception {

private static void prepareServerDirectURL() {
Microservice microservice = RegistryUtils.getMicroservice();
MicroserviceInstance microserviceInstance = (MicroserviceInstance) RegistryUtils.getServiceRegistry()
MicroserviceInstance microserviceInstance = (MicroserviceInstance) RegistryUtils
.getAppManager()
.getOrCreateMicroserviceVersionRule(microservice.getAppId(), "jaxrs", DefinitionConst.VERSION_RULE_ALL)
.getVersionedCache()
Expand Down
Expand Up @@ -106,8 +106,8 @@ public void dispatchEvent(Object event) {
try {
dispatcher.accept(event);
} catch (Throwable e) {
LOGGER.error("event process should not throw error. ", e);
if (enableExceptionPropagation) {
LOGGER.error("event process should not throw error. ", e);
throw e;
}
}
Expand Down

0 comments on commit 2f3e710

Please sign in to comment.