Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/com/yahoo/bullet/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
@Slf4j
public class Config implements Serializable {
private Map<String, Object> data;
public static final String DELIMITER = ".";

/**
* Constructor that loads a specific file and loads the settings in that file.
Expand Down Expand Up @@ -86,6 +87,24 @@ public Map<String, Object> getAll(Optional<Set<String>> keys) {
.collect(HashMap::new, (m, e) -> m.put(e.getKey(), e.getValue()), HashMap::putAll);
}

/**
* Get mappings for all keys with the specified prefix. If stripPrefix is set, the output keys do not contain the
* prefix.
*
* @param keys an {@link Optional} {@link Set} of mapping names.
* @param prefix The prefix that relevant keys must contain.
* @param stripPrefix If true, the output keys do not contain the prefix.
* @return mapping for keys (or all keys in data, if keys is empty) with the prefix.
*/
public Map<String, Object> getAllWithPrefix(Optional<Set<String>> keys, String prefix, boolean stripPrefix) {
Set<String> inclusions = keys.orElse(data.keySet());
int prefixLength = stripPrefix ? prefix.length() : 0;
return this.data.entrySet().stream()
.filter(e -> inclusions.contains(e.getKey()))
.filter(e -> e.getKey().startsWith(prefix))
.collect(HashMap::new, (m, e) -> m.put(e.getKey().substring(prefixLength), e.getValue()), HashMap::putAll);
}

/**
* Gets all mappings other than a set of keys. If no keys are specified, all mappings
* are returned.
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,27 @@ public PubSubMessage(String id, String content, Metadata metadata, int sequence)
public boolean hasContent() {
return content != null;
}

/**
* Check if message has {@link Metadata}.
*
* @return true if message has Metadata.
*/
public boolean hasMetadata() {
return metadata != null;
}

@Override
public int hashCode() {
return (id + sequence).hashCode();
}

@Override
public boolean equals(Object other) {
if (other == null || other.getClass() != PubSubMessage.class) {
return false;
}
PubSubMessage otherMessage = (PubSubMessage) other;
return id.equals(otherMessage.getId()) && sequence == otherMessage.getSequence();
}
}
6 changes: 6 additions & 0 deletions src/main/resources/bullet_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,9 @@ bullet.record.inject.timestamp.enable: true
# This is the key that is used to add the timestamp in milliseconds to the record, if record.inject.timestamp.enable is true. This is the timestamp when
# the record is projected. This is only applicable to RAW queries.
bullet.record.inject.timestamp.key: "bullet_project_timestamp"

## PubSub default settings
# This should point to a concrete implementation of PubSub.
bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.PubSub"
# The current context. This can be QUERY_PROCESSING or QUERY_SUBMISSION. The PubSub implementation should use this to generate appropriate Publishers and Subscribers.
bullet.pubsub.context.name: "QUERY_PROCESSING"
28 changes: 28 additions & 0 deletions src/test/java/com/yahoo/bullet/BulletConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,32 @@ public void testMerging() throws IOException {
Assert.assertEquals(config.get(BulletConfig.AGGREGATION_MAX_SIZE), 100L);
Assert.assertEquals(config.get("pi"), 3.14);
}

@Test
public void testPropertiesWithPrefix() throws IOException {
BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml");
String prefix = "bullet.pubsub";
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";

int configSize = config.getAllWithPrefix(Optional.empty(), prefix, false).size();
Assert.assertEquals(configSize, 2);

Map<String, Object> properties = config.getAllWithPrefix(Optional.empty(), prefix, false);
Assert.assertEquals(properties.get(BulletConfig.PUBSUB_CLASS_NAME), fieldValue);
}

