Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@
import org.slf4j.LoggerFactory;

import io.netty.buffer.Unpooled;
import io.servicecomb.common.rest.codec.RestCodec;
import io.servicecomb.common.rest.codec.produce.ProduceProcessor;
import io.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
import io.servicecomb.common.rest.definition.RestOperationMeta;
import io.servicecomb.common.rest.filter.HttpServerFilter;
import io.servicecomb.common.rest.locator.OperationLocator;
import io.servicecomb.common.rest.locator.ServicePathManager;
import io.servicecomb.core.Const;
import io.servicecomb.core.Invocation;
import io.servicecomb.core.definition.MicroserviceMeta;
import io.servicecomb.core.definition.OperationMeta;
import io.servicecomb.foundation.common.utils.JsonUtils;
import io.servicecomb.foundation.metrics.MetricsServoRegistry;
import io.servicecomb.foundation.metrics.performance.QueueMetrics;
import io.servicecomb.foundation.metrics.performance.QueueMetricsData;
import io.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import io.servicecomb.foundation.vertx.http.HttpServletResponseEx;
import io.servicecomb.foundation.vertx.stream.BufferOutputStream;
Expand All @@ -62,6 +70,18 @@ public void setHttpServerFilters(List<HttpServerFilter> httpServerFilters) {
this.httpServerFilters = httpServerFilters;
}

protected void findRestOperation(MicroserviceMeta microserviceMeta) {
ServicePathManager servicePathManager = ServicePathManager.getServicePathManager(microserviceMeta);
if (servicePathManager == null) {
LOGGER.error("No schema defined for {}:{}.", microserviceMeta.getAppId(), microserviceMeta.getName());
throw new InvocationException(Status.NOT_FOUND, Status.NOT_FOUND.getReasonPhrase());
}

OperationLocator locator = locateOperation(servicePathManager);
requestEx.setAttribute(RestConst.PATH_PARAMETERS, locator.getPathVarMap());
this.restOperationMeta = locator.getOperation();
}

protected void initProduceProcessor() {
produceProcessor = restOperationMeta.ensureFindProduceProcessor(requestEx);
if (produceProcessor == null) {
Expand All @@ -82,6 +102,44 @@ protected void setContext() throws Exception {
invocation.setContext(cseContext);
}

protected void scheduleInvocation() {
OperationMeta operationMeta = restOperationMeta.getOperationMeta();
QueueMetrics metricsData = initMetrics(operationMeta);
operationMeta.getExecutor().execute(() -> {
synchronized (this.requestEx) {
try {
if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
// already timeout
// in this time, request maybe recycled and reused by web container, do not use requestEx
LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
operationMeta.getHttpMethod(),
operationMeta.getMicroserviceQualifiedName());
return;
}

runOnExecutor(metricsData);
} catch (Throwable e) {
LOGGER.error("rest server onRequest error", e);
sendFailResponse(e);
}
}
});
}

protected void runOnExecutor(QueueMetrics metricsData) {
Object[] args = RestCodec.restToArgs(requestEx, restOperationMeta);
createInvocation(args);

this.invocation.setMetricsData(metricsData);
updateMetrics();

invoke();
}

protected abstract OperationLocator locateOperation(ServicePathManager servicePathManager);

protected abstract void createInvocation(Object[] args);

