Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/main/java/com/yahoo/bullet/BulletConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;

@Slf4j
public class BulletConfig extends Config {
public static final String SPECIFICATION_DEFAULT_DURATION = "bullet.query.default.duration";
Expand Down Expand Up @@ -55,9 +53,15 @@ public class BulletConfig extends Config {
* Constructor that loads specific file augmented with defaults.
*
* @param file YAML file to load.
* @throws IOException if an error occurred with the file loading.
*/
public BulletConfig(String file) throws IOException {
public BulletConfig(String file) {
super(file, DEFAULT_CONFIGURATION_NAME);
}

/**
* Constructor that loads just the defaults.
*/
public BulletConfig() {
super(DEFAULT_CONFIGURATION_NAME);
}
}
68 changes: 56 additions & 12 deletions src/main/java/com/yahoo/bullet/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

Expand All @@ -29,26 +30,24 @@ public class Config implements Serializable {
* Constructor that loads a specific file and loads the settings in that file.
*
* @param file YAML file to load.
* @throws IOException if an error occurred with the file loading.
*/
public Config(String file) throws IOException {
public Config(String file) {
data = readYAML(file);
log.info("Configuration: {} ", data);
log.info("Final Configuration:\n{} ", data);
}

/**
* Constructor that loads specific file augmented with defaults and the name of the default configuration file.
*
* @param file YAML file to load.
* @param defaultConfigurationFile Default YAML file to load.
* @throws IOException if an error occurred with the file loading.
*/
public Config(String file, String defaultConfigurationFile) throws IOException {
this(defaultConfigurationFile);
public Config(String file, String defaultConfigurationFile) {
data = readYAML(defaultConfigurationFile);
// Override
Map<String, Object> specificConf = readYAML(file);
data.putAll(specificConf);
log.info("Final configuration: {} ", data);
log.info("Final Configuration with defaults:\n{} ", data);
}

/**
Expand All @@ -73,6 +72,47 @@ public Object getOrDefault(String key, Object defaultValue) {
return value != null ? value : defaultValue;
}

/**
* Get a value from the config as a particular type.
*
* @param key The name of the config.
* @param clazz The Class of the type.
* @param <T> The type of the config.
* @return The config as the particular type or null.
* @throws ClassCastException if the value of the config could not be casted to the type.
*/
public <T> T getAs(String key, Class<T> clazz) {
return clazz.cast(get(key));
}

/**
* Get a value from the config as a particular type or default to a provided value.
*
* @param key The name of the config.
* @param defaultValue A default of the same type to use if the config was not found.
* @param clazz The Class of the type.
* @param <T> The type of the config.
* @return The config or your default value as the particular type.
* @throws ClassCastException if the value of the config or default could not be casted to the type.
*/
public <T> T getOrDefaultAs(String key, T defaultValue, Class<T> clazz) {
return clazz.cast(getOrDefault(key, defaultValue));
}

/**
* Get a value from the config as a particular type or throw an exception with a message if not found.
*
* @param key The name of the config.
* @param clazz The Class of the type.
* @param <T> The type of the config.
* @return The config as the particular type.
* @throws ClassCastException if the value of the config could not be casted to the type.
* @throws NullPointerException if the config was not found.
*/
public <T> T getRequiredConfigAs(String key, Class<T> clazz) {
return Objects.requireNonNull(getAs(key, clazz), "Required value for " + key + " was missing");
}

/**
* Gets all mappings for a set of keys. If no keys are specified, all mappings
* are returned.
Expand Down Expand Up @@ -152,15 +192,19 @@ public void clear() {
*
* @param yamlFile The String name of the YAML resource file in classpath or the path to a YAML file containing the mappings.
* @return A {@link Map} of String names to Objects of the mappings in the YAML file.
* @throws IOException if the resource could not be read.
*/
protected Map<String, Object> readYAML(String yamlFile) throws IOException {
if (yamlFile != null && yamlFile.length() > 0) {
log.info("Loading configuration file: {}", yamlFile);
protected Map<String, Object> readYAML(String yamlFile) {
if (yamlFile == null || yamlFile.isEmpty()) {
return new HashMap<>();
}
log.info("Loading configuration file: {}", yamlFile);
try {
InputStream is = this.getClass().getResourceAsStream("/" + yamlFile);
Reader reader = (is != null ? new InputStreamReader(is) : new FileReader(yamlFile));
return (Map<String, Object>) YAML.load(reader);
} catch (IOException ioe) {
log.error("Error loading configuration", ioe);
return new HashMap<>();
}
return new HashMap<>();
}
}
56 changes: 52 additions & 4 deletions src/main/java/com/yahoo/bullet/pubsub/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,59 @@
*/
package com.yahoo.bullet.pubsub;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import java.io.Serializable;

@Getter @Setter @AllArgsConstructor @NoArgsConstructor
@NoArgsConstructor
public class Metadata implements Serializable {
public enum Signal {
ACKNOWLEDGE,
COMPLETE
COMPLETE,
FAIL
}

private static final long serialVersionUID = 4234800234857923112L;

@Getter @Setter
private Signal signal;
private Serializable content;

// This is a Serializable object enforced through the constructor, getter and setter. Storing it as an Object so
// GSON can reify an instance.
private Object content;

/**
* Allows you to create an instance with a {@link com.yahoo.bullet.pubsub.Metadata.Signal} and a
* {@link Serializable} object.
*
* @param signal The signal to set.
* @param object The object that is the metadata.
*/
public Metadata(Signal signal, Serializable object) {
this.signal = signal;
this.content = object;
}

/**
* Set a serializable content for this metadata.
*
* @param content The content for this metadata.
*/
public void setContent(Serializable content) {
this.content = content;
}

/**
* Returns the {@link Serializable} content in this metadata.
*
* @return The serializable content or null.
*/
public Serializable getContent() {
return (Serializable) content;

}

/**
* Check if Metadata has content.
Expand All @@ -38,4 +76,14 @@ public boolean hasContent() {
public boolean hasSignal() {
return signal != null;
}

/**
* Check if Metadata has the given signal.
*
* @param signal The signal to check against.
* @return true if message has {@link Metadata#signal}
*/
public boolean hasSignal(Signal signal) {
return hasSignal() && this.signal == signal;
}
}
3 changes: 1 addition & 2 deletions src/main/java/com/yahoo/bullet/pubsub/PubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Objects;

/**
* Notation: Partition is a unit of parallelism in the Pub/Sub queue.
Expand Down Expand Up @@ -113,7 +112,7 @@ public static PubSub from(BulletConfig config) throws PubSubException {
*/
public <T> T getRequiredConfig(Class<T> clazz, String name) throws PubSubException {
try {
return clazz.cast(Objects.requireNonNull(config.get(name)));
return config.getRequiredConfigAs(name, clazz);
} catch (Exception e) {
throw PubSubException.forArgument(name, e);
}
Expand Down
53 changes: 50 additions & 3 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.pubsub.Metadata.Signal;
import com.yahoo.bullet.result.JSONFormatter;
import lombok.Getter;

import java.io.Serializable;
Expand All @@ -17,14 +18,21 @@
* emitted by Bullet.
*/
@Getter
public class PubSubMessage implements Serializable {
public class PubSubMessage implements Serializable, JSONFormatter {
private static final long serialVersionUID = 2407848310969237888L;

private String id;
private int sequence;
private String content;
private Metadata metadata;

/**
* Constructor for a message having no information. Used internally. Not recommended for use.
*/
public PubSubMessage() {
this("", null);
}

/**
* Constructor for a message having only content.
*
Expand Down Expand Up @@ -62,14 +70,14 @@ public PubSubMessage(String id, String content, Metadata metadata) {
*
* @param id The ID associated with the message.
* @param content The content of the message.
* @param signal The Metadata.Signal to be sent with the message.
* @param signal The Signal to be sent with the message.
*/
public PubSubMessage(String id, String content, Signal signal) {
this(id, content, signal, -1);
}

/**
* Constructor for a message having content, a {@link Metadata.Signal} and a sequence number.
* Constructor for a message having content, a {@link Signal} and a sequence number.
*
* @param id The ID associated with the message.
* @param content The content of the message.
Expand Down Expand Up @@ -113,6 +121,25 @@ public boolean hasMetadata() {
return metadata != null;
}

/**
* Check if message has a given {@link Signal}.
*
* @param signal The signal to check for.
* @return true if message has the given signal.
*/
public boolean hasSignal(Signal signal) {
return hasMetadata() && metadata.hasSignal(signal);
}

/**
* Check if the message has a {@link Signal}.
*
* @return true if message has a signal.
*/
public boolean hasSignal() {
return hasMetadata() && metadata.hasSignal();
}

@Override
public int hashCode() {
return (id + sequence).hashCode();
Expand All @@ -126,4 +153,24 @@ public boolean equals(Object other) {
PubSubMessage otherMessage = (PubSubMessage) other;
return id.equals(otherMessage.getId()) && sequence == otherMessage.getSequence();
}

@Override
public String toString() {
return asJSON();
}

@Override
public String asJSON() {
return JSONFormatter.asJSON(this);
}

/**
* Converts a json representation back to an instance.
*
* @param json The string representation of the JSON.
* @return An instance of this class.
*/
public static PubSubMessage fromJSON(String json) {
return JSONFormatter.fromJSON(json, PubSubMessage.class);
}
}
12 changes: 12 additions & 0 deletions src/main/java/com/yahoo/bullet/result/JSONFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ static String asJSON(Object object) {
return GSON.toJson(object);
}

/**
* Returns a deserialized object from JSON using {@link JSONFormatter#GSON}.
*
* @param json The String json that represents the object.
* @param clazz The class of the object.
* @param <T> The type of the object. It must implement {@link JSONFormatter}.
* @return An instance of the object deserialized from JSON.
*/
static <T extends JSONFormatter> T fromJSON(String json, Class<T> clazz) {
return GSON.fromJson(json, clazz);
}

/**
* Convert this object to a JSON string.
* @return The JSON representation of this.
Expand Down
Loading