Skip to content

Commit

Permalink
JAMES-2922 MailQueueItemView should express next delivery as a ZonedD…
Browse files Browse the repository at this point in the history
…ateTime
  • Loading branch information
mbaechler authored and chibenwa committed Jan 26, 2018
1 parent 866d09e commit cec918f
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 71 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -2369,6 +2369,11 @@
<artifactId>testcontainers</artifactId>
<version>${testcontainers-version}</version>
</dependency>
<dependency>
<groupId>org.threeten</groupId>
<artifactId>threeten-extra</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>pl.pragmatists</groupId>
<artifactId>JUnitParams</artifactId>
Expand Down
Expand Up @@ -34,6 +34,7 @@ public class JsonTransformer implements ResponseTransformer {
public JsonTransformer() {
objectMapper = new ObjectMapper()
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule());
}
Expand Down
Expand Up @@ -19,8 +19,8 @@

package org.apache.james.webadmin.dto;

import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Optional;

Expand All @@ -34,8 +34,6 @@

public class MailQueueItemDTO {

private static final long NO_NEXT_DELIVERY = -1;

public static Builder builder() {
return new Builder();
}
Expand All @@ -45,24 +43,16 @@ public static MailQueueItemDTO from(ManageableMailQueue.MailQueueItemView mailQu
.name(mailQueueItemView.getMail().getName())
.sender(mailQueueItemView.getMail().getSender())
.recipients(mailQueueItemView.getMail().getRecipients())
.nextDelivery(nextDelivery(mailQueueItemView))
.nextDelivery(mailQueueItemView.getNextDelivery())
.build();
}

private static Optional<Date> nextDelivery(ManageableMailQueue.MailQueueItemView mailQueueItemView) {
long nextDelivery = mailQueueItemView.getNextDelivery();
if (nextDelivery == NO_NEXT_DELIVERY) {
return Optional.empty();
}
return Optional.of(new Date(nextDelivery));
}

public static class Builder {

private String name;
private String sender;
private List<String> recipients;
private Optional<Date> nextDelivery;
private Optional<ZonedDateTime> nextDelivery;

private Builder() {
}
Expand All @@ -84,7 +74,7 @@ public Builder recipients(Collection<MailAddress> recipients) {
return this;
}

public Builder nextDelivery(Optional<Date> nextDelivery) {
public Builder nextDelivery(Optional<ZonedDateTime> nextDelivery) {
this.nextDelivery = nextDelivery;
return this;
}
Expand All @@ -98,9 +88,9 @@ public MailQueueItemDTO build() {
private final String name;
private final String sender;
private final List<String> recipients;
private final Optional<Date> nextDelivery;
private final Optional<ZonedDateTime> nextDelivery;

public MailQueueItemDTO(String name, String sender, List<String> recipients, Optional<Date> nextDelivery) {
public MailQueueItemDTO(String name, String sender, List<String> recipients, Optional<ZonedDateTime> nextDelivery) {
this.name = name;
this.sender = sender;
this.recipients = recipients;
Expand All @@ -119,7 +109,7 @@ public List<String> getRecipients() {
return recipients;
}

public Optional<Date> getNextDelivery() {
public Optional<ZonedDateTime> getNextDelivery() {
return nextDelivery;
}
}
Expand Up @@ -21,6 +21,7 @@

import static org.apache.james.webadmin.Constants.SEPARATOR;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -248,7 +249,9 @@ private List<MailQueueItemDTO> listMails(ManageableMailQueue queue, Optional<Boo
}

private boolean filter(MailQueueItemDTO item, Optional<Boolean> isDelayed) {
return isDelayed.map(delayed -> delayed == item.getNextDelivery().isPresent())
boolean mailIsDelayed = item.getNextDelivery().map(date -> date.isAfter(ZonedDateTime.now())).orElse(false);
return isDelayed
.map(delayed -> delayed == mailIsDelayed)
.orElse(true);
}

Expand Down
Expand Up @@ -20,7 +20,7 @@

import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Date;
import java.time.ZonedDateTime;
import java.util.List;

import org.apache.james.core.MailAddress;
Expand Down Expand Up @@ -53,7 +53,8 @@ public void buildShouldThrowWhenNameIsEmpty() {
@Test
public void fromShouldCreateTheRightObject() throws Exception {
FakeMail mail = Mails.defaultMail().build();
MailQueueItemView mailQueueItemView = new MailQueueItemView(mail, 4);
ZonedDateTime date = ZonedDateTime.parse("2018-01-02T11:22:02Z");
MailQueueItemView mailQueueItemView = new MailQueueItemView(mail, date);
MailQueueItemDTO mailQueueItemDTO = MailQueueItemDTO.from(mailQueueItemView);
List<String> expectedRecipients = mail.getRecipients().stream()
.map(MailAddress::asString)
Expand All @@ -62,6 +63,6 @@ public void fromShouldCreateTheRightObject() throws Exception {
softly.assertThat(mailQueueItemDTO.getName()).isEqualTo(mail.getName());
softly.assertThat(mailQueueItemDTO.getSender()).isEqualTo(mail.getSender().asString());
softly.assertThat(mailQueueItemDTO.getRecipients()).isEqualTo(expectedRecipients);
softly.assertThat(mailQueueItemDTO.getNextDelivery()).contains(new Date(4));
softly.assertThat(mailQueueItemDTO.getNextDelivery()).contains(date);
}
}
Expand Up @@ -47,7 +47,6 @@
import org.eclipse.jetty.http.HttpStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

import com.github.steveash.guavate.Guavate;
Expand Down Expand Up @@ -251,7 +250,6 @@ public void listMailsShouldReturnCurrentMailsWhenMailsAndAskForNotDelayed() thro
.body(".", hasSize(1));
}

@Ignore("MemoryMailQueueFactory doesn't support delay")
@Test
public void listMailsShouldReturnDelayedMailsWhenAskFor() throws Exception {
MemoryMailQueue queue = mailQueueFactory.createQueue(FIRST_QUEUE);
Expand Down
Expand Up @@ -18,7 +18,9 @@
****************************************************************/
package org.apache.james.queue.api;

import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.Optional;

import org.apache.mailet.Mail;

Expand Down Expand Up @@ -92,29 +94,26 @@ interface MailQueueIterator extends Iterator<MailQueueItemView> {
class MailQueueItemView {

private final Mail mail;
private final long nextDelivery;
private final Optional<ZonedDateTime> nextDelivery;

public MailQueueItemView(Mail mail, long nextDelivery) {
public MailQueueItemView(Mail mail) {
this(mail, Optional.empty());
}

public MailQueueItemView(Mail mail, ZonedDateTime nextDelivery) {
this(mail, Optional.of(nextDelivery));
}

public MailQueueItemView(Mail mail, Optional<ZonedDateTime> nextDelivery) {
this.mail = mail;
this.nextDelivery = nextDelivery;
}

/**
* Return the Mail
*
* @return mail
*/
public Mail getMail() {
return mail;
}

/**
* Return the timestamp when the mail will be ready for dequeuing or -1
* if there is no restriction set..
*
* @return nextDelivery
*/
public long getNextDelivery() {
public Optional<ZonedDateTime> getNextDelivery() {
return nextDelivery;
}
}
Expand Down
Expand Up @@ -26,9 +26,14 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.security.SecureRandom;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -112,11 +117,11 @@ private void init() throws IOException {

oin = new ObjectInputStream(new FileInputStream(item.getObjectFile()));
Mail mail = (Mail) oin.readObject();
Long next = getNextDelivery(mail);
Optional<ZonedDateTime> next = getNextDelivery(mail);

final String key = mail.getName();
keyMappings.put(key, item);
if (next <= System.currentTimeMillis()) {
if (!next.isPresent() || next.get().isBefore(ZonedDateTime.now())) {

try {
inmemoryQueue.put(key);
Expand All @@ -128,14 +133,15 @@ private void init() throws IOException {

// Schedule a task which will put the mail in the queue
// for processing after a given delay
long nextDeliveryDelay = ZonedDateTime.now().until(next.get(), ChronoUnit.MILLIS);
scheduler.schedule(() -> {
try {
inmemoryQueue.put(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Unable to init", e);
}
}, next - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}, nextDeliveryDelay, TimeUnit.MILLISECONDS);
}
} catch (ClassNotFoundException | IOException e) {
LOGGER.error("Unable to load Mail", e);
Expand All @@ -153,12 +159,12 @@ private void init() throws IOException {
}
}

private Long getNextDelivery(Mail mail) {
private Optional<ZonedDateTime> getNextDelivery(Mail mail) {
Long next = (Long) mail.getAttribute(NEXT_DELIVERY);
if (next == null) {
next = 0L;
return Optional.empty();
}
return next;
return Optional.of(Instant.ofEpochMilli(next).atZone(ZoneId.systemDefault()));
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions server/queue/queue-jms/pom.xml
Expand Up @@ -143,6 +143,10 @@
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.threeten</groupId>
<artifactId>threeten-extra</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Expand Up @@ -21,6 +21,9 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
Expand Down Expand Up @@ -61,6 +64,7 @@
import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.extra.Temporals;

import com.google.common.base.Throwables;

Expand Down Expand Up @@ -209,11 +213,7 @@ public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueExcept
TimeMetric timeMetric = metricFactory.timer("enqueueMailTime:" + queueName);
Session session = null;

long mydelay = 0;

if (delay > 0) {
mydelay = TimeUnit.MILLISECONDS.convert(delay, unit);
}
long nextDeliveryTimestamp = computeNextDeliveryTimestamp(delay, unit);

try {

Expand All @@ -225,7 +225,7 @@ public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueExcept
msgPrio = (Integer) prio;
}

Map<String, Object> props = getJMSProperties(mail, mydelay);
Map<String, Object> props = getJMSProperties(mail, nextDeliveryTimestamp);

produceMail(session, props, msgPrio, mail);

Expand All @@ -240,6 +240,16 @@ public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueExcept
}
}

public long computeNextDeliveryTimestamp(long delay, TimeUnit unit) {
if (delay > 0) {
return ZonedDateTime.now()
.plus(delay, Temporals.chronoUnit(unit))
.toInstant()
.toEpochMilli();
}
return NO_DELAY;
}

@Override
public void enQueue(Mail mail) throws MailQueueException {
enQueue(mail, NO_DELAY, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -282,21 +292,8 @@ protected void produceMail(Session session, Map<String, Object> props, int msgPr
}
}

/**
* Get JMS Message properties with values
*
* @param mail
* @param delayInMillis
* @throws JMSException
* @throws MessagingException
*/
protected Map<String, Object> getJMSProperties(Mail mail, long delayInMillis) throws MessagingException {
protected Map<String, Object> getJMSProperties(Mail mail, long nextDelivery) throws MessagingException {
Map<String, Object> props = new HashMap<>();
long nextDelivery = -1;
if (delayInMillis > 0) {
nextDelivery = System.currentTimeMillis() + delayInMillis;

}
props.put(JAMES_NEXT_DELIVERY, nextDelivery);
props.put(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
props.put(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
Expand Down Expand Up @@ -678,8 +675,7 @@ public MailQueueItemView next() {
while (hasNext()) {
try {
Message m = messages.nextElement();
return new MailQueueItemView(createMail(m),
m.getLongProperty(JAMES_NEXT_DELIVERY));
return new MailQueueItemView(createMail(m), nextDeliveryDate(m));
} catch (MessagingException | JMSException e) {
LOGGER.error("Unable to browse queue", e);
}
Expand All @@ -688,6 +684,11 @@ public MailQueueItemView next() {
throw new NoSuchElementException();
}

private ZonedDateTime nextDeliveryDate(Message m) throws JMSException {
long nextDeliveryTimestamp = m.getLongProperty(JAMES_NEXT_DELIVERY);
return Instant.ofEpochMilli(nextDeliveryTimestamp).atZone(ZoneId.systemDefault());
}

@Override
public boolean hasNext() {
return messages.hasMoreElements();
Expand Down

0 comments on commit cec918f

Please sign in to comment.