Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.yahoo.bullet</groupId>
<artifactId>bullet-core</artifactId>
<version>0.1.3-SNAPSHOT</version>
<version>0.2.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>bullet-core</name>

Expand Down
36 changes: 36 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/Metadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.yahoo.bullet.pubsub;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.Serializable;

@Getter @NoArgsConstructor @Setter @AllArgsConstructor
public class Metadata implements Serializable {
public enum Signal {
ACKNOWLEDGE,
COMPLETE
}
private Signal signal;
private Serializable content;

/**
* Check if Metadata has content.
*
* @return true if Metadata has content.
*/
public boolean hasContent() {
return content != null;
}

/**
* Check if Metadata has signal.
*
* @return true if message has {@link Metadata#signal}
*/
public boolean hasSignal() {
return signal != null;
}
}
89 changes: 89 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/PubSub.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.yahoo.bullet.pubsub;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.List;

/**
* Notation: Partition is a unit of parallelism in the Pub/Sub queue.
*
* Implementations of PubSub should take in a {@link PubSubConfig} and use the information to wire up and return
* Publishers and Subscribers.
*/
public abstract class PubSub implements Serializable {
/**
* The context determines how the {@link Publisher} and {@link Subscriber} returned by PubSub behave. For example,
* If the Context is {@link Context#QUERY_SUBMISSION}:
* 1. Publishers write to request queue.
* 2. Subscribers read from the response queue.
* If the Context is {@link Context#QUERY_PROCESSING}:
* 1. Publishers write to response queue.
* 2. Subscribers read from the request queue.
*/
public enum Context {
QUERY_SUBMISSION,
QUERY_PROCESSING
}

protected Context context;

/**
* Instantiate a PubSub using parameters from {@link PubSubConfig}.
*
* @param config the {@link PubSubConfig} containing all required PubSub parameters.
*/
public PubSub(PubSubConfig config) {
context = Context.valueOf(config.get(PubSubConfig.CONTEXT_NAME).toString());
}

/**
* Get a {@link Publisher} instance wired to write to all allocated partitions in the appropriate queue (See
* {@link PubSub#context}).
*
* @return {@link Publisher} wired as required.
*/
public abstract Publisher getPublisher();

/**
* Get a list of n {@link Publisher} instances with the allocated partitions in the appropriate queue
* (See {@link PubSub#context}) split as evenly as possible among them.
*
* @param n The number of Publishers requested.
* @return List of n Publishers wired as required.
*/
public abstract List<Publisher> getPublishers(int n);

/**
* Get a {@link Subscriber} instance wired to read from all allocated partitions in the appropriate queue (See
* {@link PubSub#context}).
*
* @return {@link Subscriber} wired as required.
*/
public abstract Subscriber getSubscriber();

/**
* Get a list of n {@link Subscriber} instances with allocated partitions from the appropriate queue
* (See {@link PubSub#context}) split as evenly as possible among them.
*
* @param n The number of Subscribers requested.
* @return List of n Subscribers wired as required.
*/
public abstract List<Subscriber> getSubscribers(int n);

/**
* Create a PubSub instance using the class specified in the config file.
* @param config {@link PubSubConfig} containing the class name and PubSub settings.
* @return an instance of specified class initialized with settings from the input file and defaults.
* @throws PubSubException if PubSub creation fails.
*/
public static PubSub from(PubSubConfig config) throws PubSubException {
try {
String pubSubClassName = (String) config.get(PubSubConfig.PUBSUB_CLASS_NAME);
Class<? extends PubSub> pubSubClass = (Class<? extends PubSub>) Class.forName(pubSubClassName);
Constructor<? extends PubSub> constructor = pubSubClass.getConstructor(PubSubConfig.class);
return constructor.newInstance(config);
} catch (Exception e) {
throw new PubSubException("Cannot create PubSub instance. Error: " + e.toString());
}
}
}
21 changes: 21 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.BulletConfig;

import java.io.IOException;

public class PubSubConfig extends BulletConfig {
public static final String CONTEXT_NAME = "bullet.pubsub.context.name";
public static final String PUBSUB_CLASS_NAME = "bullet.pubsub.class.name";

/**
* Constructor that loads configuration parameters from a specific file augmented by defaults.
*
* @param file YAML file to load.
* @throws IOException if an error occurs when loading the file.
*/
public PubSubConfig(String file) throws IOException {
// Load settings and merge with bullet defaults.
super(file);
}
}
12 changes: 12 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.yahoo.bullet.pubsub;

public class PubSubException extends Exception {
/**
* Exception to be thrown if there is an error in {@link PubSub}, {@link Publisher} or {@link Subscriber}.
*
* @param message error message to be associated with the PubSubException.
*/
public PubSubException(String message) {
super(message);
}
}
86 changes: 86 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.yahoo.bullet.pubsub;

import java.io.Serializable;
import com.yahoo.bullet.pubsub.Metadata.Signal;
import lombok.Getter;

