diff --git a/pom.xml b/pom.xml index d483e993..67526840 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.yahoo.bullet bullet-core - 0.1.3-SNAPSHOT + 0.2.0-SNAPSHOT jar bullet-core diff --git a/src/main/java/com/yahoo/bullet/pubsub/Metadata.java b/src/main/java/com/yahoo/bullet/pubsub/Metadata.java new file mode 100644 index 00000000..6baf3cbd --- /dev/null +++ b/src/main/java/com/yahoo/bullet/pubsub/Metadata.java @@ -0,0 +1,36 @@ +package com.yahoo.bullet.pubsub; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.io.Serializable; + +@Getter @NoArgsConstructor @Setter @AllArgsConstructor +public class Metadata implements Serializable { + public enum Signal { + ACKNOWLEDGE, + COMPLETE + } + private Signal signal; + private Serializable content; + + /** + * Check if Metadata has content. + * + * @return true if Metadata has content. + */ + public boolean hasContent() { + return content != null; + } + + /** + * Check if Metadata has signal. + * + * @return true if message has {@link Metadata#signal} + */ + public boolean hasSignal() { + return signal != null; + } +} diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSub.java b/src/main/java/com/yahoo/bullet/pubsub/PubSub.java new file mode 100644 index 00000000..eff632d0 --- /dev/null +++ b/src/main/java/com/yahoo/bullet/pubsub/PubSub.java @@ -0,0 +1,89 @@ +package com.yahoo.bullet.pubsub; + +import java.io.Serializable; +import java.lang.reflect.Constructor; +import java.util.List; + +/** + * Notation: Partition is a unit of parallelism in the Pub/Sub queue. + * + * Implementations of PubSub should take in a {@link PubSubConfig} and use the information to wire up and return + * Publishers and Subscribers. + */ +public abstract class PubSub implements Serializable { + /** + * The context determines how the {@link Publisher} and {@link Subscriber} returned by PubSub behave. For example, + * If the Context is {@link Context#QUERY_SUBMISSION}: + * 1. Publishers write to request queue. + * 2. Subscribers read from the response queue. + * If the Context is {@link Context#QUERY_PROCESSING}: + * 1. Publishers write to response queue. + * 2. Subscribers read from the request queue. + */ + public enum Context { + QUERY_SUBMISSION, + QUERY_PROCESSING + } + + protected Context context; + + /** + * Instantiate a PubSub using parameters from {@link PubSubConfig}. + * + * @param config the {@link PubSubConfig} containing all required PubSub parameters. + */ + public PubSub(PubSubConfig config) { + context = Context.valueOf(config.get(PubSubConfig.CONTEXT_NAME).toString()); + } + + /** + * Get a {@link Publisher} instance wired to write to all allocated partitions in the appropriate queue (See + * {@link PubSub#context}). + * + * @return {@link Publisher} wired as required. + */ + public abstract Publisher getPublisher(); + + /** + * Get a list of n {@link Publisher} instances with the allocated partitions in the appropriate queue + * (See {@link PubSub#context}) split as evenly as possible among them. + * + * @param n The number of Publishers requested. + * @return List of n Publishers wired as required. + */ + public abstract List getPublishers(int n); + + /** + * Get a {@link Subscriber} instance wired to read from all allocated partitions in the appropriate queue (See + * {@link PubSub#context}). + * + * @return {@link Subscriber} wired as required. + */ + public abstract Subscriber getSubscriber(); + + /** + * Get a list of n {@link Subscriber} instances with allocated partitions from the appropriate queue + * (See {@link PubSub#context}) split as evenly as possible among them. + * + * @param n The number of Subscribers requested. + * @return List of n Subscribers wired as required. + */ + public abstract List getSubscribers(int n); + + /** + * Create a PubSub instance using the class specified in the config file. + * @param config {@link PubSubConfig} containing the class name and PubSub settings. + * @return an instance of specified class initialized with settings from the input file and defaults. + * @throws PubSubException if PubSub creation fails. + */ + public static PubSub from(PubSubConfig config) throws PubSubException { + try { + String pubSubClassName = (String) config.get(PubSubConfig.PUBSUB_CLASS_NAME); + Class pubSubClass = (Class) Class.forName(pubSubClassName); + Constructor constructor = pubSubClass.getConstructor(PubSubConfig.class); + return constructor.newInstance(config); + } catch (Exception e) { + throw new PubSubException("Cannot create PubSub instance. Error: " + e.toString()); + } + } +} diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java b/src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java new file mode 100644 index 00000000..a167d378 --- /dev/null +++ b/src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java @@ -0,0 +1,21 @@ +package com.yahoo.bullet.pubsub; + +import com.yahoo.bullet.BulletConfig; + +import java.io.IOException; + +public class PubSubConfig extends BulletConfig { + public static final String CONTEXT_NAME = "bullet.pubsub.context.name"; + public static final String PUBSUB_CLASS_NAME = "bullet.pubsub.class.name"; + + /** + * Constructor that loads configuration parameters from a specific file augmented by defaults. + * + * @param file YAML file to load. + * @throws IOException if an error occurs when loading the file. + */ + public PubSubConfig(String file) throws IOException { + // Load settings and merge with bullet defaults. + super(file); + } +} diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSubException.java b/src/main/java/com/yahoo/bullet/pubsub/PubSubException.java new file mode 100644 index 00000000..659f3405 --- /dev/null +++ b/src/main/java/com/yahoo/bullet/pubsub/PubSubException.java @@ -0,0 +1,12 @@ +package com.yahoo.bullet.pubsub; + +public class PubSubException extends Exception { + /** + * Exception to be thrown if there is an error in {@link PubSub}, {@link Publisher} or {@link Subscriber}. + * + * @param message error message to be associated with the PubSubException. + */ + public PubSubException(String message) { + super(message); + } +} diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java b/src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java new file mode 100644 index 00000000..b8a54296 --- /dev/null +++ b/src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java @@ -0,0 +1,86 @@ +package com.yahoo.bullet.pubsub; + +import java.io.Serializable; +import com.yahoo.bullet.pubsub.Metadata.Signal; +import lombok.Getter; + +/** + * The class of messages that can be understood by the PubSub package. The id should be set to the query ID generated + * by the web service for Bullet queries. The sequence identifies individual segments if a multi-part response is + * emitted by Bullet. + */ +@Getter +public class PubSubMessage implements Serializable { + private String id; + private int sequence; + private String content; + private Metadata metadata; + + /** + * Constructor for a message having only content. + * + * @param id is the ID associated with the message. + * @param content is the content of the message. + */ + public PubSubMessage(String id, String content) { + this(id, content, -1, new Metadata()); + } + + /** + * Constructor for a message having content and a sequence number. + * + * @param id is the ID associated with the message. + * @param content is the content of the message. + * @param sequence is the sequence number of the message. + */ + public PubSubMessage(String id, String content, int sequence) { + this(id, content, sequence, new Metadata()); + } + + /** + * Constructor for a PubSubMessage having content and Metadata. + * + * @param id is the ID associated with the message. + * @param content is the content of the message. + * @param metadata is the {@link Metadata} associated with the message. + */ + public PubSubMessage(String id, String content, Metadata metadata) { + this(id, content, -1, metadata); + } + + /** + * Constructor for a message having content, a {@link Metadata.Signal} and a sequence number. + * + * @param id is the ID associated with the message. + * @param content is the content of the message. + * @param sequence is the sequence number of the message. + * @param signal is the Metadata.Signal of the message. + */ + public PubSubMessage(String id, String content, int sequence, Signal signal) { + this(id, content, sequence, new Metadata(signal, null)); + } + + /** + * Constructor for a PubSubMessage having content, Metadata and a sequence number. + * + * @param id is the ID associated with the message. + * @param content is the content of the message. + * @param sequence is the sequence number associated with the message. + * @param metadata is the {@link Metadata} associated with the message. + */ + public PubSubMessage(String id, String content, int sequence, Metadata metadata) { + this.id = id; + this.content = content; + this.sequence = sequence; + this.metadata = metadata; + } + + /** + * Check if message has content. + * + * @return true if message has content. + */ + public boolean hasContent() { + return content != null; + } +} diff --git a/src/main/java/com/yahoo/bullet/pubsub/Publisher.java b/src/main/java/com/yahoo/bullet/pubsub/Publisher.java new file mode 100644 index 00000000..db7cee9c --- /dev/null +++ b/src/main/java/com/yahoo/bullet/pubsub/Publisher.java @@ -0,0 +1,16 @@ +package com.yahoo.bullet.pubsub; + +public interface Publisher { + /** + * Sends a {@link PubSubMessage}. Messages with the same ID should be received in order. + * + * @param message the {@link PubSubMessage} to be sent. + * @throws PubSubException if the messaging system throws an error. + */ + void send(PubSubMessage message) throws PubSubException; + + /** + * Close Publisher and delete all related context. + */ + void close(); +} diff --git a/src/main/java/com/yahoo/bullet/pubsub/Subscriber.java b/src/main/java/com/yahoo/bullet/pubsub/Subscriber.java new file mode 100644 index 00000000..54aa5d47 --- /dev/null +++ b/src/main/java/com/yahoo/bullet/pubsub/Subscriber.java @@ -0,0 +1,55 @@ +package com.yahoo.bullet.pubsub; + +public interface Subscriber { + /** + * Gets a new {@link PubSubMessage} from the assigned partition/partitions (Here a partition is a unit of + * parallelism in the Pub/Sub queue, See {@link PubSub}). + * + * @return the received {@link PubSubMessage}. + * @throws PubSubException when a receive fails. + */ + PubSubMessage receive() throws PubSubException; + + /** + * Close the Subscriber and delete all associated Context. + */ + void close(); + + /** + * Commits allow clients to implement at least once, at most once or exactly once semantics when processing messages. + * + * Common implementations might + * 1. Ack all received messages. + * 2. Commit current read offset to persistent/fault tolerant storage. + * + * @param id the ID of the message to be marked as committed. + * @param sequence the sequence number of the message to be committed. + */ + void commit(String id, int sequence); + + /** + * Convenience method to commit a message that doesn't contain a sequence number. + * + * @param id is the ID of the message to be marked as committed. + */ + default void commit(String id) { + commit(id, -1); + } + + /** + * Marks the processing of the {@link PubSubMessage} with the given id as failed. + * + * @param id the ID of the PubSubMessage to mark as a processing failure. + */ + void fail(String id, int sequence); + + /** + * Convenience method to fail a message that doesn't contain a sequence number. + * + * @param id is the ID of the message to be marked as a processing failure. + */ + default void fail(String id) { + fail(id, -1); + } + +} diff --git a/src/test/java/com/yahoo/bullet/pubsub/MetadataTest.java b/src/test/java/com/yahoo/bullet/pubsub/MetadataTest.java new file mode 100644 index 00000000..98de0774 --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/MetadataTest.java @@ -0,0 +1,36 @@ +package com.yahoo.bullet.pubsub; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class MetadataTest { + @Test + public void testHasSignal() { + Metadata full = new Metadata(Metadata.Signal.ACKNOWLEDGE, 5); + Metadata empty = new Metadata(); + Assert.assertTrue(full.hasSignal()); + Assert.assertFalse(empty.hasSignal()); + } + + @Test + public void testHasContent() { + Metadata full = new Metadata(Metadata.Signal.ACKNOWLEDGE, 5); + Metadata empty = new Metadata(); + Assert.assertTrue(full.hasContent()); + Assert.assertFalse(empty.hasContent()); + } + + @Test + public void testSetContentWhenEmpty() { + Metadata empty = new Metadata(); + empty.setContent(5); + Assert.assertEquals(empty.getContent(), 5); + } + + @Test + public void testSetSignalWhenEmpty() { + Metadata empty = new Metadata(); + empty.setSignal(Metadata.Signal.ACKNOWLEDGE); + Assert.assertEquals(empty.getSignal(), Metadata.Signal.ACKNOWLEDGE); + } +} diff --git a/src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java b/src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java new file mode 100644 index 00000000..21f67712 --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java @@ -0,0 +1,43 @@ +package com.yahoo.bullet.pubsub; + +import org.mockito.Mockito; + +import java.util.List; + +import static org.mockito.Mockito.doReturn; + +class MockPubSub extends PubSub { + public static final String MOCK_MESSAGE_NAME = "MOCK_MESSAGE"; + private String mockMessage; + + public MockPubSub(PubSubConfig config) { + super(config); + mockMessage = config.get(MOCK_MESSAGE_NAME).toString(); + } + + @Override + public Subscriber getSubscriber() { + Subscriber mockSubscriber = Mockito.mock(Subscriber.class); + try { + doReturn(new PubSubMessage("", mockMessage)).when(mockSubscriber).receive(); + } catch (Exception e) { + mockSubscriber = null; + } + return mockSubscriber; + } + + @Override + public Publisher getPublisher() { + throw new UnsupportedOperationException(); + } + + @Override + public List getSubscribers(int n) { + throw new UnsupportedOperationException(); + } + + @Override + public List getPublishers(int n) { + throw new UnsupportedOperationException(); + } +} diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubConfigTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubConfigTest.java new file mode 100644 index 00000000..283f6cbf --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/PubSubConfigTest.java @@ -0,0 +1,23 @@ +package com.yahoo.bullet.pubsub; + +import com.yahoo.bullet.BulletConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; + +public class PubSubConfigTest { + public static final String CONTEXT_PROCESSING = PubSub.Context.QUERY_PROCESSING.toString(); + public static final String RECORD_INJECT_TIMESTAMP_KEY_VALUE = "bullet_project_timestamp"; + + @Test + public void testCustomFileCreate() throws IOException { + PubSubConfig config = new PubSubConfig("src/test/resources/test_config.yaml"); + //Test custom properties + Assert.assertEquals((long) config.get(BulletConfig.AGGREGATION_MAX_SIZE), 100); + Assert.assertEquals((long) config.get(BulletConfig.SPECIFICATION_MAX_DURATION), 10000); + Assert.assertTrue(config.get(PubSubConfig.CONTEXT_NAME).toString().equals(CONTEXT_PROCESSING)); + //Test default bullet properties + Assert.assertTrue(config.get(BulletConfig.RECORD_INJECT_TIMESTAMP_KEY).toString().equals(RECORD_INJECT_TIMESTAMP_KEY_VALUE)); + } +} diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubExceptionTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubExceptionTest.java new file mode 100644 index 00000000..a2ce4166 --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/PubSubExceptionTest.java @@ -0,0 +1,15 @@ +package com.yahoo.bullet.pubsub; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.UUID; + +public class PubSubExceptionTest { + @Test + public void testGetMessage() { + String randomMessage = UUID.randomUUID().toString(); + PubSubException ex = new PubSubException(randomMessage); + Assert.assertTrue(ex.getMessage().equals(randomMessage)); + } +} diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java new file mode 100644 index 00000000..7f776498 --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java @@ -0,0 +1,102 @@ +package com.yahoo.bullet.pubsub; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.UUID; +import com.yahoo.bullet.pubsub.Metadata.Signal; + +public class PubSubMessageTest { + private String getRandomString() { + return UUID.randomUUID().toString(); + } + + @Test + public void testNoMetadataCreation() { + String messageId = getRandomString(); + String messageContent = getRandomString(); + + PubSubMessage message = new PubSubMessage(messageId, messageContent); + Assert.assertTrue(messageId.equals(message.getId())); + Assert.assertEquals(message.getSequence(), -1); + Assert.assertTrue(messageContent.equals(message.getContent())); + Assert.assertNotNull(message.getMetadata()); + Assert.assertNull(message.getMetadata().getSignal()); + Assert.assertNull(message.getMetadata().getContent()); + } + + @Test + public void testWithSequenceCreation() { + String messageId = getRandomString(); + String messageContent = getRandomString(); + + PubSubMessage message = new PubSubMessage(messageId, messageContent, 0); + Assert.assertTrue(messageId.equals(message.getId())); + Assert.assertTrue(messageContent.equals(message.getContent())); + Assert.assertEquals(message.getSequence(), 0); + Assert.assertNotNull(message.getMetadata()); + Assert.assertNull(message.getMetadata().getSignal()); + Assert.assertNull(message.getMetadata().getContent()); + } + + @Test + public void testWithSignalCreation() { + String messageId = getRandomString(); + String messageContent = getRandomString(); + Signal signal = Signal.ACKNOWLEDGE; + + PubSubMessage message = new PubSubMessage(messageId, messageContent, 0, signal); + Assert.assertTrue(messageId.equals(message.getId())); + Assert.assertTrue(messageContent.equals(message.getContent())); + Assert.assertEquals(message.getSequence(), 0); + Assert.assertNotNull(message.getMetadata()); + Assert.assertEquals(message.getMetadata().getSignal(), signal); + Assert.assertNull(message.getMetadata().getContent()); + Assert.assertNull(message.getMetadata().getContent()); + } + + @Test + public void testWithMetadataCreation() { + String messageId = getRandomString(); + String messageContent = getRandomString(); + String metadataContent = getRandomString(); + Signal signal = Signal.ACKNOWLEDGE; + //Test creation without a sequence number. + PubSubMessage message = new PubSubMessage(messageId, messageContent, new Metadata(signal, metadataContent)); + Assert.assertTrue(messageId.equals(message.getId())); + Assert.assertTrue(messageContent.equals(message.getContent())); + Assert.assertEquals(message.getSequence(), -1); + Assert.assertNotNull(message.getMetadata()); + Assert.assertEquals(message.getMetadata().getSignal(), signal); + Assert.assertTrue(message.getMetadata().getContent().toString().equals(metadataContent)); + } + + @Test + public void testWithMetadataAndSequenceCreation() { + String messageId = getRandomString(); + String messageContent = getRandomString(); + String metadataContent = getRandomString(); + Signal signal = Signal.ACKNOWLEDGE; + //Test creation with a sequence number. + PubSubMessage message = new PubSubMessage(messageId, messageContent, 0, new Metadata(signal, metadataContent)); + Assert.assertTrue(messageId.equals(message.getId())); + Assert.assertTrue(messageContent.equals(message.getContent())); + Assert.assertEquals(message.getSequence(), 0); + Assert.assertNotNull(message.getMetadata()); + Assert.assertEquals(message.getMetadata().getSignal(), signal); + Assert.assertTrue(message.getMetadata().getContent().toString().equals(metadataContent)); + } + + @Test + public void testHasContent() { + String messageId = getRandomString(); + String messageContent = getRandomString(); + + PubSubMessage message; + message = new PubSubMessage(messageId, messageContent); + Assert.assertTrue(message.hasContent()); + + message = new PubSubMessage(messageId, null); + Assert.assertFalse(message.hasContent()); + } +} diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java new file mode 100644 index 00000000..a72d0608 --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java @@ -0,0 +1,26 @@ +package com.yahoo.bullet.pubsub; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.UUID; + +public class PubSubTest { + @Test + public void testMockPubSubCreation() throws IOException, PubSubException { + PubSubConfig config = new PubSubConfig("src/test/resources/test_config.yaml"); + String mockMessage = UUID.randomUUID().toString(); + config.set(MockPubSub.MOCK_MESSAGE_NAME, mockMessage); + PubSub testPubSub = PubSub.from(config); + + Assert.assertEquals(testPubSub.getClass(), MockPubSub.class); + Assert.assertTrue(testPubSub.getSubscriber().receive().getContent().equals(mockMessage)); + } + + @Test(expectedExceptions = PubSubException.class) + public void testIllegalPubSubCreation() throws IOException, PubSubException { + PubSubConfig config = new PubSubConfig(null); + PubSub testPubSub = PubSub.from(config); + } +} diff --git a/src/test/java/com/yahoo/bullet/pubsub/SubscriberTest.java b/src/test/java/com/yahoo/bullet/pubsub/SubscriberTest.java new file mode 100644 index 00000000..4fd22a31 --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/SubscriberTest.java @@ -0,0 +1,54 @@ +package com.yahoo.bullet.pubsub; + +import lombok.NoArgsConstructor; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.UUID; + +public class SubscriberTest { + @NoArgsConstructor + private class MockSubscriber implements Subscriber { + String commitID; + String failID; + + int commitSequence; + int failSequence; + + public PubSubMessage receive() { + throw new UnsupportedOperationException(); + } + + public void commit(String id, int sequence) { + commitID = id; + commitSequence = sequence; + } + + public void fail(String id, int sequence) { + failID = id; + failSequence = sequence; + } + + public void close() { + throw new UnsupportedOperationException(); + } + } + + @Test + public void testCommitWithNoSequenceNumber() { + String randomID = UUID.randomUUID().toString(); + MockSubscriber subscriber = new MockSubscriber(); + subscriber.commit(randomID); + Assert.assertEquals(subscriber.commitSequence, -1); + Assert.assertTrue(subscriber.commitID.equals(randomID)); + } + + @Test + public void testFailWithNoSequenceNumber() { + String randomID = UUID.randomUUID().toString(); + MockSubscriber subscriber = new MockSubscriber(); + subscriber.fail(randomID); + Assert.assertEquals(subscriber.failSequence, -1); + Assert.assertTrue(subscriber.failID.equals(randomID)); + } +} diff --git a/src/test/resources/test_config.yaml b/src/test/resources/test_config.yaml index 6d3c2f8a..68af95be 100644 --- a/src/test/resources/test_config.yaml +++ b/src/test/resources/test_config.yaml @@ -2,3 +2,6 @@ bullet.query.max.duration: 10000 bullet.query.aggregation.max.size: 100 # Some random fake setting fake.setting: null + +bullet.pubsub.context.name: "QUERY_PROCESSING" +bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.MockPubSub"