Skip to content

Commit

Permalink
review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
clohfink committed Jul 23, 2019
1 parent d504f31 commit f375b32
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 10 deletions.
14 changes: 13 additions & 1 deletion src/java/org/apache/cassandra/concurrent/DebuggableTask.java
Expand Up @@ -17,11 +17,23 @@
*/
package org.apache.cassandra.concurrent;

/**
* Interface to include on a Runnable or Callable submitted to the SharedExecutorPool to provide more
* detailed diagnostics.
*/
public interface DebuggableTask
{
public long startTimeNanos();

/**
* String describing the task, this can be general thing or something very specific like the query string
*/
public String debug();

/**
* ThreadedDebuggableTask is created by the SharedExecutorPool to include the thread name of any DebuggableTask
* running on a SEPWorker
*/
public static class ThreadedDebuggableTask implements DebuggableTask
{
final DebuggableTask task;
Expand Down Expand Up @@ -50,7 +62,7 @@ public long startTimeNanos()

public String debug()
{
return task == null ? "" : task.debug();
return task == null ? "[debug unavailable]" : task.debug();
}
}
}
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/concurrent/SEPWorker.java
Expand Up @@ -56,8 +56,12 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
thread.start();
}

/**
* If the FutureTask being run has a DebuggableTask Runnable or Callable it will return it here.
*/
public DebuggableTask debuggableTask()
{
// this.task can change after null check so go off local reference
AbstractLocalAwareExecutorService.FutureTask task = this.task;
return task == null? null : task.debuggableTask();
}
Expand Down
Expand Up @@ -81,8 +81,8 @@ public class SharedExecutorPool
final ConcurrentSkipListMap<Long, SEPWorker> spinning = new ConcurrentSkipListMap<>();
// the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();

final ConcurrentHashMap<Long, SEPWorker> allWorkers = new ConcurrentHashMap<>();
// All SEPWorkers that are currently running
private final ConcurrentHashMap<Long, SEPWorker> allWorkers = new ConcurrentHashMap<>();

volatile boolean shuttingDown = false;

Expand Down Expand Up @@ -114,6 +114,10 @@ void workerEnded(SEPWorker worker)
allWorkers.remove(worker.workerId);
}

/**
* Return all DebuggableTasks that have been submitted to the SharedExecutorPool, this will also attach the
* thread name of the SEPWorker that is running it as a ThreadedDebuggableTask
*/
public List<DebuggableTask.ThreadedDebuggableTask> runningTasks()
{
return allWorkers.values().stream()
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/Mutation.java
Expand Up @@ -194,7 +194,7 @@ public void apply(boolean durableWrites, boolean isDroppable)

public void apply(boolean durableWrites)
{
apply(durableWrites, true);
apply(durableWrites, true);
}

/*
Expand Down
27 changes: 25 additions & 2 deletions src/java/org/apache/cassandra/db/virtual/QueriesTable.java
Expand Up @@ -28,24 +28,45 @@
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.MonotonicClock;

/**
* Virtual table to list out the current running queries on the NTR (coordinator), Read and Mutation (local) stages
*
* Example:
* <pre>
* cqlsh> select * from system_views.queries;
*
* thread_id | duration_micros | task
* ------------------------------+-----------------+---------------------------------------------------------------------------------
* Native-Transport-Requests-17 | 6325 | QUERY select * from system_views.queries; [pageSize = 100]
* Native-Transport-Requests-4 | 14681 | EXECUTE f4115f91190d4acf09e452637f1f2444 with 0 values at consistency LOCAL_ONE
* Native-Transport-Requests-6 | 14678 | EXECUTE f4115f91190d4acf09e452637f1f2444 with 0 values at consistency LOCAL_ONE
* ReadStage-10 | 16535 | SELECT * FROM keyspace.table LIMIT 5000
* </pre>
*/
public class QueriesTable extends AbstractVirtualTable
{

private static final String TABLE_NAME = "queries";
private static final String ID = "thread_id";
private static final String DURATION = "duration_micros";
private static final String DESC = "task";

QueriesTable(String keyspace)
{
super(TableMetadata.builder(keyspace, "queries")
super(TableMetadata.builder(keyspace, TABLE_NAME)
.kind(TableMetadata.Kind.VIRTUAL)
.partitioner(new LocalPartitioner(UTF8Type.instance))
.addPartitionKeyColumn(ID, UTF8Type.instance) // strictly for uniqueness
// The thread name is unique since the id given to each SEPWorker is unique
.addPartitionKeyColumn(ID, UTF8Type.instance)
.addRegularColumn(DURATION, LongType.instance)
.addRegularColumn(DESC, UTF8Type.instance)
.build());
}

/**
* Walks the SharedExecutorPool.SHARED SEPWorkers for any DebuggableTasks's and returns them
* @see DebuggableTask
*/
@Override
public AbstractVirtualTable.DataSet data()
{
Expand All @@ -56,6 +77,8 @@ public AbstractVirtualTable.DataSet data()
if(!task.hasTask()) continue;
long micros = TimeUnit.NANOSECONDS.toMicros(task.startTimeNanos());
result.row(task.threadId())
// Since MonotonicClock is used for some but not all, we want to cap to make sure any drift between
// different clocks dont cause this to go negative which would just look silly
.column(DURATION, Math.max(1, now - micros))
.column(DESC, task.debug());
}
Expand Down
12 changes: 8 additions & 4 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Expand Up @@ -582,10 +582,12 @@ public void runMayThrow()
responseHandler.onFailure(FBUtilities.getBroadcastAddressAndPort(), RequestFailureReason.forException(ex));
}
}

public String debug()
{
return "Paxos" + message.payload.toString();
}

@Override
protected Verb verb()
{
Expand Down Expand Up @@ -1014,7 +1016,7 @@ private static void asyncRemoveFromBatchlog(ReplicaPlan.ForTokenWrite replicaPla
logger.trace("Sending batchlog remove request {} to {}", uuid, target);

if (target.isSelf())
performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid), "Batchlog remove "+ uuid);
performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid), "Batchlog remove");
else
MessagingService.instance().send(message, target.endpoint());
}
Expand Down Expand Up @@ -1254,7 +1256,7 @@ public static void sendToHintedReplicas(final Mutation mutation,
if (insertLocal)
{
Preconditions.checkNotNull(localReplica);
performLocally(stage, localReplica, mutation::apply, responseHandler, mutation.toString());
performLocally(stage, localReplica, mutation::apply, responseHandler, mutation);
}

if (localDc != null)
Expand Down Expand Up @@ -1346,7 +1348,7 @@ protected Verb verb()
}

private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable,
final RequestCallback<?> handler, String description)
final RequestCallback<?> handler, Object description)
{
StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable(localReplica)
{
Expand All @@ -1367,7 +1369,9 @@ public void runMayThrow()

public String debug()
{
return description;
// description is an Object and toString() called so we do not have to evaluate the Mutation.toString()
// unless expliclitly checked
return description.toString();
}

@Override
Expand Down

0 comments on commit f375b32

Please sign in to comment.