/**
* The class of messages that can be understood by the PubSub package. The id should be set to the query ID generated
* by the web service for Bullet queries. The sequence identifies individual segments if a multi-part response is
* emitted by Bullet.
*/
@Getter
public class PubSubMessage implements Serializable {
private String id;
private int sequence;
private String content;
private Metadata metadata;

/**
* Constructor for a message having only content.
*
* @param id is the ID associated with the message.
* @param content is the content of the message.
*/
public PubSubMessage(String id, String content) {
this(id, content, -1, new Metadata());
}

/**
* Constructor for a message having content and a sequence number.
*
* @param id is the ID associated with the message.
* @param content is the content of the message.
* @param sequence is the sequence number of the message.
*/
public PubSubMessage(String id, String content, int sequence) {
this(id, content, sequence, new Metadata());
}

/**
* Constructor for a PubSubMessage having content and Metadata.
*
* @param id is the ID associated with the message.
* @param content is the content of the message.
* @param metadata is the {@link Metadata} associated with the message.
*/
public PubSubMessage(String id, String content, Metadata metadata) {
this(id, content, -1, metadata);
}

/**
* Constructor for a message having content, a {@link Metadata.Signal} and a sequence number.
*
* @param id is the ID associated with the message.
* @param content is the content of the message.
* @param sequence is the sequence number of the message.
* @param signal is the Metadata.Signal of the message.
*/
public PubSubMessage(String id, String content, int sequence, Signal signal) {
this(id, content, sequence, new Metadata(signal, null));
}

/**
* Constructor for a PubSubMessage having content, Metadata and a sequence number.
*
* @param id is the ID associated with the message.
* @param content is the content of the message.
* @param sequence is the sequence number associated with the message.
* @param metadata is the {@link Metadata} associated with the message.
*/
public PubSubMessage(String id, String content, int sequence, Metadata metadata) {
this.id = id;
this.content = content;
this.sequence = sequence;
this.metadata = metadata;
}

/**
* Check if message has content.
*
* @return true if message has content.
*/
public boolean hasContent() {
return content != null;
}
}
16 changes: 16 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/Publisher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.yahoo.bullet.pubsub;

public interface Publisher {
/**
* Sends a {@link PubSubMessage}. Messages with the same ID should be received in order.
*
* @param message the {@link PubSubMessage} to be sent.
* @throws PubSubException if the messaging system throws an error.
*/
void send(PubSubMessage message) throws PubSubException;

/**
* Close Publisher and delete all related context.
*/
void close();
}
55 changes: 55 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/Subscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.yahoo.bullet.pubsub;

public interface Subscriber {
/**
* Gets a new {@link PubSubMessage} from the assigned partition/partitions (Here a partition is a unit of
* parallelism in the Pub/Sub queue, See {@link PubSub}).
*
* @return the received {@link PubSubMessage}.
* @throws PubSubException when a receive fails.
*/
PubSubMessage receive() throws PubSubException;

/**
* Close the Subscriber and delete all associated Context.
*/
void close();

/**
* Commits allow clients to implement at least once, at most once or exactly once semantics when processing messages.
*
* Common implementations might
* 1. Ack all received messages.
* 2. Commit current read offset to persistent/fault tolerant storage.
*
* @param id the ID of the message to be marked as committed.
* @param sequence the sequence number of the message to be committed.
*/
void commit(String id, int sequence);

/**
* Convenience method to commit a message that doesn't contain a sequence number.
*
* @param id is the ID of the message to be marked as committed.
*/
default void commit(String id) {
commit(id, -1);
}

/**
* Marks the processing of the {@link PubSubMessage} with the given id as failed.
*
* @param id the ID of the PubSubMessage to mark as a processing failure.
*/
void fail(String id, int sequence);

/**
* Convenience method to fail a message that doesn't contain a sequence number.
*
* @param id is the ID of the message to be marked as a processing failure.
*/
default void fail(String id) {
fail(id, -1);
}

}
36 changes: 36 additions & 0 deletions src/test/java/com/yahoo/bullet/pubsub/MetadataTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.yahoo.bullet.pubsub;

import org.testng.Assert;
import org.testng.annotations.Test;

public class MetadataTest {
@Test
public void testHasSignal() {
Metadata full = new Metadata(Metadata.Signal.ACKNOWLEDGE, 5);
Metadata empty = new Metadata();
Assert.assertTrue(full.hasSignal());
Assert.assertFalse(empty.hasSignal());
}

@Test
public void testHasContent() {
Metadata full = new Metadata(Metadata.Signal.ACKNOWLEDGE, 5);
Metadata empty = new Metadata();
Assert.assertTrue(full.hasContent());
Assert.assertFalse(empty.hasContent());
}

@Test
public void testSetContentWhenEmpty() {
Metadata empty = new Metadata();
empty.setContent(5);
Assert.assertEquals(empty.getContent(), 5);
}

@Test
public void testSetSignalWhenEmpty() {
Metadata empty = new Metadata();
empty.setSignal(Metadata.Signal.ACKNOWLEDGE);
Assert.assertEquals(empty.getSignal(), Metadata.Signal.ACKNOWLEDGE);
}
}
Loading