Skip to content
Browse files

Merge pull request #254 from Netflix/message-queue

Fix for #253
  • Loading branch information...
2 parents 6803700 + 1d8c7ff commit e240f6e3c6cd81f501ab1c3013dedd2d5b7a5b84 @elandau elandau committed Mar 27, 2013
View
4 ...assandra/src/main/java/com/netflix/astyanax/serializers/AnnotatedCompositeSerializer.java
@@ -128,8 +128,8 @@ public ByteBuffer toByteBuffer(T obj) {
}
if (cb.limit() + COMPONENT_OVERHEAD > bb.remaining()) {
- int exponent = (int) Math.ceil(Math.log((double) (cb.limit() + COMPONENT_OVERHEAD) / (double) bb.limit()) / Math.log(2));
- int newBufferSize = bb.limit() * (int) Math.pow(2, exponent);
+ int exponent = (int) Math.ceil(Math.log((double) (cb.limit() + COMPONENT_OVERHEAD + bb.limit())) / Math.log(2));
+ int newBufferSize = (int) Math.pow(2, exponent);
ByteBuffer temp = ByteBuffer.allocate(newBufferSize);
bb.flip();
temp.put(bb);
View
32 ...ndra/src/test/java/com/netflix/astyanax/serializers/AnnotatedCompositeSerializerTest.java
@@ -5,6 +5,7 @@
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.util.Date;
/**
* Created with IntelliJ IDEA.
@@ -22,15 +23,46 @@ public void testOverflow() {
Foo foo = new Foo();
foo.bar = Strings.repeat("b", 2000);
+ foo.bar1 = Strings.repeat("b", 2000);
+ foo.bar2 = Strings.repeat("b", 4192);
ByteBuffer byteBuffer = serializer.toByteBuffer(foo);
}
+ @Test
+ public void testOverflow2() {
+ AnnotatedCompositeSerializer<Foo2> serializer = new AnnotatedCompositeSerializer<Foo2>(
+ Foo2.class);
+
+ Foo2 foo = new Foo2();
+ foo.bar = Strings.repeat("b", 500);
+ foo.test = Strings.repeat("b", 12);
+
+ ByteBuffer byteBuffer = serializer.toByteBuffer(foo);
+ }
+
+ public static class Foo2 {
+ @Component(ordinal = 0)
+ private Date updateTimestamp;
+
+ @Component(ordinal = 1)
+ private String bar;
+
+ @Component(ordinal = 2)
+ private String test;
+ }
+
public static class Foo {
@Component(ordinal = 0)
private String bar;
+ @Component(ordinal = 0)
+ private String bar1;
+
+ @Component(ordinal = 0)
+ private String bar2;
+
}
}
View
3 astyanax-queue/src/main/java/com/netflix/astyanax/recipes/queue/MessageQueueEntry.java
@@ -148,7 +148,8 @@ public String toString() {
sb.append("MessageQueueEntry [");
sb.append( "type=" + MessageQueueEntryType.values()[type]);
sb.append(", priority=" + priority);
- sb.append(", timestamp=" + timestamp + "(" + TimeUUIDUtils.getMicrosTimeFromUUID(timestamp) + ")");
+ if (timestamp != null)
+ sb.append(", timestamp=" + timestamp + "(" + TimeUUIDUtils.getMicrosTimeFromUUID(timestamp) + ")");
sb.append(", random=" + random);
sb.append(", state=" + MessageQueueEntryState.values()[state]);
sb.append("]");
View
13 ...ueue/src/main/java/com/netflix/astyanax/recipes/queue/ShardedDistributedMessageQueue.java
@@ -1489,4 +1489,17 @@ private boolean hasMessages(String shardName) throws MessageQueueException {
public Map<String, MessageQueueShardStats> getShardStats() {
return shardReaderPolicy.getShardStats();
}
+
+ public ColumnFamily<String, MessageQueueEntry> getQueueColumnFamily() {
+ return this.queueColumnFamily;
+ }
+
+ public ColumnFamily<String, MessageMetadataEntry> getKeyIndexColumnFamily() {
+ return this.keyIndexColumnFamily;
+ }
+
+ public ColumnFamily<String, UUID> getHistoryColumnFamily() {
+ return this.historyColumnFamily;
+ }
+
}
View
7 astyanax-queue/src/test/java/com/netflix/astyanax/thrift/QueueTest.java
@@ -30,6 +30,7 @@
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.recipes.functions.TraceFunction;
import com.netflix.astyanax.recipes.queue.CountingQueueStats;
import com.netflix.astyanax.recipes.queue.KeyExistsException;
import com.netflix.astyanax.recipes.queue.Message;
@@ -43,6 +44,7 @@
import com.netflix.astyanax.recipes.queue.ShardedDistributedMessageQueue;
import com.netflix.astyanax.recipes.queue.triggers.RepeatingTrigger;
import com.netflix.astyanax.recipes.queue.triggers.RunOnceTrigger;
+import com.netflix.astyanax.recipes.reader.AllRowsReader;
import com.netflix.astyanax.util.SingletonEmbeddedCassandra;
public class QueueTest {
@@ -315,7 +317,7 @@ public void testQueue() throws Exception {
final Message m1rmd = scheduler.peekMessage(messageId);
Assert.assertNull(m1rmd);
}
-
+
{
// Send another message
final Message m = new Message().setUniqueKey(key);
@@ -365,6 +367,7 @@ public void testQueue() throws Exception {
{
final Message m = new Message()
+ .setKey("Key12345")
.setTrigger(new RepeatingTrigger.Builder()
.withInterval(3, TimeUnit.SECONDS)
.withRepeatCount(10)
@@ -375,7 +378,9 @@ public void testQueue() throws Exception {
Assert.assertNotNull(m3rm);
LOG.info(m3rm.toString());
Assert.assertEquals(1, scheduler.getMessageCount());
+
scheduler.deleteMessage(messageId3);
+
Assert.assertEquals(0, scheduler.getMessageCount());
}

0 comments on commit e240f6e

Please sign in to comment.
Something went wrong with that request. Please try again.