Skip to content

Commit

Permalink
JAMES-1894 Rely on MessageID Capability to index MessageId
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Jan 9, 2017
1 parent 0606d2e commit 941a703
Show file tree
Hide file tree
Showing 25 changed files with 274 additions and 68 deletions.
Expand Up @@ -99,6 +99,7 @@ public List<MessageId> search(MailboxSession session, MultimailboxesSearchQuery
Preconditions.checkArgument(session != null, "'session' is mandatory"); Preconditions.checkArgument(session != null, "'session' is mandatory");
return searcher.search(ImmutableList.of(session.getUser()), searchQuery, Optional.of(limit)) return searcher.search(ImmutableList.of(session.getUser()), searchQuery, Optional.of(limit))
.map(SearchResult::getMessageId) .map(SearchResult::getMessageId)
.map(com.google.common.base.Optional::get)
.collect(Guavate.toImmutableList()); .collect(Guavate.toImmutableList());
} }


Expand Down
Expand Up @@ -29,11 +29,9 @@
import java.util.stream.Stream; import java.util.stream.Stream;


import org.apache.james.mailbox.MailboxSession.User; import org.apache.james.mailbox.MailboxSession.User;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.elasticsearch.IndexAttachments; import org.apache.james.mailbox.elasticsearch.IndexAttachments;
import org.apache.james.mailbox.elasticsearch.query.DateResolutionFormater; import org.apache.james.mailbox.elasticsearch.query.DateResolutionFormater;
import org.apache.james.mailbox.extractor.TextExtractor; import org.apache.james.mailbox.extractor.TextExtractor;
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.Property; import org.apache.james.mailbox.store.mail.model.Property;
import org.apache.james.mime4j.MimeException; import org.apache.james.mime4j.MimeException;
Expand All @@ -48,8 +46,8 @@


