Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/java/com/yahoo/bullet/BulletConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class BulletConfig extends Config {

public static final String RESULT_METADATA_METRICS_MAPPING = "bullet.result.metadata.metrics.mapping";

public static final String PUBSUB_CONTEXT_NAME = "bullet.pubsub.context.name";
public static final String PUBSUB_CLASS_NAME = "bullet.pubsub.class.name";

public static final String DEFAULT_CONFIGURATION_NAME = "bullet_defaults.yaml";

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/yahoo/bullet/pubsub/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import java.io.Serializable;

@Getter @NoArgsConstructor @Setter @AllArgsConstructor
@Getter @Setter @AllArgsConstructor @NoArgsConstructor
public class Metadata implements Serializable {
public enum Signal {
ACKNOWLEDGE,
Expand Down
24 changes: 13 additions & 11 deletions src/main/java/com/yahoo/bullet/pubsub/PubSub.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.BulletConfig;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.List;

/**
* Notation: Partition is a unit of parallelism in the Pub/Sub queue.
*
* Implementations of PubSub should take in a {@link PubSubConfig} and use the information to wire up and return
* Implementations of PubSub should take in a {@link BulletConfig} and use the information to wire up and return
* Publishers and Subscribers.
*/
public abstract class PubSub implements Serializable {
Expand All @@ -28,12 +30,12 @@ public enum Context {
protected Context context;

/**
* Instantiate a PubSub using parameters from {@link PubSubConfig}.
* Instantiate a PubSub using parameters from {@link BulletConfig}.
*
* @param config The {@link PubSubConfig} containing all required PubSub parameters.
* @param config The {@link BulletConfig} containing all required PubSub parameters.
*/
public PubSub(PubSubConfig config) {
context = Context.valueOf(config.get(PubSubConfig.CONTEXT_NAME).toString());
public PubSub(BulletConfig config) {
context = Context.valueOf(config.get(BulletConfig.PUBSUB_CONTEXT_NAME).toString());
}

/**
Expand All @@ -57,7 +59,7 @@ public PubSub(PubSubConfig config) {
* Get a {@link Subscriber} instance wired to read from all allocated partitions in the appropriate queue (See
* {@link PubSub#context}).
*
* @return The {@link Subscriber} wired as required.
* @return {@link Subscriber} wired as required.
*/
public abstract Subscriber getSubscriber();

Expand All @@ -73,18 +75,18 @@ public PubSub(PubSubConfig config) {
/**
* Create a PubSub instance using the class specified in the config file.
*
* @param config The {@link PubSubConfig} containing the class name and PubSub settings.
* @param config The {@link BulletConfig} containing the class name and PubSub settings.
* @return an instance of specified class initialized with settings from the input file and defaults.
* @throws PubSubException if PubSub creation fails.
*/
public static PubSub from(PubSubConfig config) throws PubSubException {
public static PubSub from(BulletConfig config) throws PubSubException {
try {
String pubSubClassName = (String) config.get(PubSubConfig.PUBSUB_CLASS_NAME);
String pubSubClassName = (String) config.get(BulletConfig.PUBSUB_CLASS_NAME);
Class<? extends PubSub> pubSubClass = (Class<? extends PubSub>) Class.forName(pubSubClassName);
Constructor<? extends PubSub> constructor = pubSubClass.getConstructor(PubSubConfig.class);
Constructor<? extends PubSub> constructor = pubSubClass.getConstructor(BulletConfig.class);
return constructor.newInstance(config);
} catch (Exception e) {
throw new PubSubException("Cannot create PubSub instance. Error: " + e.toString());
throw new PubSubException("Cannot create PubSub instance.", e);
}
}
}
21 changes: 0 additions & 21 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java

This file was deleted.

15 changes: 14 additions & 1 deletion src/main/java/com/yahoo/bullet/pubsub/PubSubException.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
package com.yahoo.bullet.pubsub;

/**
* Exception to be thrown if there is an error in {@link PubSub}, {@link Publisher} or {@link Subscriber}.
*/
public class PubSubException extends Exception {
/**
* Exception to be thrown if there is an error in {@link PubSub}, {@link Publisher} or {@link Subscriber}.
* Constructor to initialize PubSubException with a message.
*
* @param message The error message to be associated with the PubSubException.
*/
public PubSubException(String message) {
super(message);
}

/**
* Constructor to initialize PubSubException with a message and a {@link Throwable} cause.
*
* @param message The error message to be associated with the PubSubException.
* @param cause The reason for the PubSubException.
*/
public PubSubException(String message, Throwable cause) {
super(message, cause);
}
}
43 changes: 28 additions & 15 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.yahoo.bullet.pubsub;

import java.io.Serializable;
import com.yahoo.bullet.pubsub.Metadata.Signal;
import lombok.Getter;

import java.io.Serializable;
import java.util.Objects;

/**
* The class of messages that can be understood by the PubSub package. The id should be set to the query ID generated
* by the web service for Bullet queries. The sequence identifies individual segments if a multi-part response is
Expand All @@ -23,7 +25,7 @@ public class PubSubMessage implements Serializable {
* @param content The content of the message.
*/
public PubSubMessage(String id, String content) {
this(id, content, -1, new Metadata());
this(id, content, (Metadata) null, -1);
}

/**
Expand All @@ -34,45 +36,56 @@ public PubSubMessage(String id, String content) {
* @param sequence The sequence number of the message.
*/
public PubSubMessage(String id, String content, int sequence) {
this(id, content, sequence, new Metadata());
this(id, content, (Metadata) null, sequence);
}

/**
* Constructor for a PubSubMessage having content and Metadata.
* Constructor for a message having content and {@link Metadata}.
*
* @param id The ID associated with the message.
* @param content The content of the message.
* @param metadata The {@link Metadata} associated with the message.
* @param metadata The Metadata associated with the message.
*/
public PubSubMessage(String id, String content, Metadata metadata) {
this(id, content, -1, metadata);
this(id, content, metadata, -1);
}

/**
* Constructor for a message having content and a {@link Metadata.Signal}.
*
* @param id The ID associated with the message.
* @param content The content of the message.
* @param signal The Metadata.Signal to be sent with the message.
*/
public PubSubMessage(String id, String content, Signal signal) {
this(id, content, signal, -1);
}

/**
* Constructor for a message having content, a {@link Metadata.Signal} and a sequence number.
*
* @param id The ID associated with the message.
* @param content The content of the message.
* @param signal The Signal to be sent with the message.
* @param sequence The sequence number of the message.
* @param signal The Metadata.Signal of the message.
*/
public PubSubMessage(String id, String content, int sequence, Signal signal) {
this(id, content, sequence, new Metadata(signal, null));
public PubSubMessage(String id, String content, Signal signal, int sequence) {
this(id, content, new Metadata(signal, null), sequence);
}

/**
* Constructor for a PubSubMessage having content, Metadata and a sequence number.
* Constructor for a message having content, {@link Metadata} and a sequence number.
*
* @param id The ID associated with the message.
* @param content The content of the message.
* @param sequence The sequence number associated with the message.
* @param metadata The {@link Metadata} associated with the message.
* @param metadata The Metadata associated with the message.
* @param sequence The sequence number of the message.
*/
public PubSubMessage(String id, String content, int sequence, Metadata metadata) {
this.id = id;
public PubSubMessage(String id, String content, Metadata metadata, int sequence) {
this.id = Objects.requireNonNull(id, "ID cannot be null");
this.content = content;
this.sequence = sequence;
this.metadata = metadata;
this.sequence = sequence;
}

/**
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/Publisher.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
package com.yahoo.bullet.pubsub;

public interface Publisher {
/**
* Send a message with an ID and content.
*
* @param id The ID associated with the message.
* @param content The content of the message.
* @throws PubSubException if the messaging system throws an error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use {@link PubSubException} in this and all other Javadocs in this file?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. You can't use this for throws. Thanks for the fix.

*/
default void send(String id, String content) throws PubSubException {
send(new PubSubMessage(id, content));
}

/**
* Sends a {@link PubSubMessage}. Messages with the same ID should be received in order.
*
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.BulletConfig;
import org.mockito.Mockito;

import java.util.List;
Expand All @@ -10,7 +11,7 @@ class MockPubSub extends PubSub {
public static final String MOCK_MESSAGE_NAME = "MOCK_MESSAGE";
private String mockMessage;

public MockPubSub(PubSubConfig config) {
public MockPubSub(BulletConfig config) {
super(config);
mockMessage = config.get(MOCK_MESSAGE_NAME).toString();
}
Expand Down
23 changes: 0 additions & 23 deletions src/test/java/com/yahoo/bullet/pubsub/PubSubConfigTest.java

This file was deleted.

21 changes: 11 additions & 10 deletions src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.pubsub.Metadata.Signal;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.UUID;
import com.yahoo.bullet.pubsub.Metadata.Signal;

public class PubSubMessageTest {
private String getRandomString() {
Expand All @@ -20,9 +20,7 @@ public void testNoMetadataCreation() {
Assert.assertTrue(messageId.equals(message.getId()));
Assert.assertEquals(message.getSequence(), -1);
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertNotNull(message.getMetadata());
Assert.assertNull(message.getMetadata().getSignal());
Assert.assertNull(message.getMetadata().getContent());
Assert.assertNull(message.getMetadata());
}

@Test
Expand All @@ -34,9 +32,7 @@ public void testWithSequenceCreation() {
Assert.assertTrue(messageId.equals(message.getId()));
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertEquals(message.getSequence(), 0);
Assert.assertNotNull(message.getMetadata());
Assert.assertNull(message.getMetadata().getSignal());
Assert.assertNull(message.getMetadata().getContent());
Assert.assertNull(message.getMetadata());
}

@Test
Expand All @@ -45,7 +41,7 @@ public void testWithSignalCreation() {
String messageContent = getRandomString();
Signal signal = Signal.ACKNOWLEDGE;

PubSubMessage message = new PubSubMessage(messageId, messageContent, 0, signal);
PubSubMessage message = new PubSubMessage(messageId, messageContent, signal, 0);
Assert.assertTrue(messageId.equals(message.getId()));
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertEquals(message.getSequence(), 0);
Expand Down Expand Up @@ -78,7 +74,7 @@ public void testWithMetadataAndSequenceCreation() {
String metadataContent = getRandomString();
Signal signal = Signal.ACKNOWLEDGE;
//Test creation with a sequence number.
PubSubMessage message = new PubSubMessage(messageId, messageContent, 0, new Metadata(signal, metadataContent));
PubSubMessage message = new PubSubMessage(messageId, messageContent, new Metadata(signal, metadataContent), 0);
Assert.assertTrue(messageId.equals(message.getId()));
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertEquals(message.getSequence(), 0);
Expand All @@ -96,7 +92,12 @@ public void testHasContent() {
message = new PubSubMessage(messageId, messageContent);
Assert.assertTrue(message.hasContent());

message = new PubSubMessage(messageId, null);
message = new PubSubMessage(messageId, null, Signal.COMPLETE);
Assert.assertFalse(message.hasContent());
}

@Test(expectedExceptions = NullPointerException.class)
public void testNoIDIllegalCreation() {
new PubSubMessage(null, "");
}
}
29 changes: 23 additions & 6 deletions src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.BulletConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.UUID;

public class PubSubTest {
@Test
public void testMockPubSubCreation() throws IOException, PubSubException {
PubSubConfig config = new PubSubConfig("src/test/resources/test_config.yaml");
public void testMockPubSubCreation() throws Exception {
BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml");
String mockMessage = UUID.randomUUID().toString();
config.set(MockPubSub.MOCK_MESSAGE_NAME, mockMessage);
PubSub testPubSub = PubSub.from(config);
Expand All @@ -19,8 +20,24 @@ public void testMockPubSubCreation() throws IOException, PubSubException {
}

@Test(expectedExceptions = PubSubException.class)
public void testIllegalPubSubCreation() throws IOException, PubSubException {
PubSubConfig config = new PubSubConfig(null);
PubSub testPubSub = PubSub.from(config);
public void testIllegalPubSubParameter() throws Exception {
BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml");
try {
PubSub.from(config);
} catch (Exception e) {
Assert.assertEquals(e.getCause().getClass(), InvocationTargetException.class);
throw e;
}
}

@Test(expectedExceptions = PubSubException.class)
public void testIllegalPubSubClassName() throws Exception {
BulletConfig config = new BulletConfig(null);
try {
PubSub.from(config);
} catch (Exception e) {
Assert.assertEquals(e.getCause().getClass(), NullPointerException.class);
throw e;
}
}
}
Loading