Skip to content

Commit

Permalink
JAMES-2541 Abstract the Id required by the Store API
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Sep 10, 2018
1 parent 9d216b2 commit b853f6e
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 79 deletions.
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,30 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.blob.api;

import java.util.Map;

public interface BlobPartsId {
interface Factory<I extends BlobPartsId> {
I generate(Map<Store.BlobType, BlobId> map);
}

Map<Store.BlobType, BlobId> asMap();
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.james.blob.api; package org.apache.james.blob.api;


import java.io.InputStream; import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream; import java.util.stream.Stream;
Expand All @@ -31,7 +29,12 @@


import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;


public interface Store<T> { public interface Store<T, I> {

CompletableFuture<I> save(T t);

CompletableFuture<T> read(I blobIds);

class BlobType { class BlobType {
private final String name; private final String name;


Expand Down Expand Up @@ -59,38 +62,36 @@ public final int hashCode() {
} }
} }


interface Encoder<T> { class Impl<T, I extends BlobPartsId> implements Store<T, I> {
Stream<Pair<BlobType, InputStream>> encode(T t);
}

interface Decoder<T> {
void validateInput(Collection<BlobType> input);


T decode(Stream<Pair<BlobType, byte[]>> streams); public interface Encoder<T> {
} Stream<Pair<BlobType, InputStream>> encode(T t);

}
CompletableFuture<Map<BlobType, BlobId>> save(T t);


CompletableFuture<T> read(Map<BlobType, BlobId> blobIds); public interface Decoder<T> {
T decode(Stream<Pair<BlobType, byte[]>> streams);
}


class Impl<T> implements Store<T> { private final BlobPartsId.Factory<I> idFactory;
private final Encoder<T> encoder; private final Encoder<T> encoder;
private final Decoder<T> decoder; private final Decoder<T> decoder;
private final BlobStore blobStore; private final BlobStore blobStore;


public Impl(Encoder<T> encoder, Decoder<T> decoder, BlobStore blobStore) { public Impl(BlobPartsId.Factory<I> idFactory, Encoder<T> encoder, Decoder<T> decoder, BlobStore blobStore) {
this.idFactory = idFactory;
this.encoder = encoder; this.encoder = encoder;
this.decoder = decoder; this.decoder = decoder;
this.blobStore = blobStore; this.blobStore = blobStore;
} }


@Override @Override
public CompletableFuture<Map<BlobType, BlobId>> save(T t) { public CompletableFuture<I> save(T t) {
return FluentFutureStream.of( return FluentFutureStream.of(
encoder.encode(t) encoder.encode(t)
.map(this::saveEntry)) .map(this::saveEntry))
.completableFuture() .completableFuture()
.thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue))); .thenApply(pairStream -> pairStream.collect(ImmutableMap.toImmutableMap(Pair::getKey, Pair::getValue)))
.thenApply(idFactory::generate);
} }


private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) { private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Pair<BlobType, InputStream> entry) {
Expand All @@ -99,10 +100,9 @@ private CompletableFuture<Pair<BlobType, BlobId>> saveEntry(Pair<BlobType, Input
} }


