Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class Constants {
public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants";
public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state";
public static final String ZMANAGER_TICK = ZMANAGERS + "/tick";
public static final String ZMANAGER_ASSIGNMENTS = ZMANAGERS + "/assignments";
public static final String ZMANAGER_FATE_ASSIGNMENTS = ZMANAGER_ASSIGNMENTS + "/fate";

public static final String ZGC = "/gc";
public static final String ZGC_LOCK = ZGC + "/lock";
Expand All @@ -74,6 +76,8 @@ public class Constants {
public static final String ZDEAD = "/dead";
public static final String ZDEADTSERVERS = ZDEAD + "/tservers";

public static final String ZSHUTTING_DOWN_TSERVERS = "/shutting-down-tservers";

public static final String ZFATE = "/fate";

public static final String ZNEXT_FILE = "/next_file";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -1328,15 +1327,12 @@ public void clearTabletLocationCache() {
}

private static Set<String> createPersistentWatcherPaths() {
Set<String> pathsToWatch = new HashSet<>();
for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK,
return Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK,
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK,
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) {
pathsToWatch.add(path);
}
return pathsToWatch;
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS,
Constants.ZMANAGER_ASSIGNMENTS);
}

}
43 changes: 40 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/FateKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
Expand All @@ -41,28 +42,40 @@ public class FateKey {
private final FateKeyType type;
private final Optional<KeyExtent> keyExtent;
private final Optional<ExternalCompactionId> compactionId;
private final Optional<TServerInstance> tServerInstance;
private final byte[] serialized;

private FateKey(FateKeyType type, KeyExtent keyExtent) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.of(keyExtent);
this.compactionId = Optional.empty();
this.tServerInstance = Optional.empty();
this.serialized = serialize(type, keyExtent);
}

private FateKey(FateKeyType type, ExternalCompactionId compactionId) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.of(compactionId);
this.tServerInstance = Optional.empty();
this.serialized = serialize(type, compactionId);
}

private FateKey(FateKeyType type, TServerInstance tServerInstance) {
this.type = Objects.requireNonNull(type);
this.keyExtent = Optional.empty();
this.compactionId = Optional.empty();
this.tServerInstance = Optional.of(tServerInstance);
this.serialized = serialize(type, tServerInstance);
}

