Skip to content

Commit

Permalink
Dry up Translog.Operation classes (#95106)
Browse files Browse the repository at this point in the history
Pulling some common logic into an abstract base class,
removing error-prone source getters from the implementations
without a source and simplifying serialization by implementing
Writeable.
  • Loading branch information
original-brownbear committed Apr 10, 2023
1 parent f662a19 commit 9347999
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeZLong(trimAboveSeqNo);
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
out.writeArray(Translog.Operation::writeOperation, operations);
out.writeArray(operations);
}

@Override
Expand Down
172 changes: 65 additions & 107 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
Expand Down Expand Up @@ -1064,8 +1065,8 @@ public void close() throws IOException {
* A generic interface representing an operation performed on the transaction log.
* Each is associated with a type.
*/
public interface Operation {
enum Type {
public abstract static class Operation implements Writeable {
public enum Type {
@Deprecated
CREATE((byte) 1),
INDEX((byte) 2),
Expand Down Expand Up @@ -1093,47 +1094,50 @@ public static Type fromId(byte id) {
}
}

Type opType();
protected final long seqNo;

long estimateSize();
protected final long primaryTerm;

BytesReference source();
protected Operation(long seqNo, long primaryTerm) {
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
}

public abstract Type opType();

long seqNo();
public abstract long estimateSize();

public final long seqNo() {
return seqNo;
}

long primaryTerm();
public final long primaryTerm() {
return primaryTerm;
}

/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeOperation(StreamOutput, Operation)}
* Reads the type and the operation from the given stream.
*/
static Operation readOperation(final StreamInput input) throws IOException {
public static Operation readOperation(final StreamInput input) throws IOException {
final Translog.Operation.Type type = Translog.Operation.Type.fromId(input.readByte());
return switch (type) {
// the de-serialization logic in Index was identical to that of Create when create was deprecated
case CREATE, INDEX -> new Index(input);
case DELETE -> new Delete(input);
case CREATE, INDEX -> Index.readFrom(input);
case DELETE -> Delete.readFrom(input);
case NO_OP -> new NoOp(input);
};
}

/**
* Writes the type and translog operation to the given stream
*/
static void writeOperation(final StreamOutput output, final Operation operation) throws IOException {
output.writeByte(operation.opType().id());
switch (operation.opType()) {
// the serialization logic in Index was identical to that of Create when create was deprecated
case CREATE, INDEX -> ((Index) operation).write(output);
case DELETE -> ((Delete) operation).write(output);
case NO_OP -> ((NoOp) operation).write(output);
default -> throw new AssertionError("no case for [" + operation.opType() + "]");
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeByte(opType().id());
writeBody(out);
}

protected abstract void writeBody(StreamOutput out) throws IOException;
}

public static class Index implements Operation {
public static class Index extends Operation {

public static final int FORMAT_NO_PARENT = 9; // since 7.0
public static final int FORMAT_NO_VERSION_TYPE = FORMAT_NO_PARENT + 1;
Expand All @@ -1142,39 +1146,40 @@ public static class Index implements Operation {

private final String id;
private final long autoGeneratedIdTimestamp;
private final long seqNo;
private final long primaryTerm;
private final long version;
private final BytesReference source;
private final String routing;

private Index(final StreamInput in) throws IOException {
private static Index readFrom(StreamInput in) throws IOException {
final int format = in.readVInt(); // SERIALIZATION_FORMAT
assert format >= FORMAT_NO_PARENT : "format was: " + format;
id = in.readString();
String id = in.readString();
if (format < FORMAT_NO_DOC_TYPE) {
in.readString();
// can't assert that this is _doc because pre-8.0 indexes can have any name for a type
}
source = in.readBytesReference();
routing = in.readOptionalString();
this.version = in.readLong();
BytesReference source = in.readBytesReference();
String routing = in.readOptionalString();
long version = in.readLong();
if (format < FORMAT_NO_VERSION_TYPE) {
in.readByte(); // _version_type
}
this.autoGeneratedIdTimestamp = in.readLong();
seqNo = in.readLong();
primaryTerm = in.readLong();
long autoGeneratedIdTimestamp = in.readLong();
long seqNo = in.readLong();
long primaryTerm = in.readLong();
return new Index(id, seqNo, primaryTerm, version, source, routing, autoGeneratedIdTimestamp);
}

public Index(Engine.Index index, Engine.IndexResult indexResult) {
this.id = index.id();
this.source = index.source();
this.routing = index.routing();
this.seqNo = indexResult.getSeqNo();
this.primaryTerm = index.primaryTerm();
this.version = indexResult.getVersion();
this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
this(
index.id(),
indexResult.getSeqNo(),
index.primaryTerm(),
indexResult.getVersion(),
index.source(),
index.routing(),
index.getAutoGeneratedIdTimestamp()
);
}

public Index(
Expand All @@ -1186,10 +1191,9 @@ public Index(
String routing,
long autoGeneratedIdTimestamp
) {
super(seqNo, primaryTerm);
this.id = id;
this.source = source;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.routing = routing;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
Expand All @@ -1216,26 +1220,16 @@ public String routing() {
return this.routing;
}

@Override
public BytesReference source() {
return this.source;
}

@Override
public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}

public long version() {
return this.version;
}

private void write(final StreamOutput out) throws IOException {
@Override
public void writeBody(final StreamOutput out) throws IOException {
final int format = out.getTransportVersion().onOrAfter(TransportVersion.V_8_0_0)
? SERIALIZATION_FORMAT
: FORMAT_NO_VERSION_TYPE;
Expand Down Expand Up @@ -1309,7 +1303,7 @@ public long getAutoGeneratedIdTimestamp() {

}

public static class Delete implements Operation {
public static class Delete extends Operation {

private static final int FORMAT_6_0 = 4; // 6.0 - *
public static final int FORMAT_NO_PARENT = FORMAT_6_0 + 1; // since 7.0
Expand All @@ -1318,29 +1312,28 @@ public static class Delete implements Operation {
public static final int SERIALIZATION_FORMAT = FORMAT_NO_DOC_TYPE;

private final String id;
private final long seqNo;
private final long primaryTerm;
private final long version;

private Delete(final StreamInput in) throws IOException {
private static Delete readFrom(StreamInput in) throws IOException {
final int format = in.readVInt();// SERIALIZATION_FORMAT
assert format >= FORMAT_6_0 : "format was: " + format;
if (format < FORMAT_NO_DOC_TYPE) {
in.readString();
// Can't assert that this is _doc because pre-8.0 indexes can have any name for a type
}
id = in.readString();
String id = in.readString();
if (format < FORMAT_NO_DOC_TYPE) {
final String docType = in.readString();
assert docType.equals(IdFieldMapper.NAME) : docType + " != " + IdFieldMapper.NAME;
in.readBytesRef(); // uid
}
this.version = in.readLong();
long version = in.readLong();
if (format < FORMAT_NO_VERSION_TYPE) {
in.readByte(); // versionType
}
seqNo = in.readLong();
primaryTerm = in.readLong();
long seqNo = in.readLong();
long primaryTerm = in.readLong();
return new Delete(id, seqNo, primaryTerm, version);
}

public Delete(Engine.Delete delete, Engine.DeleteResult deleteResult) {
Expand All @@ -1353,9 +1346,8 @@ public Delete(String id, long seqNo, long primaryTerm) {
}

public Delete(String id, long seqNo, long primaryTerm, long version) {
super(seqNo, primaryTerm);
this.id = Objects.requireNonNull(id);
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
}

Expand All @@ -1373,26 +1365,12 @@ public String id() {
return id;
}

@Override
public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}

public long version() {
return this.version;
}

@Override
public BytesReference source() {
throw new IllegalStateException("trying to read doc source from delete operation");
}

private void write(final StreamOutput out) throws IOException {
public void writeBody(final StreamOutput out) throws IOException {
final int format = out.getTransportVersion().onOrAfter(TransportVersion.V_8_0_0)
? SERIALIZATION_FORMAT
: FORMAT_NO_VERSION_TYPE;
Expand Down Expand Up @@ -1439,42 +1417,27 @@ public String toString() {
}
}

public static class NoOp implements Operation {

private final long seqNo;
private final long primaryTerm;
public static class NoOp extends Operation {
private final String reason;

@Override
public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}

public String reason() {
return reason;
}

private NoOp(final StreamInput in) throws IOException {
seqNo = in.readLong();
primaryTerm = in.readLong();
reason = in.readString();
this(in.readLong(), in.readLong(), in.readString());
}

public NoOp(final long seqNo, final long primaryTerm, final String reason) {
super(seqNo, primaryTerm);
assert seqNo > SequenceNumbers.NO_OPS_PERFORMED;
assert primaryTerm >= 0;
assert reason != null;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.reason = reason;
}

private void write(final StreamOutput out) throws IOException {
@Override
public void writeBody(final StreamOutput out) throws IOException {
out.writeLong(seqNo);
out.writeLong(primaryTerm);
out.writeString(reason);
Expand All @@ -1490,11 +1453,6 @@ public long estimateSize() {
return 2 * reason.length() + 2 * Long.BYTES;
}

@Override
public BytesReference source() {
throw new UnsupportedOperationException("source does not exist for a no-op");
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down Expand Up @@ -1629,7 +1587,7 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
// because closing it closes the underlying stream, which we don't
// want to do here.
out.resetDigest();
Translog.Operation.writeOperation(out, op);
op.writeTo(out);
long checksum = out.getChecksum();
out.writeInt((int) checksum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ public void testResyncAfterPrimaryPromotion() throws Exception {
translogOperations++;
assertThat("unexpected op: " + next, (int) next.seqNo(), lessThan(initialDocs + extraDocs));
assertThat("unexpected primaryTerm: " + next.primaryTerm(), next.primaryTerm(), is(oldPrimary.getPendingPrimaryTerm()));
assertThat(next.source().utf8ToString(), is("{ \"f\": \"normal\"}"));
assertThat(((Translog.Index) next).source().utf8ToString(), is("{ \"f\": \"normal\"}"));
}
}
assertThat(translogOperations, either(equalTo(initialDocs + extraDocs)).or(equalTo(task.getResyncedOperations())));
Expand Down

0 comments on commit 9347999

Please sign in to comment.