Skip to content

Commit

Permalink
Backend streaming thread model improvement (#2247)
Browse files Browse the repository at this point in the history
[Performance Improvement]Backend streaming thread model improvement
  • Loading branch information
wu-sheng committed Feb 18, 2019
1 parent 19614a1 commit 2f3ccf3
Show file tree
Hide file tree
Showing 23 changed files with 455 additions and 26 deletions.
Empty file modified .mvn/wrapper/MavenWrapperDownloader.java 100755 → 100644
Empty file.
Empty file modified .mvn/wrapper/maven-wrapper.properties 100755 → 100644
Empty file.
Expand Up @@ -18,8 +18,7 @@

package org.apache.skywalking.apm.commons.datacarrier.buffer;

import java.util.LinkedList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;

Expand Down Expand Up @@ -80,6 +79,10 @@ public int getBufferSize() {
return buffer.length;
}

public LinkedList<T> obtain() {
return this.obtain(0, buffer.length);
}

public LinkedList<T> obtain(int start, int end) {
LinkedList<T> result = new LinkedList<T>();
for (int i = start; i < end; i++) {
Expand Down
Expand Up @@ -81,6 +81,14 @@ public int getChannelSize() {
return this.bufferChannels.length;
}

public int getBufferSize() {
return bufferChannels[0].getBufferSize();
}

public long size() {
return (long)getChannelSize() * getBufferSize();
}

public Buffer<T> getBuffer(int index) {
return this.bufferChannels[index];
}
Expand Down
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.commons.datacarrier.consumer;

import java.util.*;
import java.util.concurrent.Callable;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;

/**
* BulkConsumePool works for consuming data from multiple channels(DataCarrier instances), with multiple {@link
* MultipleChannelsConsumer}s.
*
* In typical case, the number of {@link MultipleChannelsConsumer} should be less than the number of channels.
*
* @author wusheng
*/
public class BulkConsumePool implements ConsumerPool {
private List<MultipleChannelsConsumer> allConsumers;
private volatile boolean isStarted = false;

public BulkConsumePool(String name, int size, long consumeCycle) {
allConsumers = new ArrayList<MultipleChannelsConsumer>(size);
String threadNum = System.getenv(name + "_THREAD");
if (threadNum != null) {
try {
size = Integer.parseInt(threadNum);
} catch (NumberFormatException e) {

}
}
for (int i = 0; i < size; i++) {
MultipleChannelsConsumer multipleChannelsConsumer = new MultipleChannelsConsumer("DataCarrier." + name + ".BulkConsumePool." + i + ".Thread", consumeCycle);
multipleChannelsConsumer.setDaemon(true);
allConsumers.add(multipleChannelsConsumer);
}
}

@Override synchronized public void add(String name, Channels channels, IConsumer consumer) {
MultipleChannelsConsumer multipleChannelsConsumer = getLowestPayload();
multipleChannelsConsumer.addNewTarget(channels, consumer);
}

/**
* Get the lowest payload consumer thread based on current allocate status.
*
* @return the lowest consumer.
*/
private MultipleChannelsConsumer getLowestPayload() {
MultipleChannelsConsumer winner = allConsumers.get(0);
for (int i = 1; i < allConsumers.size(); i++) {
MultipleChannelsConsumer option = allConsumers.get(i);
if (option.size() < winner.size()) {
return option;
}
}
return winner;
}

/**
* @param channels
* @return
*/
@Override public boolean isRunning(Channels channels) {
return isStarted;
}

@Override public void close(Channels channels) {
for (MultipleChannelsConsumer consumer : allConsumers) {
consumer.shutdown();
}
}

@Override public void begin(Channels channels) {
if (isStarted) {
return;
}
for (MultipleChannelsConsumer consumer : allConsumers) {
consumer.start();
}
isStarted = true;
}

/**
* The creator for {@link BulkConsumePool}.
*/
public static class Creator implements Callable<ConsumerPool> {
private String name;
private int size;
private long consumeCycle;

public Creator(String name, int poolSize, long consumeCycle) {
this.name = name;
this.size = poolSize;
this.consumeCycle = consumeCycle;
}

@Override public ConsumerPool call() {
return new BulkConsumePool(name, size, consumeCycle);
}

public static int recommendMaxSize() {
int processorNum = Runtime.getRuntime().availableProcessors();
if (processorNum > 1) {
processorNum -= 1;
}
return processorNum;
}
}
}
Expand Up @@ -63,9 +63,13 @@ public ConsumerPool get(String poolName) {
}
}

/**
* Always return true.
* @param channels
* @return
*/
@Override public boolean isRunning(Channels channels) {
ConsumeDriver driver = allDrivers.get(channels);
return driver == null ? false : driver.isRunning(channels);
return true;
}

@Override public void close(Channels channels) {
Expand Down
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.commons.datacarrier.consumer;

import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.buffer.*;

/**
* MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
* IConsumer}s
*
* @author wusheng
*/
public class MultipleChannelsConsumer extends Thread {
private volatile boolean running;
private volatile ArrayList<Group> consumeTargets;
private volatile long size;
private final long consumeCycle;

public MultipleChannelsConsumer(String threadName, long consumeCycle) {
super(threadName);
this.consumeTargets = new ArrayList<Group>();
this.consumeCycle = consumeCycle;
}

@Override
public void run() {
running = true;

while (running) {
boolean hasData = false;
for (Group target : consumeTargets) {
hasData = hasData || consume(target);
}

if (!hasData) {
try {
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}

}

// consumer thread is going to stop
// consume the last time
for (Group target : consumeTargets) {
consume(target);

target.consumer.onExit();
}
}

private boolean consume(Group target) {
boolean hasData;
LinkedList consumeList = new LinkedList();
for (int i = 0; i < target.channels.getChannelSize(); i++) {
Buffer buffer = target.channels.getBuffer(i);
consumeList.addAll(buffer.obtain());
}
hasData = consumeList.size() > 0;

if (consumeList.size() > 0) {
try {
target.consumer.consume(consumeList);
} catch (Throwable t) {
target.consumer.onError(consumeList, t);
}
}
return hasData;
}

/**
* Add a new target channels.
*
* @param channels
* @param consumer
*/
public void addNewTarget(Channels channels, IConsumer consumer) {
Group group = new Group(channels, consumer);
// Recreate the new list to avoid change list while the list is used in consuming.
ArrayList<Group> newList = new ArrayList<Group>();
for (Group target : consumeTargets) {
newList.add(target);
}
newList.add(group);
consumeTargets = newList;
size += channels.size();
}

public long size() {
return size;
}

void shutdown() {
running = false;
}

private class Group {
private Channels channels;
private IConsumer consumer;

public Group(Channels channels, IConsumer consumer) {
this.channels = channels;
this.consumer = consumer;
}
}
}
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.commons.datacarrier.consumer;

import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
import org.junit.*;

/**
* @author wusheng
*/
public class BulkConsumePoolTest {
@Test
public void testOneThreadPool() throws InterruptedException {
BulkConsumePool pool = new BulkConsumePool("testPool", 1, 50);
final ArrayList<Object> result1 = new ArrayList();
Channels c1 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
pool.add("test", c1,
new IConsumer() {
@Override public void init() {

}

@Override public void consume(List data) {
for (Object datum : data) {
result1.add(datum);
}
}

@Override public void onError(List data, Throwable t) {

}

@Override public void onExit() {

}
});
pool.begin(c1);
final ArrayList<Object> result2 = new ArrayList();
Channels c2 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
pool.add("test2", c2,
new IConsumer() {
@Override public void init() {

}

@Override public void consume(List data) {
for (Object datum : data) {
result2.add(datum);
}
}

@Override public void onError(List data, Throwable t) {

}

@Override public void onExit() {

}
});
pool.begin(c2);
c1.save(new Object());
c1.save(new Object());
c1.save(new Object());
c1.save(new Object());
c1.save(new Object());
c2.save(new Object());
c2.save(new Object());
Thread.sleep(2000);

Assert.assertEquals(5, result1.size());
Assert.assertEquals(2, result2.size());
}
}

0 comments on commit 2f3ccf3

Please sign in to comment.