Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dcapwell committed Sep 17, 2021
1 parent 1ac4906 commit 2a94614
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 77 deletions.
82 changes: 5 additions & 77 deletions src/java/org/apache/cassandra/service/reads/ReadCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@
*/
package org.apache.cassandra.service.reads;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +42,7 @@
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.reads.trackwarnings.CoordinatorWarnings;
import org.apache.cassandra.service.reads.trackwarnings.WarningContext;
import org.apache.cassandra.service.reads.trackwarnings.WarningsSnapshot;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
Expand All @@ -58,50 +53,6 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<
{
protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );

private static class WarnAbortCounter
{
final Set<InetAddressAndPort> warnings = Collections.newSetFromMap(new ConcurrentHashMap<>());
// the highest number reported by a node's warning
final AtomicLong maxWarningValue = new AtomicLong();

final Set<InetAddressAndPort> aborts = Collections.newSetFromMap(new ConcurrentHashMap<>());
// the highest number reported by a node's rejection.
final AtomicLong maxAbortsValue = new AtomicLong();

void addWarning(InetAddressAndPort from, long value)
{
maxWarningValue.accumulateAndGet(value, Math::max);
// call add last so concurrent reads see empty even if values > 0; if done in different order then
// size=1 could have values == 0
warnings.add(from);
}

void addAbort(InetAddressAndPort from, long value)
{
maxAbortsValue.accumulateAndGet(value, Math::max);
// call add last so concurrent reads see empty even if values > 0; if done in different order then
// size=1 could have values == 0
aborts.add(from);
}

WarningsSnapshot.Warnings snapshot()
{
return WarningsSnapshot.Warnings.create(WarningsSnapshot.Counter.create(warnings, maxWarningValue), WarningsSnapshot.Counter.create(aborts, maxAbortsValue));
}
}

private static class WarningContext
{
final WarnAbortCounter tombstones = new WarnAbortCounter();
final WarnAbortCounter localReadSize = new WarnAbortCounter();
final WarnAbortCounter rowIndexTooLarge = new WarnAbortCounter();

private WarningsSnapshot snapshot()
{
return WarningsSnapshot.create(tombstones.snapshot(), localReadSize.snapshot(), rowIndexTooLarge.snapshot());
}
}

public final ResponseResolver<E, P> resolver;
final SimpleCondition condition = new SimpleCondition();
private final long queryStartNanoTime;
Expand All @@ -115,8 +66,8 @@ private WarningsSnapshot snapshot()
private volatile int failures = 0;
private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
private volatile WarningContext warningContext;
private static final AtomicReferenceFieldUpdater<ReadCallback, ReadCallback.WarningContext> warningsUpdater
= AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, ReadCallback.WarningContext.class, "warningContext");
private static final AtomicReferenceFieldUpdater<ReadCallback, WarningContext> warningsUpdater
= AtomicReferenceFieldUpdater.newUpdater(ReadCallback.class, WarningContext.class, "warningContext");

