Skip to content

Commit

Permalink
Dry up RecoveryRequest serialization code (#94371)
Browse files Browse the repository at this point in the history
We have both the recovery id and the shard id as shared fields and they are
serialized the same way over the wire for all implementations.
-> we can reflect that in code
  • Loading branch information
original-brownbear committed Mar 8, 2023
1 parent 1f5e6f0 commit b27b1fd
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ public void messageReceived(final RecoveryFileChunkRequest request, TransportCha
class RestoreFileFromSnapshotTransportRequestHandler implements TransportRequestHandler<RecoverySnapshotFileRequest> {
@Override
public void messageReceived(final RecoverySnapshotFileRequest request, TransportChannel channel, Task task) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.getRecoveryId(), request.getShardId())) {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.target();
final ActionListener<Void> listener = createOrFinishListener(recoveryRef, channel, request);
if (listener == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
import java.io.IOException;

public class RecoveryCleanFilesRequest extends RecoveryTransportRequest {

private final long recoveryId;
private final ShardId shardId;
private final Store.MetadataSnapshot snapshotFiles;
private final int totalTranslogOps;
private final long globalCheckpoint;
Expand All @@ -31,18 +28,14 @@ public RecoveryCleanFilesRequest(
int totalTranslogOps,
long globalCheckpoint
) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
super(requestSeqNo, recoveryId, shardId);
this.snapshotFiles = snapshotFiles;
this.totalTranslogOps = totalTranslogOps;
this.globalCheckpoint = globalCheckpoint;
}

RecoveryCleanFilesRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
snapshotFiles = Store.MetadataSnapshot.readFrom(in);
totalTranslogOps = in.readVInt();
globalCheckpoint = in.readZLong();
Expand All @@ -51,8 +44,6 @@ public RecoveryCleanFilesRequest(
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
snapshotFiles.writeTo(out);
out.writeVInt(totalTranslogOps);
out.writeZLong(globalCheckpoint);
Expand All @@ -62,14 +53,6 @@ public Store.MetadataSnapshot sourceMetaSnapshot() {
return snapshotFiles;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public int totalTranslogOps() {
return totalTranslogOps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

public final class RecoveryFileChunkRequest extends RecoveryTransportRequest implements RefCounted {
private final boolean lastChunk;
private final long recoveryId;
private final ShardId shardId;
private final long position;
private final ReleasableBytesReference content;
private final StoreFileMetadata metadata;
Expand All @@ -30,8 +28,6 @@ public final class RecoveryFileChunkRequest extends RecoveryTransportRequest imp

public RecoveryFileChunkRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
final String name = in.readString();
position = in.readVLong();
final long length = in.readVLong();
Expand All @@ -55,9 +51,7 @@ public RecoveryFileChunkRequest(
int totalTranslogOps,
long sourceThrottleTimeInNanos
) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
super(requestSeqNo, recoveryId, shardId);
this.metadata = metadata;
this.position = position;
this.content = content.retain();
Expand All @@ -66,14 +60,6 @@ public RecoveryFileChunkRequest(
this.sourceThrottleTimeInNanos = sourceThrottleTimeInNanos;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public String name() {
return metadata.name();
}
Expand Down Expand Up @@ -101,8 +87,6 @@ public long sourceThrottleTimeInNanos() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeString(metadata.name());
out.writeVLong(position);
out.writeVLong(metadata.length());
Expand All @@ -116,7 +100,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return shardId + ": name='" + name() + '\'' + ", position=" + position + ", length=" + length();
return shardId() + ": name='" + name() + '\'' + ", position=" + position + ", length=" + length();
}

public StoreFileMetadata metadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

public class RecoveryFilesInfoRequest extends RecoveryTransportRequest {

private final long recoveryId;
private final ShardId shardId;

List<String> phase1FileNames;
List<Long> phase1FileSizes;
List<String> phase1ExistingFileNames;
Expand All @@ -30,8 +27,6 @@ public class RecoveryFilesInfoRequest extends RecoveryTransportRequest {

public RecoveryFilesInfoRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
int size = in.readVInt();
phase1FileNames = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Expand Down Expand Up @@ -68,29 +63,17 @@ public RecoveryFilesInfoRequest(StreamInput in) throws IOException {
List<Long> phase1ExistingFileSizes,
int totalTranslogOps
) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
super(requestSeqNo, recoveryId, shardId);
this.phase1FileNames = phase1FileNames;
this.phase1FileSizes = phase1FileSizes;
this.phase1ExistingFileNames = phase1ExistingFileNames;
this.phase1ExistingFileSizes = phase1ExistingFileSizes;
this.totalTranslogOps = totalTranslogOps;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);

out.writeStringCollection(phase1FileNames);
out.writeCollection(phase1FileSizes, StreamOutput::writeVLong);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
import java.io.IOException;

final class RecoveryFinalizeRecoveryRequest extends RecoveryTransportRequest {

private final long recoveryId;
private final ShardId shardId;
private final long globalCheckpoint;
private final long trimAboveSeqNo;

RecoveryFinalizeRecoveryRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
globalCheckpoint = in.readZLong();
trimAboveSeqNo = in.readZLong();
}
Expand All @@ -36,21 +31,11 @@ final class RecoveryFinalizeRecoveryRequest extends RecoveryTransportRequest {
final long globalCheckpoint,
final long trimAboveSeqNo
) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
super(requestSeqNo, recoveryId, shardId);
this.globalCheckpoint = globalCheckpoint;
this.trimAboveSeqNo = trimAboveSeqNo;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public long globalCheckpoint() {
return globalCheckpoint;
}
Expand All @@ -62,8 +47,6 @@ public long trimAboveSeqNo() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeZLong(globalCheckpoint);
out.writeZLong(trimAboveSeqNo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,25 @@
import java.io.IOException;

class RecoveryPrepareForTranslogOperationsRequest extends RecoveryTransportRequest {

private final long recoveryId;
private final ShardId shardId;
private final int totalTranslogOps;

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, long requestSeqNo, ShardId shardId, int totalTranslogOps) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
super(requestSeqNo, recoveryId, shardId);
this.totalTranslogOps = totalTranslogOps;
}

RecoveryPrepareForTranslogOperationsRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
totalTranslogOps = in.readVInt();
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public int totalTranslogOps() {
return totalTranslogOps;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVInt(totalTranslogOps);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.io.IOException;

public class RecoverySnapshotFileRequest extends RecoveryTransportRequest {
private final long recoveryId;
private final ShardId shardId;
private final String repository;
private final IndexId indexId;
private final BlobStoreIndexShardSnapshot.FileInfo fileInfo;
Expand All @@ -31,18 +29,14 @@ public RecoverySnapshotFileRequest(
IndexId indexId,
BlobStoreIndexShardSnapshot.FileInfo fileInfo
) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
super(requestSeqNo, recoveryId, shardId);
this.repository = repository;
this.indexId = indexId;
this.fileInfo = fileInfo;
}

public RecoverySnapshotFileRequest(StreamInput in) throws IOException {
super(in);
this.recoveryId = in.readLong();
this.shardId = new ShardId(in);
this.repository = in.readString();
this.indexId = new IndexId(in);
this.fileInfo = new BlobStoreIndexShardSnapshot.FileInfo(in);
Expand All @@ -53,21 +47,11 @@ public void writeTo(StreamOutput out) throws IOException {
assert out.getTransportVersion().onOrAfter(RecoverySettings.SNAPSHOT_RECOVERIES_SUPPORTED_TRANSPORT_VERSION)
: "Unexpected serialization version " + out.getTransportVersion();
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeString(repository);
indexId.writeTo(out);
fileInfo.writeTo(out);
}

public long getRecoveryId() {
return recoveryId;
}

public ShardId getShardId() {
return shardId;
}

public String getRepository() {
return repository;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest implements RawIndexingDataTransportRequest {

private final long recoveryId;
private final ShardId shardId;
private final List<Translog.Operation> operations;
private final int totalTranslogOps;
private final long maxSeenAutoIdTimestampOnPrimary;
Expand All @@ -40,9 +38,7 @@ public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary
) {
super(requestSeqNo);
this.recoveryId = recoveryId;
this.shardId = shardId;
super(requestSeqNo, recoveryId, shardId);
this.operations = operations;
this.totalTranslogOps = totalTranslogOps;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
Expand All @@ -51,14 +47,6 @@ public class RecoveryTranslogOperationsRequest extends RecoveryTransportRequest
this.mappingVersionOnPrimary = mappingVersionOnPrimary;
}

public long recoveryId() {
return this.recoveryId;
}

public ShardId shardId() {
return shardId;
}

public List<Translog.Operation> operations() {
return operations;
}
Expand Down Expand Up @@ -90,8 +78,6 @@ long mappingVersionOnPrimary() {

RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
super(in);
recoveryId = in.readLong();
shardId = new ShardId(in);
operations = Translog.readOperations(in, "recovery");
totalTranslogOps = in.readVInt();
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
Expand All @@ -103,8 +89,6 @@ long mappingVersionOnPrimary() {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(recoveryId);
shardId.writeTo(out);
Translog.writeOperations(out, operations);
out.writeVInt(totalTranslogOps);
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
Expand Down

0 comments on commit b27b1fd

Please sign in to comment.