Skip to content

Commit

Permalink
Merge branch 'elasticity' into elasticity-feature-4131
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinrr888 committed Jun 25, 2024
2 parents be1ac10 + df60974 commit bc388d5
Show file tree
Hide file tree
Showing 142 changed files with 5,262 additions and 5,378 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ enum Status {
*/
VIOLATED,
/**
* error occurred after mutation was sent to server, its unknown if the mutation was written.
* Error occurred after mutation was sent to server, its unknown if the mutation was written.
* Although the status of the mutation is unknown, Accumulo guarantees the mutation will not be
* written at a later point in time.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;

import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -85,12 +84,8 @@
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ReadConsistency;
import org.apache.accumulo.core.metadata.schema.AmpleImpl;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.rpc.SaslConnectionParams;
import org.apache.accumulo.core.rpc.SslConnectionParams;
import org.apache.accumulo.core.security.Authorizations;
Expand Down Expand Up @@ -475,38 +470,6 @@ public synchronized TCredentials rpcCreds() {
return rpcCreds;
}

/**
* Returns the location of the tablet server that is serving the root tablet.
*
* @return location in "hostname:port" form
*/
public String getRootTabletLocation() {
ensureOpen();

OpTimer timer = null;

if (log.isTraceEnabled()) {
log.trace("tid={} Looking up root tablet location in zookeeper.",
Thread.currentThread().getId());
timer = new OpTimer().start();
}

Location loc =
getAmple().readTablet(RootTable.EXTENT, ReadConsistency.EVENTUAL, LOCATION).getLocation();

if (timer != null) {
timer.stop();
log.trace("tid={} Found root tablet at {} in {}", Thread.currentThread().getId(), loc,
String.format("%.3f secs", timer.scale(SECONDS)));
}

if (loc == null || loc.getType() != LocationType.CURRENT) {
return null;
}

return loc.getHostPort();
}

/**
* Returns the location(s) of the accumulo manager and any redundant servers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,15 @@
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.net.HostAndPort;

public class ConditionalWriterImpl implements ConditionalWriter {

private static final Logger log = LoggerFactory.getLogger(ConditionalWriterImpl.class);

private static final int MAX_SLEEP = 30000;

private Authorizations auths;
Expand Down Expand Up @@ -685,19 +689,24 @@ private void invalidateSession(SessionID sessionId, HostAndPort location)
// invalidation prevents future attempts to contact the
// tserver even its gone zombie and is still running w/o a lock
locator.invalidateCache(context, location.toString());
log.trace("tablet server {} {} is dead, so no need to invalidate {}", location,
sessionId.lockId, sessionId.sessionID);
return;
}

try {
// if the mutation is currently processing, this method will block until its done or times
// out
log.trace("Attempting to invalidate {} at {}", sessionId.sessionID, location);
invalidateSession(sessionId.sessionID, location);

log.trace("Invalidated {} at {}", sessionId.sessionID, location);
return;
} catch (TApplicationException tae) {
throw new AccumuloServerException(location.toString(), tae);
} catch (TException e) {
locator.invalidateCache(context, location.toString());
log.trace("Failed to invalidate {} at {} {}", sessionId.sessionID, location,
e.getMessage());
}

if ((System.currentTimeMillis() - startTime) + sleepTime > timeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.accumulo.core.clientImpl;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -30,22 +30,34 @@
import java.util.Optional;
import java.util.function.BiConsumer;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.clientImpl.ClientTabletCacheImpl.TabletServerLockChecker;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample.ReadConsistency;
import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.util.OpTimer;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Provides ability to get the location of the root tablet from a zoocache. This cache
* implementation does not actually do any caching on its own and soley relies on zoocache. One nice
* feature of using zoo cache is that if the location changes in zookeeper, it will eventually be
* updated by zookeeper watchers in zoocache. Therefore, the invalidation functions are
* intentionally no-ops and rely on the zookeeper watcher to keep things up to date.
*
* <p>
* This code is relying on the assumption that if two client objects are created for the same
* accumulo instance in the same process that both will have the same zoocache. This assumption
* means there is only a single zookeeper watch per process per accumulo instance. This assumptions
* leads to efficiencies at the cluster level by reduce the total number of zookeeper watches.
* </p>
*/
public class RootClientTabletCache extends ClientTabletCache {

private final TabletServerLockChecker lockChecker;
Expand Down Expand Up @@ -87,20 +99,24 @@ public List<Range> findTablets(ClientContext context, List<Range> ranges,
}

@Override
public void invalidateCache(KeyExtent failedExtent) {}
public void invalidateCache(KeyExtent failedExtent) {
// no-op see class level javadoc
}

@Override
public void invalidateCache(Collection<KeyExtent> keySet) {}
public void invalidateCache(Collection<KeyExtent> keySet) {
// no-op see class level javadoc
}

@Override
public void invalidateCache(ClientContext context, String server) {
ZooCache zooCache = context.getZooCache();
String root = context.getZooKeeperRoot() + Constants.ZTSERVERS;
zooCache.clear(root + "/" + server);
// no-op see class level javadoc
}

@Override
public void invalidateCache() {}
public void invalidateCache() {
// no-op see class level javadoc
}

protected CachedTablet getRootTabletLocation(ClientContext context) {
Logger log = LoggerFactory.getLogger(this.getClass());
Expand All @@ -113,8 +129,10 @@ protected CachedTablet getRootTabletLocation(ClientContext context) {
timer = new OpTimer().start();
}

Location loc = context.getAmple()
.readTablet(RootTable.EXTENT, ReadConsistency.EVENTUAL, LOCATION).getLocation();
var zpath = RootTabletMetadata.zooPath(context);
var zooCache = context.getZooCache();
Location loc = new RootTabletMetadata(new String(zooCache.get(zpath), UTF_8)).toTabletMetadata()
.getLocation();

if (timer != null) {
timer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.accumulo.core.data;

import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.hadoop.io.Text;

/**
Expand Down Expand Up @@ -82,6 +84,50 @@ public List<Condition> getConditions() {
return Collections.unmodifiableList(conditions);
}

private String toString(ByteSequence bs) {
if (bs == null) {
return null;
}
return new String(bs.toArray(), UTF_8);
}

@Override
public String prettyPrint() {
StringBuilder sb = new StringBuilder(super.prettyPrint());
for (Condition c : conditions) {
sb.append(" condition: ");
sb.append(toString(c.getFamily()));
sb.append(":");
sb.append(toString(c.getQualifier()));
if (c.getValue() != null && !toString(c.getValue()).isBlank()) {
sb.append(" value: ");
sb.append(toString(c.getValue()));
}
if (c.getVisibility() != null && !toString(c.getVisibility()).isBlank()) {
sb.append(" visibility: '");
sb.append(toString(c.getVisibility()));
sb.append("'");
}
if (c.getTimestamp() != null) {
sb.append(" timestamp: ");
sb.append("'");
sb.append(c.getTimestamp());
sb.append("'");
}
if (c.getIterators().length != 0) {
sb.append(" iterator: ");
IteratorSetting[] iterators = c.getIterators();
for (IteratorSetting its : iterators) {
sb.append("'");
sb.append(its.toString());
sb.append("' ");
}
}
sb.append("\n");
}
return sb.toString();
}

@Override
public boolean equals(Object o) {
if (o == this) {
Expand Down
28 changes: 19 additions & 9 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,7 @@ private class ExecutionState {
private void execute(final FateTxStore<T> txStore, final ExecutionState state)
throws Exception {
while (state.op != null && state.deferTime == 0) {
var startTime = NanoTime.now();
state.deferTime = state.op.isReady(txStore.getID(), environment);
log.debug("Running {}.isReady() {} took {} ms and returned {}", state.op.getName(),
txStore.getID(), startTime.elapsed().toMillis(), state.deferTime);
state.deferTime = executeIsReady(txStore.getID(), state.op);

if (state.deferTime == 0) {
if (state.status == SUBMITTED) {
Expand All @@ -224,11 +221,7 @@ private void execute(final FateTxStore<T> txStore, final ExecutionState state)
}

state.prevOp = state.op;
startTime = NanoTime.now();
state.op = state.op.call(txStore.getID(), environment);
log.debug("Running {}.call() {} took {} ms and returned {}", state.prevOp.getName(),
txStore.getID(), startTime.elapsed().toMillis(),
state.op == null ? "null" : state.op.getName());
state.op = executeCall(txStore.getID(), state.op);

if (state.op != null) {
// persist the completion of this step before starting to run the next so in the case of
Expand Down Expand Up @@ -317,6 +310,23 @@ private void undo(FateId fateId, Repo<T> op) {

}

protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
var startTime = NanoTime.now();
var deferTime = op.isReady(fateId, environment);
log.debug("Running {}.isReady() {} took {} ms and returned {}", op.getName(), fateId,
startTime.elapsed().toMillis(), deferTime);
return deferTime;
}

protected Repo<T> executeCall(FateId fateId, Repo<T> op) throws Exception {
var startTime = NanoTime.now();
var next = op.call(fateId, environment);
log.debug("Running {}.call() {} took {} ms and returned {}", op.getName(), fateId,
startTime.elapsed().toMillis(), next == null ? "null" : next.getName());

return next;
}

/**
* A thread that finds reservations held by dead processes and unreserves them. Only one thread
* runs per store type across all Fate instances (one to clean up dead reservations for
Expand Down
Loading

0 comments on commit bc388d5

Please sign in to comment.