Skip to content

Commit

Permalink
NodeStorage fix #19 : add sutom executor pool with size based on node…
Browse files Browse the repository at this point in the history
…s count
  • Loading branch information
wayerr committed Jan 30, 2017
1 parent 8139b0b commit d42e12b
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.codeabovelab.dm.common.mb.MessageBus;
import com.codeabovelab.dm.common.mb.Subscriptions;
import com.codeabovelab.dm.common.security.Action;
import com.codeabovelab.dm.common.utils.ExecutorUtils;
import com.codeabovelab.dm.common.validate.ValidityException;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
Expand All @@ -52,6 +53,7 @@
import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand All @@ -69,16 +71,18 @@ public class NodeStorage implements NodeInfoProvider, NodeRegistry {
private final PersistentBusFactory persistentBusFactory;
private final ExecutorService executorService;
private final DockerEventsConfig dockerEventConfig;
private final NodeStorageConfig config;
private DockerServiceFactory dockerFactory;

@Autowired
public NodeStorage(KvMapperFactory kvmf,
public NodeStorage(NodeStorageConfig config,
KvMapperFactory kvmf,
@Qualifier(NodeEvent.BUS) MessageBus<NodeEvent> nodeEventBus,
@Qualifier(DockerServiceEvent.BUS) MessageBus<DockerServiceEvent> dockerBus,
@Qualifier(DockerLogEvent.BUS) MessageBus<DockerLogEvent> dockerLogBus,
DockerEventsConfig dockerEventConfig,
PersistentBusFactory persistentBusFactory,
ExecutorService executorService) {
PersistentBusFactory persistentBusFactory) {
this.config = config;
this.nodeEventBus = nodeEventBus;
this.persistentBusFactory = persistentBusFactory;
this.dockerEventConfig = dockerEventConfig;
Expand All @@ -97,7 +101,24 @@ public NodeStorage(KvMapperFactory kvmf,
.listener(this::onKVEvent)
.mapper(kvmf)
.build();
this.executorService = executorService;
log.info("{} initialized with config: {}", getClass().getSimpleName(), this.config);
this.executorService = ExecutorUtils.executorBuilder()
.name(getClass().getSimpleName())
.maxSize(this.config.getMaxNodes())
.rejectedHandler((runnable, executor) -> {
String hint = "";
try {
int nodes = this.nodes.list().size();
int maxNodes = this.config.getMaxNodes();
if(nodes > maxNodes) {
hint = "\nNote that 'config.maxNodes'=" + maxNodes + " but storage has 'nodes'=" + nodes;
}
} catch (Exception e) {
//supress
}
throw new RejectedExecutionException("Task " + runnable + " rejected from " + executor + hint);
})
.build();
dockerBus.subscribe(this::onDockerServiceEvent);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2017 Code Above Lab LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.codeabovelab.dm.cluman.ds.nodes;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
*/
@Data
@Component
@ConfigurationProperties("dm.nodeStorage")
public class NodeStorageConfig {
/**
* Maximal value of expected nodes count. <p/>
* It not limit, it used for allocate some resources, so you can
* exceed this value, but then performance may degrade.
*/
private int maxNodes = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.codeabovelab.dm.common.utils;

import lombok.Data;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Common utilities for {@link java.util.concurrent.Executor }
Expand Down Expand Up @@ -62,4 +64,108 @@ public void flush() {
public static DeferredExecutor deferred() {
return new DeferredExecutor();
}

private static class ThreadFactoryImpl implements ThreadFactory {
private final ThreadGroup group;
private final AtomicInteger count = new AtomicInteger(1);
private final String prefix;
private final boolean daemon;

ThreadFactoryImpl(String name, boolean daemon) {
SecurityManager sm = System.getSecurityManager();
group = (sm != null)? sm.getThreadGroup():
Thread.currentThread().getThreadGroup();
this.prefix = name.endsWith("-")? name : (name + "-");
this.daemon = daemon;
}

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(group, r, prefix + count.getAndIncrement());
thread.setDaemon(daemon);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}

@Data
public static final class ExecutorBuilder {
private String name;
private boolean daemon = true;
private Thread.UncaughtExceptionHandler exceptionHandler = Throwables.uncaughtHandler();
private RejectedExecutionHandler rejectedHandler = new ThreadPoolExecutor.AbortPolicy();
private int maxSize = 5;
private int coreSize = 2;
private long keepAlive = 30;
private int queueSize = 10;

/**
* Name of thread, without thread number pattern.
* @param name name
* @return this
*/
public ExecutorBuilder name(String name) {
setName(name);
return this;
}

/**
* Daemon flag.
* @param daemon default true
* @return this
*/
public ExecutorBuilder daemon(boolean daemon) {
setDaemon(daemon);
return this;
}

/**
* Uncaught exception handler.
* @param exceptionHandler handler, default {@link Throwables#uncaughtHandler()}
* @return this
*/
public ExecutorBuilder exceptionHandler(Thread.UncaughtExceptionHandler exceptionHandler) {
setExceptionHandler(exceptionHandler);
return this;
}

/**
* Rejected execution handler.
* @param rejectedHandler rejected execution handler, default {@link ThreadPoolExecutor.AbortPolicy()}
* @return this
*/
public ExecutorBuilder rejectedHandler(RejectedExecutionHandler rejectedHandler) {
setRejectedHandler(rejectedHandler);
return this;
}

public ExecutorBuilder coreSize(int coreSize) {
setCoreSize(coreSize);
return this;
}

public ExecutorBuilder maxSize(int maxSize) {
setMaxSize(maxSize);
return this;
}

public ExecutorBuilder keepAlive(long keepAlive) {
setKeepAlive(keepAlive);
return this;
}

public ExecutorBuilder queueSize(int queueSize) {
setQueueSize(queueSize);
return this;
}

public ExecutorService build() {
ThreadFactory tf = new ThreadFactoryImpl(name, daemon);
return new ThreadPoolExecutor(coreSize, maxSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<>(queueSize), tf, rejectedHandler);
}
}

public static ExecutorBuilder executorBuilder() {
return new ExecutorBuilder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@

package com.codeabovelab.dm.common.utils;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.springframework.util.Assert;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.ConnectException;

/**
* tools for throwables
*/
@Slf4j
public class Throwables {
/**
* Print specified throwable to string. If throwable is null, then return null.
Expand Down Expand Up @@ -96,6 +97,10 @@ public static boolean has(Throwable e, Class<? extends Throwable> type) {
return false;
}

public static Thread.UncaughtExceptionHandler uncaughtHandler() {
return uncaughtHandler(log);
}

public static Thread.UncaughtExceptionHandler uncaughtHandler(Logger log) {
return uncaughtHandler(log, "Uncaught exception.");
}
Expand Down

0 comments on commit d42e12b

Please sign in to comment.