public void invoke() {
try {
Response response = prepareInvoke();
Expand Down Expand Up @@ -113,7 +171,12 @@ protected Response prepareInvoke() throws Throwable {
return null;
}

protected abstract void doInvoke() throws Throwable;
protected void doInvoke() throws Throwable {
invocation.next(resp -> {
sendResponseQuietly(resp);
endMetrics();
});
}

public void sendFailResponse(Throwable throwable) {
if (produceProcessor == null) {
Expand Down Expand Up @@ -166,4 +229,52 @@ protected void sendResponse(Response response) throws Exception {
responseEx.flushBuffer();
}
}

/**
* Init the metrics. Note down the queue count and start time.
* @param operationMeta Operation data
* @return QueueMetrics
*/
private QueueMetrics initMetrics(OperationMeta operationMeta) {
QueueMetrics metricsData = new QueueMetrics();
metricsData.setQueueStartTime(System.currentTimeMillis());
metricsData.setOperQualifiedName(operationMeta.getMicroserviceQualifiedName());
QueueMetricsData reqQueue = MetricsServoRegistry.getOrCreateLocalMetrics()
.getOrCreateQueueMetrics(operationMeta.getMicroserviceQualifiedName());
reqQueue.incrementCountInQueue();
return metricsData;
}

/**
* Update the queue metrics.
*/
private void updateMetrics() {
QueueMetrics metricsData = (QueueMetrics) this.invocation.getMetricsData();
if (null != metricsData) {
metricsData.setQueueEndTime(System.currentTimeMillis());
QueueMetricsData reqQueue = MetricsServoRegistry.getOrCreateLocalMetrics()
.getOrCreateQueueMetrics(restOperationMeta.getOperationMeta().getMicroserviceQualifiedName());
reqQueue.incrementTotalCount();
Long timeInQueue = metricsData.getQueueEndTime() - metricsData.getQueueStartTime();
reqQueue.setTotalTime(reqQueue.getTotalTime() + timeInQueue);
reqQueue.setMinLifeTimeInQueue(timeInQueue);
reqQueue.setMaxLifeTimeInQueue(timeInQueue);
reqQueue.decrementCountInQueue();
}
}

/**
* Prepare the end time of queue metrics.
*/
private void endMetrics() {
QueueMetrics metricsData = (QueueMetrics) this.invocation.getMetricsData();
if (null != metricsData) {
metricsData.setEndOperTime(System.currentTimeMillis());
QueueMetricsData reqQueue = MetricsServoRegistry.getOrCreateLocalMetrics()
.getOrCreateQueueMetrics(restOperationMeta.getOperationMeta().getMicroserviceQualifiedName());
reqQueue.incrementTotalServExecutionCount();
reqQueue.setTotalServExecutionTime(
reqQueue.getTotalServExecutionTime() + (metricsData.getEndOperTime() - metricsData.getQueueEndTime()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,20 @@

import java.util.List;

import javax.ws.rs.core.Response.Status;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.servicecomb.common.rest.codec.RestCodec;
import io.servicecomb.common.rest.definition.RestOperationMeta;
import io.servicecomb.common.rest.filter.HttpServerFilter;
import io.servicecomb.common.rest.locator.OperationLocator;
import io.servicecomb.common.rest.locator.ServicePathManager;
import io.servicecomb.core.Const;
import io.servicecomb.core.CseContext;
import io.servicecomb.core.Transport;
import io.servicecomb.core.definition.MicroserviceMeta;
import io.servicecomb.core.definition.OperationMeta;
import io.servicecomb.core.invocation.InvocationFactory;
import io.servicecomb.foundation.metrics.MetricsServoRegistry;
import io.servicecomb.foundation.metrics.performance.QueueMetrics;
import io.servicecomb.foundation.metrics.performance.QueueMetricsData;
import io.servicecomb.foundation.vertx.http.HttpServletRequestEx;
import io.servicecomb.foundation.vertx.http.HttpServletResponseEx;
import io.servicecomb.serviceregistry.RegistryUtils;
import io.servicecomb.swagger.invocation.exception.InvocationException;

public class RestProducerInvocation extends AbstractRestInvocation {
private static final Logger LOGGER = LoggerFactory.getLogger(RestProducerInvocation.class);

protected Transport transport;

public void invoke(Transport transport, HttpServletRequestEx requestEx, HttpServletResponseEx responseEx,
Expand All @@ -56,7 +43,7 @@ public void invoke(Transport transport, HttpServletRequestEx requestEx, HttpServ
requestEx.setAttribute(RestConst.REST_REQUEST, requestEx);

try {
this.restOperationMeta = findRestOperation();
findRestOperation();
} catch (InvocationException e) {
sendFailResponse(e);
return;
Expand All @@ -65,115 +52,26 @@ public void invoke(Transport transport, HttpServletRequestEx requestEx, HttpServ
scheduleInvocation();
}

protected void scheduleInvocation() {
OperationMeta operationMeta = restOperationMeta.getOperationMeta();
QueueMetrics metricsData = initMetrics(operationMeta);
operationMeta.getExecutor().execute(() -> {
synchronized (this.requestEx) {
try {
if (requestEx.getAttribute(RestConst.REST_REQUEST) != requestEx) {
// already timeout
// in this time, request maybe recycled and reused by web container, do not use requestEx
LOGGER.error("Rest request already timeout, abandon execute, method {}, operation {}.",
operationMeta.getHttpMethod(),
operationMeta.getMicroserviceQualifiedName());
return;
}

runOnExecutor(metricsData);
} catch (Throwable e) {
LOGGER.error("rest server onRequest error", e);
sendFailResponse(e);
}
}
});
}

protected void runOnExecutor(QueueMetrics metricsData) {
Object[] args = RestCodec.restToArgs(requestEx, restOperationMeta);
this.invocation = InvocationFactory.forProvider(transport.getEndpoint(),
restOperationMeta.getOperationMeta(),
args);
this.invocation.setMetricsData(metricsData);
updateMetrics();
invoke();
}

protected RestOperationMeta findRestOperation() {
protected void findRestOperation() {
String targetMicroserviceName = requestEx.getHeader(Const.TARGET_MICROSERVICE);
if (targetMicroserviceName == null) {
// for compatible
targetMicroserviceName = RegistryUtils.getMicroservice().getServiceName();
}
MicroserviceMeta selfMicroserviceMeta =
CseContext.getInstance().getMicroserviceMetaManager().ensureFindValue(targetMicroserviceName);
ServicePathManager servicePathManager = ServicePathManager.getServicePathManager(selfMicroserviceMeta);
if (servicePathManager == null) {
LOGGER.error("No schema in microservice");
throw new InvocationException(Status.NOT_FOUND, Status.NOT_FOUND.getReasonPhrase());
}

OperationLocator locator =
servicePathManager.producerLocateOperation(requestEx.getRequestURI(), requestEx.getMethod());
requestEx.setAttribute(RestConst.PATH_PARAMETERS, locator.getPathVarMap());

return locator.getOperation();
findRestOperation(selfMicroserviceMeta);
}

@Override
protected void doInvoke() throws Throwable {
invocation.next(resp -> {
sendResponseQuietly(resp);
endMetrics();

});
}

/**
* Init the metrics. Note down the queue count and start time.
* @param operationMeta Operation data
* @return QueueMetrics
*/
private QueueMetrics initMetrics(OperationMeta operationMeta) {
QueueMetrics metricsData = new QueueMetrics();
metricsData.setQueueStartTime(System.currentTimeMillis());
metricsData.setOperQualifiedName(operationMeta.getMicroserviceQualifiedName());
QueueMetricsData reqQueue = MetricsServoRegistry.getOrCreateLocalMetrics()
.getOrCreateQueueMetrics(operationMeta.getMicroserviceQualifiedName());
reqQueue.incrementCountInQueue();
return metricsData;
}

/**
* Update the queue metrics.
*/
private void updateMetrics() {
QueueMetrics metricsData = (QueueMetrics) this.invocation.getMetricsData();
if (null != metricsData) {
metricsData.setQueueEndTime(System.currentTimeMillis());
QueueMetricsData reqQueue = MetricsServoRegistry.getOrCreateLocalMetrics()
.getOrCreateQueueMetrics(restOperationMeta.getOperationMeta().getMicroserviceQualifiedName());
reqQueue.incrementTotalCount();
Long timeInQueue = metricsData.getQueueEndTime() - metricsData.getQueueStartTime();
reqQueue.setTotalTime(reqQueue.getTotalTime() + timeInQueue);
reqQueue.setMinLifeTimeInQueue(timeInQueue);
reqQueue.setMaxLifeTimeInQueue(timeInQueue);
reqQueue.decrementCountInQueue();
}
protected OperationLocator locateOperation(ServicePathManager servicePathManager) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we cannot find the OperationLocator?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if locate failed, throw 404 or 405 exception.

return servicePathManager.producerLocateOperation(requestEx.getRequestURI(), requestEx.getMethod());
}

/**
* Prepare the end time of queue metrics.
*/
private void endMetrics() {
QueueMetrics metricsData = (QueueMetrics) this.invocation.getMetricsData();
if (null != metricsData) {
metricsData.setEndOperTime(System.currentTimeMillis());
QueueMetricsData reqQueue = MetricsServoRegistry.getOrCreateLocalMetrics()
.getOrCreateQueueMetrics(restOperationMeta.getOperationMeta().getMicroserviceQualifiedName());
reqQueue.incrementTotalServExecutionCount();
reqQueue.setTotalServExecutionTime(
reqQueue.getTotalServExecutionTime() + (metricsData.getEndOperTime() - metricsData.getQueueEndTime()));
}
@Override
protected void createInvocation(Object[] args) {
this.invocation = InvocationFactory.forProvider(transport.getEndpoint(),
restOperationMeta.getOperationMeta(),
args);
}
}
Loading