private FateKey(byte[] serialized) {
try (DataInputBuffer buffer = new DataInputBuffer()) {
buffer.reset(serialized, serialized.length);
this.type = FateKeyType.valueOf(buffer.readUTF());
this.keyExtent = deserializeKeyExtent(type, buffer);
this.compactionId = deserializeCompactionId(type, buffer);
this.tServerInstance = deserializeTserverId(type, buffer);
this.serialized = serialized;
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down Expand Up @@ -127,8 +140,12 @@ public static FateKey forMerge(KeyExtent extent) {
return new FateKey(FateKeyType.MERGE, extent);
}

public static FateKey forShutdown(TServerInstance tServerInstance) {
return new FateKey(FateKeyType.TSERVER_SHUTDOWN, tServerInstance);
}

public enum FateKeyType {
SPLIT, COMPACTION_COMMIT, MERGE
SPLIT, COMPACTION_COMMIT, MERGE, TSERVER_SHUTDOWN
}

private static byte[] serialize(FateKeyType type, KeyExtent ke) {
Expand All @@ -155,22 +172,42 @@ private static byte[] serialize(FateKeyType type, ExternalCompactionId compactio
}
}

private static byte[] serialize(FateKeyType type, TServerInstance tServerInstance) {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(type.toString());
dos.writeUTF(tServerInstance.getHostPortSession());
dos.close();
return baos.toByteArray();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Optional<KeyExtent> deserializeKeyExtent(FateKeyType type, DataInputBuffer buffer)
throws IOException {
return switch (type) {
case SPLIT, MERGE -> Optional.of(KeyExtent.readFrom(buffer));
case COMPACTION_COMMIT -> Optional.empty();
case COMPACTION_COMMIT, TSERVER_SHUTDOWN -> Optional.empty();
};
}

private static Optional<ExternalCompactionId> deserializeCompactionId(FateKeyType type,
DataInputBuffer buffer) throws IOException {
return switch (type) {
case SPLIT, MERGE -> Optional.empty();
case SPLIT, MERGE, TSERVER_SHUTDOWN -> Optional.empty();
case COMPACTION_COMMIT -> Optional.of(ExternalCompactionId.of(buffer.readUTF()));
};
}

private static Optional<TServerInstance> deserializeTserverId(FateKeyType type,
DataInputBuffer buffer) throws IOException {
return switch (type) {
case SPLIT, MERGE, COMPACTION_COMMIT -> Optional.empty();
case TSERVER_SHUTDOWN -> Optional.of(new TServerInstance(buffer.readUTF()));
};
}

@Override
public String toString() {
var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ public boolean contains(FateId fateId) {
}

}

public FateInstanceType getType() {
return start.getType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.accumulo.server.conf.store.ResourceGroupPropKey;
import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.manager.FateLocations;
import org.apache.accumulo.server.metadata.RootGcCandidates;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -178,6 +179,12 @@ void initialize(final ServerContext context, final String rootTabletDirName,
ZooUtil.NodeExistsPolicy.FAIL);
zrwChroot.putPersistentData(Constants.ZMANAGER_ASSISTANT_LOCK, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
zrwChroot.putPersistentData(Constants.ZSHUTTING_DOWN_TSERVERS, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
zrwChroot.putPersistentData(Constants.ZMANAGER_ASSIGNMENTS, EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
FateLocations.storeLocations(zrwChroot, Map.of(), ZooUtil.NodeExistsPolicy.FAIL);

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.manager;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static org.apache.accumulo.core.util.LazySingletons.GSON;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FatePartition;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;

import com.google.common.base.Preconditions;
import com.google.common.net.HostAndPort;
import com.google.common.reflect.TypeToken;

public class FateLocations {

private final ServerContext context;

private long lastUpdateCount;
private Map<HostAndPort,Set<FatePartition>> lastLocations = null;

public FateLocations(ServerContext context) {
this.context = context;
}

public synchronized Map<HostAndPort,Set<FatePartition>> getLocations() {

var zooCache = context.getZooCache();

if (lastLocations == null || lastUpdateCount != zooCache.getUpdateCount()) {
lastUpdateCount = zooCache.getUpdateCount();
var json = new String(context.getZooCache().get(Constants.ZMANAGER_FATE_ASSIGNMENTS), UTF_8);
var type = new TypeToken<Map<String,List<List<String>>>>() {}.getType();
Map<String,List<List<String>>> stringMap = GSON.get().fromJson(json, type);
Map<HostAndPort,Set<FatePartition>> locations = new HashMap<>();
stringMap.forEach((hp, parts) -> {
var partsSet = parts.stream().peek(part -> Preconditions.checkArgument(part.size() == 2))
.map(part -> new FatePartition(FateId.from(part.get(0)), FateId.from(part.get(1))))
.collect(toUnmodifiableSet());
locations.put(HostAndPort.fromString(hp), partsSet);
});
lastLocations = Map.copyOf(locations);
}

return lastLocations;
}

private static byte[] serialize(Map<HostAndPort,Set<FatePartition>> assignments) {
Map<String,List<List<String>>> jsonMap = new HashMap<>();
assignments.forEach((hp, parts) -> {
var listParts = parts.stream()
.map(part -> List.of(part.start().canonical(), part.end().canonical())).toList();
jsonMap.put(hp.toString(), listParts);
});

var json = GSON.get().toJson(jsonMap);
return json.getBytes(UTF_8);
}

public static void storeLocations(ZooReaderWriter zoo,
Map<HostAndPort,Set<FatePartition>> assignments, NodeExistsPolicy nodeExistsPolicy) {
try {
zoo.putPersistentData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments),
nodeExistsPolicy);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Unable to set fate locations in zookeeper", e);
}
}

public static void storeLocations(ServerContext context,
Map<HostAndPort,Set<FatePartition>> assignments) {
try {
context.getZooSession().setData(Constants.ZMANAGER_FATE_ASSIGNMENTS, serialize(assignments),
-1);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Unable to set fate locations in zookeeper", e);
}
}

}
Loading