public ReadCallback(ResponseResolver<E, P> resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
{
Expand Down Expand Up @@ -203,38 +154,15 @@ public int blockFor()
return blockFor;
}

private RequestFailureReason updateCounters(Map<ParamType, Object> params,
InetAddressAndPort from,
ParamType abort, ParamType warn,
RequestFailureReason reason,
Function<WarningContext, WarnAbortCounter> fieldAccess)
{
// some checks use int32 others user int64; so rely on Number to handle both cases
if (params.containsKey(abort))
{
fieldAccess.apply(getWarningContext()).addAbort(from, ((Number) params.get(abort)).longValue());
return reason;
}
else if (params.containsKey(warn))
{
fieldAccess.apply(getWarningContext()).addWarning(from, ((Number) params.get(warn)).longValue());
}
return null;
}

@Override
public void onResponse(Message<ReadResponse> message)
{
assertWaitingFor(message.from());
Map<ParamType, Object> params = message.header.params();
InetAddressAndPort from = message.from();
for (Supplier<RequestFailureReason> fn : Arrays.<Supplier<RequestFailureReason>>asList(
() -> updateCounters(params, from, ParamType.TOMBSTONE_ABORT, ParamType.TOMBSTONE_WARNING, RequestFailureReason.READ_TOO_MANY_TOMBSTONES, ctx -> ctx.tombstones),
() -> updateCounters(params, from, ParamType.LOCAL_READ_SIZE_ABORT, ParamType.LOCAL_READ_SIZE_WARN, RequestFailureReason.READ_SIZE, ctx -> ctx.localReadSize),
() -> updateCounters(params, from, ParamType.ROW_INDEX_SIZE_ABORT, ParamType.ROW_INDEX_SIZE_WARN, RequestFailureReason.READ_SIZE, ctx -> ctx.rowIndexTooLarge)
))
if (WarningContext.isSupported(params.keySet()))
{
RequestFailureReason reason = fn.get();
RequestFailureReason reason = getWarningContext().updateCounters(params, from);
if (reason != null)
{
onFailure(message.from(), reason);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads.trackwarnings;

import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.cassandra.locator.InetAddressAndPort;

public class WarnAbortCounter
{
final Set<InetAddressAndPort> warnings = Collections.newSetFromMap(new ConcurrentHashMap<>());
// the highest number reported by a node's warning
final AtomicLong maxWarningValue = new AtomicLong();

final Set<InetAddressAndPort> aborts = Collections.newSetFromMap(new ConcurrentHashMap<>());
// the highest number reported by a node's rejection.
final AtomicLong maxAbortsValue = new AtomicLong();

void addWarning(InetAddressAndPort from, long value)
{
maxWarningValue.accumulateAndGet(value, Math::max);
// call add last so concurrent reads see empty even if values > 0; if done in different order then
// size=1 could have values == 0
warnings.add(from);
}

void addAbort(InetAddressAndPort from, long value)
{
maxAbortsValue.accumulateAndGet(value, Math::max);
// call add last so concurrent reads see empty even if values > 0; if done in different order then
// size=1 could have values == 0
aborts.add(from);
}

public WarningsSnapshot.Warnings snapshot()
{
return WarningsSnapshot.Warnings.create(WarningsSnapshot.Counter.create(warnings, maxWarningValue), WarningsSnapshot.Counter.create(aborts, maxAbortsValue));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service.reads.trackwarnings;

import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;

import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ParamType;

public class WarningContext
{
private static EnumSet<ParamType> SUPPORTED = EnumSet.of(ParamType.TOMBSTONE_WARNING, ParamType.TOMBSTONE_ABORT,
ParamType.LOCAL_READ_SIZE_WARN, ParamType.LOCAL_READ_SIZE_ABORT,
ParamType.ROW_INDEX_SIZE_WARN, ParamType.ROW_INDEX_SIZE_ABORT);

final WarnAbortCounter tombstones = new WarnAbortCounter();
final WarnAbortCounter localReadSize = new WarnAbortCounter();
final WarnAbortCounter rowIndexTooLarge = new WarnAbortCounter();

public static boolean isSupported(Set<ParamType> keys)
{
return !Collections.disjoint(keys, SUPPORTED);
}

public RequestFailureReason updateCounters(Map<ParamType, Object> params, InetAddressAndPort from)
{
for (Map.Entry<ParamType, Object> entry : params.entrySet())
{
WarnAbortCounter counter = null;
RequestFailureReason reason = null;
switch (entry.getKey())
{
case ROW_INDEX_SIZE_ABORT:
reason = RequestFailureReason.READ_SIZE;
case ROW_INDEX_SIZE_WARN:
counter = rowIndexTooLarge;
break;
case LOCAL_READ_SIZE_ABORT:
reason = RequestFailureReason.READ_SIZE;
case LOCAL_READ_SIZE_WARN:
counter = localReadSize;
break;
case TOMBSTONE_ABORT:
reason = RequestFailureReason.READ_TOO_MANY_TOMBSTONES;
case TOMBSTONE_WARNING:
counter = tombstones;
break;
}
if (reason != null)
{
counter.addAbort(from, ((Number) entry.getValue()).longValue());
return reason;
}
if (counter != null)
counter.addWarning(from, ((Number) entry.getValue()).longValue());
}
return null;
}

public WarningsSnapshot snapshot()
{
return WarningsSnapshot.create(tombstones.snapshot(), localReadSize.snapshot(), rowIndexTooLarge.snapshot());
}
}

0 comments on commit 2a94614

Please sign in to comment.