@Override @Override
public CompletableFuture<T> read(Map<BlobType, BlobId> blobIds) { public CompletableFuture<T> read(I blobIds) {
decoder.validateInput(blobIds.keySet()); CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = FluentFutureStream.of(blobIds.asMap()

.entrySet()
CompletableFuture<Stream<Pair<BlobType, byte[]>>> binaries = FluentFutureStream.of(blobIds.entrySet()
.stream() .stream()
.map(entry -> blobStore.readBytes(entry.getValue()) .map(entry -> blobStore.readBytes(entry.getValue())
.thenApply(bytes -> Pair.of(entry.getKey(), bytes)))) .thenApply(bytes -> Pair.of(entry.getKey(), bytes))))
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,102 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.blob.mail;

import java.util.Map;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobPartsId;
import org.apache.james.blob.api.Store;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;

public class MimeMessagePartsId implements BlobPartsId {
@FunctionalInterface
public interface RequireHeaderBlobId {
RequireBodyBlobId headerBlobId(BlobId headerBlobId);
}

@FunctionalInterface
public interface RequireBodyBlobId {
Builder bodyBlobId(BlobId bodyBlobId);
}

public static class Builder {
private final BlobId headerBlobId;
private final BlobId bodyBlobId;

private Builder(BlobId headerBlobId, BlobId bodyBlobId) {
Preconditions.checkNotNull(headerBlobId, "'headerBlobId' should not be null");
Preconditions.checkNotNull(bodyBlobId, "'bodyBlobId' should not be null");

this.headerBlobId = headerBlobId;
this.bodyBlobId = bodyBlobId;
}

public MimeMessagePartsId build() {
return new MimeMessagePartsId(headerBlobId, bodyBlobId);
}
}

public static RequireHeaderBlobId builder() {
return headerBlobId -> bodyBlobId -> new Builder(headerBlobId, bodyBlobId);
}

public static class Factory implements BlobPartsId.Factory<MimeMessagePartsId> {
@Override
public MimeMessagePartsId generate(Map<Store.BlobType, BlobId> map) {
Preconditions.checkArgument(map.keySet().contains(HEADER_BLOB_TYPE), "Expecting 'mailHeader' blobId to be specified");
Preconditions.checkArgument(map.keySet().contains(BODY_BLOB_TYPE), "Expecting 'mailBody' blobId to be specified");
Preconditions.checkArgument(map.size() == 2, "blobId other than 'mailHeader' or 'mailBody' are not supported");

return builder()
.headerBlobId(map.get(HEADER_BLOB_TYPE))
.bodyBlobId(map.get(BODY_BLOB_TYPE))
.build();
}
}

static final Store.BlobType HEADER_BLOB_TYPE = new Store.BlobType("mailHeader");
static final Store.BlobType BODY_BLOB_TYPE = new Store.BlobType("mailBody");

private final BlobId headerBlobId;
private final BlobId bodyBlobId;

private MimeMessagePartsId(BlobId headerBlobId, BlobId bodyBlobId) {
this.headerBlobId = headerBlobId;
this.bodyBlobId = bodyBlobId;
}

@Override
public Map<Store.BlobType, BlobId> asMap() {
return ImmutableMap.of(
HEADER_BLOB_TYPE, headerBlobId,
BODY_BLOB_TYPE, bodyBlobId);
}

public BlobId getHeaderBlobId() {
return headerBlobId;
}

public BlobId getBodyBlobId() {
return bodyBlobId;
}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
package org.apache.james.blob.mail; package org.apache.james.blob.mail;


import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
import static org.apache.james.blob.mail.MimeMessagePartsId.BODY_BLOB_TYPE;
import static org.apache.james.blob.mail.MimeMessagePartsId.HEADER_BLOB_TYPE;


import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.SequenceInputStream; import java.io.SequenceInputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.stream.Stream; import java.util.stream.Stream;
Expand All @@ -48,8 +49,6 @@
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;


public class MimeMessageStore { public class MimeMessageStore {
public static final Store.BlobType HEADER_BLOB_TYPE = new Store.BlobType("mailHeader");
public static final Store.BlobType BODY_BLOB_TYPE = new Store.BlobType("mailBody");


public static class Factory { public static class Factory {
private final BlobStore blobStore; private final BlobStore blobStore;
Expand All @@ -59,15 +58,16 @@ public Factory(BlobStore blobStore) {
this.blobStore = blobStore; this.blobStore = blobStore;
} }


public Store<MimeMessage> mimeMessageStore() { public Store<MimeMessage, MimeMessagePartsId> mimeMessageStore() {
return new Store.Impl<>( return new Store.Impl<>(
new MimeMessagePartsId.Factory(),
new MailEncoder(), new MailEncoder(),
new MailDecoder(), new MailDecoder(),
blobStore); blobStore);
} }
} }


static class MailEncoder implements Store.Encoder<MimeMessage> { static class MailEncoder implements Store.Impl.Encoder<MimeMessage> {
@Override @Override
public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) { public Stream<Pair<BlobType, InputStream>> encode(MimeMessage message) {
try { try {
Expand Down Expand Up @@ -124,14 +124,7 @@ private static void consume(InputStream in) throws IOException {
} }
} }


static class MailDecoder implements Store.Decoder<MimeMessage> { static class MailDecoder implements Store.Impl.Decoder<MimeMessage> {
@Override
public void validateInput(Collection<BlobType> input) {
Preconditions.checkArgument(input.contains(HEADER_BLOB_TYPE), "Expecting 'mailHeader' blobId to be specified");
Preconditions.checkArgument(input.contains(BODY_BLOB_TYPE), "Expecting 'mailBody' blobId to be specified");
Preconditions.checkArgument(input.size() == 2, "blobId other than 'mailHeader' or 'mailBody' are not supported");
}

@Override @Override
public MimeMessage decode(Stream<Pair<BlobType, byte[]>> streams) { public MimeMessage decode(Stream<Pair<BlobType, byte[]>> streams) {
Preconditions.checkNotNull(streams); Preconditions.checkNotNull(streams);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThatThrownBy;


import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map;


import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMessage;


Expand All @@ -37,12 +36,11 @@
import org.assertj.core.api.SoftAssertions; import org.assertj.core.api.SoftAssertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;


class MimeMessageStoreTest { class MimeMessageStoreTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();


private Store<MimeMessage> testee; private Store<MimeMessage, MimeMessagePartsId> testee;
private BlobStore blobStore; private BlobStore blobStore;


@BeforeEach @BeforeEach
Expand All @@ -63,29 +61,6 @@ void readShouldThrowWhenNull() {
.isInstanceOf(NullPointerException.class); .isInstanceOf(NullPointerException.class);
} }


@Test
void readShouldThrowWhenMissingHeaderBlobs() {
assertThatThrownBy(() -> testee.read(ImmutableMap.of(
MimeMessageStore.HEADER_BLOB_TYPE, BLOB_ID_FACTORY.randomId())))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void readShouldThrowWhenMissingBodyBlobs() {
assertThatThrownBy(() -> testee.read(ImmutableMap.of(
MimeMessageStore.BODY_BLOB_TYPE, BLOB_ID_FACTORY.randomId())))
.isInstanceOf(IllegalArgumentException.class);
}

@Test
void readShouldThrowWhenExtraBodyBlobs() {
assertThatThrownBy(() -> testee.read(ImmutableMap.of(
MimeMessageStore.BODY_BLOB_TYPE, BLOB_ID_FACTORY.randomId(),
MimeMessageStore.HEADER_BLOB_TYPE, BLOB_ID_FACTORY.randomId(),
new Store.BlobType("Unknown"), BLOB_ID_FACTORY.randomId())))
.isInstanceOf(IllegalArgumentException.class);
}

@Test @Test
void mailStoreShouldPreserveContent() throws Exception { void mailStoreShouldPreserveContent() throws Exception {
MimeMessage message = MimeMessageBuilder.mimeMessageBuilder() MimeMessage message = MimeMessageBuilder.mimeMessageBuilder()
Expand All @@ -95,7 +70,7 @@ void mailStoreShouldPreserveContent() throws Exception {
.setText("Important mail content") .setText("Important mail content")
.build(); .build();


Map<Store.BlobType, BlobId> parts = testee.save(message).join(); MimeMessagePartsId parts = testee.save(message).join();


MimeMessage retrievedMessage = testee.read(parts).join(); MimeMessage retrievedMessage = testee.read(parts).join();


Expand All @@ -111,7 +86,7 @@ void mailStoreShouldPreserveMailWithoutBody() throws Exception {
.setSubject("Important Mail") .setSubject("Important Mail")
.build(); .build();


Map<Store.BlobType, BlobId> parts = testee.save(message).join(); MimeMessagePartsId parts = testee.save(message).join();


MimeMessage retrievedMessage = testee.read(parts).join(); MimeMessage retrievedMessage = testee.read(parts).join();


Expand All @@ -130,14 +105,12 @@ void saveShouldSeparateHeadersAndBodyInDifferentBlobs() throws Exception {
.setText("Important mail content") .setText("Important mail content")
.build(); .build();


Map<Store.BlobType, BlobId> parts = testee.save(message).join(); MimeMessagePartsId parts = testee.save(message).join();


SoftAssertions.assertSoftly( SoftAssertions.assertSoftly(
softly -> { softly -> {
softly.assertThat(parts).containsKeys(MimeMessageStore.HEADER_BLOB_TYPE, MimeMessageStore.BODY_BLOB_TYPE); BlobId headerBlobId = parts.getHeaderBlobId();

BlobId bodyBlobId = parts.getBodyBlobId();
BlobId headerBlobId = parts.get(MimeMessageStore.HEADER_BLOB_TYPE);
BlobId bodyBlobId = parts.get(MimeMessageStore.BODY_BLOB_TYPE);


softly.assertThat(new String(blobStore.readBytes(headerBlobId).join(), StandardCharsets.UTF_8)) softly.assertThat(new String(blobStore.readBytes(headerBlobId).join(), StandardCharsets.UTF_8))
.isEqualTo("Date: Thu, 6 Sep 2018 13:29:13 +0700 (ICT)\r\n" + .isEqualTo("Date: Thu, 6 Sep 2018 13:29:13 +0700 (ICT)\r\n" +
Expand Down
Loading

0 comments on commit b853f6e

Please sign in to comment.