Skip to content

Commit

Permalink
MINOR: Replace deepIterator/shallowIterator with deepEntries/shallowE…
Browse files Browse the repository at this point in the history
…ntries

The latter return `Iterable` instead of `Iterator` so that enhanced foreach can be used
in Java.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #2261 from ijuma/deepEntries-shallowEntries
  • Loading branch information
ijuma authored and hachikuji committed Dec 16, 2016
1 parent e55205b commit b58b6a1
Show file tree
Hide file tree
Showing 38 changed files with 241 additions and 224 deletions.
Expand Up @@ -61,7 +61,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -687,9 +686,7 @@ private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
}

List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
Iterator<LogEntry> deepIterator = partition.records.deepIterator();
while (deepIterator.hasNext()) {
LogEntry logEntry = deepIterator.next();
for (LogEntry logEntry : partition.records.deepEntries()) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
Expand Down
Expand Up @@ -16,19 +16,37 @@
**/
package org.apache.kafka.common.record;

import org.apache.kafka.common.utils.AbstractIterator;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public abstract class AbstractRecords implements Records {

private final Iterable<Record> records = new Iterable<Record>() {
@Override
public Iterator<Record> iterator() {
return new Iterator<Record>() {
private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator();
@Override
public boolean hasNext() {
return deepEntries.hasNext();
}
@Override
public Record next() {
return deepEntries.next().record();
}
@Override
public void remove() {
throw new UnsupportedOperationException("Removal not supported");
}
};
}
};

@Override
public boolean hasMatchingShallowMagic(byte magic) {
Iterator<? extends LogEntry> iterator = shallowIterator();
while (iterator.hasNext())
if (iterator.next().magic() != magic)
for (LogEntry entry : shallowEntries())
if (entry.magic() != magic)
return false;
return true;
}
Expand All @@ -39,11 +57,8 @@ public boolean hasMatchingShallowMagic(byte magic) {
@Override
public Records toMessageFormat(byte toMagic) {
List<LogEntry> converted = new ArrayList<>();
Iterator<LogEntry> deepIterator = deepIterator();
while (deepIterator.hasNext()) {
LogEntry entry = deepIterator.next();
for (LogEntry entry : deepEntries())
converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
}

if (converted.isEmpty()) {
// This indicates that the message is too large, which indicates that the buffer is not large
Expand All @@ -60,7 +75,7 @@ public Records toMessageFormat(byte toMagic) {
// cause some timestamp information to be lost (e.g. if the timestamp type was changed) since
// we are essentially merging multiple message sets. However, currently this method is only
// used for down-conversion, so we've ignored the problem.
CompressionType compressionType = shallowIterator().next().record().compressionType();
CompressionType compressionType = shallowEntries().iterator().next().record().compressionType();
return MemoryRecords.withLogEntries(compressionType, converted);
}
}
Expand All @@ -77,16 +92,8 @@ public static int estimatedSize(CompressionType compressionType, Iterable<LogEnt
* Get an iterator over the deep records.
* @return An iterator over the records
*/
public Iterator<Record> records() {
return new AbstractIterator<Record>() {
private final Iterator<? extends LogEntry> deepEntries = deepIterator();
@Override
protected Record makeNext() {
if (deepEntries.hasNext())
return deepEntries.next().record();
return allDone();
}
};
public Iterable<Record> records() {
return records;
}

}
Expand Up @@ -44,6 +44,15 @@ public class FileRecords extends AbstractRecords implements Closeable {
private volatile File file;
private final AtomicInteger size;

private final Iterable<FileChannelLogEntry> shallowEntries;

private final Iterable<LogEntry> deepEntries = new Iterable<LogEntry>() {
@Override
public Iterator<LogEntry> iterator() {
return deepIterator();
}
};

public FileRecords(File file,
FileChannel channel,
int start,
Expand All @@ -58,6 +67,8 @@ public FileRecords(File file,

// set the initial size of the buffer
resize();

shallowEntries = shallowEntriesFrom(start);
}

public void resize() throws IOException {
Expand Down Expand Up @@ -246,9 +257,7 @@ public long writeTo(GatheringByteChannel destChannel, long offset, int length) t
* @param startingPosition The starting position in the file to begin searching from.
*/
public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
Iterator<FileChannelLogEntry> iterator = shallowIteratorFrom(Integer.MAX_VALUE, startingPosition);
while (iterator.hasNext()) {
FileChannelLogEntry entry = iterator.next();
for (FileChannelLogEntry entry : shallowEntriesFrom(startingPosition)) {
long offset = entry.offset();
if (offset >= targetOffset)
return new LogEntryPosition(offset, entry.position(), entry.sizeInBytes());
Expand All @@ -264,9 +273,7 @@ public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingP
* @return The timestamp and offset of the message found. None, if no message is found.
*/
public TimestampAndOffset searchForTimestamp(long targetTimestamp, int startingPosition) {
Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
while (shallowIterator.hasNext()) {
LogEntry shallowEntry = shallowIterator.next();
for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
Record shallowRecord = shallowEntry.record();
if (shallowRecord.timestamp() >= targetTimestamp) {
// We found a message
Expand All @@ -292,9 +299,7 @@ public TimestampAndOffset largestTimestampAfter(int startingPosition) {
long maxTimestamp = Record.NO_TIMESTAMP;
long offsetOfMaxTimestamp = -1L;

Iterator<FileChannelLogEntry> shallowIterator = shallowIteratorFrom(startingPosition);
while (shallowIterator.hasNext()) {
LogEntry shallowEntry = shallowIterator.next();
for (LogEntry shallowEntry : shallowEntriesFrom(startingPosition)) {
long timestamp = shallowEntry.record().timestamp();
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
Expand All @@ -311,24 +316,33 @@ public TimestampAndOffset largestTimestampAfter(int startingPosition) {
* @return An iterator over the shallow entries
*/
@Override
public Iterator<FileChannelLogEntry> shallowIterator() {
return shallowIteratorFrom(start);
public Iterable<FileChannelLogEntry> shallowEntries() {
return shallowEntries;
}

/**
* Get an iterator over the shallow entries, enforcing a maximum record size
* @param maxRecordSize The maximum allowable size of individual records (including compressed record sets)
* @return An iterator over the shallow entries
*/
public Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize) {
return shallowIteratorFrom(maxRecordSize, start);
public Iterable<FileChannelLogEntry> shallowEntries(int maxRecordSize) {
return shallowEntries(maxRecordSize, start);
}

private Iterator<FileChannelLogEntry> shallowIteratorFrom(int start) {
return shallowIteratorFrom(Integer.MAX_VALUE, start);
private Iterable<FileChannelLogEntry> shallowEntriesFrom(int start) {
return shallowEntries(Integer.MAX_VALUE, start);
}

private Iterator<FileChannelLogEntry> shallowIteratorFrom(int maxRecordSize, int start) {
private Iterable<FileChannelLogEntry> shallowEntries(final int maxRecordSize, final int start) {
return new Iterable<FileChannelLogEntry>() {
@Override
public Iterator<FileChannelLogEntry> iterator() {
return shallowIterator(maxRecordSize, start);
}
};
}

private Iterator<FileChannelLogEntry> shallowIterator(int maxRecordSize, int start) {
final int end;
if (isSlice)
end = this.end;
Expand All @@ -339,7 +353,11 @@ private Iterator<FileChannelLogEntry> shallowIteratorFrom(int maxRecordSize, int
}

@Override
public Iterator<LogEntry> deepIterator() {
public Iterable<LogEntry> deepEntries() {
return deepEntries;
}

private Iterator<LogEntry> deepIterator() {
final int end;
if (isSlice)
end = this.end;
Expand Down
Expand Up @@ -31,8 +31,17 @@ public class MemoryRecords extends AbstractRecords {

public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));

// the underlying buffer used for read; while the records are still writable it is null
private ByteBuffer buffer;
private final ByteBuffer buffer;

private final Iterable<ByteBufferLogEntry> shallowEntries = new Iterable<ByteBufferLogEntry>() {
@Override
public Iterator<ByteBufferLogEntry> iterator() {
return shallowIterator();
}
};

private final Iterable<LogEntry> deepEntries = deepEntries(false);

private int validBytes = -1;

// Construct a writable memory records
Expand Down Expand Up @@ -79,9 +88,8 @@ public int validBytes() {
return validBytes;

int bytes = 0;
Iterator<ByteBufferLogEntry> iterator = shallowIterator();
while (iterator.hasNext())
bytes += iterator.next().sizeInBytes();
for (LogEntry entry : shallowEntries())
bytes += entry.sizeInBytes();

this.validBytes = bytes;
return bytes;
Expand All @@ -102,9 +110,7 @@ public FilterResult filterTo(LogEntryFilter filter, ByteBuffer buffer) {
int messagesRetained = 0;
int bytesRetained = 0;

Iterator<ByteBufferLogEntry> shallowIterator = shallowIterator();
while (shallowIterator.hasNext()) {
ByteBufferLogEntry shallowEntry = shallowIterator.next();
for (ByteBufferLogEntry shallowEntry : shallowEntries()) {
bytesRead += shallowEntry.sizeInBytes();

// We use the absolute offset to decide whether to retain the message or not (this is handled by the
Expand Down Expand Up @@ -174,27 +180,36 @@ public ByteBuffer buffer() {
}

@Override
public Iterator<ByteBufferLogEntry> shallowIterator() {
public Iterable<ByteBufferLogEntry> shallowEntries() {
return shallowEntries;
}

private Iterator<ByteBufferLogEntry> shallowIterator() {
return RecordsIterator.shallowIterator(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE));
}

@Override
public Iterator<LogEntry> deepIterator() {
return deepIterator(false);
public Iterable<LogEntry> deepEntries() {
return deepEntries;
}

public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic) {
return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic) {
return new Iterable<LogEntry>() {
@Override
public Iterator<LogEntry> iterator() {
return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
}
};
}

public Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
ensureMatchingMagic, maxMessageSize);
}

@Override
public String toString() {
Iterator<LogEntry> iter = deepIterator();
Iterator<LogEntry> iter = deepEntries().iterator();
StringBuilder builder = new StringBuilder();
builder.append('[');
while (iter.hasNext()) {
Expand Down
Expand Up @@ -18,14 +18,13 @@

import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
import java.util.Iterator;

/**
* Interface for accessing the records contained in a log. The log itself is represented as a sequence of log entries.
* Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
* If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
* compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
* over the shallow records, use {@link #shallowIterator()}; for the deep records, use {@link #deepIterator()}. Note
* over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries()}. Note
* that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
* shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
* See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
Expand Down Expand Up @@ -61,16 +60,16 @@ public interface Records {
* record data (see {@link FileLogInputStream.FileChannelLogEntry#magic()}.
* @return An iterator over the shallow entries of the log
*/
Iterator<? extends LogEntry> shallowIterator();
Iterable<? extends LogEntry> shallowEntries();

/**
* Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
* there are fewer options for optimization since the data must be decompressed before it can be
* returned. Hence there is little advantage in allowing subclasses to return a more specific type
* as we do for {@link #shallowIterator()}.
* as we do for {@link #shallowEntries()}.
* @return An iterator over the deep entries of the log
*/
Iterator<LogEntry> deepIterator();
Iterable<LogEntry> deepEntries();

/**
* Check whether all shallow entries in this buffer have a certain magic value.
Expand Down
Expand Up @@ -101,7 +101,7 @@ public void testFull() throws Exception {
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);

Iterator<LogEntry> iter = batch.records().deepIterator();
Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
for (int i = 0; i < appends; i++) {
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
Expand Down Expand Up @@ -130,7 +130,7 @@ public void testLinger() throws Exception {
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);

Iterator<LogEntry> iter = batch.records().deepIterator();
Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
Expand Down Expand Up @@ -182,11 +182,8 @@ public void run() {
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
Iterator<LogEntry> deepEntries = batch.records().deepIterator();
while (deepEntries.hasNext()) {
deepEntries.next();
for (LogEntry entry : batch.records().deepEntries())
read++;
}
accum.deallocate(batch);
}
}
Expand Down

0 comments on commit b58b6a1

Please sign in to comment.