Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm.ha;

import jakarta.annotation.Nonnull;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.hdds.utils.db.CodecException;
import org.apache.hadoop.hdds.utils.db.StringCodec;

/**
* Represents the sequence ID types managed by
* {@code org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator}
* The enum constant names are kept exactly as their persisted RocksDB keys.
*/
public enum SequenceIdType {

localId,
delTxnId,
containerId,

/**
* Certificate ID for all services, including root certificates.
*/
CertificateId,

/**
* @deprecated Use {@link #CertificateId} instead.
*/
@Deprecated
rootCertificateId;

private static final Codec<SequenceIdType> INSTANCE = new Codec<SequenceIdType>() {
@Override
public Class<SequenceIdType> getTypeClass() {
return SequenceIdType.class;
}

@Override
public boolean supportCodecBuffer() {
return true;
}

@Override
public byte[] toPersistedFormat(SequenceIdType type) {
return type.getByteArray();
}

@Override
public SequenceIdType fromPersistedFormat(byte[] bytes) throws CodecException {
final SequenceIdType type = SEQUENCE_ID_TYPES.get(bytes[0]);
if (type != null && Arrays.equals(type.getByteArray(), bytes)) {
return type;
}
throw new CodecException("Failed to decode " + StringUtils.bytes2Hex(ByteBuffer.wrap(bytes), 20));
}

@Override
public CodecBuffer toCodecBuffer(@Nonnull SequenceIdType object, CodecBuffer.Allocator allocator) {
final ByteBuffer buffer = object.getByteBuffer();
final CodecBuffer cb = allocator.apply(buffer.remaining());
cb.put(buffer);
return cb;
}

@Override
public SequenceIdType fromCodecBuffer(@Nonnull CodecBuffer bytes) throws CodecException {
final ByteBuffer buffer = bytes.asReadOnlyByteBuffer();
final SequenceIdType type = SEQUENCE_ID_TYPES.get(buffer.get(buffer.position()));
if (type != null && type.getByteBuffer().equals(buffer)) {
return type;
}
throw new CodecException("Failed to decode " + StringUtils.bytes2Hex(buffer, 20));

}

@Override
public SequenceIdType copyObject(SequenceIdType object) {
return object;
}
};

/** Only use the first byte in the name since they are all distinct. */
private static final Map<Byte, SequenceIdType> SEQUENCE_ID_TYPES;

private final byte[] byteArray;
private final ByteBuffer byteBuffer;

SequenceIdType() {
try {
this.byteArray = StringCodec.getCodecNoFallback().toPersistedFormat(name());
} catch (CodecException e) {
throw new IllegalStateException("Failed to construct " + this, e);
}

this.byteBuffer = ByteBuffer.wrap(byteArray).asReadOnlyBuffer();
}

public byte[] getByteArray() {
return byteArray.clone();
}

public ByteBuffer getByteBuffer() {
return byteBuffer.duplicate();
}

static {
final Map<Byte, SequenceIdType> map = new HashMap<>();
for (SequenceIdType type : SequenceIdType.values()) {
final byte first = type.getByteArray()[0];
final SequenceIdType previous = map.put(first, type);
if (previous != null) {
throw new IllegalStateException("Duplicated first byte: " + type + " and " + previous);
}
}
SEQUENCE_ID_TYPES = Collections.unmodifiableMap(map);
}

public static Codec<SequenceIdType> getCodec() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.scm.ha.SequenceIdType;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.DBStoreHAManager;
Expand Down Expand Up @@ -102,7 +103,7 @@ public interface SCMMetadataStore extends DBStoreHAManager {
/**
* Table that maintains sequence id information.
*/
Table<String, Long> getSequenceIdTable();
Table<SequenceIdType, Long> getSequenceIdTable();

/**
* Table that maintains move information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class SequenceIdGenerator {
* @param sequenceIdTable : sequenceIdTable
*/
public SequenceIdGenerator(ConfigurationSource conf,
SCMHAManager scmhaManager, Table<String, Long> sequenceIdTable) {
SCMHAManager scmhaManager, Table<SequenceIdType, Long> sequenceIdTable) {
this.sequenceIdToBatchMap = newSequenceIdToBatchMap();
this.lock = new ReentrantLock();
this.batchSize = conf.getInt(OZONE_SCM_SEQUENCE_ID_BATCH_SIZE,
Expand All @@ -96,7 +96,7 @@ static Map<SequenceIdType, Batch> newSequenceIdToBatchMap() {
}

public StateManager createStateManager(SCMHAManager scmhaManager,
Table<String, Long> sequenceIdTable) {
Table<SequenceIdType, Long> sequenceIdTable) {
Objects.requireNonNull(scmhaManager, "scmhaManager == null");
return new StateManagerImpl.Builder()
.setRatisServer(scmhaManager.getRatisServer())
Expand Down Expand Up @@ -168,7 +168,7 @@ private void invalidateBatchInternal() {
* Reinitialize the SequenceIdGenerator with the latest sequenceIdTable
* during SCM reload.
*/
public void reinitialize(Table<String, Long> sequenceIdTable)
public void reinitialize(Table<SequenceIdType, Long> sequenceIdTable)
throws IOException {
LOG.info("reinitialize SequenceIdGenerator.");
lock.lock();
Expand Down Expand Up @@ -208,7 +208,7 @@ Boolean allocateBatch(String sequenceIdName,
* Reinitialize the SequenceIdGenerator with the latest sequenceIdTable
* during SCM reload.
*/
void reinitialize(Table<String, Long> sequenceIdTable) throws IOException;
void reinitialize(Table<SequenceIdType, Long> sequenceIdTable) throws IOException;

@Override
default RequestType getType() {
Expand All @@ -221,11 +221,11 @@ default RequestType getType() {
* DBTransactionBuffer until a snapshot is taken.
*/
static final class StateManagerImpl implements StateManager {
private Table<String, Long> sequenceIdTable;
private Table<SequenceIdType, Long> sequenceIdTable;
private final DBTransactionBuffer transactionBuffer;
private final Map<SequenceIdType, Long> sequenceIdToLastIdMap;

private StateManagerImpl(Table<String, Long> sequenceIdTable,
private StateManagerImpl(Table<SequenceIdType, Long> sequenceIdTable,
DBTransactionBuffer trxBuffer) {
this.sequenceIdTable = sequenceIdTable;
this.transactionBuffer = trxBuffer;
Expand All @@ -240,7 +240,7 @@ public Boolean allocateBatch(String sequenceIdName,
Long lastId = sequenceIdToLastIdMap.computeIfAbsent(idType,
key -> {
try {
Long idInDb = this.sequenceIdTable.get(key.name());
Long idInDb = this.sequenceIdTable.get(key);
return idInDb != null ? idInDb : INVALID_SEQUENCE_ID;
} catch (IOException ioe) {
throw new RuntimeException("Failed to get lastId from db", ioe);
Expand All @@ -255,7 +255,7 @@ public Boolean allocateBatch(String sequenceIdName,

try {
transactionBuffer
.addToBuffer(sequenceIdTable, idType.name(), newLastId);
.addToBuffer(sequenceIdTable, idType, newLastId);
} catch (IOException ioe) {
throw new RuntimeException("Failed to put lastId to Batch", ioe);
}
Expand All @@ -270,25 +270,25 @@ public Long getLastId(SequenceIdType idType) {
}

@Override
public void reinitialize(Table<String, Long> seqIdTable)
public void reinitialize(Table<SequenceIdType, Long> seqIdTable)
throws IOException {
this.sequenceIdTable = seqIdTable;
this.sequenceIdToLastIdMap.clear();
initialize();
}

private void initialize() throws IOException {
try (Table.KeyValueIterator<String, Long> iterator = sequenceIdTable.iterator()) {
try (Table.KeyValueIterator<SequenceIdType, Long> iterator = sequenceIdTable.iterator()) {

while (iterator.hasNext()) {
Table.KeyValue<String, Long> kv = iterator.next();
final String sequenceIdName = kv.getKey();
Table.KeyValue<SequenceIdType, Long> kv = iterator.next();
final SequenceIdType idType = kv.getKey();
final Long lastId = kv.getValue();
Objects.requireNonNull(sequenceIdName,
"sequenceIdName should not be null");
Objects.requireNonNull(idType,
"idType should not be null");
Objects.requireNonNull(lastId,
"lastId should not be null");
sequenceIdToLastIdMap.put(SequenceIdType.valueOf(sequenceIdName), lastId);
sequenceIdToLastIdMap.put(idType, lastId);
}
}
}
Expand All @@ -297,7 +297,7 @@ private void initialize() throws IOException {
* Builder for Ratis based StateManager.
*/
public static class Builder {
private Table<String, Long> table;
private Table<SequenceIdType, Long> table;
private DBTransactionBuffer buffer;
private SCMRatisServer ratisServer;

Expand All @@ -307,7 +307,7 @@ public Builder setRatisServer(final SCMRatisServer scmRatisServer) {
}

public Builder setSequenceIdTable(
final Table<String, Long> sequenceIdTable) {
final Table<SequenceIdType, Long> sequenceIdTable) {
table = sequenceIdTable;
return this;
}
Expand Down Expand Up @@ -337,37 +337,37 @@ public StateManager build() {
*/
public static void upgradeToSequenceId(SCMMetadataStore scmMetadataStore)
throws IOException {
Table<String, Long> sequenceIdTable = scmMetadataStore.getSequenceIdTable();
Table<SequenceIdType, Long> sequenceIdTable = scmMetadataStore.getSequenceIdTable();

// upgrade localId
// Short-term solution: when setup multi SCM from scratch, they need
// achieve an agreement upon the initial value of LOCAL_ID.
// Long-term solution: the bootstrapped SCM will explicitly download
// scm.db from leader SCM, and drop its own scm.db. Thus the upgrade
// operations can take effect exactly once in a SCM HA cluster.
if (sequenceIdTable.get(SequenceIdType.localId.name()) == null) {
if (sequenceIdTable.get(SequenceIdType.localId) == null) {
long millisSinceEpoch = TimeUnit.DAYS.toMillis(
LocalDate.of(LocalDate.now().getYear() + 1, 1, 1).toEpochDay());

long localId = millisSinceEpoch << Short.SIZE;
Preconditions.checkArgument(localId > UniqueId.next());

sequenceIdTable.put(SequenceIdType.localId.name(), localId);
LOG.info("upgrade {} to {}", SequenceIdType.localId, sequenceIdTable.get(SequenceIdType.localId.name()));
sequenceIdTable.put(SequenceIdType.localId, localId);
LOG.info("upgrade {} to {}", SequenceIdType.localId, sequenceIdTable.get(SequenceIdType.localId));
}

// upgrade delTxnId
if (sequenceIdTable.get(SequenceIdType.delTxnId.name()) == null) {
if (sequenceIdTable.get(SequenceIdType.delTxnId) == null) {
// fetch delTxnId from DeletedBlocksTXTable
// check HDDS-4477 for details.
DeletedBlocksTransaction txn
= scmMetadataStore.getDeletedBlocksTXTable().get(0L);
sequenceIdTable.put(SequenceIdType.delTxnId.name(), txn != null ? txn.getTxID() : 0L);
LOG.info("upgrade {} to {}", SequenceIdType.delTxnId, sequenceIdTable.get(SequenceIdType.delTxnId.name()));
sequenceIdTable.put(SequenceIdType.delTxnId, txn != null ? txn.getTxID() : 0L);
LOG.info("upgrade {} to {}", SequenceIdType.delTxnId, sequenceIdTable.get(SequenceIdType.delTxnId));
}

// upgrade containerId
if (sequenceIdTable.get(SequenceIdType.containerId.name()) == null) {
if (sequenceIdTable.get(SequenceIdType.containerId) == null) {
long largestContainerId = 0;
try (TableIterator<ContainerID, ContainerInfo> iterator
= scmMetadataStore.getContainerTable().valueIterator()) {
Expand All @@ -378,20 +378,20 @@ public static void upgradeToSequenceId(SCMMetadataStore scmMetadataStore)
}
}

sequenceIdTable.put(SequenceIdType.containerId.name(), largestContainerId);
sequenceIdTable.put(SequenceIdType.containerId, largestContainerId);
LOG.info("upgrade {} to {}",
SequenceIdType.containerId, sequenceIdTable.get(SequenceIdType.containerId.name()));
SequenceIdType.containerId, sequenceIdTable.get(SequenceIdType.containerId));
}

upgradeToCertificateSequenceId(scmMetadataStore, false);
}

public static void upgradeToCertificateSequenceId(
SCMMetadataStore scmMetadataStore, boolean force) throws IOException {
Table<String, Long> sequenceIdTable = scmMetadataStore.getSequenceIdTable();
Table<SequenceIdType, Long> sequenceIdTable = scmMetadataStore.getSequenceIdTable();

// upgrade certificate ID table
if (sequenceIdTable.get(SequenceIdType.CertificateId.name()) == null || force) {
if (sequenceIdTable.get(SequenceIdType.CertificateId) == null || force) {
// Start from ID 2.
// ID 1 - root certificate, ID 2 - first SCM certificate.
long largestCertId = BigInteger.ONE.add(BigInteger.ONE).longValueExact();
Expand All @@ -413,15 +413,15 @@ public static void upgradeToCertificateSequenceId(
}
}

sequenceIdTable.put(SequenceIdType.CertificateId.name(), largestCertId);
sequenceIdTable.put(SequenceIdType.CertificateId, largestCertId);
LOG.info("upgrade {} to {}", SequenceIdType.CertificateId,
sequenceIdTable.get(SequenceIdType.CertificateId.name()));
sequenceIdTable.get(SequenceIdType.CertificateId));
}

// delete the ROOT_CERTIFICATE_ID record if exists
// ROOT_CERTIFICATE_ID is replaced with CERTIFICATE_ID now
if (sequenceIdTable.get(SequenceIdType.rootCertificateId.name()) != null) {
sequenceIdTable.delete(SequenceIdType.rootCertificateId.name());
if (sequenceIdTable.get(SequenceIdType.rootCertificateId) != null) {
sequenceIdTable.delete(SequenceIdType.rootCertificateId);
}
}

Expand Down
Loading