diff --git a/src/main/java/com/yahoo/bullet/BulletConfig.java b/src/main/java/com/yahoo/bullet/BulletConfig.java index a2cf0ae2..512884f1 100644 --- a/src/main/java/com/yahoo/bullet/BulletConfig.java +++ b/src/main/java/com/yahoo/bullet/BulletConfig.java @@ -46,6 +46,9 @@ public class BulletConfig extends Config { public static final String RESULT_METADATA_METRICS_MAPPING = "bullet.result.metadata.metrics.mapping"; + public static final String PUBSUB_CONTEXT_NAME = "bullet.pubsub.context.name"; + public static final String PUBSUB_CLASS_NAME = "bullet.pubsub.class.name"; + public static final String DEFAULT_CONFIGURATION_NAME = "bullet_defaults.yaml"; /** diff --git a/src/main/java/com/yahoo/bullet/pubsub/Metadata.java b/src/main/java/com/yahoo/bullet/pubsub/Metadata.java index 6baf3cbd..e6342f15 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/Metadata.java +++ b/src/main/java/com/yahoo/bullet/pubsub/Metadata.java @@ -7,7 +7,7 @@ import java.io.Serializable; -@Getter @NoArgsConstructor @Setter @AllArgsConstructor +@Getter @Setter @AllArgsConstructor @NoArgsConstructor public class Metadata implements Serializable { public enum Signal { ACKNOWLEDGE, diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSub.java b/src/main/java/com/yahoo/bullet/pubsub/PubSub.java index b2c46c21..8d42e463 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/PubSub.java +++ b/src/main/java/com/yahoo/bullet/pubsub/PubSub.java @@ -1,5 +1,7 @@ package com.yahoo.bullet.pubsub; +import com.yahoo.bullet.BulletConfig; + import java.io.Serializable; import java.lang.reflect.Constructor; import java.util.List; @@ -7,7 +9,7 @@ /** * Notation: Partition is a unit of parallelism in the Pub/Sub queue. * - * Implementations of PubSub should take in a {@link PubSubConfig} and use the information to wire up and return + * Implementations of PubSub should take in a {@link BulletConfig} and use the information to wire up and return * Publishers and Subscribers. */ public abstract class PubSub implements Serializable { @@ -28,12 +30,12 @@ public enum Context { protected Context context; /** - * Instantiate a PubSub using parameters from {@link PubSubConfig}. + * Instantiate a PubSub using parameters from {@link BulletConfig}. * - * @param config The {@link PubSubConfig} containing all required PubSub parameters. + * @param config The {@link BulletConfig} containing all required PubSub parameters. */ - public PubSub(PubSubConfig config) { - context = Context.valueOf(config.get(PubSubConfig.CONTEXT_NAME).toString()); + public PubSub(BulletConfig config) { + context = Context.valueOf(config.get(BulletConfig.PUBSUB_CONTEXT_NAME).toString()); } /** @@ -57,7 +59,7 @@ public PubSub(PubSubConfig config) { * Get a {@link Subscriber} instance wired to read from all allocated partitions in the appropriate queue (See * {@link PubSub#context}). * - * @return The {@link Subscriber} wired as required. + * @return {@link Subscriber} wired as required. */ public abstract Subscriber getSubscriber(); @@ -73,18 +75,18 @@ public PubSub(PubSubConfig config) { /** * Create a PubSub instance using the class specified in the config file. * - * @param config The {@link PubSubConfig} containing the class name and PubSub settings. + * @param config The {@link BulletConfig} containing the class name and PubSub settings. * @return an instance of specified class initialized with settings from the input file and defaults. * @throws PubSubException if PubSub creation fails. */ - public static PubSub from(PubSubConfig config) throws PubSubException { + public static PubSub from(BulletConfig config) throws PubSubException { try { - String pubSubClassName = (String) config.get(PubSubConfig.PUBSUB_CLASS_NAME); + String pubSubClassName = (String) config.get(BulletConfig.PUBSUB_CLASS_NAME); Class pubSubClass = (Class) Class.forName(pubSubClassName); - Constructor constructor = pubSubClass.getConstructor(PubSubConfig.class); + Constructor constructor = pubSubClass.getConstructor(BulletConfig.class); return constructor.newInstance(config); } catch (Exception e) { - throw new PubSubException("Cannot create PubSub instance. Error: " + e.toString()); + throw new PubSubException("Cannot create PubSub instance.", e); } } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java b/src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java deleted file mode 100644 index a167d378..00000000 --- a/src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.yahoo.bullet.pubsub; - -import com.yahoo.bullet.BulletConfig; - -import java.io.IOException; - -public class PubSubConfig extends BulletConfig { - public static final String CONTEXT_NAME = "bullet.pubsub.context.name"; - public static final String PUBSUB_CLASS_NAME = "bullet.pubsub.class.name"; - - /** - * Constructor that loads configuration parameters from a specific file augmented by defaults. - * - * @param file YAML file to load. - * @throws IOException if an error occurs when loading the file. - */ - public PubSubConfig(String file) throws IOException { - // Load settings and merge with bullet defaults. - super(file); - } -} diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSubException.java b/src/main/java/com/yahoo/bullet/pubsub/PubSubException.java index d3fdcc30..240bb131 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/PubSubException.java +++ b/src/main/java/com/yahoo/bullet/pubsub/PubSubException.java @@ -1,12 +1,25 @@ package com.yahoo.bullet.pubsub; +/** + * Exception to be thrown if there is an error in {@link PubSub}, {@link Publisher} or {@link Subscriber}. + */ public class PubSubException extends Exception { /** - * Exception to be thrown if there is an error in {@link PubSub}, {@link Publisher} or {@link Subscriber}. + * Constructor to initialize PubSubException with a message. * * @param message The error message to be associated with the PubSubException. */ public PubSubException(String message) { super(message); } + + /** + * Constructor to initialize PubSubException with a message and a {@link Throwable} cause. + * + * @param message The error message to be associated with the PubSubException. + * @param cause The reason for the PubSubException. + */ + public PubSubException(String message, Throwable cause) { + super(message, cause); + } } diff --git a/src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java b/src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java index 1002f5bd..44e9c475 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java +++ b/src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java @@ -1,9 +1,11 @@ package com.yahoo.bullet.pubsub; -import java.io.Serializable; import com.yahoo.bullet.pubsub.Metadata.Signal; import lombok.Getter; +import java.io.Serializable; +import java.util.Objects; + /** * The class of messages that can be understood by the PubSub package. The id should be set to the query ID generated * by the web service for Bullet queries. The sequence identifies individual segments if a multi-part response is @@ -23,7 +25,7 @@ public class PubSubMessage implements Serializable { * @param content The content of the message. */ public PubSubMessage(String id, String content) { - this(id, content, -1, new Metadata()); + this(id, content, (Metadata) null, -1); } /** @@ -34,18 +36,29 @@ public PubSubMessage(String id, String content) { * @param sequence The sequence number of the message. */ public PubSubMessage(String id, String content, int sequence) { - this(id, content, sequence, new Metadata()); + this(id, content, (Metadata) null, sequence); } /** - * Constructor for a PubSubMessage having content and Metadata. + * Constructor for a message having content and {@link Metadata}. * * @param id The ID associated with the message. * @param content The content of the message. - * @param metadata The {@link Metadata} associated with the message. + * @param metadata The Metadata associated with the message. */ public PubSubMessage(String id, String content, Metadata metadata) { - this(id, content, -1, metadata); + this(id, content, metadata, -1); + } + + /** + * Constructor for a message having content and a {@link Metadata.Signal}. + * + * @param id The ID associated with the message. + * @param content The content of the message. + * @param signal The Metadata.Signal to be sent with the message. + */ + public PubSubMessage(String id, String content, Signal signal) { + this(id, content, signal, -1); } /** @@ -53,26 +66,26 @@ public PubSubMessage(String id, String content, Metadata metadata) { * * @param id The ID associated with the message. * @param content The content of the message. + * @param signal The Signal to be sent with the message. * @param sequence The sequence number of the message. - * @param signal The Metadata.Signal of the message. */ - public PubSubMessage(String id, String content, int sequence, Signal signal) { - this(id, content, sequence, new Metadata(signal, null)); + public PubSubMessage(String id, String content, Signal signal, int sequence) { + this(id, content, new Metadata(signal, null), sequence); } /** - * Constructor for a PubSubMessage having content, Metadata and a sequence number. + * Constructor for a message having content, {@link Metadata} and a sequence number. * * @param id The ID associated with the message. * @param content The content of the message. - * @param sequence The sequence number associated with the message. - * @param metadata The {@link Metadata} associated with the message. + * @param metadata The Metadata associated with the message. + * @param sequence The sequence number of the message. */ - public PubSubMessage(String id, String content, int sequence, Metadata metadata) { - this.id = id; + public PubSubMessage(String id, String content, Metadata metadata, int sequence) { + this.id = Objects.requireNonNull(id, "ID cannot be null"); this.content = content; - this.sequence = sequence; this.metadata = metadata; + this.sequence = sequence; } /** diff --git a/src/main/java/com/yahoo/bullet/pubsub/Publisher.java b/src/main/java/com/yahoo/bullet/pubsub/Publisher.java index 6bed546b..916668ab 100644 --- a/src/main/java/com/yahoo/bullet/pubsub/Publisher.java +++ b/src/main/java/com/yahoo/bullet/pubsub/Publisher.java @@ -1,6 +1,17 @@ package com.yahoo.bullet.pubsub; public interface Publisher { + /** + * Send a message with an ID and content. + * + * @param id The ID associated with the message. + * @param content The content of the message. + * @throws PubSubException if the messaging system throws an error. + */ + default void send(String id, String content) throws PubSubException { + send(new PubSubMessage(id, content)); + } + /** * Sends a {@link PubSubMessage}. Messages with the same ID should be received in order. * diff --git a/src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java b/src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java index 21f67712..a22fe805 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java +++ b/src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java @@ -1,5 +1,6 @@ package com.yahoo.bullet.pubsub; +import com.yahoo.bullet.BulletConfig; import org.mockito.Mockito; import java.util.List; @@ -10,7 +11,7 @@ class MockPubSub extends PubSub { public static final String MOCK_MESSAGE_NAME = "MOCK_MESSAGE"; private String mockMessage; - public MockPubSub(PubSubConfig config) { + public MockPubSub(BulletConfig config) { super(config); mockMessage = config.get(MOCK_MESSAGE_NAME).toString(); } diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubConfigTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubConfigTest.java deleted file mode 100644 index 283f6cbf..00000000 --- a/src/test/java/com/yahoo/bullet/pubsub/PubSubConfigTest.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.yahoo.bullet.pubsub; - -import com.yahoo.bullet.BulletConfig; -import org.testng.Assert; -import org.testng.annotations.Test; - -import java.io.IOException; - -public class PubSubConfigTest { - public static final String CONTEXT_PROCESSING = PubSub.Context.QUERY_PROCESSING.toString(); - public static final String RECORD_INJECT_TIMESTAMP_KEY_VALUE = "bullet_project_timestamp"; - - @Test - public void testCustomFileCreate() throws IOException { - PubSubConfig config = new PubSubConfig("src/test/resources/test_config.yaml"); - //Test custom properties - Assert.assertEquals((long) config.get(BulletConfig.AGGREGATION_MAX_SIZE), 100); - Assert.assertEquals((long) config.get(BulletConfig.SPECIFICATION_MAX_DURATION), 10000); - Assert.assertTrue(config.get(PubSubConfig.CONTEXT_NAME).toString().equals(CONTEXT_PROCESSING)); - //Test default bullet properties - Assert.assertTrue(config.get(BulletConfig.RECORD_INJECT_TIMESTAMP_KEY).toString().equals(RECORD_INJECT_TIMESTAMP_KEY_VALUE)); - } -} diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java index 7f776498..d965d1ed 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java @@ -1,10 +1,10 @@ package com.yahoo.bullet.pubsub; +import com.yahoo.bullet.pubsub.Metadata.Signal; import org.testng.Assert; import org.testng.annotations.Test; import java.util.UUID; -import com.yahoo.bullet.pubsub.Metadata.Signal; public class PubSubMessageTest { private String getRandomString() { @@ -20,9 +20,7 @@ public void testNoMetadataCreation() { Assert.assertTrue(messageId.equals(message.getId())); Assert.assertEquals(message.getSequence(), -1); Assert.assertTrue(messageContent.equals(message.getContent())); - Assert.assertNotNull(message.getMetadata()); - Assert.assertNull(message.getMetadata().getSignal()); - Assert.assertNull(message.getMetadata().getContent()); + Assert.assertNull(message.getMetadata()); } @Test @@ -34,9 +32,7 @@ public void testWithSequenceCreation() { Assert.assertTrue(messageId.equals(message.getId())); Assert.assertTrue(messageContent.equals(message.getContent())); Assert.assertEquals(message.getSequence(), 0); - Assert.assertNotNull(message.getMetadata()); - Assert.assertNull(message.getMetadata().getSignal()); - Assert.assertNull(message.getMetadata().getContent()); + Assert.assertNull(message.getMetadata()); } @Test @@ -45,7 +41,7 @@ public void testWithSignalCreation() { String messageContent = getRandomString(); Signal signal = Signal.ACKNOWLEDGE; - PubSubMessage message = new PubSubMessage(messageId, messageContent, 0, signal); + PubSubMessage message = new PubSubMessage(messageId, messageContent, signal, 0); Assert.assertTrue(messageId.equals(message.getId())); Assert.assertTrue(messageContent.equals(message.getContent())); Assert.assertEquals(message.getSequence(), 0); @@ -78,7 +74,7 @@ public void testWithMetadataAndSequenceCreation() { String metadataContent = getRandomString(); Signal signal = Signal.ACKNOWLEDGE; //Test creation with a sequence number. - PubSubMessage message = new PubSubMessage(messageId, messageContent, 0, new Metadata(signal, metadataContent)); + PubSubMessage message = new PubSubMessage(messageId, messageContent, new Metadata(signal, metadataContent), 0); Assert.assertTrue(messageId.equals(message.getId())); Assert.assertTrue(messageContent.equals(message.getContent())); Assert.assertEquals(message.getSequence(), 0); @@ -96,7 +92,12 @@ public void testHasContent() { message = new PubSubMessage(messageId, messageContent); Assert.assertTrue(message.hasContent()); - message = new PubSubMessage(messageId, null); + message = new PubSubMessage(messageId, null, Signal.COMPLETE); Assert.assertFalse(message.hasContent()); } + + @Test(expectedExceptions = NullPointerException.class) + public void testNoIDIllegalCreation() { + new PubSubMessage(null, ""); + } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java b/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java index a72d0608..2fe02662 100644 --- a/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java +++ b/src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java @@ -1,15 +1,16 @@ package com.yahoo.bullet.pubsub; +import com.yahoo.bullet.BulletConfig; import org.testng.Assert; import org.testng.annotations.Test; -import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.UUID; public class PubSubTest { @Test - public void testMockPubSubCreation() throws IOException, PubSubException { - PubSubConfig config = new PubSubConfig("src/test/resources/test_config.yaml"); + public void testMockPubSubCreation() throws Exception { + BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml"); String mockMessage = UUID.randomUUID().toString(); config.set(MockPubSub.MOCK_MESSAGE_NAME, mockMessage); PubSub testPubSub = PubSub.from(config); @@ -19,8 +20,24 @@ public void testMockPubSubCreation() throws IOException, PubSubException { } @Test(expectedExceptions = PubSubException.class) - public void testIllegalPubSubCreation() throws IOException, PubSubException { - PubSubConfig config = new PubSubConfig(null); - PubSub testPubSub = PubSub.from(config); + public void testIllegalPubSubParameter() throws Exception { + BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml"); + try { + PubSub.from(config); + } catch (Exception e) { + Assert.assertEquals(e.getCause().getClass(), InvocationTargetException.class); + throw e; + } + } + + @Test(expectedExceptions = PubSubException.class) + public void testIllegalPubSubClassName() throws Exception { + BulletConfig config = new BulletConfig(null); + try { + PubSub.from(config); + } catch (Exception e) { + Assert.assertEquals(e.getCause().getClass(), NullPointerException.class); + throw e; + } } } diff --git a/src/test/java/com/yahoo/bullet/pubsub/PublisherTest.java b/src/test/java/com/yahoo/bullet/pubsub/PublisherTest.java new file mode 100644 index 00000000..17ec4a54 --- /dev/null +++ b/src/test/java/com/yahoo/bullet/pubsub/PublisherTest.java @@ -0,0 +1,33 @@ +package com.yahoo.bullet.pubsub; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.UUID; + +public class PublisherTest { + private class MockPublisher implements Publisher { + PubSubMessage sentMessage; + + @Override + public void send(PubSubMessage message) { + sentMessage = message; + } + + @Override + public void close() { + throw new UnsupportedOperationException(); + } + } + + @Test + public void testDefaultSend() throws PubSubException { + String randomId = UUID.randomUUID().toString(); + String randomMessage = UUID.randomUUID().toString(); + MockPublisher mockPublisher = new MockPublisher(); + mockPublisher.send(randomId, randomMessage); + + Assert.assertTrue(mockPublisher.sentMessage.getContent().equals(randomMessage)); + Assert.assertTrue(mockPublisher.sentMessage.getId().equals(randomId)); + } +}