Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/main/java/com/yahoo/bullet/RandomPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet;

import java.util.List;
import java.util.Random;

public class RandomPool<T> {
private List<T> items;

private static final Random RANDOM = new Random();

/**
* Constructor for the RandomPool that takes a list of items.
* @param items A list of items to form the pool with.
*/
public RandomPool(List<T> items) {
this.items = items;
}

/**
* Get a random item from the pool.
*
* @return a randomly chosen item from the pool.
*/
public T get() {
if (items == null || items.isEmpty()) {
return null;
}
return items.get(RANDOM.nextInt(items.size()));
}

/**
* Clear the RandomPool. Gets now return null.
*/
public void clear() {
items = null;
}

@Override
public String toString() {
return items == null ? null : items.toString();
}

@Override
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (object == null) {
return false;
}
if (!(object instanceof RandomPool)) {
return false;
}
RandomPool asPool = (RandomPool) object;
return items == null ? asPool.items == null : items.equals(asPool.items);
}

@Override
public int hashCode() {
// Any number would do since we want RandomPools of null to be equal to each other.
return items == null ? 42 : items.hashCode();
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/Metadata.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

import lombok.AllArgsConstructor;
Expand Down
49 changes: 41 additions & 8 deletions src/main/java/com/yahoo/bullet/pubsub/PubSub.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.BulletConfig;

import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.List;
import java.util.Objects;

/**
* Notation: Partition is a unit of parallelism in the Pub/Sub queue.
*
* Implementations of PubSub should take in a {@link BulletConfig} and use the information to wire up and return
* Publishers and Subscribers.
*/
public abstract class PubSub implements Serializable {
public abstract class PubSub {
/**
* The context determines how the {@link Publisher} and {@link Subscriber} returned by PubSub behave. For example,
* If the Context is {@link Context#QUERY_SUBMISSION}:
Expand All @@ -28,49 +33,60 @@ public enum Context {
}

protected Context context;
protected BulletConfig config;

/**
* Instantiate a PubSub using parameters from {@link BulletConfig}.
*
* @param config The {@link BulletConfig} containing all required PubSub parameters.
* @throws PubSubException if the context name is not present or cannot be parsed.
*/
public PubSub(BulletConfig config) {
context = Context.valueOf(config.get(BulletConfig.PUBSUB_CONTEXT_NAME).toString());
public PubSub(BulletConfig config) throws PubSubException {
this.config = config;
try {
this.context = Context.valueOf(getRequiredConfig(String.class, BulletConfig.PUBSUB_CONTEXT_NAME));
} catch (RuntimeException e) {
throw new PubSubException("Cannot create PubSub", e);
}
}

/**
* Get a {@link Publisher} instance wired to write to all allocated partitions in the appropriate queue (See
* {@link PubSub#context}).
*
* @return {@link Publisher} wired as required.
* @throws PubSubException if the Publisher could not be created.
*/
public abstract Publisher getPublisher();
public abstract Publisher getPublisher() throws PubSubException;

/**
* Get a list of n {@link Publisher} instances with the allocated partitions in the appropriate queue
* (See {@link PubSub#context}) split as evenly as possible among them.
*
* @param n The number of Publishers requested.
* @return The {@link List} of n Publishers wired as required.
* @throws PubSubException if Publishers could not be created.
*/
public abstract List<Publisher> getPublishers(int n);
public abstract List<Publisher> getPublishers(int n) throws PubSubException;

/**
* Get a {@link Subscriber} instance wired to read from all allocated partitions in the appropriate queue (See
* {@link PubSub#context}).
*
* @return {@link Subscriber} wired as required.
* @throws PubSubException if the Subscriber could not be created.
*/
public abstract Subscriber getSubscriber();
public abstract Subscriber getSubscriber() throws PubSubException;

/**
* Get a list of n {@link Subscriber} instances with allocated partitions from the appropriate queue
* (See {@link PubSub#context}) split as evenly as possible among them.
*
* @param n The number of Subscribers requested.
* @return The {@link List} of n Subscribers wired as required.
* @throws PubSubException if Subscribers could not be created.
*/
public abstract List<Subscriber> getSubscribers(int n);
public abstract List<Subscriber> getSubscribers(int n) throws PubSubException;

/**
* Create a PubSub instance using the class specified in the config file.
Expand All @@ -89,4 +105,21 @@ public static PubSub from(BulletConfig config) throws PubSubException {
throw new PubSubException("Cannot create PubSub instance.", e);
}
}

/**
* A method to get a required configuration of a particular type.
*
* @param name The name of the required configuration.
* @param tClass The class of the required configuration.
* @param <T> The type to cast the configuration to. Inferred from tClass.
* @return The extracted configuration of type T.
* @throws PubSubException if the configuration is missing or cannot be cast to type T.
*/
public <T> T getRequiredConfig(Class<T> tClass, String name) throws PubSubException {
try {
return (T) Objects.requireNonNull(config.get(name));
} catch (Exception e) {
throw PubSubException.forArgument(name, e);
}
}
}
17 changes: 17 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubException.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

/**
Expand All @@ -22,4 +27,16 @@ public PubSubException(String message) {
public PubSubException(String message, Throwable cause) {
super(message, cause);
}

/**
* Method to create a PubSubException when a required argument could not be read.
*
* @param name The name of the argument that could not be read.
* @param cause The optional {@link Throwable} that caused the exception.
* @return A PubSubException indicating failure to read a required argument.
*/
public static PubSubException forArgument(String name, Throwable cause) {
String message = "Could not read required argument: " + name;
return cause == null ? new PubSubException(message) : new PubSubException(message, cause);
}
}
7 changes: 7 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/PubSubMessage.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.pubsub.Metadata.Signal;
Expand All @@ -13,6 +18,8 @@
*/
@Getter
public class PubSubMessage implements Serializable {
private static final long serialVersionUID = 2407848310969237888L;

private String id;
private int sequence;
private String content;
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/Publisher.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

public interface Publisher {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/yahoo/bullet/pubsub/Subscriber.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

public interface Subscriber {
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
<property name="severity" value="error"/>
</module>

<!--Checks that all files have a copyright header. -->
<module name="Header">
<property name="headerFile" value="src/main/resources/copyright.txt"/>
<property name="ignoreLines" value="2"/>
<property name="fileExtensions" value="java"/>
</module>

<!-- Check each Java file for violations. -->
<module name="TreeWalker">

Expand Down
5 changes: 5 additions & 0 deletions src/main/resources/copyright.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
83 changes: 83 additions & 0 deletions src/test/java/com/yahoo/bullet/RandomPoolTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2016, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet;

import org.testng.Assert;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class RandomPoolTest {
@Test
public void testDefaultOverrides() {
RandomPool<String> poolA = new RandomPool<>(null);
RandomPool<String> poolB = new RandomPool<>(null);
Assert.assertTrue(poolA.equals(poolA));
Assert.assertEquals(poolA.hashCode(), poolA.hashCode());

Assert.assertFalse(poolA.equals(null));
Assert.assertFalse(poolA.equals("foo"));

Assert.assertTrue(poolA.equals(poolB));
Assert.assertEquals(poolA.hashCode(), poolB.hashCode());

poolA = new RandomPool<>(Collections.singletonList("foo"));
poolB = new RandomPool<>(Collections.singletonList("foo"));
Assert.assertTrue(poolA.equals(poolB));
Assert.assertEquals(poolA.hashCode(), poolB.hashCode());

poolB = new RandomPool<>(Collections.singletonList("bar"));
Assert.assertFalse(poolA.equals(poolB));

List<String> contents = Collections.singletonList("foo");
poolA = new RandomPool<>(contents);
poolB = new RandomPool<>(contents);
Assert.assertTrue(poolA.equals(poolB));
Assert.assertEquals(poolA.hashCode(), poolB.hashCode());
}

@Test
public void testToString() {
RandomPool<String> pool = new RandomPool<>(null);
Assert.assertNull(pool.toString());
pool = new RandomPool<>(Collections.singletonList("foo"));
Assert.assertEquals(pool.toString(), Collections.singletonList("foo").toString());
}

@Test
public void testEmptyCase() {
RandomPool<Integer> pool = new RandomPool<>(null);
Assert.assertNull(pool.get());
pool = new RandomPool<>(Collections.emptyList());
Assert.assertNull(pool.get());
}

@Test
public void testRandomGet() {
List<Integer> list = Arrays.asList(1, 3, 4);
Map<Integer, Integer> map = list.stream().collect(Collectors.toMap(Function.identity(), x -> 0));
RandomPool<Integer> pool = new RandomPool<>(list);
for (int i = 0; i < 1000; ++i) {
int item = pool.get();
map.put(item, map.get(item) + 1);
}
// That this is false is 1 - (2/3)^1000
Assert.assertTrue(map.values().stream().allMatch(v -> v > 0));
}

@Test
public void testGetReturnsNullAfterClear() {
List<Integer> list = Arrays.asList(1, 3, 4);
RandomPool<Integer> pool = new RandomPool<>(list);
pool.clear();
Assert.assertNull(pool.get());
}
}
5 changes: 5 additions & 0 deletions src/test/java/com/yahoo/bullet/pubsub/MetadataTest.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

import org.testng.Assert;
Expand Down
9 changes: 7 additions & 2 deletions src/test/java/com/yahoo/bullet/pubsub/MockPubSub.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright 2017, Yahoo Inc.
* Licensed under the terms of the Apache License, Version 2.0.
* See the LICENSE file associated with the project for terms.
*/
package com.yahoo.bullet.pubsub;

import com.yahoo.bullet.BulletConfig;
Expand All @@ -11,9 +16,9 @@ class MockPubSub extends PubSub {
public static final String MOCK_MESSAGE_NAME = "MOCK_MESSAGE";
private String mockMessage;

public MockPubSub(BulletConfig config) {
public MockPubSub(BulletConfig config) throws PubSubException {
super(config);
mockMessage = config.get(MOCK_MESSAGE_NAME).toString();
mockMessage = getRequiredConfig(String.class, MOCK_MESSAGE_NAME);
}

@Override
Expand Down
Loading