Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce random-access translog #28205

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]translog[/\\]BaseTranslogReader.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]translog[/\\]Translog.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]translog[/\\]TranslogReader.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]translog[/\\]TranslogSnapshot.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]translog[/\\]TranslogWriter.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndexingMemoryController.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]IndicesService.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.index.translog;

import com.carrotsearch.hppc.LongLongMap;
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;

import java.io.IOException;
Expand Down Expand Up @@ -77,15 +78,30 @@ protected final int readSize(ByteBuffer reusableBuffer, long position) throws IO
return size;
}

/**
* Creates a new snapshot.
*
* @return a snapshot
*/
public TranslogSnapshot newSnapshot() {
return new TranslogSnapshot(this, sizeInBytes());
}

/**
* Creates a new random-access snapshot using the specified index to access the underlying snapshot by sequence number.
*
* @param index the random-access index for this snapshot
* @return a random-access snapshot
*/
public RandomAccessTranslogSnapshot newRandomAccessSnapshot(final LongLongMap index) {
return new RandomAccessTranslogSnapshot(this, sizeInBytes(), index);
}

/**
* reads an operation at the given position and returns it. The buffer length is equal to the number
* of bytes reads.
*/
protected final BufferedChecksumStreamInput checksummedStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException {
final BufferedChecksumStreamInput checksumStream(ByteBuffer reusableBuffer, long position, int opSize, BufferedChecksumStreamInput reuse) throws IOException {
final ByteBuffer buffer;
if (reusableBuffer.capacity() >= opSize) {
buffer = reusableBuffer;
Expand All @@ -106,7 +122,7 @@ protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws I
/**
* reads bytes at position into the given buffer, filling it.
*/
protected abstract void readBytes(ByteBuffer buffer, long position) throws IOException;
protected abstract void readBytes(ByteBuffer buffer, long position) throws IOException;

@Override
public String toString() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ public int overriddenOperations() {
public Translog.Operation next() throws IOException {
for (; index >= 0; index--) {
final TranslogSnapshot current = translogs[index];
Translog.Operation op;
while ((op = current.next()) != null) {
Translog.OperationWithPosition it;
while ((it = current.next()) != null) {
final Translog.Operation op = it.operation();
if (op.seqNo() == SequenceNumbers.UNASSIGNED_SEQ_NO || seenSeqNo.getAndSet(op.seqNo()) == false) {
return op;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import java.io.Closeable;
import java.io.IOException;

/**
* A random-access multi-snapshot providing random access to a collection of indexed snapshots.
*/
final class RandomAccessMultiSnapshot implements Translog.RandomAccessSnapshot {

private final RandomAccessTranslogSnapshot[] snapshots;
private final Closeable onClose;

/**
* Creates a new random-access multi-snapshot from the specified random-access snapshots.
*
* @param snapshots the random-access snapshots to wrap in this multi-snapshot
* @param onClose resource to close when this snapshot is closed
*/
RandomAccessMultiSnapshot(final RandomAccessTranslogSnapshot[] snapshots, final Closeable onClose) {
this.snapshots = snapshots;
this.onClose = onClose;
}

@Override
public Translog.Operation operation(final long seqNo) throws IOException {
for (int i = snapshots.length - 1; i >= 0; i--) {
final Translog.Operation operation = snapshots[i].operation(seqNo);
if (operation != null) {
return operation;
}
}
return null;
}

@Override
public void close() throws IOException {
onClose.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.translog;

import com.carrotsearch.hppc.LongLongMap;

import java.io.IOException;

/**
* A random-access iterator over a fixed snapshot of a single translog generation.
*/
final class RandomAccessTranslogSnapshot extends TranslogSnapshotReader {

private final LongLongMap index;

/**
* Create a snapshot of the translog file channel. This gives a random-access iterator over the operations in the snapshot, indexed by
* the specified map from sequence number to position.
*
* @param reader the underlying reader
* @param length the size in bytes of the underlying snapshot
* @param index the random-access index from sequence number to position for this snapshot
*/
RandomAccessTranslogSnapshot(final BaseTranslogReader reader, final long length, final LongLongMap index) {
super(reader, length);
this.index = index;
}

Translog.Operation operation(final long seqNo) throws IOException {
if (index.containsKey(seqNo)) {
return readOperation(index.get(seqNo));
} else {
return null;
}
}

}
Loading