Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ static final class Bytes {
}

byte[] array() {
return array;
if (null != array) {
return array;
}
return buffer.getArray();
}

ByteBuffer asReadOnlyByteBuffer() {
Expand Down Expand Up @@ -258,6 +261,9 @@ String batchSizeDiscardedString() {
countSize2String(discardedCount, discardedSize));
}

public Map<Bytes, Object> getOps() {
return ops;
}
@Override
public String toString() {
return name + ": " + family.getName();
Expand Down Expand Up @@ -320,6 +326,27 @@ String getCommitString() {
countSize2String(discardedCount, discardedSize),
countSize2String(opCount - discardedCount, opSize - discardedSize));
}

Map<String, Map<byte[], byte[]>> getCachedTransaction() {
Map<String, Map<byte[], byte[]>> tableMap = new HashMap<>();
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
Map<byte[], byte[]> dataMap = tableMap.computeIfAbsent(e.getKey(), (p) -> new HashMap<>());
for (Map.Entry<Bytes, Object> d : e.getValue().getOps().entrySet()) {
Object value = d.getValue();
if (value instanceof byte[]) {
dataMap.put(d.getKey().array(), (byte[]) value);
} else if (value instanceof CodecBuffer) {
dataMap.put(d.getKey().array(), (byte[]) ((CodecBuffer) value).getArray());
} else if (value == Op.DELETE) {
dataMap.put(d.getKey().array(), null);
} else {
throw new IllegalStateException("Unexpected value: " + value
+ ", class=" + value.getClass().getSimpleName());
}
}
}
return tableMap;
}
}

private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
Expand Down Expand Up @@ -378,4 +405,8 @@ public void put(ColumnFamily family, byte[] key, byte[] value)
throws IOException {
opCache.put(family, key, value);
}

public Map<String, Map<byte[], byte[]>> getCachedTransaction() {
return opCache.getCachedTransaction();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,10 @@ void deleteBatchWithPrefix(BatchOperation batch, KEY prefix)
* @throws IOException
*/
void loadFromFile(File externalFile) throws IOException;

default Table getRawTable() {
return this;
}

/**
* Class used to represent the key and value pair of a db entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,10 @@ public void dumpToFileWithPrefix(File externalFile, KEY prefix)
public void loadFromFile(File externalFile) throws IOException {
rawTable.loadFromFile(externalFile);
}

@Override
public Table getRawTable() {
return rawTable;
}
@Override
public void cleanupCache(List<Long> epochs) {
cache.cleanup(epochs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ public static boolean isReadOnly(
case SetTimes:
case AbortExpiredMultiPartUploads:
case SetSnapshotProperty:
case QuotaRepair:
case QuotaRepair:
case PersistDb:
case UnknownCommand:
return false;
case EchoRPC:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ enum Type {
RenameSnapshot = 131;
ListOpenFiles = 132;
QuotaRepair = 133;
PersistDb = 134;
}

enum SafeMode {
Expand Down Expand Up @@ -287,6 +288,7 @@ message OMRequest {
optional RenameSnapshotRequest RenameSnapshotRequest = 129;
optional ListOpenFilesRequest ListOpenFilesRequest = 130;
optional QuotaRepairRequest QuotaRepairRequest = 131;
optional PersistDbRequest PersistDbRequest = 132;
}

message OMResponse {
Expand Down Expand Up @@ -412,6 +414,7 @@ message OMResponse {
optional RenameSnapshotResponse RenameSnapshotResponse = 132;
optional ListOpenFilesResponse ListOpenFilesResponse = 133;
optional QuotaRepairResponse QuotaRepairResponse = 134;
optional PersistDbResponse PersistDbResponse = 135;
}

enum Status {
Expand Down Expand Up @@ -2202,6 +2205,20 @@ message BucketQuotaCount {
message QuotaRepairResponse {
}

message PersistDbRequest {
repeated DBTableUpdate tableUpdates = 1;
required int64 cacheIndex = 2;
}
message DBTableUpdate {
required string tableName = 1;
repeated DBTableRecord records = 2;
}
message DBTableRecord {
required bytes key = 1;
optional bytes value = 2;
}
message PersistDbResponse {
}
message OMLockDetailsProto {
optional bool isLockAcquired = 1;
optional uint64 waitLockNanos = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -121,6 +122,7 @@ public final class OzoneManagerRatisServer {
private final OzoneManagerStateMachine omStateMachine;
private final String ratisStorageDir;
private final OMPerformanceMetrics perfMetrics;
private final OzoneManagerRequestExecutor omRequestExecutor;

private final ClientId clientId = ClientId.randomId();
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
Expand Down Expand Up @@ -188,6 +190,7 @@ private OzoneManagerRatisServer(ConfigurationSource conf, OzoneManager om,
}
});
this.perfMetrics = om.getPerfMetrics();
omRequestExecutor = new OzoneManagerRequestExecutor(this);
}

/**
Expand Down Expand Up @@ -310,9 +313,18 @@ private RaftClientRequest createRaftRequest(OMRequest omRequest) {
*/
public OMResponse submitRequest(OMRequest omRequest,
RaftClientRequest raftClientRequest) throws ServiceException {
RaftClientReply raftClientReply =
submitRequestToRatis(raftClientRequest);
return createOmResponse(omRequest, raftClientReply);
try {
CompletableFuture<OMResponse> reply = omRequestExecutor.submit(omRequest);
return reply.get();
} catch (ExecutionException ex) {
throw new ServiceException(ex.getMessage(), ex);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new ServiceException(ex.getMessage(), ex);
}
//RaftClientReply raftClientReply =
// submitRequestToRatis(raftClientRequest);
//return createOmResponse(omRequest, raftClientReply);
}

private RaftClientReply submitRequestToRatisImpl(
Expand Down Expand Up @@ -452,10 +464,12 @@ public void removeRaftPeer(OMNodeDetails omNodeDetails) {
* ratis server.
*/
private RaftClientRequest createRaftRequestImpl(OMRequest omRequest) {
if (!ozoneManager.isTestSecureOmFlag()) {
Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID);
Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID);
}
// TODO remove as Server.getClientId() is set for external request, but with change
// in mode, this is not required as submit will be done internally
//if (!ozoneManager.isTestSecureOmFlag()) {
// Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID);
// Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID);
//}
return RaftClientRequest.newBuilder()
.setClientId(
ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId())))
Expand Down
Loading