Skip to content

Commit

Permalink
Introduce SortMergeReader (apache#12)
Browse files Browse the repository at this point in the history
* Introduce SortMergeReader to perform a merge sort on the sorted reader list.

* Fix several typos.
  • Loading branch information
LadyForest committed Dec 2, 2021
1 parent 80ff29c commit 94b4f25
Show file tree
Hide file tree
Showing 7 changed files with 684 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ public enum CombinePolicy implements Serializable {
* number.
*/
public RecordReader<KeyValue> combine(
RecordReader<KeyValue> iterator,
RecordReader<KeyValue> reader,
Comparator<RowData> comparator,
RowDataSerializer userKeySerializer,
RowDataSerializer valueSerializer) {
switch (this) {
case VALUE_COUNT:
return new ValueCountCombineReader(
iterator, comparator, userKeySerializer, valueSerializer);
reader, comparator, userKeySerializer, valueSerializer);
case DEDUPLICATE:
return new DeduplicateCombineReader(
iterator, comparator, userKeySerializer, valueSerializer);
reader, comparator, userKeySerializer, valueSerializer);
default:
throw new UnsupportedOperationException("Unsupported strategy: " + this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.storage.filestore.combine;

import org.apache.flink.table.data.RowData;
import org.apache.flink.table.storage.filestore.KeyValue;
import org.apache.flink.table.storage.filestore.utils.RecordIterator;
import org.apache.flink.table.storage.filestore.utils.RecordReader;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;

/**
* This reader is to read a list of {@link RecordReader}, which is already sorted by key and
* sequence number, and perform a sort merge algorithm. Note that values are not combined at this
* phase.
*/
public class SortMergeReader implements RecordReader<KeyValue> {

private final List<RecordReader<KeyValue>> sortedReaders;
private final Comparator<RowData> userKeyComparator;

private PriorityQueue<Element> minHeap;
private Element minElement;

public SortMergeReader(
List<RecordReader<KeyValue>> sortedReaders, Comparator<RowData> userKeyComparator) {
this.sortedReaders = sortedReaders;
this.userKeyComparator = userKeyComparator;
}

@Nullable
@Override
public RecordIterator<KeyValue> readBatch() throws IOException {
if (sortedReaders.isEmpty()) {
return null;
}
if (minHeap == null) {
initElements();
} else {
supplyElements();
}
if (minHeap.size() > 0) {
return new SortMergeIterator();
} else {
return null;
}
}

@Override
public void close() throws IOException {
for (RecordReader<KeyValue> reader : sortedReaders) {
reader.close();
}
}

private void initElements() throws IOException {
minHeap =
new PriorityQueue<>(
(e1, e2) -> {
int result =
userKeyComparator.compare(
e1.iterator.current().key(),
e2.iterator.current().key());
if (result != 0) {
return result;
}
return Long.compare(
e1.iterator.current().sequenceNumber(),
e2.iterator.current().sequenceNumber());
});
for (int i = 0; i < sortedReaders.size(); i++) {
RecordIterator<KeyValue> iterator = sortedReaders.get(i).readBatch();
if (iterator != null && iterator.advanceNext()) {
minHeap.offer(new Element(iterator, i));
}
}
}

private void supplyElements() throws IOException {
RecordReader<KeyValue> reader = sortedReaders.get(minElement.index);
RecordIterator<KeyValue> iterator = reader.readBatch();
if (iterator != null && iterator.advanceNext()) {
minElement.iterator = iterator;
minHeap.offer(minElement);
}
}

/** The iterator iterates on {@link SortMergeReader}. */
private class SortMergeIterator implements RecordIterator<KeyValue> {

private boolean justPeek = true;

@Override
public boolean advanceNext() throws IOException {
if (minHeap.size() > 0) {
if (justPeek) {
// keep pace with the inner iterators
justPeek = false;
minElement = minHeap.peek();
return true;
}
minElement = minHeap.poll();
// find the top iterator to advance
RecordIterator<KeyValue> topIterator = minElement.iterator;
if (topIterator.advanceNext()) {
minHeap.offer(minElement);
minElement = minHeap.peek();
return true;
} else {
// reach the end of current batch, should invoke the next batch
return false;
}
}
return false;
}

@Override
public boolean singleInstance() {
return false;
}

@Override
public KeyValue current() {
return minElement.iterator.current();
}

@Override
public void releaseBatch() {}
}

/**
* A POJO class composed of a {@link RecordIterator} with the index in the input sorted readers.
* The index helps track which reader should invoke readBatch operation.
*/
private static class Element {

private final int index;

private RecordIterator<KeyValue> iterator;

private Element(RecordIterator<KeyValue> iterator, int index) {
this.iterator = iterator;
this.index = index;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface RecordIterator<T> {

/**
* Releases the batch that this iterator iterated over. This is not supposed to close the reader
* and its resources, but is simply a signal that this iterator is no used any more. This method
* and its resources, but is simply a signal that this iterator is not used anymore. This method
* can be used as a hook to recycle/reuse heavyweight object structures.
*/
void releaseBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,17 @@

package org.apache.flink.table.storage.filestore.combine;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.storage.filestore.KeyValue;
import org.apache.flink.table.storage.filestore.ValueKind;
import org.apache.flink.table.storage.filestore.utils.RecordIterator;
import org.apache.flink.table.storage.filestore.utils.RecordReader;
import org.apache.flink.table.storage.filestore.utils.TestRecordReader;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.storage.filestore.ValueKind.ADD;
import static org.apache.flink.table.storage.filestore.ValueKind.DELETE;
import static org.apache.flink.table.storage.filestore.utils.TestRecordReader.USER_KEY_COMPARATOR;
import static org.apache.flink.table.storage.filestore.utils.TestRecordReader.USER_KEY_SERIALIZER;
import static org.apache.flink.table.storage.filestore.utils.TestRecordReader.VALUE_SERIALIZER;
import static org.apache.flink.table.storage.filestore.utils.TestRecordReader.testCombine;

/** Test for {@link DeduplicateCombineReader}. */
public class DeduplicateCombineReaderTest {
Expand All @@ -49,89 +37,74 @@ public class DeduplicateCombineReaderTest {

@Test
public void testEmpty() throws IOException {
doTest(Collections.emptyList(), Collections.emptyList());
testCombine(
CombinePolicy.DEDUPLICATE,
Collections.emptyList(),
Collections.emptyList(),
BATCH_SIZE);
}

@Test
public void testDelete() throws IOException {
doTest(
Arrays.asList(new Tuple2<>(1, ADD), new Tuple2<>(1, ADD), new Tuple2<>(1, DELETE)),
Collections.singletonList(new Tuple2<>(1, DELETE)));
testCombine(
CombinePolicy.DEDUPLICATE,
Arrays.asList(
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(1, DELETE)),
Collections.singletonList(new TestRecordReader.TestKeyValue(1, DELETE)),
BATCH_SIZE);
}

@Test
public void testCrossBatch() throws IOException {
doTest(
testCombine(
CombinePolicy.DEDUPLICATE,
Arrays.asList(
new Tuple2<>(1, ADD),
new Tuple2<>(1, ADD),
new Tuple2<>(1, ADD),
new Tuple2<>(1, DELETE)),
Collections.singletonList(new Tuple2<>(1, DELETE)));
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(1, DELETE)),
Collections.singletonList(new TestRecordReader.TestKeyValue(1, DELETE)),
BATCH_SIZE);
}

@Test
public void testMultipleKeys() throws IOException {
doTest(
testCombine(
CombinePolicy.DEDUPLICATE,
Arrays.asList(
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(2, ADD),
new TestRecordReader.TestKeyValue(2, DELETE),
new TestRecordReader.TestKeyValue(3, ADD)),
Arrays.asList(
new Tuple2<>(1, ADD),
new Tuple2<>(2, ADD),
new Tuple2<>(2, DELETE),
new Tuple2<>(3, ADD)),
Arrays.asList(new Tuple2<>(1, ADD), new Tuple2<>(2, DELETE), new Tuple2<>(3, ADD)));
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(2, DELETE),
new TestRecordReader.TestKeyValue(3, ADD)),
BATCH_SIZE);
}

@Test
public void testMultipleKeysCross() throws IOException {
doTest(
testCombine(
CombinePolicy.DEDUPLICATE,
Arrays.asList(
new Tuple2<>(1, ADD),
new Tuple2<>(2, ADD),
new Tuple2<>(2, ADD),
new Tuple2<>(2, DELETE),
new Tuple2<>(3, ADD),
new Tuple2<>(4, ADD),
new Tuple2<>(5, ADD),
new Tuple2<>(6, ADD)),
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(2, ADD),
new TestRecordReader.TestKeyValue(2, ADD),
new TestRecordReader.TestKeyValue(2, DELETE),
new TestRecordReader.TestKeyValue(3, ADD),
new TestRecordReader.TestKeyValue(4, ADD),
new TestRecordReader.TestKeyValue(5, ADD),
new TestRecordReader.TestKeyValue(6, ADD)),
Arrays.asList(
new Tuple2<>(1, ADD),
new Tuple2<>(2, DELETE),
new Tuple2<>(3, ADD),
new Tuple2<>(4, ADD),
new Tuple2<>(5, ADD),
new Tuple2<>(6, ADD)));
}

private void doTest(
List<Tuple2<Integer, ValueKind>> inputs, List<Tuple2<Integer, ValueKind>> expected)
throws IOException {
TestRecordReader reader =
new TestRecordReader(
inputs.stream()
.map(
tuple2 ->
new Tuple3<>(
tuple2.f0,
tuple2.f1,
Long.valueOf(tuple2.f0)))
.collect(Collectors.toList()),
BATCH_SIZE);
RecordReader<KeyValue> combineReader =
CombinePolicy.DEDUPLICATE.combine(
reader, USER_KEY_COMPARATOR, USER_KEY_SERIALIZER, VALUE_SERIALIZER);

List<Tuple2<Integer, ValueKind>> values = new ArrayList<>();
RecordIterator<KeyValue> batch;
while ((batch = combineReader.readBatch()) != null) {
while (batch.advanceNext()) {
values.add(
new Tuple2<>(batch.current().key().getInt(0), batch.current().valueKind()));
}
}

combineReader.close();
Assert.assertTrue(reader.isClosed());

Assert.assertEquals(expected, values);
new TestRecordReader.TestKeyValue(1, ADD),
new TestRecordReader.TestKeyValue(2, DELETE),
new TestRecordReader.TestKeyValue(3, ADD),
new TestRecordReader.TestKeyValue(4, ADD),
new TestRecordReader.TestKeyValue(5, ADD),
new TestRecordReader.TestKeyValue(6, ADD)),
BATCH_SIZE);
}
}

0 comments on commit 94b4f25

Please sign in to comment.