public class IndexableMessage { public class IndexableMessage {


public static IndexableMessage from(MailboxMessage message, List<User> users, TextExtractor textExtractor, public static IndexableMessage from(MailboxMessage message, List<User> users, TextExtractor textExtractor,
ZoneId zoneId, IndexAttachments indexAttachments) { ZoneId zoneId, IndexAttachments indexAttachments) {


Preconditions.checkNotNull(message.getMailboxId()); Preconditions.checkNotNull(message.getMailboxId());
Preconditions.checkArgument(!users.isEmpty()); Preconditions.checkArgument(!users.isEmpty());
Expand Down Expand Up @@ -90,8 +88,7 @@ private void copyHeaderFields(HeaderCollection headerCollection, ZonedDateTime i
} }


private void copyMessageFields(MailboxMessage message, ZoneId zoneId) { private void copyMessageFields(MailboxMessage message, ZoneId zoneId) {
this.messageId = message.getMessageId(); this.uid = message.getUid().asLong();
this.uid = message.getUid();
this.mailboxId = message.getMailboxId().serialize(); this.mailboxId = message.getMailboxId().serialize();
this.modSeq = message.getModSeq(); this.modSeq = message.getModSeq();
this.size = message.getFullContentOctets(); this.size = message.getFullContentOctets();
Expand Down Expand Up @@ -129,8 +126,7 @@ private void generateText() {
.collect(Collectors.joining(" ")); .collect(Collectors.joining(" "));
} }


private MessageId messageId; private long uid;
private MessageUid uid;
private String mailboxId; private String mailboxId;
private List<String> users; private List<String> users;
private long modSeq; private long modSeq;
Expand Down Expand Up @@ -159,14 +155,47 @@ private void generateText() {
private Optional<String> bodyHtml; private Optional<String> bodyHtml;
private String text; private String text;


@JsonProperty(JsonMessageConstants.MESSAGE_ID) public IndexableMessage(long uid, String mailboxId, List<String> users, long modSeq, long size, String date, String mediaType,
public String getId() { String subType, boolean isUnRead, boolean isRecent, boolean isFlagged, boolean isDeleted, boolean isDraft,
return messageId.serialize(); boolean isAnswered, String[] userFlags, Multimap<String, String> headers, EMailers from, EMailers to,
EMailers cc, EMailers bcc, EMailers replyTo, Subjects subjects, String sentDate, List<Property> properties,
List<MimePart> attachments, Optional<String> bodyText, Optional<String> bodyHtml, String text) {
this.uid = uid;
this.mailboxId = mailboxId;
this.users = users;
this.modSeq = modSeq;
this.size = size;
this.date = date;
this.mediaType = mediaType;
this.subType = subType;
this.isUnRead = isUnRead;
this.isRecent = isRecent;
this.isFlagged = isFlagged;
this.isDeleted = isDeleted;
this.isDraft = isDraft;
this.isAnswered = isAnswered;
this.userFlags = userFlags;
this.headers = headers;
this.from = from;
this.to = to;
this.cc = cc;
this.bcc = bcc;
this.replyTo = replyTo;
this.subjects = subjects;
this.sentDate = sentDate;
this.properties = properties;
this.attachments = attachments;
this.bodyText = bodyText;
this.bodyHtml = bodyHtml;
this.text = text;
}

public IndexableMessage() {
} }


@JsonProperty(JsonMessageConstants.UID) @JsonProperty(JsonMessageConstants.UID)
public Long getUid() { public Long getUid() {
return uid.asLong(); return uid;
} }


@JsonProperty(JsonMessageConstants.MAILBOX_ID) @JsonProperty(JsonMessageConstants.MAILBOX_ID)
Expand Down
@@ -0,0 +1,72 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.elasticsearch.json;

import java.time.ZoneId;
import java.util.List;
import java.util.Optional;

import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.elasticsearch.IndexAttachments;
import org.apache.james.mailbox.extractor.TextExtractor;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.Property;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Multimap;

public class IndexableMessageWithMessageId extends IndexableMessage {

public static IndexableMessage from(MailboxMessage message, List<MailboxSession.User> users, TextExtractor textExtractor,
ZoneId zoneId, IndexAttachments indexAttachments) {
IndexableMessage indexableMessage = IndexableMessage.from(message, users, textExtractor, zoneId, indexAttachments);
return new IndexableMessageWithMessageId(indexableMessage.getUid(), indexableMessage.getMailboxId(), indexableMessage.getUsers(),
indexableMessage.getModSeq(), indexableMessage.getSize(), indexableMessage.getDate(), indexableMessage.getMediaType(),
indexableMessage.getSubType(), indexableMessage.isUnRead(), indexableMessage.isRecent(), indexableMessage.isFlagged(),
indexableMessage.isDeleted(), indexableMessage.isDraft(), indexableMessage.isAnswered(), indexableMessage.getUserFlags(),
indexableMessage.getHeaders(), indexableMessage.getFrom(), indexableMessage.getTo(), indexableMessage.getCc(), indexableMessage.getBcc(),
indexableMessage.getReplyTo(), indexableMessage.getSubjects(), indexableMessage.getSentDate(), indexableMessage.getProperties(),
indexableMessage.getAttachments(), indexableMessage.getBodyText(), indexableMessage.getBodyHtml(), indexableMessage.getText(),
message.getMessageId().serialize());
}

private String messageId;

public IndexableMessageWithMessageId(long uid, String mailboxId, List<String> users, long modSeq, long size, String date,
String mediaType, String subType, boolean isUnRead, boolean isRecent, boolean isFlagged,
boolean isDeleted, boolean isDraft, boolean isAnswered, String[] userFlags, Multimap<String, String> headers,
EMailers from, EMailers to, EMailers cc, EMailers bcc, EMailers replyTo, Subjects subjects,
String sentDate, List<Property> properties, List<MimePart> attachments, Optional<String> bodyText,
Optional<String> bodyHtml, String text, String messageId) {
super(uid, mailboxId, users, modSeq, size, date, mediaType, subType, isUnRead, isRecent, isFlagged, isDeleted,
isDraft, isAnswered, userFlags, headers, from, to, cc, bcc, replyTo, subjects, sentDate, properties, attachments,
bodyText, bodyHtml, text);
this.messageId = messageId;
}

public IndexableMessageWithMessageId(String messageId) {
this.messageId = messageId;
}

@JsonProperty(JsonMessageConstants.MESSAGE_ID)
public String getMessageId() {
return messageId;
}
}
Expand Up @@ -25,10 +25,13 @@
import javax.inject.Inject; import javax.inject.Inject;
import javax.mail.Flags; import javax.mail.Flags;


import org.apache.commons.lang.NotImplementedException;
import org.apache.james.mailbox.MailboxManager;
import org.apache.james.mailbox.MailboxSession.User; import org.apache.james.mailbox.MailboxSession.User;
import org.apache.james.mailbox.elasticsearch.IndexAttachments; import org.apache.james.mailbox.elasticsearch.IndexAttachments;
import org.apache.james.mailbox.extractor.TextExtractor; import org.apache.james.mailbox.extractor.TextExtractor;
import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.search.MessageSearchIndex;


import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -42,24 +45,40 @@ public class MessageToElasticSearchJson {
private final TextExtractor textExtractor; private final TextExtractor textExtractor;
private final ZoneId zoneId; private final ZoneId zoneId;
private final IndexAttachments indexAttachments; private final IndexAttachments indexAttachments;
private final MessageSearchIndex.IndexMessageId indexMessageId;


public MessageToElasticSearchJson(TextExtractor textExtractor, ZoneId zoneId, IndexAttachments indexAttachments) { public MessageToElasticSearchJson(TextExtractor textExtractor, ZoneId zoneId, IndexAttachments indexAttachments, MessageSearchIndex.IndexMessageId indexMessageId) {
this.textExtractor = textExtractor; this.textExtractor = textExtractor;
this.zoneId = zoneId; this.zoneId = zoneId;
this.indexAttachments = indexAttachments; this.indexAttachments = indexAttachments;
this.mapper = new ObjectMapper(); this.mapper = new ObjectMapper();
this.mapper.registerModule(new GuavaModule()); this.mapper.registerModule(new GuavaModule());
this.mapper.registerModule(new Jdk8Module()); this.mapper.registerModule(new Jdk8Module());
this.indexMessageId = indexMessageId;
} }


@Inject @Inject
public MessageToElasticSearchJson(TextExtractor textExtractor, IndexAttachments indexAttachments) { public MessageToElasticSearchJson(TextExtractor textExtractor, IndexAttachments indexAttachments, MailboxManager mailboxManager) {
this(textExtractor, ZoneId.systemDefault(), indexAttachments); this(textExtractor, ZoneId.systemDefault(), indexAttachments, indexMessageId(mailboxManager));
}

private static MessageSearchIndex.IndexMessageId indexMessageId(MailboxManager mailboxManager) {
if (mailboxManager.getSupportedMessageCapabilities().contains(MailboxManager.MessageCapabilities.UniqueID)) {
return MessageSearchIndex.IndexMessageId.Required;
}
return MessageSearchIndex.IndexMessageId.Optional;
} }


public String convertToJson(MailboxMessage message, List<User> users) throws JsonProcessingException { public String convertToJson(MailboxMessage message, List<User> users) throws JsonProcessingException {
Preconditions.checkNotNull(message); Preconditions.checkNotNull(message);
return mapper.writeValueAsString(IndexableMessage.from(message, users, textExtractor, zoneId, indexAttachments)); switch (indexMessageId) {
case Required:
return mapper.writeValueAsString(IndexableMessageWithMessageId.from(message, users, textExtractor, zoneId, indexAttachments));
case Optional:
return mapper.writeValueAsString(IndexableMessage.from(message, users, textExtractor, zoneId, indexAttachments));
default:
throw new NotImplementedException();
}
} }


public String getUpdatedJsonMessagePart(Flags flags, long modSeq) throws JsonProcessingException { public String getUpdatedJsonMessagePart(Flags flags, long modSeq) throws JsonProcessingException {
Expand Down
Expand Up @@ -34,11 +34,9 @@
import org.apache.james.mailbox.elasticsearch.query.SortConverter; import org.apache.james.mailbox.elasticsearch.query.SortConverter;
import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxId.Factory;
import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.MultimailboxesSearchQuery; import org.apache.james.mailbox.model.MultimailboxesSearchQuery;
import org.apache.james.mailbox.store.search.MessageSearchIndex; import org.apache.james.mailbox.store.search.MessageSearchIndex;
import org.apache.james.mailbox.store.search.SimpleMessageSearchIndex;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
Expand All @@ -57,15 +55,15 @@ public class ElasticSearchSearcher {
private final Client client; private final Client client;
private final QueryConverter queryConverter; private final QueryConverter queryConverter;
private final int size; private final int size;
private final Factory mailboxIdFactory; private final MailboxId.Factory mailboxIdFactory;
private final MessageId.Factory messageIdFactory; private final MessageId.Factory messageIdFactory;


@Inject @Inject
public ElasticSearchSearcher(Client client, QueryConverter queryConverter, MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory) { public ElasticSearchSearcher(Client client, QueryConverter queryConverter, MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory) {
this(client, queryConverter, DEFAULT_SIZE, mailboxIdFactory, messageIdFactory); this(client, queryConverter, DEFAULT_SIZE, mailboxIdFactory, messageIdFactory);
} }


public ElasticSearchSearcher(Client client, QueryConverter queryConverter, int size, Factory mailboxIdFactory, MessageId.Factory messageIdFactory) { public ElasticSearchSearcher(Client client, QueryConverter queryConverter, int size, MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory) {
this.client = client; this.client = client;
this.queryConverter = queryConverter; this.queryConverter = queryConverter;
this.size = size; this.size = size;
Expand Down Expand Up @@ -109,11 +107,11 @@ private Stream<MessageSearchIndex.SearchResult> transformResponseToUidStream(Sea
private Optional<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) { private Optional<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) {
SearchHitField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID); SearchHitField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID);
SearchHitField uid = hit.field(JsonMessageConstants.UID); SearchHitField uid = hit.field(JsonMessageConstants.UID);
SearchHitField id = hit.field(JsonMessageConstants.ID); Optional<SearchHitField> id = retrieveMessageIdField(hit);
if (mailboxId != null && uid != null && id != null) { if (mailboxId != null && uid != null) {
Number uidAsNumber = uid.getValue(); Number uidAsNumber = uid.getValue();
return Optional.of( return Optional.of(
new MessageSearchIndex.SearchResult(messageIdFactory.fromString(id.getValue()), new MessageSearchIndex.SearchResult(toGuava(id.map(field -> messageIdFactory.fromString(field.getValue()))),
mailboxIdFactory.fromString(mailboxId.getValue()), mailboxIdFactory.fromString(mailboxId.getValue()),
MessageUid.of(uidAsNumber.longValue()))); MessageUid.of(uidAsNumber.longValue())));
} else { } else {
Expand All @@ -122,4 +120,16 @@ private Optional<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHi
} }
} }


private Optional<SearchHitField> retrieveMessageIdField(SearchHit hit) {
if (hit.fields().keySet().contains(JsonMessageConstants.MESSAGE_ID)) {
return Optional.ofNullable(hit.field(JsonMessageConstants.MESSAGE_ID));
} else {
return Optional.empty();
}
}

private <T> com.google.common.base.Optional<T> toGuava(Optional<T> optional) {
return com.google.common.base.Optional.fromNullable(optional.orElse(null));
}

} }
Expand Up @@ -40,6 +40,7 @@
import org.apache.james.mailbox.store.extractor.DefaultTextExtractor; import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.mailbox.store.mail.model.impl.MessageParser; import org.apache.james.mailbox.store.mail.model.impl.MessageParser;
import org.apache.james.mailbox.store.search.AbstractMessageSearchIndexTest; import org.apache.james.mailbox.store.search.AbstractMessageSearchIndexTest;
import org.apache.james.mailbox.store.search.MessageSearchIndex;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.junit.Rule; import org.junit.Rule;
import org.junit.rules.RuleChain; import org.junit.rules.RuleChain;
Expand Down Expand Up @@ -71,7 +72,7 @@ protected void initializeMailboxManager() throws Exception {
messageSearchIndex = new ElasticSearchListeningMessageSearchIndex(mapperFactory, messageSearchIndex = new ElasticSearchListeningMessageSearchIndex(mapperFactory,
new ElasticSearchIndexer(client, new DeleteByQueryPerformer(client, Executors.newSingleThreadExecutor(), BATCH_SIZE)), new ElasticSearchIndexer(client, new DeleteByQueryPerformer(client, Executors.newSingleThreadExecutor(), BATCH_SIZE)),
new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE, new InMemoryId.Factory(), messageIdFactory), new ElasticSearchSearcher(client, new QueryConverter(new CriterionConverter()), SEARCH_SIZE, new InMemoryId.Factory(), messageIdFactory),
new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES)); new MessageToElasticSearchJson(new DefaultTextExtractor(), ZoneId.of("Europe/Paris"), IndexAttachments.YES, MessageSearchIndex.IndexMessageId.Required));
storeMailboxManager = new InMemoryMailboxManager( storeMailboxManager = new InMemoryMailboxManager(
mapperFactory, mapperFactory,
new FakeAuthenticator(), new FakeAuthenticator(),
Expand Down

0 comments on commit 941a703

Please sign in to comment.