Permalink
Browse files

Improve write timeout exceptions

patch by slebresne; reviewed by jbellis for CASSANDRA-4723
  • Loading branch information...
1 parent 801d7d3 commit ee5aafe6a4cb409f16c48edcb3a1682c7d6b400e @pcmanus pcmanus committed Sep 25, 2012
View
@@ -524,14 +524,30 @@ Table of Contents
0x1003 Truncate_error: error during a truncation error.
0x1100 Write_timeout: Timeout exception during a write request. The rest
of the ERROR message body will be
- <cl><received><blockfor>
+ <cl><received><blockfor><writeType>
where:
<cl> is a [string] representing the consistency level of the
query having triggered the exception.
<received> is an [int] representing the number of nodes having
acknowledged the request.
<blockfor> is the number of replica whose acknowledgement is
required to achieve <cl>.
+ <writeType> is a [string] that describe the type of the write
+ that timeouted. The value of that string can be one
+ of:
+ - "SIMPLE": the write was a non-batched
+ non-counter write.
+ - "BATCH": the write was a (logged) batch write.
+ If this type is received, it means the batch log
+ has been successfully written (otherwise a
+ "BATCH_LOG" type would have been send instead).
+ - "UNLOGGED_BATCH": the write was an unlogged
+ batch. Not batch log write has been attempted.
+ - "COUNTER": the write was a counter write
+ (batched or not).
+ - "BATCH_LOG": the timeout occured during the
+ write to the batch log when a (logged) batch
+ write was requested.
0x1200 Read_timeout: Timeout exception during a read request. The rest
of the ERROR message body will be
<cl><received><blockfor><data_present>
@@ -377,7 +377,7 @@ public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWri
responseHandler.response(null);
StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter, consistency_level);
}
- }, null);
+ }, null, WriteType.SIMPLE);
// we don't wait for answers
}
@@ -126,7 +126,7 @@ public void run()
private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws WriteTimeoutException
{
- AbstractWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
+ AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH);
MessagingService.instance().sendRR(message, endpoint, responseHandler);
responseHandler.get();
}
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public enum WriteType
+{
+ SIMPLE,
+ BATCH,
+ UNLOGGED_BATCH,
+ COUNTER,
+ BATCH_LOG;
+}
@@ -22,14 +22,15 @@
import java.nio.ByteBuffer;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
public class WriteTimeoutException extends RequestTimeoutException
{
- public final boolean writtenToBatchlog;
+ public final WriteType writeType;
- public WriteTimeoutException(ConsistencyLevel consistency, int received, int blockFor, boolean writtenToBatchlog)
+ public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, int received, int blockFor)
{
super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor);
- this.writtenToBatchlog = writtenToBatchlog;
+ this.writeType = writeType;
}
}
@@ -27,6 +27,7 @@
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
@@ -114,18 +115,18 @@ public void clearEndpointCache()
*/
public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
- public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback)
+ public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback, WriteType writeType)
{
if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
{
// block for in this context will be localnodes block.
- return DatacenterWriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level, table, callback);
+ return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
}
else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
{
- return DatacenterSyncWriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level, table, callback);
+ return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
}
- return WriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level, table, callback);
+ return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType);
}
/**
@@ -22,11 +22,11 @@
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.utils.SimpleCondition;
public abstract class AbstractWriteResponseHandler implements IAsyncCallback
@@ -37,18 +37,24 @@
protected final ConsistencyLevel consistencyLevel;
protected final Runnable callback;
protected final Collection<InetAddress> pendingEndpoints;
+ private final WriteType writeType;
/**
* @param pendingEndpoints
* @param callback A callback to be called when the write is successful.
*/
- protected AbstractWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, Runnable callback)
+ protected AbstractWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+ Collection<InetAddress> pendingEndpoints,
+ ConsistencyLevel consistencyLevel,
+ Runnable callback,
+ WriteType writeType)
{
this.pendingEndpoints = pendingEndpoints;
startTime = System.currentTimeMillis();
this.consistencyLevel = consistencyLevel;
this.naturalEndpoints = naturalEndpoints;
this.callback = callback;
+ this.writeType = writeType;
}
public void get() throws WriteTimeoutException
@@ -66,7 +72,7 @@ public void get() throws WriteTimeoutException
}
if (!success)
- throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor(), false);
+ throw new WriteTimeoutException(writeType, consistencyLevel, ackCount(), blockFor());
}
protected abstract int ackCount();
@@ -33,6 +33,7 @@
import org.apache.cassandra.locator.NetworkTopologyStrategy;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -52,10 +53,15 @@
private final NetworkTopologyStrategy strategy;
private final HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
- protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
+ public DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+ Collection<InetAddress> pendingEndpoints,
+ ConsistencyLevel consistencyLevel,
+ String table,
+ Runnable callback,
+ WriteType writeType)
{
// Response is been managed by the map so make it 1 for the superclass.
- super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback);
+ super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
this.table = table;
@@ -68,11 +74,6 @@ protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndp
}
}
- public static AbstractWriteResponseHandler create(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
- {
- return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback);
- }
-
public void response(MessageIn message)
{
String dataCenter = message == null
@@ -28,6 +28,7 @@
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -43,17 +44,17 @@
localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
}
- protected DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
+ public DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+ Collection<InetAddress> pendingEndpoints,
+ ConsistencyLevel consistencyLevel,
+ String table,
+ Runnable callback,
+ WriteType writeType)
{
- super(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback);
+ super(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback, writeType);
assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
}
- public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
- {
- return new DatacenterWriteResponseHandler(writeEndpoints, pendingEndpoints, consistencyLevel, table, callback);
- }
-
@Override
public void response(MessageIn message)
{
Oops, something went wrong.

0 comments on commit ee5aafe

Please sign in to comment.