Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,13 @@ public abstract class AbstractTransport implements Transport {
private static final long DEFAULT_TIMEOUT_MILLIS = 30000;

// 所有transport使用同一个vertx实例,避免创建太多的线程
public static TransportVertxFactory transportVertxFactory = new TransportVertxFactory();
private static TransportVertxFactory transportVertxFactory = new TransportVertxFactory();

protected Vertx transportVertx = transportVertxFactory.getTransportVertx();
public static TransportVertxFactory getTransportVertxFactory() {
return transportVertxFactory;
}

protected Vertx transportVertx = getTransportVertxFactory().getTransportVertx();

protected Endpoint endpoint;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,6 @@ public static void run() {

// TestMgr.check(true, metrics.get("jvm(name=heapUsed,statistic=gauge)") != 0);
TestMgr.check(true, metrics.size() > 0);
TestMgr.check(true,
metrics.get(
"servicecomb.invocation(operation=springmvc.codeFirst.saySomething,role=PRODUCER,stage=total,statistic=count,status=200,transport=highway)")
>= 0);

//prometheus integration test
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.Part;
import javax.ws.rs.QueryParam;
Expand All @@ -44,6 +45,7 @@
import org.apache.servicecomb.demo.jaxbbean.JAXBPerson;
import org.apache.servicecomb.demo.server.User;
import org.apache.servicecomb.demo.springmvc.decoderesponse.DecodeTestResponse;
import org.apache.servicecomb.metrics.core.MetricsBootListener;
import org.apache.servicecomb.provider.rest.common.RestSchema;
import org.apache.servicecomb.swagger.extend.annotations.RawJsonRequestBody;
import org.apache.servicecomb.swagger.extend.annotations.ResponseHeaders;
Expand Down Expand Up @@ -361,9 +363,15 @@ public String testform(HttpServletRequest request) {
return form1 + form2;
}

@Inject
MetricsBootListener metricsBootListener;

//Only for Prometheus integration test
@RequestMapping(path = "/prometheusForTest", method = RequestMethod.GET)
public String prometheusForTest() {
// just for test, this makes client always can got latest metrics
metricsBootListener.getMetricsBootstrap().pollMeters();

RestTemplate defaultRestTemplate = new RestTemplate();
return defaultRestTemplate.getForObject("http://localhost:9696/metrics", String.class);
}
Expand Down
6 changes: 4 additions & 2 deletions demo/perf/src/main/resources/microservice.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ servicecomb:
client:
thread-count: 8
references:
transport: rest
transport: highway
metrics:
window_time: 1000
publisher.defaultLog.enabled: true
publisher.defaultLog:
enabled: true
endpoints.client.detail.enabled: true

sync-count: 10
async-count: 20
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*
*/
public class EventManager {
public static EventBus eventBus = new EventBus();
public static EventBus eventBus = new SimpleEventBus();

public static EventBus getEventBus() {
return eventBus;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.foundation.common.event;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;

/**
* for register/unregister rarely
*/
public class SimpleEventBus extends EventBus {
private final Map<Object, List<SimpleSubscriber>> subscribersMap = new ConcurrentHashMapEx<>();

// key is event class
private Map<Class<?>, List<SimpleSubscriber>> subscribersCache = new ConcurrentHashMapEx<>();

private List<SimpleSubscriber> collectSubscribers(Object instance) {
List<SimpleSubscriber> subscribers = new ArrayList<>();
Method[] methods = MethodUtils.getMethodsWithAnnotation(instance.getClass(), Subscribe.class, true, true);
for (Method method : methods) {
SimpleSubscriber subscriber = new SimpleSubscriber(instance, method);
subscribers.add(subscriber);
}
return subscribers;
}

@Override
public void register(Object instance) {
subscribersMap.computeIfAbsent(instance, this::collectSubscribers);
// even ignored cause of duplicate register
// still reset cache
// this makes logic simpler
subscribersCache = new ConcurrentHashMapEx<>();
}

@Override
public void unregister(Object instance) {
if (subscribersMap.remove(instance) != null) {
subscribersCache = new ConcurrentHashMapEx<>();
}
}

public void post(Object event) {
// cache always reset after register/unregister
// so cache always match latest subscribersMap at last
// te worst scenes is invoke collectSubscriberForEvent multiple times, no problem
List<SimpleSubscriber> subscribers = subscribersCache
.computeIfAbsent(event.getClass(), this::collectSubscriberForEvent);
for (SimpleSubscriber subscriber : subscribers) {
subscriber.dispatchEvent(event);
}
}

/**
* subscribersMap almost stable<br>
* so we not care for performance of collectSubscriberForEvent
* @param eventClass
*/
private List<SimpleSubscriber> collectSubscriberForEvent(Class<?> eventClass) {
List<SimpleSubscriber> subscribersForEvent = new ArrayList<>();
for (List<SimpleSubscriber> subscribers : subscribersMap.values()) {
for (SimpleSubscriber subscriber : subscribers) {
if (subscriber.getMethod().getParameterTypes()[0].isAssignableFrom(eventClass)) {
subscribersForEvent.add(subscriber);
}
}
}
return subscribersForEvent;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicecomb.foundation.common.event;

import java.lang.reflect.Method;
import java.util.function.Consumer;

import org.apache.servicecomb.foundation.common.utils.LambdaMetafactoryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.eventbus.AllowConcurrentEvents;

public class SimpleSubscriber {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSubscriber.class);

private Object instance;

private Method method;

// generated from method
private Consumer<Object> lambda;

private Consumer<Object> dispatcher;

public SimpleSubscriber(Object instance, Method method) {
this.instance = instance;
this.method = method;

method.setAccessible(true);
try {
lambda = LambdaMetafactoryUtils.createLambda(instance, method, Consumer.class);
} catch (Throwable throwable) {
LOGGER.warn("Failed to create lambda for method: {}, fallback to reflect.", method);
lambda = event -> {
try {
method.invoke(instance, event);
} catch (Throwable e) {
throw new IllegalStateException(e);
}
};
}

dispatcher = this::syncDispatch;
if (method.getAnnotation(AllowConcurrentEvents.class) != null) {
dispatcher = this::concurrentDispatch;
}
}

public Object getInstance() {
return instance;
}

public Method getMethod() {
return method;
}

public void dispatchEvent(Object event) {
dispatcher.accept(event);
}

private void syncDispatch(Object event) {
synchronized (this) {
lambda.accept(event);
}
}

private void concurrentDispatch(Object event) {
lambda.accept(event);
}
}
Loading