From 799df6c0aba94c20bef0eb2d975d3f5649b02f54 Mon Sep 17 00:00:00 2001 From: MalharJenkins Date: Thu, 19 Nov 2015 15:44:13 -0800 Subject: [PATCH] APEX-273 - Fix existing checkstyle violations in bufferserver module --- bufferserver/pom.xml | 3 +- .../bufferserver/auth/AuthManager.java | 2 +- .../bufferserver/client/AuthClient.java | 6 +- .../bufferserver/client/Controller.java | 1 - .../bufferserver/client/Subscriber.java | 15 +--- .../bufferserver/internal/DataList.java | 90 ++++++++++++------- .../bufferserver/internal/DataListener.java | 10 +-- .../bufferserver/internal/FastDataList.java | 12 +-- .../bufferserver/internal/LogicalNode.java | 29 +++--- .../bufferserver/internal/PhysicalNode.java | 3 +- .../bufferserver/packet/DataTuple.java | 8 +- .../bufferserver/packet/EmptyTuple.java | 10 +-- .../packet/GenericRequestTuple.java | 20 ++--- .../bufferserver/packet/MessageType.java | 17 +++- .../bufferserver/packet/PayloadTuple.java | 6 +- .../packet/PublishRequestTuple.java | 5 +- .../bufferserver/packet/RequestTuple.java | 6 +- .../packet/ResetRequestTuple.java | 5 +- .../bufferserver/packet/ResetWindowTuple.java | 6 +- .../packet/SubscribeRequestTuple.java | 59 +++++------- .../bufferserver/packet/Tuple.java | 15 ++-- .../bufferserver/packet/WindowIdTuple.java | 8 +- .../bufferserver/policy/AbstractPolicy.java | 12 +-- .../bufferserver/policy/GiveAll.java | 2 +- .../bufferserver/policy/LeastBusy.java | 14 +-- .../bufferserver/policy/Policy.java | 10 +-- .../bufferserver/policy/RandomOne.java | 12 +-- .../bufferserver/policy/RoundRobin.java | 18 ++-- .../bufferserver/server/Server.java | 44 +++++---- .../bufferserver/storage/DiskStorage.java | 73 ++++++--------- .../bufferserver/util/SerializedData.java | 4 +- .../datatorrent/bufferserver/util/System.java | 12 ++- .../datatorrent/bufferserver/util/VarInt.java | 9 +- .../bufferserver/client/SubscriberTest.java | 39 ++++---- .../packet/NoMessageTupleTest.java | 23 +---- .../packet/PublishRequestTupleTest.java | 10 +-- .../packet/ResetWindowTupleTest.java | 10 +-- .../packet/SubscribeRequestTupleTest.java | 20 +++-- .../bufferserver/server/ServerTest.java | 67 +++++++------- .../bufferserver/storage/DiskStorageTest.java | 11 +-- .../bufferserver/support/Subscriber.java | 9 +- .../bufferserver/util/CodecTest.java | 4 +- 42 files changed, 346 insertions(+), 393 deletions(-) diff --git a/bufferserver/pom.xml b/bufferserver/pom.xml index f6fc8b30aa..efc5b5e896 100644 --- a/bufferserver/pom.xml +++ b/bufferserver/pom.xml @@ -48,10 +48,9 @@ --> - org.apache.maven.plugins maven-checkstyle-plugin - 228 + true diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java index 453befaa18..942a896b1f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java @@ -27,7 +27,7 @@ */ public class AuthManager { - private final static int BUFFER_SERVER_TOKEN_LENGTH = 20; + private static final int BUFFER_SERVER_TOKEN_LENGTH = 20; private static SecureRandom generator = new SecureRandom(); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java index fc105b2202..4465143e59 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java @@ -45,7 +45,8 @@ public AuthClient(byte[] readbuffer, int position, int sendBufferSize) super(readbuffer, position, sendBufferSize); } - protected void sendAuthenticate() { + protected void sendAuthenticate() + { if (token != null) { write(token); } @@ -65,7 +66,8 @@ protected void authenticateMessage(byte[] buffer, int offset, int size) } } if (!authenticated) { - throw new AccessControlException("Buffer server security is enabled. Access is restricted without proper credentials."); + throw new AccessControlException("Buffer server security is enabled." + + " Access is restricted without proper credentials."); } } } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java index d2faf696da..0c3bac9dd0 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java @@ -26,7 +26,6 @@ import com.datatorrent.bufferserver.packet.ResetRequestTuple; import com.datatorrent.bufferserver.packet.Tuple; import com.datatorrent.bufferserver.util.Codec; -import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.util.Slice; /** diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java index df91f0b73c..2b18d04eec 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java @@ -23,7 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.bufferserver.packet.SubscribeRequestTuple; +import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest; /** * @@ -46,18 +46,11 @@ public Subscriber(String id) this.id = id; } - public void activate(String version, String type, String sourceId, int mask, Collection partitions, long windowId, int bufferSize) + public void activate(final String version, final String type, final String sourceId, final int mask, + final Collection partitions, final long windowId, final int bufferSize) { sendAuthenticate(); - write(SubscribeRequestTuple.getSerializedRequest( - version, - id, - type, - sourceId, - mask, - partitions, - windowId, - bufferSize)); + write(getSerializedRequest(version, id, type, sourceId, mask, partitions, windowId, bufferSize)); } @Override diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 1f6c27359f..fa20aa296d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -90,8 +90,8 @@ public DataList(final String identifier, final int blockSize, final int numberOf public DataList(String identifier) { /* - * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem. - * we will use default value of 8 block sizes to be cached in memory + * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block + * at a time to the filesystem. We will use default value of 8 block sizes to be cached in memory */ this(identifier, 64 * 1024 * 1024, 8); } @@ -104,7 +104,8 @@ public int getBlockSize() public void rewind(final int baseSeconds, final int windowId) throws IOException { final long longWindowId = (long)baseSeconds << 32 | windowId; - logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), Codec.getStringWindowId(longWindowId)); + logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), + Codec.getStringWindowId(longWindowId)); int numberOfInMemBlockRewound = 0; synchronized (this) { @@ -139,13 +140,16 @@ public void rewind(final int baseSeconds, final int windowId) throws IOException } /* - TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last block. - */ + * TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last + * block. + */ final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound); - assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; resumeSuspendedClients(numberOfInMemBlockPermits); - logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after rewinding {}. ", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this); + logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after" + + " rewinding {}.", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this); } @@ -177,11 +181,13 @@ public void reset() public void purge(final int baseSeconds, final int windowId) { final long longWindowId = (long)baseSeconds << 32 | windowId; - logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), Codec.getStringWindowId(longWindowId)); + logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), + Codec.getStringWindowId(longWindowId)); int numberOfInMemBlockPurged = 0; synchronized (this) { - for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; prev = temp, temp = temp.next) { + for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; + prev = temp, temp = temp.next) { if (temp.ending_window > longWindowId || temp == last) { if (prev != null) { first = temp; @@ -204,9 +210,11 @@ public void purge(final int baseSeconds, final int windowId) } final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged); - assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; resumeSuspendedClients(numberOfInMemBlockPermits); - logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", numberOfInMemBlockPurged, numberOfInMemBlockPermits, this); + logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", + numberOfInMemBlockPurged, numberOfInMemBlockPermits, this); } @@ -220,7 +228,8 @@ public String getIdentifier() public void flush(final int writeOffset) { - //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, nextOffset.integer, writeOffset); + //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, + // nextOffset.integer, writeOffset); flush: do { while (size == 0) { @@ -427,7 +436,8 @@ public boolean resumeSuspendedClients(final int numberOfInMemBlockPermits) suspendedClients.clear(); } } else { - logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners); + logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, + numberOfInMemBlockPermits, all_listeners); } return resumedSuspendedClients; } @@ -608,10 +618,11 @@ public long rewind(long windowId) try (DataListIterator dli = getIterator(this)) { done: while (dli.hasNext()) { - SerializedData sd = dli.next(); + final SerializedData sd = dli.next(); + final int length = sd.length - sd.dataOffset + sd.offset; switch (sd.buffer[sd.dataOffset]) { case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + final ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); bs = (long)rwt.getBaseSeconds() << 32; if (bs > windowId) { writingOffset = sd.offset; @@ -620,7 +631,7 @@ public long rewind(long windowId) break; case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); if ((bs | bwt.getWindowId()) >= windowId) { writingOffset = sd.offset; break done; @@ -649,8 +660,9 @@ public long rewind(long windowId) public void purge(long longWindowId) { -// logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}", -// new Object[] {VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId), VarInt.getStringWindowId(ending_window)}); + //logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}", + // VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId), + // VarInt.getStringWindowId(ending_window)); boolean found = false; long bs = starting_window & 0xffffffff00000000L; SerializedData lastReset = null; @@ -659,20 +671,22 @@ public void purge(long longWindowId) done: while (dli.hasNext()) { SerializedData sd = dli.next(); + final int length = sd.length - sd.dataOffset + sd.offset; switch (sd.buffer[sd.dataOffset]) { case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); bs = (long)rwt.getBaseSeconds() << 32; lastReset = sd; break; case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); if ((bs | bwt.getWindowId()) > longWindowId) { found = true; if (lastReset != null) { /* - * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple. + * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of + * the reset tuple. */ if (sd.offset >= lastReset.length) { sd.offset -= lastReset.length; @@ -702,7 +716,8 @@ public void purge(long longWindowId) * It helps with better utilization of the RAM. */ if (!found) { - //logger.debug("we could not find a tuple which is in a window later than the window to be purged, so this has to be the last window published so far"); + //logger.debug("we could not find a tuple which is in a window later than the window to be purged, " + + // "so this has to be the last window published so far"); if (lastReset != null && lastReset.offset != 0) { this.readingOffset = this.writingOffset - lastReset.length; System.arraycopy(lastReset.buffer, lastReset.offset, this.data, this.readingOffset, lastReset.length); @@ -797,7 +812,8 @@ protected void acquire(boolean wait) } } - private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset, final Storage storage) + private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset, + final Storage storage) { return new Runnable() { @@ -819,7 +835,8 @@ public void run() logger.debug("Keeping Block {} unchanged", Block.this); } } - assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; resumeSuspendedClients(numberOfInMemBlockPermits); } } @@ -884,11 +901,14 @@ protected void discard(final boolean wait) @Override public String toString() { - return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length) - + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset - + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window) - + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) - + ", future=" + (future == null ? "null" : future.isDone() ? "Done" : future.isCancelled() ? "Cancelled" : future) + '}'; + final String future = this.future == null ? "null" : this.future.isDone() ? "Done" : + this.future.isCancelled() ? "Cancelled" : this.future.toString(); + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + + ", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset + + ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) + + ", ending_window=" + Codec.getStringWindowId(ending_window) + ", refCount=" + refCount.get() + + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) + + ", future=" + future + '}'; } } @@ -968,8 +988,10 @@ public boolean hasNext() if (nextOffset.integer + size <= da.writingOffset) { current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset); current.dataOffset = nextOffset.integer; - //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) { - // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset); + //final byte messageType = buffer[current.dataOffset]; + //if (messageType == MessageType.BEGIN_WINDOW_VALUE || messageType == MessageType.END_WINDOW_VALUE) { + // final int length = current.length - current.dataOffset + current.offset; + // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, length); // logger.debug("next t = {}", t); //} return true; @@ -993,9 +1015,9 @@ public SerializedData next() } /** - * Removes from the underlying collection the last element returned by the iterator (optional operation). This method can be called only once per call to - * next. The behavior of an iterator is unspecified if the underlying collection is modified while the iteration is in progress in any way other than by - * calling this method. + * Removes from the underlying collection the last element returned by the iterator (optional operation). This + * method can be called only once per call to next. The behavior of an iterator is unspecified if the underlying + * collection is modified while the iteration is in progress in any way other than by calling this method. */ @Override public void remove() diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java index 4add0080b4..a6a1fab093 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java @@ -18,10 +18,10 @@ */ package com.datatorrent.bufferserver.internal; -import com.datatorrent.bufferserver.util.BitVector; - import java.util.Collection; +import com.datatorrent.bufferserver.util.BitVector; + /** * * Waits for data to be added to the buffer server and then acts on it

@@ -32,17 +32,17 @@ */ public interface DataListener { - public static final BitVector NULL_PARTITION = new BitVector(0, 0); + BitVector NULL_PARTITION = new BitVector(0, 0); /** */ - public boolean addedData(); + boolean addedData(); /** * * @param partitions * @return int */ - public int getPartitions(Collection partitions); + int getPartitions(Collection partitions); } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java index 6ba7b64cb3..af47f23c23 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java @@ -54,8 +54,7 @@ public void flush(final int writeOffset) size = last.data[processingOffset]; size |= (last.data[processingOffset + 1] << 8); // logger.debug("read item = {} of size = {} at offset = {}", item++, size, processingOffset); - } - else { + } else { if (writeOffset == last.data.length) { processingOffset = 0; size = 0; @@ -73,8 +72,7 @@ public void flush(final int writeOffset) if (last.starting_window == -1) { last.starting_window = baseSeconds | btw.getWindowId(); last.ending_window = last.starting_window; - } - else { + } else { last.ending_window = baseSeconds | btw.getWindowId(); } break; @@ -83,11 +81,13 @@ public void flush(final int writeOffset) Tuple rwt = Tuple.getTuple(last.data, processingOffset, size); baseSeconds = (long)rwt.getBaseSeconds() << 32; break; + + default: + break; } processingOffset += size; size = 0; - } - else { + } else { if (writeOffset == last.data.length) { processingOffset = 0; size = 0; diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index f867d694a5..9856829890 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -157,7 +157,8 @@ public boolean isReady() public void catchUp() { long lBaseSeconds = (long)iterator.getBaseSeconds() << 32; - logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds)); + logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), + Codec.getStringWindowId(lBaseSeconds)); if (lBaseSeconds > baseSeconds) { baseSeconds = lBaseSeconds; } @@ -193,13 +194,8 @@ public void catchUp() case MessageType.BEGIN_WINDOW_VALUE: tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); - logger.debug("{}->{} condition {} =? {}", - new Object[] { - upstream, - group, - Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), - Codec.getStringWindowId(skipWindowId) - }); + logger.debug("{}->{} condition {} =? {}", upstream, group, + Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), Codec.getStringWindowId(skipWindowId)); if ((baseSeconds | tuple.getWindowId()) > skipWindowId) { logger.debug("caught up {}->{} skipping {} payload tuples", upstream, group, skippedPayloadTuples); ready = GiveAll.getInstance().distribute(physicalNodes, data); @@ -212,10 +208,12 @@ public void catchUp() case MessageType.CODEC_STATE_VALUE: case MessageType.END_STREAM_VALUE: ready = GiveAll.getInstance().distribute(physicalNodes, data); - logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes); + logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), + physicalNodes); break; default: - logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes); + logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), + physicalNodes); } } } catch (InterruptedException ie) { @@ -252,7 +250,8 @@ public boolean addedData() break; case MessageType.RESET_WINDOW_VALUE: - Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); + final int length = data.length - data.dataOffset + data.offset; + Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, length); baseSeconds = (long)resetWindow.getBaseSeconds() << 32; ready = GiveAll.getInstance().distribute(physicalNodes, data); break; @@ -266,9 +265,10 @@ public boolean addedData() } else { while (ready && iterator.hasNext()) { SerializedData data = iterator.next(); + final int length = data.length - data.dataOffset + data.offset; switch (data.buffer[data.dataOffset]) { case MessageType.PAYLOAD_VALUE: - Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); + Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, length); int value = tuple.getPartition(); for (BitVector bv : partitions) { if (bv.matches(value)) { @@ -283,7 +283,7 @@ public boolean addedData() break; case MessageType.RESET_WINDOW_VALUE: - tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); + tuple = Tuple.getTuple(data.buffer, data.dataOffset, length); baseSeconds = (long)tuple.getBaseSeconds() << 32; ready = GiveAll.getInstance().distribute(physicalNodes, data); break; @@ -344,7 +344,8 @@ public void boot(EventLoop eventloop) @Override public String toString() { - return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}'; + return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + + ", iterator=" + iterator + '}'; } private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java index 880d444836..424a51a761 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java @@ -79,8 +79,7 @@ public boolean send(SerializedData d) if (client.write(d.buffer, d.offset, d.length)) { return true; } - } - else { + } else { if (client.send(d.buffer, d.offset, d.length)) { return true; } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java index 3e7f23f894..cb1ad5fe1c 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java @@ -35,13 +35,13 @@ public DataTuple(byte[] array, int offset, int index) @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override @@ -53,13 +53,13 @@ public Slice getData() @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public static byte[] getSerializedTuple(byte type, Slice f) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java index f034f04838..3c3f184f04 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java @@ -41,31 +41,31 @@ public MessageType getType() @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public static byte[] getSerializedTuple(byte value) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java index a8153341b0..ea49077146 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java @@ -25,8 +25,6 @@ import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.netlet.util.VarInt; -import static com.datatorrent.bufferserver.packet.Tuple.CLASSIC_VERSION; -import static com.datatorrent.bufferserver.packet.Tuple.writeString; /** *

GenericRequestTuple class.

@@ -70,12 +68,10 @@ public void parse() } version = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { version = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -87,12 +83,10 @@ else if (idlen == 0) { } identifier = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { identifier = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } @@ -105,8 +99,7 @@ else if (idlen == 0) { } valid = true; - } - catch (NumberFormatException nfe) { + } catch (NumberFormatException nfe) { logger.warn("Unparseable Tuple", nfe); } } @@ -166,7 +159,8 @@ public static byte[] getSerializedRequest(String version, String identifier, lon @Override public String toString() { - return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + '}'; + return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + + Codec.getStringWindowId((long)baseSeconds | windowId) + '}'; } private static final Logger logger = LoggerFactory.getLogger(GenericRequestTuple.class); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java index 02102da969..3c0ec2cb5b 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java @@ -25,7 +25,20 @@ */ public enum MessageType { - NO_MESSAGE(0), PAYLOAD(1), RESET_WINDOW(2), BEGIN_WINDOW(3), END_WINDOW(4), END_STREAM(5), PUBLISHER_REQUEST(6), SUBSCRIBER_REQUEST(7), PURGE_REQUEST(8), RESET_REQUEST(9), CHECKPOINT(10), CODEC_STATE(11), NO_MESSAGE_ODD(127); + NO_MESSAGE(0), + PAYLOAD(1), + RESET_WINDOW(2), + BEGIN_WINDOW(3), + END_WINDOW(4), + END_STREAM(5), + PUBLISHER_REQUEST(6), + SUBSCRIBER_REQUEST(7), + PURGE_REQUEST(8), + RESET_REQUEST(9), + CHECKPOINT(10), + CODEC_STATE(11), + NO_MESSAGE_ODD(127); + public static final byte NO_MESSAGE_VALUE = 0; public static final byte PAYLOAD_VALUE = 1; public static final byte RESET_WINDOW_VALUE = 2; @@ -81,7 +94,7 @@ public static MessageType valueOf(byte value) private final int value; - private MessageType(int value) + MessageType(int value) { this.value = value; } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java index e757097520..256fc05c71 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java @@ -51,7 +51,7 @@ public int getPartition() @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override @@ -69,13 +69,13 @@ public String toString() @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public static byte[] getSerializedTuple(int partition, int size) diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java index bead7f3bee..6a9ba39e5d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java @@ -30,9 +30,10 @@ public PublishRequestTuple(byte[] array, int offset, int len) super(array, offset, len); } - public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId) + public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId) { - return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.PUBLISHER_REQUEST_VALUE); + return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, + MessageType.PUBLISHER_REQUEST_VALUE); } } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java index 9fe7859037..53505b4e4d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java @@ -43,19 +43,19 @@ public boolean isValid() @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public abstract void parse(); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java index 66fbfcd51c..17ca5857c6 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java @@ -30,9 +30,10 @@ public ResetRequestTuple(byte[] array, int offset, int length) super(array, offset, length); } - public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId) + public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId) { - return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.RESET_REQUEST_VALUE); + return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, + MessageType.RESET_REQUEST_VALUE); } } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java index abeceb3b36..60454161d7 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java @@ -43,19 +43,19 @@ public MessageType getType() @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java index 416cee9c1e..b63487b93d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java @@ -61,12 +61,10 @@ public void parse() } version = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { version = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -78,12 +76,10 @@ else if (idlen == 0) { } identifier = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { identifier = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } @@ -103,12 +99,10 @@ else if (idlen == 0) { } streamType = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { streamType = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -120,12 +114,10 @@ else if (idlen == 0) { } upstreamIdentifier = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { upstreamIdentifier = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -139,8 +131,7 @@ else if (idlen == 0) { if (mask > 0) { while (buffer[dataOffset++] < 0) { } - } - else { + } else { /* mask cannot be zero */ return; } @@ -149,8 +140,7 @@ else if (idlen == 0) { partitions[i] = readVarInt(dataOffset, limit); if (partitions[i] == -1) { return; - } - else { + } else { while (buffer[dataOffset++] < 0) { } } @@ -165,8 +155,7 @@ else if (idlen == 0) { } valid = true; - } - catch (NumberFormatException nfe) { + } catch (NumberFormatException nfe) { logger.warn("Unparseable Tuple", nfe); } } @@ -246,15 +235,9 @@ public int getBufferSize() return bufferSize; } - public static byte[] getSerializedRequest( - String version, - String id, - String down_type, - String upstream_id, - int mask, - Collection partitions, - long startingWindowId, - int bufferSize) + public static byte[] getSerializedRequest(final String version, final String id, final String down_type, + final String upstream_id, final int mask, final Collection partitions, final long startingWindowId, + final int bufferSize) { byte[] array = new byte[4096]; int offset = 0; @@ -263,10 +246,7 @@ public static byte[] getSerializedRequest( array[offset++] = MessageType.SUBSCRIBER_REQUEST_VALUE; /* write the version */ - if (version == null) { - version = CLASSIC_VERSION; - } - offset = Tuple.writeString(version, array, offset); + offset = Tuple.writeString(version == null ? CLASSIC_VERSION : version, array, offset); /* write the identifier */ offset = Tuple.writeString(id, array, offset); @@ -288,8 +268,7 @@ public static byte[] getSerializedRequest( /* write the partitions */ if (partitions == null || partitions.isEmpty()) { offset = VarInt.write(0, array, offset); - } - else { + } else { offset = VarInt.write(partitions.size(), array, offset); offset = VarInt.write(mask, array, offset); for (int i : partitions) { @@ -306,7 +285,11 @@ public static byte[] getSerializedRequest( @Override public String toString() { - return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType + ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + ", bufferSize=" + bufferSize + '}'; + return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier + + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType + + ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask + + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + + ", bufferSize=" + bufferSize + '}'; } private static final Logger logger = LoggerFactory.getLogger(SubscribeRequestTuple.class); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java index 1c45cb2a92..408e8a2ee8 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java @@ -126,33 +126,28 @@ public int readVarInt(int offset, int limit) byte tmp = buffer[offset++]; if (tmp >= 0) { return tmp; - } - else if (offset < limit) { + } else if (offset < limit) { int integer = tmp & 0x7f; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 7; - } - else if (offset < limit) { + } else if (offset < limit) { integer |= (tmp & 0x7f) << 7; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 14; - } - else if (offset < limit) { + } else if (offset < limit) { integer |= (tmp & 0x7f) << 14; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 21; - } - else if (offset < limit) { + } else if (offset < limit) { integer |= (tmp & 0x7f) << 21; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 28; - } - else { + } else { throw new NumberFormatException("Invalid varint at location " + offset + " => " + Arrays.toString(Arrays.copyOfRange(buffer, offset, limit))); } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java index 07c2f853a5..014827c7e1 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java @@ -49,25 +49,25 @@ public MessageType getType() @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java index 25d374201e..77aa56febd 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java @@ -25,21 +25,13 @@ /** * - * The base class for specifying partition policies, implements interface {@link com.datatorrent.bufferserver.policy.Policy}

- *
+ * The base class for specifying partition policies * * @since 0.3.2 */ public class AbstractPolicy implements Policy { - - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ - + @Override public boolean distribute(Set nodes, SerializedData data) throws InterruptedException { throw new UnsupportedOperationException("Not supported yet."); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java index 88825ce994..0c2819da30 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java @@ -33,7 +33,7 @@ */ public class GiveAll extends AbstractPolicy { - final static GiveAll instance = new GiveAll(); + private static final GiveAll instance = new GiveAll(); /** * diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java index 88145fc416..c4143bedef 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java @@ -25,9 +25,8 @@ /** * - * Implements load balancing by sending the tuple to the least busy partition

- *
- * Basic load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}
+ * Implements load balancing by sending the tuple to the least busy partition. + * Basic load balancing policy. Extends the base class {@link AbstractPolicy}
* * @since 0.3.2 */ @@ -51,20 +50,13 @@ private LeastBusy() { } - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ @Override public boolean distribute(Set nodes, SerializedData data) throws InterruptedException { PhysicalNode theOne = null; for (PhysicalNode node: nodes) { - if (theOne == null - || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) { + if (theOne == null || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) { theOne = node; } } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java index 1393667c47..0080ce0586 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java @@ -33,13 +33,13 @@ public interface Policy { /** + * Distributes {@code data} to the set of {@code nodes} * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send + * @param nodes Set of downstream {@link PhysicalNode} + * @param data Opaque {@link SerializedData} to be send * @throws InterruptedException + * @return {@code true} if successful, otherwise {@code false} */ - - public boolean distribute(Set nodes, SerializedData data) throws InterruptedException; + boolean distribute(Set nodes, SerializedData data) throws InterruptedException; } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java index aebe450164..4f700a66d5 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java @@ -25,9 +25,8 @@ /** * - * Randomly distributes tuples to downstream nodes. A random load balancing policy

- *
- * A generic random load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}
+ * Randomly distributes tuples to downstream nodes. A random load balancing policy. + * A generic random load balancing policy. Extends the base class {@link AbstractPolicy} * * @since 0.3.2 */ @@ -51,13 +50,6 @@ private RandomOne() { } - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ - @Override public boolean distribute(Set nodes, SerializedData data) throws InterruptedException { diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java index 11858158fa..7291b7d134 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java @@ -25,9 +25,10 @@ /** * - * Distributes to downstream nodes in a roundrobin fashion. A round robin load balancing policy

+ * Distributes to downstream nodes in a round robin fashion. A round robin load balancing policy *
- * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}
+ * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends + * the base class {@link AbstractPolicy}
*
* * @since 0.3.2 @@ -44,18 +45,15 @@ public RoundRobin() index = 0; } - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ - @Override public boolean distribute(Set nodes, SerializedData data) throws InterruptedException { int size = nodes.size(); - if (size > 0) { // why do i need to do this check? synchronization issues? because if there is no one interested, the logical group should not exist! + /* + * why do i need to do this check? synchronization issues? because if there is no one interested, + * the logical group should not exist! + */ + if (size > 0) { index %= size; int count = index++; /* diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c4cdf5b077..03d96ee9c2 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -91,7 +91,10 @@ public Server(int port, int blocksize, int numberOfCacheBlocks) this.blockSize = blocksize; this.numberOfCacheBlocks = numberOfCacheBlocks; serverHelperExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("ServerHelper")); - storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(numberOfCacheBlocks), new NameableThreadFactory("StorageHelper"), new ThreadPoolExecutor.CallerRunsPolicy()); + final ArrayBlockingQueue workQueue = new ArrayBlockingQueue<>(numberOfCacheBlocks); + final NameableThreadFactory threadFactory = new NameableThreadFactory("StorageHelper"); + storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, + new ThreadPoolExecutor.CallerRunsPolicy()); } public void setSpoolStorage(Storage storage) @@ -168,8 +171,8 @@ public String toString() private final HashMap publisherBuffers = new HashMap(); private final ConcurrentHashMap subscriberGroups = new ConcurrentHashMap(); - private final ConcurrentHashMap publisherChannels = new ConcurrentHashMap(); - private final ConcurrentHashMap subscriberChannels = new ConcurrentHashMap(); + private final ConcurrentHashMap publisherChannels = new ConcurrentHashMap<>(); + private final ConcurrentHashMap subscriberChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -251,7 +254,9 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, Abstra dl = publisherBuffers.get(upstream_identifier); //logger.debug("old list = {}", dl); } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? + new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : + new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(upstream_identifier, dl); //logger.debug("new list = {}", dl); } @@ -305,7 +310,9 @@ public DataList handlePublisherRequest(PublishRequestTuple request, AbstractLeng throw new RuntimeException(ie); } } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks); + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? + new FastDataList(identifier, blockSize, numberOfCacheBlocks) : + new DataList(identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(identifier, dl); } dl.setSecondaryStorage(storage, storageHelperExecutor); @@ -439,9 +446,11 @@ public int readSize() // bufferSize = 16 * 1024; // } if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize); + subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), + subscriberRequest.getPartitions(), bufferSize); } else { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize) + subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), + subscriberRequest.getPartitions(), bufferSize) { @Override public int readSize() @@ -515,7 +524,8 @@ class Subscriber extends AbstractLengthPrependerClient @Override public void onMessage(byte[] buffer, int offset, int size) { - logger.warn("Received data when no data is expected: {}", Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size))); + logger.warn("Received data when no data is expected: {}", + Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size))); } @Override @@ -535,7 +545,8 @@ public void handleException(Exception cce, EventLoop el) @Override public String toString() { - return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}'; + return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}'; } private volatile boolean torndown; @@ -600,10 +611,12 @@ public void onMessage(byte[] buffer, int offset, int size) } /** - * Schedules a task to conditionally resume I/O channel read operations. No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} - * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. Otherwise, calls {@linkplain #read(int) read(0)} - * to process data left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} in the key - * {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. + * Schedules a task to conditionally resume I/O channel read operations. + * No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} + * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. + * Otherwise, calls {@linkplain #read(int) read(0)} to process data + * left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} + * in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. * @return true */ @Override @@ -770,8 +783,9 @@ private void teardown() */ /** - * since the publisher server died, the queue which it was using would stop pumping the data unless a new publisher comes up with the same name. We leave - * it to the stream to decide when to bring up a new node with the same identifier as the one which just died. + * since the publisher server died, the queue which it was using would stop pumping the data unless + * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node + * with the same identifier as the one which just died. */ if (publisherChannels.containsValue(this)) { final Iterator> i = publisherChannels.entrySet().iterator(); diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java index 3ddc77f469..02ba340183 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java @@ -66,8 +66,7 @@ public static String normalizeFileName(String name) Character c = name.charAt(i); if (Character.isLetterOrDigit(c)) { sb.append(c); - } - else { + } else { sb.append('-'); } } @@ -90,30 +89,25 @@ public int store(String identifier, byte[] bytes, int startingOffset, int ending synchronized (this) { lUniqueIdentifier = ++this.uniqueIdentifier; } + } else { + throw new IllegalStateException("Collision in identifier name, please ensure that the slug for " + + "the identifiers is different"); } - else { - throw new IllegalStateException("Collission in identifier name, please ensure that the slug for the identifiers is differents"); - } - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new IllegalStateException("Identity file is hijacked!"); } - } - else { + } else { if (directory.mkdir()) { File identity = new File(directory, "identity"); try { Files.write(identifier.getBytes(), identity); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new RuntimeException("directory " + directory.getAbsolutePath() + " could not be created!"); } @@ -122,8 +116,7 @@ public int store(String identifier, byte[] bytes, int startingOffset, int ending try { return writeFile(bytes, startingOffset, endingOffset, directory, lUniqueIdentifier); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } } @@ -144,24 +137,20 @@ public void discard(String identifier, int uniqueIdentifier) if (!deletionFile.delete()) { throw new RuntimeException("File " + deletionFile.getPath() + " could not be deleted!"); } - } - else { + } else { throw new RuntimeException("File " + deletionFile.getPath() + " either is non existent or not a file!"); } + } else { + throw new RuntimeException("Collision in the identifier name, please ensure that the slugs for " + + "the identifiers are different"); } - else { - throw new RuntimeException("Collission in identifier name, please ensure that the slug for the identifiers is differents"); - } - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new RuntimeException(identityFile + " is not a file!"); } - } - else { + } else { throw new RuntimeException("directory " + directory.getPath() + " does not exist!"); } } @@ -180,37 +169,31 @@ public byte[] retrieve(String identifier, int uniqueIdentifier) File filename = new File(directory, String.valueOf(uniqueIdentifier)); if (filename.exists() && filename.isFile()) { return Files.toByteArray(filename); - } - else { + } else { throw new RuntimeException("File " + filename.getPath() + " either is non existent or not a file!"); } + } else { + throw new RuntimeException("Collision in the identifier name," + + " please ensure that the slugs for the identifiers [" + identifier + "], and [" + new String(stored) + + "] are different."); } - else { - throw new RuntimeException("Collision in identifier name, please ensure that the slugs for the identifiers [" + identifier + "], and [" + new String(stored) + "] are different."); - } - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new RuntimeException(identityFile + " is not a file!"); } - } - else { + } else { throw new RuntimeException("directory " + directory.getPath() + " does not exist!"); } } - protected int writeFile(byte[] bytes, int startingOffset, int endingOffset, File directory, final int number) throws IOException + protected int writeFile(final byte[] bytes, final int startingOffset, final int endingOffset, final File directory, + final int number) throws IOException { - FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number))); - try { + try (FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number)))) { stream.write(bytes, startingOffset, endingOffset - startingOffset); } - finally { - stream.close(); - } return number; } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java index 16f443f021..0bc065e591 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java @@ -21,8 +21,8 @@ import com.datatorrent.netlet.util.Slice; /** - * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying array.

- * + * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying + * array.

* * Note: Multibyte accessors all use big-endian order. * diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java index 0b2b67ae6b..124cc5fa65 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java @@ -18,12 +18,12 @@ */ package com.datatorrent.bufferserver.util; -import com.datatorrent.netlet.DefaultEventLoop; -import com.datatorrent.netlet.EventLoop; - import java.io.IOException; import java.util.HashMap; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.EventLoop; + /** *

System class.

* @@ -40,8 +40,7 @@ public static void startup(String identifier) if (el == null) { try { eventloops.put(identifier, el = DefaultEventLoop.createEventLoop(identifier)); - } - catch (IOException io) { + } catch (IOException io) { throw new RuntimeException(io); } } @@ -55,8 +54,7 @@ public static void shutdown(String identifier) DefaultEventLoop el = eventloops.get(identifier); if (el == null) { throw new RuntimeException("System with " + identifier + " not setup!"); - } - else { + } else { el.stop(); } } diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java index 6f12cc4992..d8583fb376 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java @@ -38,18 +38,15 @@ public static void read(SerializedData current) int result = tmp & 0x7f; if ((tmp = data[offset++]) >= 0) { result |= tmp << 7; - } - else { + } else { result |= (tmp & 0x7f) << 7; if ((tmp = data[offset++]) >= 0) { result |= tmp << 14; - } - else { + } else { result |= (tmp & 0x7f) << 14; if ((tmp = data[offset++]) >= 0) { result |= tmp << 21; - } - else { + } else { result |= (tmp & 0x7f) << 21; result |= (tmp = data[offset++]) << 28; if (tmp < 0) { diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java index ee56e4d12c..234fb120d9 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java @@ -25,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; + import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -40,6 +40,9 @@ import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.netlet.DefaultEventLoop; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + /** * */ @@ -57,8 +60,7 @@ public static void setupServerAndClients() throws Exception try { eventloopServer = DefaultEventLoop.createEventLoop("server"); eventloopClient = DefaultEventLoop.createEventLoop("client"); - } - catch (IOException ioe) { + } catch (IOException ioe) { throw new RuntimeException(ioe); } eventloopServer.start(); @@ -66,7 +68,8 @@ public static void setupServerAndClients() throws Exception instance = new Server(0, 64, 2); address = instance.run(eventloopServer); - assert (address instanceof InetSocketAddress); + assertTrue(address instanceof InetSocketAddress); + assertFalse(address.isUnresolved()); } @AfterClass @@ -82,7 +85,7 @@ public static void teardownServerAndClients() public void test() throws InterruptedException { final Publisher bsp1 = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp1); + eventloopClient.connect(address, bsp1); final Subscriber bss1 = new Subscriber("MySubscriber") { @@ -104,7 +107,7 @@ public String toString() } }; - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss1); + eventloopClient.connect(address, bss1); final int baseWindow = 0x7afebabe; bsp1.activate(null, baseWindow, 0); @@ -131,12 +134,9 @@ public void run() windowId++; Thread.sleep(5); } - } - catch (InterruptedException ex) { - } - catch (CancelledKeyException cke) { - } - finally { + } catch (InterruptedException | CancelledKeyException e) { + logger.debug("{}", e); + } finally { logger.debug("publisher the middle of window = {}", Codec.getStringWindowId(windowId)); } } @@ -158,7 +158,7 @@ public void run() * subscribe from 8 onwards. What we should see is that subscriber gets the new data from 8 onwards. */ final Publisher bsp2 = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp2); + eventloopClient.connect(address, bsp2); bsp2.activate(null, 0x7afebabe, 5); final Subscriber bss2 = new Subscriber("MyPublisher") @@ -175,7 +175,7 @@ public void beginWindow(int windowId) } }; - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss2); + eventloopClient.connect(address, bss2); bss2.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0x7afebabe00000008L, 0); @@ -200,12 +200,9 @@ public void run() windowId++; Thread.sleep(5); } - } - catch (InterruptedException ex) { - } - catch (CancelledKeyException cke) { - } - finally { + } catch (InterruptedException | CancelledKeyException e) { + logger.debug("", e); + } finally { logger.debug("publisher in the middle of window = {}", Codec.getStringWindowId(windowId)); } } @@ -221,7 +218,7 @@ public void run() eventloopClient.disconnect(bsp2); eventloopClient.disconnect(bss2); - Assert.assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get()); + assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get()); } } diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java index 5dc581c0da..04767efdc4 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java @@ -18,34 +18,17 @@ */ package com.datatorrent.bufferserver.packet; -import junit.framework.TestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.testng.annotations.Test; /** * */ -public class NoMessageTupleTest extends TestCase +public class NoMessageTupleTest { - public NoMessageTupleTest(String testName) - { - super(testName); - } - - @Override - protected void setUp() throws Exception - { - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - super.tearDown(); - } - @Test public void testSerDe() { @@ -54,7 +37,7 @@ public void testSerDe() byte[] serialized = NoMessageTuple.getSerializedTuple(); Tuple t = Tuple.getTuple(serialized, 0, serialized.length); - assert(t.getType() == MessageType.NO_MESSAGE); + assert t.getType() == MessageType.NO_MESSAGE; } private static final Logger logger = LoggerFactory.getLogger(NoMessageTupleTest.class); diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java index c84a127d93..0bfcf9b3d8 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/PublishRequestTupleTest.java @@ -18,11 +18,9 @@ */ package com.datatorrent.bufferserver.packet; -import org.testng.Assert; import org.testng.annotations.Test; -import com.datatorrent.bufferserver.packet.PublishRequestTuple; -import com.datatorrent.bufferserver.packet.Tuple; +import static org.testng.Assert.assertEquals; /** * @@ -41,8 +39,8 @@ public void testGetSerializedRequest() byte[] serial = PublishRequestTuple.getSerializedRequest(null, pubId, windowId); PublishRequestTuple request = (PublishRequestTuple)Tuple.getTuple(serial, 0, serial.length); - Assert.assertEquals(request.identifier, pubId, "Identifier"); - Assert.assertEquals(Long.toHexString((long)request.baseSeconds << 32 | request.windowId), Long.toHexString(windowId), "Window"); + assertEquals(request.identifier, pubId, "Identifier"); + assertEquals((long)request.baseSeconds << 32 | request.windowId, windowId, "Window"); } -} \ No newline at end of file +} diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java index f5176ec020..fcdf04b02a 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/ResetWindowTupleTest.java @@ -18,11 +18,9 @@ */ package com.datatorrent.bufferserver.packet; -import org.testng.Assert; import org.testng.annotations.Test; -import com.datatorrent.bufferserver.packet.ResetWindowTuple; -import com.datatorrent.bufferserver.packet.Tuple; +import static org.testng.Assert.assertEquals; /** * @@ -39,7 +37,7 @@ public void testGetSerializedTuple() byte[] serial = ResetWindowTuple.getSerializedTuple(0x7afebabe, 500); ResetWindowTuple tuple = (ResetWindowTuple)Tuple.getTuple(serial, 0, serial.length); - Assert.assertEquals(tuple.getBaseSeconds(), 0x7afebabe, "base seconds"); - Assert.assertEquals(tuple.getWindowWidth(), 500, "window width"); + assertEquals(tuple.getBaseSeconds(), 0x7afebabe, "base seconds"); + assertEquals(tuple.getWindowWidth(), 500, "window width"); } -} \ No newline at end of file +} diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java index df29222298..20c658a2f8 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/SubscribeRequestTupleTest.java @@ -19,9 +19,11 @@ package com.datatorrent.bufferserver.packet; import java.util.ArrayList; -import org.testng.Assert; import org.testng.annotations.Test; +import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; /** * @@ -42,17 +44,17 @@ public void testGetSerializedRequest() ArrayList partitions = new ArrayList(); partitions.add(5); long startingWindowId = 0xcafebabe00000078L; - byte[] serial = SubscribeRequestTuple.getSerializedRequest(null, id, down_type, upstream_id, mask, partitions, startingWindowId, 0); + byte[] serial = getSerializedRequest(null, id, down_type, upstream_id, mask, partitions, startingWindowId, 0); SubscribeRequestTuple tuple = (SubscribeRequestTuple)Tuple.getTuple(serial, 0, serial.length); - Assert.assertEquals(tuple.getIdentifier(), id, "Identifier"); - Assert.assertEquals(tuple.getStreamType(), down_type, "UpstreamType"); - Assert.assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId"); - Assert.assertEquals(tuple.getMask(), mask, "Mask"); + assertEquals(tuple.getIdentifier(), id, "Identifier"); + assertEquals(tuple.getStreamType(), down_type, "UpstreamType"); + assertEquals(tuple.getUpstreamIdentifier(), upstream_id, "UpstreamId"); + assertEquals(tuple.getMask(), mask, "Mask"); int[] parts = tuple.getPartitions(); - Assert.assertTrue(parts != null && parts.length == 1 && parts[0] == 5); + assertTrue(parts != null && parts.length == 1 && parts[0] == 5); - Assert.assertEquals(Long.toHexString((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId()), Long.toHexString(startingWindowId), "Window"); + assertEquals((long)tuple.getBaseSeconds() << 32 | tuple.getWindowId(), startingWindowId, "Window"); } -} \ No newline at end of file +} diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java index 5ae3d6d4a0..b7d8de1e03 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/server/ServerTest.java @@ -24,7 +24,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -39,7 +38,9 @@ import com.datatorrent.netlet.DefaultEventLoop; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; /** * @@ -63,8 +64,7 @@ public static void setupServerAndClients() throws Exception try { eventloopServer = DefaultEventLoop.createEventLoop("server"); eventloopClient = DefaultEventLoop.createEventLoop("client"); - } - catch (IOException ioe) { + } catch (IOException ioe) { throw new RuntimeException(ioe); } eventloopServer.start(); @@ -72,7 +72,8 @@ public static void setupServerAndClients() throws Exception instance = new Server(0, 4096,8); address = instance.run(eventloopServer); - assert (address instanceof InetSocketAddress); + assertTrue(address instanceof InetSocketAddress); + assertFalse(address.isUnresolved()); SecureRandom random = new SecureRandom(); authToken = new byte[20]; @@ -90,10 +91,10 @@ public static void teardownServerAndClients() public void testNoPublishNoSubscribe() throws InterruptedException { bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); @@ -113,10 +114,10 @@ public void testNoPublishNoSubscribe() throws InterruptedException public void test1Window() throws InterruptedException { bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); @@ -139,7 +140,7 @@ public void test1Window() throws InterruptedException eventloopClient.disconnect(bss); eventloopClient.disconnect(bsp); - Assert.assertFalse(bss.resetPayloads.isEmpty()); + assertFalse(bss.resetPayloads.isEmpty()); } @Test(dependsOnMethods = {"test1Window"}) @@ -147,7 +148,7 @@ public void test1Window() throws InterruptedException public void testLateSubscriber() throws InterruptedException { bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); @@ -162,7 +163,7 @@ public void testLateSubscriber() throws InterruptedException eventloopClient.disconnect(bss); assertEquals(bss.tupleCount.get(), 1); - Assert.assertFalse(bss.resetPayloads.isEmpty()); + assertFalse(bss.resetPayloads.isEmpty()); } @Test(dependsOnMethods = {"testLateSubscriber"}) @@ -170,11 +171,11 @@ public void testLateSubscriber() throws InterruptedException public void testATonOfData() throws InterruptedException { bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bsp.activate(null, 0x7afebabe, 0); long windowId = 0x7afebabe00000000L; @@ -221,7 +222,7 @@ public void testPurgeNonExistent() throws InterruptedException { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.purge(null, "MyPublisher", 0); for (int i = 0; i < spinCount; i++) { @@ -235,7 +236,7 @@ public void testPurgeNonExistent() throws InterruptedException assertNotNull(bsc.data); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -253,7 +254,7 @@ public void testPurgeNonExistent() throws InterruptedException public void testPurgeSome() throws InterruptedException { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.purge(null, "MyPublisher", 0x7afebabe00000000L); for (int i = 0; i < spinCount; i++) { @@ -267,7 +268,7 @@ public void testPurgeSome() throws InterruptedException assertNotNull(bsc.data); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -284,7 +285,7 @@ public void testPurgeSome() throws InterruptedException public void testPurgeAll() throws InterruptedException { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.purge(null, "MyPublisher", 0x7afebabe00000001L); for (int i = 0; i < spinCount; i++) { @@ -298,7 +299,7 @@ public void testPurgeAll() throws InterruptedException assertNotNull(bsc.data); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { @@ -323,7 +324,7 @@ public void testRepublish() throws InterruptedException public void testRepublishLowerWindow() throws InterruptedException { bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bsp.activate(null, 10, 0); @@ -354,7 +355,7 @@ public void testRepublishLowerWindow() throws InterruptedException eventloopClient.disconnect(bsp); bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { @@ -375,7 +376,7 @@ public void testRepublishLowerWindow() throws InterruptedException public void testReset() throws InterruptedException { bsc = new Controller("MyController"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); bsc.reset(null, "MyPublisher", 0x7afebabe00000001L); for (int i = 0; i < spinCount * 2; i++) { @@ -389,7 +390,7 @@ public void testReset() throws InterruptedException assertNotNull(bsc.data); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); for (int i = 0; i < spinCount; i++) { @@ -421,13 +422,13 @@ public void testResetAgain() throws InterruptedException public void testEarlySubscriberForLaterWindow() throws InterruptedException { bss = new Subscriber("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 49L, 0); /* wait in a hope that the subscriber is able to reach the server */ Thread.sleep(100); bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bsp.activate(null, 0, 0); @@ -465,18 +466,18 @@ public void testAuth() throws InterruptedException bsp = new Publisher("MyPublisher"); bsp.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); bss.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); long resetInfo = 0x7afebabe000000faL; - bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500)); + bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int)(resetInfo >> 32), 500)); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -490,7 +491,7 @@ public void testAuth() throws InterruptedException eventloopClient.disconnect(bsp); assertEquals(bss.tupleCount.get(), 1); - Assert.assertFalse(bss.resetPayloads.isEmpty()); + assertFalse(bss.resetPayloads.isEmpty()); } @Test(dependsOnMethods = {"testAuth"}) @@ -501,18 +502,18 @@ public void testAuthFailure() throws InterruptedException bsp = new Publisher("MyPublisher"); bsp.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); bss.setToken(authToken); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsp.activate(null, 0L); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); long resetInfo = 0x7afebabe000000faL; - bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int) (resetInfo >> 32), 500)); + bsp.publishMessage(ResetWindowTuple.getSerializedTuple((int)(resetInfo >> 32), 500)); for (int i = 0; i < spinCount; i++) { Thread.sleep(10); @@ -526,7 +527,7 @@ public void testAuthFailure() throws InterruptedException eventloopClient.disconnect(bsp); assertEquals(bss.tupleCount.get(), 0); - Assert.assertTrue(bss.resetPayloads.isEmpty()); + assertTrue(bss.resetPayloads.isEmpty()); } private static final Logger logger = LoggerFactory.getLogger(ServerTest.class); diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java index 8a4297c275..755298a933 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/storage/DiskStorageTest.java @@ -35,6 +35,7 @@ import static java.lang.Thread.sleep; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; /** * @@ -63,16 +64,16 @@ public static void setupServerAndClients() throws Exception instance.setSpoolStorage(new DiskStorage()); address = instance.run(eventloopServer); - assert (address instanceof InetSocketAddress); + assertFalse(address.isUnresolved()); bsp = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp); + eventloopClient.connect(address, bsp); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bsc = new Controller("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsc); + eventloopClient.connect(address, bsc); } @AfterClass @@ -128,7 +129,7 @@ public void testStorage() throws InterruptedException assertEquals(bss.tupleCount.get(), 2004); bss = new Subscriber("MySubscriber"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss); + eventloopClient.connect(address, bss); bss.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0L, 0); diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java index 2e2ba18d95..3c0cb0e564 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/support/Subscriber.java @@ -34,7 +34,8 @@ public class Subscriber extends com.datatorrent.bufferserver.client.Subscriber { public final ArrayList resetPayloads = new ArrayList(); public AtomicInteger tupleCount = new AtomicInteger(0); - public WindowIdHolder firstPayload, lastPayload; + public WindowIdHolder firstPayload; + public WindowIdHolder lastPayload; public Subscriber(String id) { @@ -42,7 +43,8 @@ public Subscriber(String id) } @Override - public void activate(String version, String type, String sourceId, int mask, Collection partitions, long windowId, int bufferSize) + public void activate(final String version, final String type, final String sourceId, final int mask, + final Collection partitions, final long windowId, final int bufferSize) { tupleCount.set(0); firstPayload = lastPayload = null; @@ -67,6 +69,9 @@ public void onMessage(byte[] buffer, int offset, int size) case RESET_WINDOW: resetWindow(tuple.getBaseSeconds(), tuple.getWindowWidth()); break; + + default: + break; } } diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java index d3aebe78f5..0b2ab6d654 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/util/CodecTest.java @@ -22,8 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; /** * @@ -33,7 +33,7 @@ public class CodecTest @Test public void testSomeMethod() { - byte buffer[] = new byte[10]; + byte[] buffer = new byte[10]; int value = 127; VarInt.write(value, buffer, 0);