@Test
public void testPropertiesStripPrefix() throws IOException {
BulletConfig config = new BulletConfig("src/test/resources/test_config.yaml");
String prefix = "bullet.pubsub.";
String fieldName = "class.name";
String fieldValue = "com.yahoo.bullet.pubsub.MockPubSub";

int configSize = config.getAllWithPrefix(Optional.empty(), prefix, true).size();
Assert.assertEquals(configSize, 2);

Map<String, Object> properties = config.getAllWithPrefix(Optional.empty(), prefix, true);
Assert.assertNull(properties.get(BulletConfig.PUBSUB_CLASS_NAME));
Assert.assertEquals(properties.get(fieldName), fieldValue);
}
}
79 changes: 61 additions & 18 deletions src/test/java/com/yahoo/bullet/pubsub/PubSubMessageTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,36 +13,36 @@ private String getRandomString() {

@Test
public void testNoMetadataCreation() {
String messageId = getRandomString();
String messageID = getRandomString();
String messageContent = getRandomString();

PubSubMessage message = new PubSubMessage(messageId, messageContent);
Assert.assertTrue(messageId.equals(message.getId()));
PubSubMessage message = new PubSubMessage(messageID, messageContent);
Assert.assertTrue(messageID.equals(message.getId()));
Assert.assertEquals(message.getSequence(), -1);
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertNull(message.getMetadata());
}

@Test
public void testWithSequenceCreation() {
String messageId = getRandomString();
String messageID = getRandomString();
String messageContent = getRandomString();

PubSubMessage message = new PubSubMessage(messageId, messageContent, 0);
Assert.assertTrue(messageId.equals(message.getId()));
PubSubMessage message = new PubSubMessage(messageID, messageContent, 0);
Assert.assertTrue(messageID.equals(message.getId()));
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertEquals(message.getSequence(), 0);
Assert.assertNull(message.getMetadata());
}

@Test
public void testWithSignalCreation() {
String messageId = getRandomString();
String messageID = getRandomString();
String messageContent = getRandomString();
Signal signal = Signal.ACKNOWLEDGE;

PubSubMessage message = new PubSubMessage(messageId, messageContent, signal, 0);
Assert.assertTrue(messageId.equals(message.getId()));
PubSubMessage message = new PubSubMessage(messageID, messageContent, signal, 0);
Assert.assertTrue(messageID.equals(message.getId()));
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertEquals(message.getSequence(), 0);
Assert.assertNotNull(message.getMetadata());
Expand All @@ -53,13 +53,13 @@ public void testWithSignalCreation() {

@Test
public void testWithMetadataCreation() {
String messageId = getRandomString();
String messageID = getRandomString();
String messageContent = getRandomString();
String metadataContent = getRandomString();
Signal signal = Signal.ACKNOWLEDGE;
//Test creation without a sequence number.
PubSubMessage message = new PubSubMessage(messageId, messageContent, new Metadata(signal, metadataContent));
Assert.assertTrue(messageId.equals(message.getId()));
PubSubMessage message = new PubSubMessage(messageID, messageContent, new Metadata(signal, metadataContent));
Assert.assertTrue(messageID.equals(message.getId()));
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertEquals(message.getSequence(), -1);
Assert.assertNotNull(message.getMetadata());
Expand All @@ -69,13 +69,13 @@ public void testWithMetadataCreation() {

@Test
public void testWithMetadataAndSequenceCreation() {
String messageId = getRandomString();
String messageID = getRandomString();
String messageContent = getRandomString();
String metadataContent = getRandomString();
Signal signal = Signal.ACKNOWLEDGE;
//Test creation with a sequence number.
PubSubMessage message = new PubSubMessage(messageId, messageContent, new Metadata(signal, metadataContent), 0);
Assert.assertTrue(messageId.equals(message.getId()));
PubSubMessage message = new PubSubMessage(messageID, messageContent, new Metadata(signal, metadataContent), 0);
Assert.assertTrue(messageID.equals(message.getId()));
Assert.assertTrue(messageContent.equals(message.getContent()));
Assert.assertEquals(message.getSequence(), 0);
Assert.assertNotNull(message.getMetadata());
Expand All @@ -85,19 +85,62 @@ public void testWithMetadataAndSequenceCreation() {

@Test
public void testHasContent() {
String messageId = getRandomString();
String messageID = getRandomString();
String messageContent = getRandomString();

PubSubMessage message;
message = new PubSubMessage(messageId, messageContent);
message = new PubSubMessage(messageID, messageContent);
Assert.assertTrue(message.hasContent());

message = new PubSubMessage(messageId, null, Signal.COMPLETE);
message = new PubSubMessage(messageID, null, Signal.COMPLETE);
Assert.assertFalse(message.hasContent());
}

@Test(expectedExceptions = NullPointerException.class)
public void testNoIDIllegalCreation() {
new PubSubMessage(null, "");
}

@Test
public void testEquals() {
String messageID = getRandomString();
String messageContent = getRandomString();
Metadata randomMetadata = new Metadata(Signal.ACKNOWLEDGE, getRandomString());
PubSubMessage message1 = new PubSubMessage(messageID, messageContent, randomMetadata);
PubSubMessage message2 = new PubSubMessage(messageID, messageContent, new Metadata(Signal.ACKNOWLEDGE, getRandomString()));
PubSubMessage message3 = new PubSubMessage(getRandomString(), messageContent, randomMetadata);
PubSubMessage message4 = new PubSubMessage(messageID, messageContent, randomMetadata, 2);

Assert.assertEquals(message1, message2);
Assert.assertNotEquals(message1, message3);
Assert.assertNotEquals(message1, message4);
Assert.assertFalse(message1.equals(null));
Assert.assertFalse(message1.equals(new PubSubException("Dummy")));
}

@Test
public void testHashCode() {
String messageID = getRandomString();
String messageContent = getRandomString();
Metadata randomMetadata = new Metadata(Signal.ACKNOWLEDGE, getRandomString());
PubSubMessage message1 = new PubSubMessage(messageID, messageContent, randomMetadata);
PubSubMessage message2 = new PubSubMessage(messageID, messageContent, new Metadata(Signal.ACKNOWLEDGE, getRandomString()));
Assert.assertEquals(message1.hashCode(), message2.hashCode());
}

@Test
public void testHasMetadata() {
String messageID = getRandomString();
String messageContent = getRandomString();

PubSubMessage message;
message = new PubSubMessage(messageID, messageContent);
Assert.assertFalse(message.hasMetadata());

message = new PubSubMessage(messageID, null, Signal.COMPLETE);
Assert.assertTrue(message.hasMetadata());

message = new PubSubMessage(messageID, null, new Metadata());
Assert.assertTrue(message.hasMetadata());
}
}
1 change: 1 addition & 0 deletions src/test/java/com/yahoo/bullet/pubsub/PubSubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public void testIllegalPubSubParameter() throws Exception {
@Test(expectedExceptions = PubSubException.class)
public void testIllegalPubSubClassName() throws Exception {
BulletConfig config = new BulletConfig(null);
config.set(BulletConfig.PUBSUB_CLASS_NAME, null);
try {
PubSub.from(config);
} catch (Exception e) {
Expand Down
1 change: 0 additions & 1 deletion src/test/resources/test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ bullet.query.max.duration: 10000
bullet.query.aggregation.max.size: 100
# Some random fake setting
fake.setting: null

bullet.pubsub.context.name: "QUERY_PROCESSING"
bullet.pubsub.class.name: "com.yahoo.bullet.pubsub.MockPubSub"