Skip to content

Commit

Permalink
JAMES-2544 Reverse MailQueue usage
Browse files Browse the repository at this point in the history
	Instead of polling from a pool of threads we now return the
	Flux so it can be consumed in a more optimized way.
  • Loading branch information
mbaechler authored and chibenwa committed Mar 14, 2019
1 parent 9fdb43b commit 66ef29e
Show file tree
Hide file tree
Showing 25 changed files with 375 additions and 373 deletions.
Expand Up @@ -171,10 +171,11 @@ private HierarchicalConfiguration getProcessorConfiguration() {
}
}

private void configureJamesSpooler() throws ConfigurationException {
private void configureJamesSpooler() {
jamesMailSpooler.setMailProcessor(camelCompositeProcessor);
jamesMailSpooler.configure(getJamesSpoolerConfiguration());
jamesMailSpooler.init();
jamesMailSpooler.run();
}

private HierarchicalConfiguration getJamesSpoolerConfiguration() {
Expand Down
Expand Up @@ -19,15 +19,13 @@

package org.apache.james.mailetcontainer.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.HierarchicalConfiguration;
import org.apache.james.lifecycle.api.Configurable;
import org.apache.james.lifecycle.api.Disposable;
Expand All @@ -37,20 +35,24 @@
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.metrics.api.TimeMetric;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueue.MailQueueException;
import org.apache.james.queue.api.MailQueue.MailQueueItem;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.util.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/**
* Manages the mail spool. This class is responsible for retrieving messages
* from the spool, directing messages to the appropriate processor, and removing
* them from the spool when processing is complete.
*/
public class JamesMailSpooler implements Runnable, Disposable, Configurable, MailSpoolerMBean {
public class JamesMailSpooler implements Disposable, Configurable, MailSpoolerMBean {
private static final Logger LOGGER = LoggerFactory.getLogger(JamesMailSpooler.class);

public static final String SPOOL_PROCESSING = "spoolProcessing";
Expand All @@ -61,35 +63,18 @@ public class JamesMailSpooler implements Runnable, Disposable, Configurable, Mai
*/
private int numThreads;

/**
* Number of active threads
*/
private final AtomicInteger numActive = new AtomicInteger(0);

private final AtomicInteger processingActive = new AtomicInteger(0);

/**
* Spool threads are active
*/
private final AtomicBoolean active = new AtomicBoolean(false);

private final MetricFactory metricFactory;

/**
* Spool threads
*/
private ExecutorService dequeueService;

private ExecutorService workerService;

/**
* The mail processor
*/
private MailProcessor mailProcessor;

private MailQueueFactory<?> queueFactory;

private int numDequeueThreads;
private reactor.core.Disposable disposable;
private Scheduler spooler;

@Inject
public JamesMailSpooler(MetricFactory metricFactory) {
Expand All @@ -107,9 +92,7 @@ public void setMailProcessor(MailProcessor mailProcessor) {
}

@Override
public void configure(HierarchicalConfiguration config) throws ConfigurationException {
numDequeueThreads = config.getInt("dequeueThreads", 2);

public void configure(HierarchicalConfiguration config) {
numThreads = config.getInt("threads", 100);
}

Expand All @@ -118,81 +101,50 @@ public void configure(HierarchicalConfiguration config) throws ConfigurationExce
*/
@PostConstruct
public void init() {
LOGGER.info("{} init...", getClass().getName());

LOGGER.info("init...");
queue = queueFactory.createQueue(MailQueueFactory.SPOOL);
spooler = Schedulers.fromExecutor(Executors.newFixedThreadPool(numThreads, NamedThreadFactory.withName("spooler")));
LOGGER.info("uses {} Thread(s)", numThreads);
}

LOGGER.info("{} uses {} Thread(s)", getClass().getName(), numThreads);
public void run() {
LOGGER.info("Queue={}", queue);

active.set(true);
workerService = JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool", "spooler", numThreads);
dequeueService = JMXEnabledThreadPoolExecutor.newFixedThreadPool("org.apache.james:type=component,component=mailetcontainer,name=mailspooler,sub-type=threadpool", "dequeuer", numDequeueThreads);
disposable = Flux.from(queue.deQueue())
.publishOn(spooler)
.flatMap(this::handleOnQueueItem)
.onErrorContinue((throwable, item) -> LOGGER.error("Exception processing mail while spooling {}", item, throwable))
.subscribeOn(Schedulers.elastic())
.subscribe();
}

for (int i = 0; i < numDequeueThreads; i++) {
Thread reader = new Thread(this, "Dequeue Thread #" + i);
dequeueService.execute(reader);
private Mono<Void> handleOnQueueItem(MailQueueItem queueItem) {
TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
try {
processingActive.incrementAndGet();
return processMail(queueItem);
} catch (Throwable e) {
return Mono.error(e);
} finally {
processingActive.decrementAndGet();
timeMetric.stopAndPublish();
}
}

/**
* This routinely checks the message spool for messages, and processes them
* as necessary
*/
@Override
public void run() {
LOGGER.info("Run {}: {}", getClass().getName(), Thread.currentThread().getName());
LOGGER.info("Queue={}", queue);

while (active.get()) {

final MailQueueItem queueItem;
try {
queueItem = queue.deQueue();
workerService.execute(() -> {
TimeMetric timeMetric = metricFactory.timer(SPOOL_PROCESSING);
try {
numActive.incrementAndGet();

// increase count
processingActive.incrementAndGet();

Mail mail = queueItem.getMail();
LOGGER.debug("==== Begin processing mail {} ====", mail.getName());

try {
mailProcessor.service(mail);
queueItem.done(true);
} catch (Exception e) {
if (active.get()) {
LOGGER.error("Exception processing mail while spooling", e);
}
queueItem.done(false);

} finally {
LifecycleUtil.dispose(mail);
mail = null;
}
} catch (Throwable e) {
if (active.get()) {
LOGGER.error("Exception processing mail while spooling", e);

}
} finally {
processingActive.decrementAndGet();
numActive.decrementAndGet();
timeMetric.stopAndPublish();
}

});
} catch (MailQueueException e1) {
if (active.get()) {
LOGGER.error("Exception dequeue mail", e1);
}
} catch (InterruptedException interrupted) {
//MailSpooler is stopping
}
private Mono<Void> processMail(MailQueueItem queueItem) throws MailQueue.MailQueueException {
Mail mail = queueItem.getMail();
LOGGER.debug("==== Begin processing mail {} ====", mail.getName());
try {
mailProcessor.service(mail);
queueItem.done(true);
return Mono.empty();
} catch (Exception e) {
queueItem.done(false);
return Mono.error(e);
} finally {
LOGGER.debug("==== End processing mail {} ====", mail.getName());
LifecycleUtil.dispose(mail);
}
LOGGER.info("Stop {} : {}", getClass().getName(), Thread.currentThread().getName());
}

/**
Expand All @@ -206,22 +158,10 @@ public void run() {
@PreDestroy
@Override
public void dispose() {
LOGGER.info("{} dispose...", getClass().getName());
active.set(false); // shutdown the threads
dequeueService.shutdownNow();
workerService.shutdown();

long stop = System.currentTimeMillis() + 60000;
// give the spooler threads one minute to terminate gracefully
while (numActive.get() != 0 && stop > System.currentTimeMillis()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

LOGGER.info("{} thread shutdown completed.", getClass().getName());
LOGGER.info("start dispose() ...");
disposable.dispose();
spooler.dispose();
LOGGER.info("thread shutdown completed.");
}

@Override
Expand Down
Expand Up @@ -22,10 +22,6 @@
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.inject.Inject;
import javax.mail.MessagingException;
Expand All @@ -43,7 +39,6 @@
import org.apache.james.transport.mailets.remote.delivery.DeliveryRunnable;
import org.apache.james.transport.mailets.remote.delivery.RemoteDeliveryConfiguration;
import org.apache.james.transport.mailets.remote.delivery.RemoteDeliverySocketFactory;
import org.apache.james.util.concurrent.NamedThreadFactory;
import org.apache.mailet.Mail;
import org.apache.mailet.base.GenericMailet;
import org.slf4j.Logger;
Expand Down Expand Up @@ -123,6 +118,7 @@
*/
public class RemoteDelivery extends GenericMailet {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteDelivery.class);
private DeliveryRunnable deliveryRunnable;

public enum ThreadState {
START_THREADS,
Expand All @@ -135,12 +131,10 @@ public enum ThreadState {
private final DomainList domainList;
private final MailQueueFactory<?> queueFactory;
private final MetricFactory metricFactory;
private final AtomicBoolean isDestroyed;
private final ThreadState startThreads;

private MailQueue queue;
private RemoteDeliveryConfiguration configuration;
private ExecutorService executor;

@Inject
public RemoteDelivery(DNSService dnsServer, DomainList domainList, MailQueueFactory<?> queueFactory, MetricFactory metricFactory) {
Expand All @@ -152,7 +146,6 @@ public RemoteDelivery(DNSService dnsServer, DomainList domainList, MailQueueFact
this.domainList = domainList;
this.queueFactory = queueFactory;
this.metricFactory = metricFactory;
this.isDestroyed = new AtomicBoolean(false);
this.startThreads = startThreads;
}

Expand All @@ -167,23 +160,14 @@ public void init() throws MessagingException {
} catch (UnknownHostException e) {
LOGGER.error("Invalid bind setting ({}): ", configuration.getBindAddress(), e);
}
deliveryRunnable = new DeliveryRunnable(queue,
configuration,
dnsServer,
metricFactory,
getMailetContext(),
new Bouncer(configuration, getMailetContext()));
if (startThreads == ThreadState.START_THREADS) {
initDeliveryThreads();
}
}

private void initDeliveryThreads() {
ThreadFactory threadFactory = NamedThreadFactory.withClassName(getClass());
executor = Executors.newFixedThreadPool(configuration.getWorkersThreadCount(), threadFactory);
for (int a = 0; a < configuration.getWorkersThreadCount(); a++) {
executor.execute(
new DeliveryRunnable(queue,
configuration,
dnsServer,
metricFactory,
getMailetContext(),
new Bouncer(configuration, getMailetContext()),
isDestroyed));
deliveryRunnable.start();
}
}

Expand Down Expand Up @@ -261,9 +245,7 @@ private Map<Domain, Collection<MailAddress>> groupByServer(Collection<MailAddres
@Override
public synchronized void destroy() {
if (startThreads == ThreadState.START_THREADS) {
isDestroyed.set(true);
executor.shutdownNow();
notifyAll();
deliveryRunnable.dispose();
}
}

Expand Down

0 comments on commit 66ef29e

Please sign in to comment.