Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

AllRowsReader fix, Queue fixes #267

Merged
merged 2 commits into from

1 participant

@elandau
Owner

Add retry policy to AllRowsReader
Add IndefiniteRetry policy
Honor message queue reader polling interval
Remove random value from message queue metadata

@elandau elandau merged commit dfc5f65 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 2, 2013
  1. @elandau
Commits on Apr 3, 2013
  1. @elandau

    Honor polling interval in message queue reader. Don't set random value

    elandau authored
    for MessageQueueEntry metadata.
This page is out of date. Refresh to see the latest.
View
36 astyanax-core/src/main/java/com/netflix/astyanax/retry/IndefiniteRetry.java
@@ -0,0 +1,36 @@
+package com.netflix.astyanax.retry;
+
+public class IndefiniteRetry implements RetryPolicy {
+
+ private int counter = 1;
+
+ @Override
+ public void begin() {
+ counter = 1;
+ }
+
+ @Override
+ public void success() {
+ }
+
+ @Override
+ public void failure(Exception e) {
+ counter++;
+ }
+
+ @Override
+ public boolean allowRetry() {
+ return true;
+ }
+
+ @Override
+ public int getAttemptCount() {
+ return counter;
+ }
+
+ @Override
+ public RetryPolicy duplicate() {
+ return new IndefiniteRetry();
+ }
+
+}
View
13 astyanax-queue/conf/log4j.xml
@@ -0,0 +1,13 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
+<log4j:configuration>
+ <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d{ABSOLUTE} %5p %c{1}:%L - %m%n"/>
+ </layout>
+ </appender>
+ <root>
+ <priority value="info"></priority>
+ <appender-ref ref="stdout"/>
+ </root>
+</log4j:configuration>
View
3  astyanax-queue/src/main/java/com/netflix/astyanax/recipes/queue/Message.java
@@ -232,8 +232,9 @@ public boolean isAutoCommitTrigger() {
return isAutoCommitTrigger;
}
- public void setAutoCommitTrigger(boolean isAutoCommitTrigger) {
+ public Message setAutoCommitTrigger(boolean isAutoCommitTrigger) {
this.isAutoCommitTrigger = isAutoCommitTrigger;
+ return this;
}
@Override
View
2  astyanax-queue/src/main/java/com/netflix/astyanax/recipes/queue/MessageQueueEntry.java
@@ -76,7 +76,7 @@ public static MessageQueueEntry newLockEntry(UUID timestamp, MessageQueueEntrySt
}
public static MessageQueueEntry newMetadataEntry() {
- return new MessageQueueEntry(MessageQueueEntryType.Metadata, (byte)0, null, null, MessageQueueEntryState.None);
+ return new MessageQueueEntry(MessageQueueEntryType.Metadata, (byte)0, null, TimeUUIDUtils.getMicrosTimeUUID(0), MessageQueueEntryState.None);
}
public static MessageQueueEntry newMessageEntry(byte priority, UUID timestamp, MessageQueueEntryState state) {
View
2  astyanax-queue/src/main/java/com/netflix/astyanax/recipes/queue/ShardedDistributedMessageQueue.java
@@ -781,6 +781,8 @@ public MessageConsumer createConsumer() {
if (timeoutTime != 0 && System.currentTimeMillis() > timeoutTime)
return Lists.newLinkedList();
+
+ Thread.sleep(settings.getPollInterval());
}
}
View
17 astyanax-recipes/src/main/java/com/netflix/astyanax/recipes/reader/AllRowsReader.java
@@ -47,6 +47,7 @@
import com.netflix.astyanax.query.CheckpointManager;
import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.shallows.EmptyCheckpointManager;
import com.netflix.astyanax.util.Callables;
import com.netflix.astyanax.model.ConsistencyLevel;
@@ -83,6 +84,7 @@
private final AtomicBoolean cancelling = new AtomicBoolean(false);
private final Partitioner partitioner;
private final ConsistencyLevel consistencyLevel;
+ private final RetryPolicy retryPolicy;
private AtomicReference<Exception> error = new AtomicReference<Exception>();
private String dc;
@@ -108,6 +110,7 @@
private String dc;
private String rack;
private ConsistencyLevel consistencyLevel = null;
+ private RetryPolicy retryPolicy;
public Builder(Keyspace ks, ColumnFamily<K, C> columnFamily) {
this.keyspace = ks;
@@ -308,6 +311,11 @@ public Builder(Keyspace ks, ColumnFamily<K, C> columnFamily) {
return this;
}
+ public Builder<K,C> withRetryPolicy(RetryPolicy policy) {
+ this.retryPolicy = policy;
+ return this;
+ }
+
public AllRowsReader<K,C> build() {
if (partitioner == null) {
try {
@@ -332,7 +340,8 @@ public Builder(Keyspace ks, ColumnFamily<K, C> columnFamily) {
partitioner,
dc,
rack,
- consistencyLevel);
+ consistencyLevel,
+ retryPolicy);
}
}
@@ -351,7 +360,8 @@ public AllRowsReader(Keyspace keyspace, ColumnFamily<K, C> columnFamily,
Partitioner partitioner,
String dc,
String rack,
- ConsistencyLevel consistencyLevel) {
+ ConsistencyLevel consistencyLevel,
+ RetryPolicy retryPolicy) {
super();
this.keyspace = keyspace;
this.columnFamily = columnFamily;
@@ -369,6 +379,7 @@ public AllRowsReader(Keyspace keyspace, ColumnFamily<K, C> columnFamily,
this.dc = dc;
this.rack = rack;
this.consistencyLevel = consistencyLevel;
+ this.retryPolicy = retryPolicy;
// Flag explicitly set
if (includeEmptyRows != null)
@@ -385,6 +396,8 @@ else if (columnSlice != null && columnSlice.getColumns() == null && columnSlice.
ColumnFamilyQuery<K, C> query = keyspace.prepareQuery(columnFamily);
if (consistencyLevel != null)
query.setConsistencyLevel(consistencyLevel);
+ if (retryPolicy != null)
+ query.withRetryPolicy(retryPolicy);
return query;
}
Something went wrong with that request. Please try again.