From 9223fc96fd15db96aaafae883d19b7a5e6068d8b Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Fri, 5 Dec 2025 17:14:38 +0000 Subject: [PATCH 1/8] Shuffle the files for ServerSide iterators Shuffle the mapFile iterators before the MultiIterator is created to avoid block cache contention. --- .../org/apache/accumulo/tserver/tablet/ScanDataSource.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 6bcfbc4b9e0..1a1ed35681c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -177,8 +177,10 @@ private SortedKeyValueIterator createIterator() files = reservation.getSecond(); } - Collection mapfiles = + List mapfiles = fileManager.openFiles(files, scanParams.isIsolated(), samplerConfig); + // Randomize the ordering of files to avoid block cache contention on seeks + Collections.shuffle(mapfiles); List.of(mapfiles, memIters).forEach(c -> c.forEach(ii -> ii.setInterruptFlag(interruptFlag))); From 28c8f09b2db3679197feda4e880ba8a0b26dcfa4 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Fri, 5 Dec 2025 21:52:39 +0000 Subject: [PATCH 2/8] Modified MultiIterator instead of ScanDataSource Realized the changes could be moved down to MultiIterator so the behavior was similiar for deepCopies as well as constructed MultiIterators. Added a test for deepCopy since one did not exist --- .../iteratorsImpl/system/MultiIterator.java | 3 ++ .../iterators/system/MultiIteratorTest.java | 49 +++++++++++++++++++ .../tserver/tablet/ScanDataSource.java | 6 +-- 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java index 3b3defc5462..7ec8bfb6908 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -50,6 +51,7 @@ private MultiIterator(MultiIterator other, IteratorEnvironment env) { super(other.iters.size()); this.iters = new ArrayList<>(); this.fence = other.fence; + Collections.shuffle(other.iters); for (SortedKeyValueIterator iter : other.iters) { iters.add(iter.deepCopy(env)); } @@ -73,6 +75,7 @@ private MultiIterator(List> iters, Range seekF } this.fence = seekFence; + Collections.shuffle(iters); this.iters = iters; if (init) { diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java index c9a52242dc2..933f3c5eb96 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java @@ -422,4 +422,53 @@ public void test7() throws IOException { mi.seek(r7, EMPTY_COL_FAMS, false); assertFalse(mi.hasTop()); } + + @Test + public void testDeepCopy() throws IOException { + // TEst setting an endKey + TreeMap tm1 = new TreeMap<>(); + newKeyValue(tm1, 0, 3, false, "1"); + newKeyValue(tm1, 0, 2, false, "2"); + newKeyValue(tm1, 0, 1, false, "3"); + newKeyValue(tm1, 0, 0, false, "4"); + newKeyValue(tm1, 1, 2, false, "5"); + newKeyValue(tm1, 1, 1, false, "6"); + newKeyValue(tm1, 1, 0, false, "7"); + newKeyValue(tm1, 2, 1, false, "8"); + newKeyValue(tm1, 2, 0, false, "9"); + + List> skvil = new ArrayList<>(1); + skvil.add(new SortedMapIterator(tm1)); + + KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); + + MultiIterator mi = new MultiIterator(skvil, extent); + MultiIterator miCopy = mi.deepCopy(null); + + Range r1 = new Range((Text) null, (Text) null); + mi.seek(r1, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("5", mi.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("6", mi.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("7", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + + miCopy.seek(r1, EMPTY_COL_FAMS, false); + + assertTrue(miCopy.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + miCopy.next(); + assertTrue(miCopy.hasTop()); + assertEquals("6", miCopy.getTopValue().toString()); + miCopy.next(); + assertTrue(miCopy.hasTop()); + assertEquals("7", miCopy.getTopValue().toString()); + miCopy.next(); + assertFalse(miCopy.hasTop()); + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 1a1ed35681c..6bcfbc4b9e0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -177,10 +177,8 @@ private SortedKeyValueIterator createIterator() files = reservation.getSecond(); } - List mapfiles = + Collection mapfiles = fileManager.openFiles(files, scanParams.isIsolated(), samplerConfig); - // Randomize the ordering of files to avoid block cache contention on seeks - Collections.shuffle(mapfiles); List.of(mapfiles, memIters).forEach(c -> c.forEach(ii -> ii.setInterruptFlag(interruptFlag))); From c67d7a0ce5cac2b8079e076d155a06eaa5ba5564 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Fri, 5 Dec 2025 21:55:26 +0000 Subject: [PATCH 3/8] formatting change --- .../accumulo/core/iterators/system/MultiIteratorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java index 933f3c5eb96..26f0e2a6289 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java @@ -426,7 +426,7 @@ public void test7() throws IOException { @Test public void testDeepCopy() throws IOException { // TEst setting an endKey - TreeMap tm1 = new TreeMap<>(); + TreeMap tm1 = new TreeMap<>(); newKeyValue(tm1, 0, 3, false, "1"); newKeyValue(tm1, 0, 2, false, "2"); newKeyValue(tm1, 0, 1, false, "3"); @@ -437,7 +437,7 @@ public void testDeepCopy() throws IOException { newKeyValue(tm1, 2, 1, false, "8"); newKeyValue(tm1, 2, 0, false, "9"); - List> skvil = new ArrayList<>(1); + List> skvil = new ArrayList<>(1); skvil.add(new SortedMapIterator(tm1)); KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); From f3438ce774e98fd9c04914a2889ed5b93f6bd3eb Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Tue, 16 Dec 2025 20:46:33 -0500 Subject: [PATCH 4/8] Update core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java Co-authored-by: Keith Turner --- .../accumulo/core/iterators/system/MultiIteratorTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java index 26f0e2a6289..9ac59c127eb 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java @@ -447,11 +447,17 @@ public void testDeepCopy() throws IOException { Range r1 = new Range((Text) null, (Text) null); mi.seek(r1, EMPTY_COL_FAMS, false); + miCopy.seek(r1, EMPTY_COL_FAMS, false); assertTrue(mi.hasTop()); assertEquals("5", mi.getTopValue().toString()); mi.next(); assertTrue(mi.hasTop()); assertEquals("6", mi.getTopValue().toString()); + assertTrue(miCopy.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + . + . + . mi.next(); assertTrue(mi.hasTop()); assertEquals("7", mi.getTopValue().toString()); From bb6102a6f1597b7d0e42bfe184d1cc7c37cb4cec Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Wed, 17 Dec 2025 03:34:14 +0000 Subject: [PATCH 5/8] Adds shuffle prop and fileManager shuffling Adds the table property for shuffling files. Adds shuffling for files in the FileManager. Moves the shuffling logic into a separate Iterator class and changes the ScanDataSource code to select the specific iterator class. --- .../core/client/rfile/RFileScanner.java | 15 +- .../apache/accumulo/core/conf/Property.java | 3 + .../iteratorsImpl/system/MultiIterator.java | 3 - .../system/MultiShuffledIterator.java | 122 +++++ .../iterators/system/MultiIteratorTest.java | 7 +- .../system/MultiShuffledIteratorTest.java | 480 ++++++++++++++++++ .../accumulo/server/fs/FileManager.java | 8 + .../tserver/tablet/ScanDataSource.java | 14 +- 8 files changed, 640 insertions(+), 12 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java create mode 100644 core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index 9266a53e510..e6d821c9195 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -61,6 +61,7 @@ import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; @@ -286,10 +287,20 @@ public Iterator> iterator() { } SortedKeyValueIterator iterator; + boolean shuffled = tableConf.getBoolean(Property.TABLE_SHUFFLE_SOURCES); + if (opts.bounds != null) { - iterator = new MultiIterator(readers, opts.bounds); + if (shuffled) { + iterator = new MultiShuffledIterator(readers, opts.bounds); + } else { + iterator = new MultiIterator(readers, opts.bounds); + } } else { - iterator = new MultiIterator(readers, false); + if (shuffled) { + iterator = new MultiShuffledIterator(readers, false); + } else { + iterator = new MultiIterator(readers, false); + } } Set families = Collections.emptySet(); diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index d6acb504fad..8fd7896bf5a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1194,6 +1194,9 @@ public enum Property { "The maximum amount of memory that will be used to cache results of a client query/scan. " + "Once this limit is reached, the buffered data is sent to the client.", "1.3.5"), + TABLE_SHUFFLE_SOURCES("table.shuffle.sources", "false", PropertyType.BOOLEAN, + "Shuffle the opening order for Rfiles to reduce thread contention on file open operations.", + "2.1.5"), TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.FILENAME_EXT, "Change the type of file a table writes.", "1.3.5"), TABLE_LOAD_BALANCER("table.balancer", "org.apache.accumulo.core.spi.balancer.SimpleLoadBalancer", diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java index 7ec8bfb6908..3b3defc5462 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -51,7 +50,6 @@ private MultiIterator(MultiIterator other, IteratorEnvironment env) { super(other.iters.size()); this.iters = new ArrayList<>(); this.fence = other.fence; - Collections.shuffle(other.iters); for (SortedKeyValueIterator iter : other.iters) { iters.add(iter.deepCopy(env)); } @@ -75,7 +73,6 @@ private MultiIterator(List> iters, Range seekF } this.fence = seekFence; - Collections.shuffle(iters); this.iters = iters; if (init) { diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java new file mode 100644 index 00000000000..d503fc03408 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iteratorsImpl.system; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + +/** + * An iterator capable of iterating over other iterators in sorted order while shuffling the initial + * seek ordering to avoid thread contention. + */ +public class MultiShuffledIterator extends HeapIterator { + + private final List> iters; + private final Range fence; + + // deep copy with no seek/scan state + @Override + public MultiShuffledIterator deepCopy(IteratorEnvironment env) { + return new MultiShuffledIterator(this, env); + } + + private MultiShuffledIterator(MultiShuffledIterator other, IteratorEnvironment env) { + super(other.iters.size()); + this.iters = new ArrayList<>(); + this.fence = other.fence; + Collections.shuffle(other.iters); + for (SortedKeyValueIterator iter : other.iters) { + iters.add(iter.deepCopy(env)); + } + } + + private void init() { + for (SortedKeyValueIterator skvi : iters) { + addSource(skvi); + } + } + + private MultiShuffledIterator(List> iters, Range seekFence, + boolean init) { + super(iters.size()); + + if (seekFence != null && init) { + // throw this exception because multi-iterator does not seek on init, therefore the + // fence would not be enforced in anyway, so do not want to give the impression it + // will enforce this + throw new IllegalArgumentException("Initializing not supported when seek fence set"); + } + + this.fence = seekFence; + Collections.shuffle(iters); + this.iters = iters; + + if (init) { + init(); + } + } + + public MultiShuffledIterator(List> iters, Range seekFence) { + this(iters, seekFence, false); + } + + public MultiShuffledIterator(List> iters2, KeyExtent extent) { + this(iters2, new Range(extent.prevEndRow(), false, extent.endRow(), true), false); + } + + public MultiShuffledIterator(List> readers, boolean init) { + this(readers, null, init); + } + + @Override + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + clear(); + + if (fence != null) { + range = fence.clip(range, true); + if (range == null) { + return; + } + } + + for (SortedKeyValueIterator skvi : iters) { + skvi.seek(range, columnFamilies, inclusive); + addSource(skvi); + } + } + + @Override + public void init(SortedKeyValueIterator source, Map options, + IteratorEnvironment env) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java index 9ac59c127eb..cc488fb8869 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java @@ -445,7 +445,7 @@ public void testDeepCopy() throws IOException { MultiIterator mi = new MultiIterator(skvil, extent); MultiIterator miCopy = mi.deepCopy(null); - Range r1 = new Range((Text) null, (Text) null); + Range r1 = new Range((Text) null, null); mi.seek(r1, EMPTY_COL_FAMS, false); miCopy.seek(r1, EMPTY_COL_FAMS, false); assertTrue(mi.hasTop()); @@ -455,14 +455,12 @@ public void testDeepCopy() throws IOException { assertEquals("6", mi.getTopValue().toString()); assertTrue(miCopy.hasTop()); assertEquals("5", miCopy.getTopValue().toString()); - . - . - . mi.next(); assertTrue(mi.hasTop()); assertEquals("7", mi.getTopValue().toString()); mi.next(); assertFalse(mi.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); miCopy.seek(r1, EMPTY_COL_FAMS, false); @@ -471,6 +469,7 @@ public void testDeepCopy() throws IOException { miCopy.next(); assertTrue(miCopy.hasTop()); assertEquals("6", miCopy.getTopValue().toString()); + assertFalse(mi.hasTop()); miCopy.next(); assertTrue(miCopy.hasTop()); assertEquals("7", miCopy.getTopValue().toString()); diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java new file mode 100644 index 00000000000..a5cd814337d --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.iterators.system; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; +import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Test; + +public class MultiShuffledIteratorTest { + + private static final Collection EMPTY_COL_FAMS = new ArrayList<>(); + + public static Key newKey(int row, long ts) { + return new Key(newRow(row), ts); + } + + public static Range newRange(int row, long ts) { + return new Range(newKey(row, ts), null); + } + + public static void newKeyValue(TreeMap tm, int row, long ts, boolean deleted, + String val) { + Key k = newKey(row, ts); + k.setDeleted(deleted); + tm.put(k, new Value(val)); + } + + public static Text newRow(int row) { + return new Text(String.format("r%03d", row)); + } + + void verify(int start, int end, Key seekKey, Text endRow, Text prevEndRow, boolean init, + boolean incrRow, List> maps) throws IOException { + List> iters = new ArrayList<>(maps.size()); + + for (TreeMap map : maps) { + iters.add(new SortedMapIterator(map)); + } + + MultiShuffledIterator mi; + if (endRow == null && prevEndRow == null) { + mi = new MultiShuffledIterator(iters, init); + } else { + Range range = new Range(prevEndRow, false, endRow, true); + if (init) { + for (SortedKeyValueIterator iter : iters) { + iter.seek(range, Set.of(), false); + } + } + mi = new MultiShuffledIterator(iters, range); + + if (init) { + mi.seek(range, Set.of(), false); + } + } + + if (seekKey != null) { + mi.seek(new Range(seekKey, null), EMPTY_COL_FAMS, false); + } else { + mi.seek(new Range(), EMPTY_COL_FAMS, false); + } + + int i = start; + while (mi.hasTop()) { + if (incrRow) { + assertEquals(newKey(i, 0), mi.getTopKey()); + } else { + assertEquals(newKey(0, i), mi.getTopKey()); + } + + assertEquals("v" + i, mi.getTopValue().toString()); + + mi.next(); + if (incrRow) { + i++; + } else { + i--; + } + } + + assertEquals(end, i, + "start=" + start + " end=" + end + " seekKey=" + seekKey + " endRow=" + endRow + + " prevEndRow=" + prevEndRow + " init=" + init + " incrRow=" + incrRow + " maps=" + + maps); + } + + void verify(int start, Key seekKey, List> maps) throws IOException { + if (seekKey != null) { + verify(start, -1, seekKey, null, null, false, false, maps); + } + + verify(start, -1, seekKey, null, null, true, false, maps); + } + + @Test + public void test1() throws IOException { + // TEST non overlapping inputs + + TreeMap tm1 = new TreeMap<>(); + List> tmpList = new ArrayList<>(2); + + for (int i = 0; i < 4; i++) { + newKeyValue(tm1, 0, i, false, "v" + i); + } + tmpList.add(tm1); + tm1 = new TreeMap<>(); + for (int i = 4; i < 8; i++) { + newKeyValue(tm1, 0, i, false, "v" + i); + } + tmpList.add(tm1); + for (int seek = -1; seek < 8; seek++) { + if (seek == 7) { + verify(seek, null, tmpList); + } + verify(seek, newKey(0, seek), tmpList); + } + } + + @Test + public void test2() throws IOException { + // TEST overlapping inputs + + TreeMap tm1 = new TreeMap<>(); + TreeMap tm2 = new TreeMap<>(); + List> tmpList = new ArrayList<>(2); + + for (int i = 0; i < 8; i++) { + if (i % 2 == 0) { + newKeyValue(tm1, 0, i, false, "v" + i); + } else { + newKeyValue(tm2, 0, i, false, "v" + i); + } + } + tmpList.add(tm1); + tmpList.add(tm2); + for (int seek = -1; seek < 8; seek++) { + if (seek == 7) { + verify(seek, null, tmpList); + } + verify(seek, newKey(0, seek), tmpList); + } + } + + @Test + public void test3() throws IOException { + // TEST single input + + TreeMap tm1 = new TreeMap<>(); + List> tmpList = new ArrayList<>(2); + + for (int i = 0; i < 8; i++) { + newKeyValue(tm1, 0, i, false, "v" + i); + } + tmpList.add(tm1); + + for (int seek = -1; seek < 8; seek++) { + if (seek == 7) { + verify(seek, null, tmpList); + } + verify(seek, newKey(0, seek), tmpList); + } + } + + @Test + public void test4() throws IOException { + // TEST empty input + + TreeMap tm1 = new TreeMap<>(); + + List> skvil = new ArrayList<>(1); + skvil.add(new SortedMapIterator(tm1)); + MultiIterator mi = new MultiIterator(skvil, true); + + assertFalse(mi.hasTop()); + + mi.seek(newRange(0, 6), EMPTY_COL_FAMS, false); + assertFalse(mi.hasTop()); + } + + @Test + public void test5() throws IOException { + // TEST overlapping inputs AND prevRow AND endRow AND seek + + TreeMap tm1 = new TreeMap<>(); + TreeMap tm2 = new TreeMap<>(); + List> tmpList = new ArrayList<>(2); + + for (int i = 0; i < 8; i++) { + if (i % 2 == 0) { + newKeyValue(tm1, i, 0, false, "v" + i); + } else { + newKeyValue(tm2, i, 0, false, "v" + i); + } + } + + tmpList.add(tm1); + tmpList.add(tm2); + for (int seek = -1; seek < 9; seek++) { + verify(Math.max(0, seek), 8, newKey(seek, 0), null, null, true, true, tmpList); + verify(Math.max(0, seek), 8, newKey(seek, 0), null, null, false, true, tmpList); + + for (int er = seek; er < 10; er++) { + + int end = seek > er ? seek : Math.min(er + 1, 8); + + int noSeekEnd = Math.min(er + 1, 8); + if (er < 0) { + noSeekEnd = 0; + } + + verify(0, noSeekEnd, null, newRow(er), null, true, true, tmpList); + verify(Math.max(0, seek), end, newKey(seek, 0), newRow(er), null, true, true, tmpList); + verify(Math.max(0, seek), end, newKey(seek, 0), newRow(er), null, false, true, tmpList); + + for (int per = -1; per < er; per++) { + + int start = Math.max(per + 1, seek); + + if (start > er) { + end = start; + } + + if (per >= 8) { + end = start; + } + + int noSeekStart = Math.max(0, per + 1); + + if (er < 0 || per >= 7) { + noSeekEnd = noSeekStart; + } + + verify(noSeekStart, noSeekEnd, null, newRow(er), newRow(per), true, true, tmpList); + verify(Math.max(0, start), end, newKey(seek, 0), newRow(er), newRow(per), true, true, + tmpList); + verify(Math.max(0, start), end, newKey(seek, 0), newRow(er), newRow(per), false, true, + tmpList); + } + } + } + } + + @Test + public void test6() throws IOException { + // TEst setting an endKey + TreeMap tm1 = new TreeMap<>(); + newKeyValue(tm1, 3, 0, false, "1"); + newKeyValue(tm1, 4, 0, false, "2"); + newKeyValue(tm1, 6, 0, false, "3"); + + List> skvil = new ArrayList<>(1); + skvil.add(new SortedMapIterator(tm1)); + MultiIterator mi = new MultiIterator(skvil, true); + mi.seek(new Range(null, true, newKey(5, 9), false), EMPTY_COL_FAMS, false); + + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(3, 0)); + assertEquals("1", mi.getTopValue().toString()); + mi.next(); + + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(4, 0)); + assertEquals("2", mi.getTopValue().toString()); + mi.next(); + + assertFalse(mi.hasTop()); + + mi.seek(new Range(newKey(4, 10), true, newKey(5, 9), false), EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(4, 0)); + assertEquals("2", mi.getTopValue().toString()); + mi.next(); + + assertFalse(mi.hasTop()); + + mi.seek(new Range(newKey(4, 10), true, newKey(6, 0), false), EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(4, 0)); + assertEquals("2", mi.getTopValue().toString()); + mi.next(); + + assertFalse(mi.hasTop()); + + mi.seek(new Range(newKey(4, 10), true, newKey(6, 0), true), EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(4, 0)); + assertEquals("2", mi.getTopValue().toString()); + mi.next(); + + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(6, 0)); + assertEquals("3", mi.getTopValue().toString()); + mi.next(); + + assertFalse(mi.hasTop()); + + mi.seek(new Range(newKey(4, 0), true, newKey(6, 0), false), EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(4, 0)); + assertEquals("2", mi.getTopValue().toString()); + mi.next(); + + assertFalse(mi.hasTop()); + + mi.seek(new Range(newKey(4, 0), false, newKey(6, 0), false), EMPTY_COL_FAMS, false); + assertFalse(mi.hasTop()); + + mi.seek(new Range(newKey(4, 0), false, newKey(6, 0), true), EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals(mi.getTopKey(), newKey(6, 0)); + assertEquals("3", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + + } + + @Test + public void test7() throws IOException { + // TEst setting an endKey + TreeMap tm1 = new TreeMap<>(); + newKeyValue(tm1, 0, 3, false, "1"); + newKeyValue(tm1, 0, 2, false, "2"); + newKeyValue(tm1, 0, 1, false, "3"); + newKeyValue(tm1, 0, 0, false, "4"); + newKeyValue(tm1, 1, 2, false, "5"); + newKeyValue(tm1, 1, 1, false, "6"); + newKeyValue(tm1, 1, 0, false, "7"); + newKeyValue(tm1, 2, 1, false, "8"); + newKeyValue(tm1, 2, 0, false, "9"); + + List> skvil = new ArrayList<>(1); + skvil.add(new SortedMapIterator(tm1)); + + KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); + + MultiIterator mi = new MultiIterator(skvil, extent); + + Range r1 = new Range((Text) null, (Text) null); + mi.seek(r1, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("5", mi.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("6", mi.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("7", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + + Range r2 = new Range(newKey(0, 0), true, newKey(1, 1), true); + mi.seek(r2, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("5", mi.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("6", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + + Range r3 = new Range(newKey(0, 0), false, newKey(1, 1), false); + mi.seek(r3, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("5", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + + Range r4 = new Range(newKey(1, 2), true, newKey(1, 1), false); + mi.seek(r4, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("5", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + + Range r5 = new Range(newKey(1, 2), false, newKey(1, 1), true); + mi.seek(r5, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("6", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + + Range r6 = new Range(newKey(2, 1), true, newKey(2, 0), true); + mi.seek(r6, EMPTY_COL_FAMS, false); + assertFalse(mi.hasTop()); + + Range r7 = new Range(newKey(0, 3), true, newKey(0, 1), true); + mi.seek(r7, EMPTY_COL_FAMS, false); + assertFalse(mi.hasTop()); + } + + @Test + public void testDeepCopy() throws IOException { + // TEst setting an endKey + TreeMap tm1 = new TreeMap<>(); + newKeyValue(tm1, 0, 3, false, "1"); + newKeyValue(tm1, 0, 2, false, "2"); + newKeyValue(tm1, 0, 1, false, "3"); + newKeyValue(tm1, 0, 0, false, "4"); + newKeyValue(tm1, 1, 2, false, "5"); + newKeyValue(tm1, 1, 1, false, "6"); + newKeyValue(tm1, 1, 0, false, "7"); + newKeyValue(tm1, 2, 1, false, "8"); + newKeyValue(tm1, 2, 0, false, "9"); + + List> skvil = new ArrayList<>(1); + skvil.add(new SortedMapIterator(tm1)); + + KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); + + MultiShuffledIterator mi = new MultiShuffledIterator(skvil, extent); + MultiShuffledIterator miCopy = mi.deepCopy(null); + + Range r1 = new Range((Text) null, null); + mi.seek(r1, EMPTY_COL_FAMS, false); + miCopy.seek(r1, EMPTY_COL_FAMS, false); + assertTrue(mi.hasTop()); + assertEquals("5", mi.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("6", mi.getTopValue().toString()); + assertTrue(miCopy.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + mi.next(); + assertTrue(mi.hasTop()); + assertEquals("7", mi.getTopValue().toString()); + mi.next(); + assertFalse(mi.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + + miCopy.seek(r1, EMPTY_COL_FAMS, false); + + assertTrue(miCopy.hasTop()); + assertEquals("5", miCopy.getTopValue().toString()); + miCopy.next(); + assertTrue(miCopy.hasTop()); + assertEquals("6", miCopy.getTopValue().toString()); + assertFalse(mi.hasTop()); + miCopy.next(); + assertTrue(miCopy.hasTop()); + assertEquals("7", miCopy.getTopValue().toString()); + miCopy.next(); + assertFalse(miCopy.hasTop()); + } +} diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java index b2f9daeccd3..151a343c482 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java @@ -467,6 +467,7 @@ public class ScanFileManager { private final KeyExtent tablet; private boolean continueOnFailure; private final CacheProvider cacheProvider; + private final boolean shuffleFiles; ScanFileManager(KeyExtent tablet, CacheProvider cacheProvider) { tabletReservedReaders = new ArrayList<>(); @@ -477,6 +478,9 @@ public class ScanFileManager { continueOnFailure = context.getTableConfiguration(tablet.tableId()) .getBoolean(Property.TABLE_FAILURES_IGNORE); + shuffleFiles = context.getTableConfiguration(tablet.tableId()) + .getBoolean(Property.TABLE_SHUFFLE_SOURCES); + if (tablet.isMeta()) { continueOnFailure = false; } @@ -494,6 +498,10 @@ private Map openFiles(List files) + maxOpen + " tablet = " + tablet); } + if (shuffleFiles) { + Collections.shuffle(files); + } + Map newlyReservedReaders = reserveReaders(tablet, files, continueOnFailure, cacheProvider); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 6bcfbc4b9e0..2056cdf599b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -20,13 +20,13 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.thrift.IterInfo; @@ -35,9 +35,11 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; +import org.apache.accumulo.core.iteratorsImpl.system.HeapIterator; import org.apache.accumulo.core.iteratorsImpl.system.InterruptibleIterator; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator.DataSource; import org.apache.accumulo.core.iteratorsImpl.system.StatsIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; @@ -177,7 +179,7 @@ private SortedKeyValueIterator createIterator() files = reservation.getSecond(); } - Collection mapfiles = + List mapfiles = fileManager.openFiles(files, scanParams.isIsolated(), samplerConfig); List.of(mapfiles, memIters).forEach(c -> c.forEach(ii -> ii.setInterruptFlag(interruptFlag))); @@ -188,7 +190,13 @@ private SortedKeyValueIterator createIterator() iters.addAll(mapfiles); iters.addAll(memIters); - MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent()); + HeapIterator multiIter; + if (tablet.getContext().getTableConfiguration(tablet.getExtent().tableId()) + .getBoolean(Property.TABLE_SHUFFLE_SOURCES)) { + multiIter = new MultiShuffledIterator(iters, tablet.getExtent()); + } else { + multiIter = new MultiIterator(iters, tablet.getExtent()); + } var builder = new SystemIteratorEnvironmentImpl.Builder(tablet.getContext()) .withTopLevelIterators(new ArrayList<>()).withScope(IteratorScope.scan) From fa0e2b697483c1dce35a24fdc0cf65425d387d08 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Wed, 17 Dec 2025 19:24:27 +0000 Subject: [PATCH 6/8] Clean up constructors for MuliIterator Support shuffled iterators in GeneratedSplits and OfflineIterator --- .../core/client/rfile/RFileScanner.java | 4 +- .../core/clientImpl/OfflineIterator.java | 8 +- .../core/file/rfile/GenerateSplits.java | 4 +- .../iteratorsImpl/system/MultiIterator.java | 31 ++++--- .../system/MultiShuffledIterator.java | 86 ++----------------- .../iterators/system/MultiIteratorTest.java | 4 +- .../system/MultiShuffledIteratorTest.java | 6 +- .../SystemIteratorEnvironmentImpl.java | 2 +- .../tserver/tablet/ScanDataSource.java | 4 +- .../performance/scan/CollectTabletStats.java | 2 +- 10 files changed, 49 insertions(+), 102 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java index e6d821c9195..49e424b4563 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java @@ -297,9 +297,9 @@ public Iterator> iterator() { } } else { if (shuffled) { - iterator = new MultiShuffledIterator(readers, false); + iterator = new MultiShuffledIterator(readers); } else { - iterator = new MultiIterator(readers, false); + iterator = new MultiIterator(readers); } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java index 7ec99bd9c49..251523b445c 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java @@ -56,6 +56,7 @@ import org.apache.accumulo.core.iteratorsImpl.ClientIteratorEnvironment; import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; +import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -243,7 +244,12 @@ private SortedKeyValueIterator createIterator(KeyExtent extent, readers.add(reader); } - MultiIterator multiIter = new MultiIterator(readers, extent); + MultiIterator multiIter; + if (tableCC.getBoolean(Property.TABLE_SHUFFLE_SOURCES)) { + multiIter = new MultiShuffledIterator(readers, extent.toDataRange()); + } else { + multiIter = new MultiIterator(readers, extent.toDataRange()); + } ClientIteratorEnvironment.Builder iterEnvBuilder = new ClientIteratorEnvironment.Builder().withAuthorizations(authorizations) diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java index 865210a9708..3937ca383b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/GenerateSplits.java @@ -338,7 +338,7 @@ private TreeSet getSplitsFromFullScan(SiteConfiguration accumuloConf, readers.add(reader); fileReaders.add(reader); } - iterator = new MultiIterator(readers, false); + iterator = new MultiIterator(readers); iterator.seek(new Range(), Collections.emptySet(), false); splitArray = getQuantiles(iterator, numSplits); } finally { @@ -372,7 +372,7 @@ private TreeSet getSplitsBySize(AccumuloConfiguration accumuloConf, readers.add(reader); fileReaders.add(reader); } - iterator = new MultiIterator(readers, false); + iterator = new MultiIterator(readers); iterator.seek(new Range(), Collections.emptySet(), false); while (iterator.hasTop()) { Key key = iterator.getTopKey(); diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java index 3b3defc5462..7bdd086065c 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java @@ -28,7 +28,6 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -37,7 +36,7 @@ */ public class MultiIterator extends HeapIterator { - private final List> iters; + private List> iters; private final Range fence; // deep copy with no seek/scan state @@ -48,11 +47,16 @@ public MultiIterator deepCopy(IteratorEnvironment env) { private MultiIterator(MultiIterator other, IteratorEnvironment env) { super(other.iters.size()); - this.iters = new ArrayList<>(); + var tmpIters = new ArrayList>(); this.fence = other.fence; for (SortedKeyValueIterator iter : other.iters) { - iters.add(iter.deepCopy(env)); + tmpIters.add(iter.deepCopy(env)); } + setIters(tmpIters); + } + + protected void setIters(List> iters) { + this.iters = iters; } private void init() { @@ -67,27 +71,32 @@ private MultiIterator(List> iters, Range seekF if (seekFence != null && init) { // throw this exception because multi-iterator does not seek on init, therefore the - // fence would not be enforced in anyway, so do not want to give the impression it + // fence would not be enforced in any way, so do not want to give the impression it // will enforce this throw new IllegalArgumentException("Initializing not supported when seek fence set"); } this.fence = seekFence; - this.iters = iters; + setIters(iters); if (init) { init(); } } - public MultiIterator(List> iters, Range seekFence) { - this(iters, seekFence, false); + /** + * Creates a MultiIterator that doesn't have a fence range and therefore doesn't seek on creation. + * + * @param iters List of source iterators + */ + public MultiIterator(List> iters) { + this(iters, null, false); } - public MultiIterator(List> iters2, KeyExtent extent) { - this(iters2, new Range(extent.prevEndRow(), false, extent.endRow(), true), false); + public MultiIterator(List> iters, Range seekFence) { + this(iters, seekFence, false); } - + public MultiIterator(List> readers, boolean init) { this(readers, null, init); } diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java index d503fc03408..e563c5467e0 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiShuffledIterator.java @@ -18,105 +18,37 @@ */ package org.apache.accumulo.core.iteratorsImpl.system; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; /** * An iterator capable of iterating over other iterators in sorted order while shuffling the initial * seek ordering to avoid thread contention. */ -public class MultiShuffledIterator extends HeapIterator { +public class MultiShuffledIterator extends MultiIterator { - private final List> iters; - private final Range fence; - - // deep copy with no seek/scan state - @Override - public MultiShuffledIterator deepCopy(IteratorEnvironment env) { - return new MultiShuffledIterator(this, env); - } - - private MultiShuffledIterator(MultiShuffledIterator other, IteratorEnvironment env) { - super(other.iters.size()); - this.iters = new ArrayList<>(); - this.fence = other.fence; - Collections.shuffle(other.iters); - for (SortedKeyValueIterator iter : other.iters) { - iters.add(iter.deepCopy(env)); - } - } - - private void init() { - for (SortedKeyValueIterator skvi : iters) { - addSource(skvi); - } - } - - private MultiShuffledIterator(List> iters, Range seekFence, - boolean init) { - super(iters.size()); - - if (seekFence != null && init) { - // throw this exception because multi-iterator does not seek on init, therefore the - // fence would not be enforced in anyway, so do not want to give the impression it - // will enforce this - throw new IllegalArgumentException("Initializing not supported when seek fence set"); - } - - this.fence = seekFence; - Collections.shuffle(iters); - this.iters = iters; - - if (init) { - init(); - } + public MultiShuffledIterator(List> readers) { + super(readers); } public MultiShuffledIterator(List> iters, Range seekFence) { - this(iters, seekFence, false); - } - - public MultiShuffledIterator(List> iters2, KeyExtent extent) { - this(iters2, new Range(extent.prevEndRow(), false, extent.endRow(), true), false); + super(iters, seekFence); } public MultiShuffledIterator(List> readers, boolean init) { - this(readers, null, init); - } - - @Override - public void seek(Range range, Collection columnFamilies, boolean inclusive) - throws IOException { - clear(); - - if (fence != null) { - range = fence.clip(range, true); - if (range == null) { - return; - } - } - - for (SortedKeyValueIterator skvi : iters) { - skvi.seek(range, columnFamilies, inclusive); - addSource(skvi); - } + super(readers, init); } @Override - public void init(SortedKeyValueIterator source, Map options, - IteratorEnvironment env) throws IOException { - throw new UnsupportedOperationException(); + protected void setIters(List> iters) { + var copy = new ArrayList<>(iters); + Collections.shuffle(copy); + super.setIters(copy); } } diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java index cc488fb8869..19edabe9007 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java @@ -368,7 +368,7 @@ public void test7() throws IOException { KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - MultiIterator mi = new MultiIterator(skvil, extent); + MultiIterator mi = new MultiIterator(skvil, extent.toDataRange()); Range r1 = new Range((Text) null, (Text) null); mi.seek(r1, EMPTY_COL_FAMS, false); @@ -442,7 +442,7 @@ public void testDeepCopy() throws IOException { KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - MultiIterator mi = new MultiIterator(skvil, extent); + MultiIterator mi = new MultiIterator(skvil, extent.toDataRange()); MultiIterator miCopy = mi.deepCopy(null); Range r1 = new Range((Text) null, null); diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java index a5cd814337d..5c11490ea82 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java @@ -369,7 +369,7 @@ public void test7() throws IOException { KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - MultiIterator mi = new MultiIterator(skvil, extent); + MultiIterator mi = new MultiIterator(skvil, extent.toDataRange()); Range r1 = new Range((Text) null, (Text) null); mi.seek(r1, EMPTY_COL_FAMS, false); @@ -443,8 +443,8 @@ public void testDeepCopy() throws IOException { KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - MultiShuffledIterator mi = new MultiShuffledIterator(skvil, extent); - MultiShuffledIterator miCopy = mi.deepCopy(null); + SortedKeyValueIterator mi = new MultiShuffledIterator(skvil, extent.toDataRange()); + SortedKeyValueIterator miCopy = mi.deepCopy(null); Range r1 = new Range((Text) null, null); mi.seek(r1, EMPTY_COL_FAMS, false); diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java index 2705703ad6f..3ef3a5fc9b1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/SystemIteratorEnvironmentImpl.java @@ -125,7 +125,7 @@ public ServerContext getServerContext() { } ArrayList> allIters = new ArrayList<>(topLevelIterators); allIters.add(iter); - return new MultiIterator(allIters, false); + return new MultiIterator(allIters); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 2056cdf599b..2455d70a472 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@ -193,9 +193,9 @@ private SortedKeyValueIterator createIterator() HeapIterator multiIter; if (tablet.getContext().getTableConfiguration(tablet.getExtent().tableId()) .getBoolean(Property.TABLE_SHUFFLE_SOURCES)) { - multiIter = new MultiShuffledIterator(iters, tablet.getExtent()); + multiIter = new MultiShuffledIterator(iters, tablet.getExtent().toDataRange()); } else { - multiIter = new MultiIterator(iters, tablet.getExtent()); + multiIter = new MultiIterator(iters, tablet.getExtent().toDataRange()); } var builder = new SystemIteratorEnvironmentImpl.Builder(tablet.getContext()) diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java index 62e034bbef5..ae789e0d88b 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java @@ -438,7 +438,7 @@ private static SortedKeyValueIterator createScanIterator(KeyExtent ke iters.addAll(mapfiles); iters.add(smi); - MultiIterator multiIter = new MultiIterator(iters, ke); + MultiIterator multiIter = new MultiIterator(iters, ke.toDataRange()); SortedKeyValueIterator delIter = DeletingIterator.wrap(multiIter, false, Behavior.PROCESS); ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); From 147a98037fbda8d71ba57db678143984cb0ae929 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Thu, 18 Dec 2025 15:16:16 +0000 Subject: [PATCH 7/8] fix formatting --- .../accumulo/core/iteratorsImpl/system/MultiIterator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java index 7bdd086065c..a7b7a76dae9 100644 --- a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/system/MultiIterator.java @@ -96,7 +96,7 @@ public MultiIterator(List> iters) { public MultiIterator(List> iters, Range seekFence) { this(iters, seekFence, false); } - + public MultiIterator(List> readers, boolean init) { this(readers, null, init); } From ed88068c1e573ecb88b317b4f8a129bea7ce3763 Mon Sep 17 00:00:00 2001 From: Daniel Roberts ddanielr Date: Tue, 23 Dec 2025 21:55:02 +0000 Subject: [PATCH 8/8] Remove duplicate test code Removed the duplicate test code by extending the original test class --- .../iterators/system/MultiIteratorTest.java | 20 +- .../system/MultiShuffledIteratorTest.java | 452 +----------------- 2 files changed, 18 insertions(+), 454 deletions(-) diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java index 19edabe9007..d669565c011 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiIteratorTest.java @@ -45,6 +45,14 @@ public class MultiIteratorTest { private static final Collection EMPTY_COL_FAMS = new ArrayList<>(); + protected MultiIterator makeIterator(List> list, Range range) { + return new MultiIterator(list, range); + } + + protected MultiIterator makeIterator(List> list, Boolean init) { + return new MultiIterator(list, init); + } + public static Key newKey(int row, long ts) { return new Key(newRow(row), ts); } @@ -74,7 +82,7 @@ void verify(int start, int end, Key seekKey, Text endRow, Text prevEndRow, boole MultiIterator mi; if (endRow == null && prevEndRow == null) { - mi = new MultiIterator(iters, init); + mi = makeIterator(iters, init); } else { Range range = new Range(prevEndRow, false, endRow, true); if (init) { @@ -82,7 +90,7 @@ void verify(int start, int end, Key seekKey, Text endRow, Text prevEndRow, boole iter.seek(range, Set.of(), false); } } - mi = new MultiIterator(iters, range); + mi = makeIterator(iters, range); if (init) { mi.seek(range, Set.of(), false); @@ -204,7 +212,7 @@ public void test4() throws IOException { List> skvil = new ArrayList<>(1); skvil.add(new SortedMapIterator(tm1)); - MultiIterator mi = new MultiIterator(skvil, true); + MultiIterator mi = makeIterator(skvil, true); assertFalse(mi.hasTop()); @@ -285,7 +293,7 @@ public void test6() throws IOException { List> skvil = new ArrayList<>(1); skvil.add(new SortedMapIterator(tm1)); - MultiIterator mi = new MultiIterator(skvil, true); + MultiIterator mi = makeIterator(skvil, true); mi.seek(new Range(null, true, newKey(5, 9), false), EMPTY_COL_FAMS, false); assertTrue(mi.hasTop()); @@ -368,7 +376,7 @@ public void test7() throws IOException { KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - MultiIterator mi = new MultiIterator(skvil, extent.toDataRange()); + MultiIterator mi = makeIterator(skvil, extent.toDataRange()); Range r1 = new Range((Text) null, (Text) null); mi.seek(r1, EMPTY_COL_FAMS, false); @@ -442,7 +450,7 @@ public void testDeepCopy() throws IOException { KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - MultiIterator mi = new MultiIterator(skvil, extent.toDataRange()); + MultiIterator mi = makeIterator(skvil, extent.toDataRange()); MultiIterator miCopy = mi.deepCopy(null); Range r1 = new Range((Text) null, null); diff --git a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java index 5c11490ea82..50ab6fae5d6 100644 --- a/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/iterators/system/MultiShuffledIteratorTest.java @@ -18,463 +18,19 @@ */ package org.apache.accumulo.core.iterators.system; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.Set; -import java.util.TreeMap; -import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.iteratorsImpl.system.MultiIterator; import org.apache.accumulo.core.iteratorsImpl.system.MultiShuffledIterator; -import org.apache.accumulo.core.iteratorsImpl.system.SortedMapIterator; -import org.apache.hadoop.io.Text; -import org.junit.jupiter.api.Test; - -public class MultiShuffledIteratorTest { - - private static final Collection EMPTY_COL_FAMS = new ArrayList<>(); - - public static Key newKey(int row, long ts) { - return new Key(newRow(row), ts); - } - - public static Range newRange(int row, long ts) { - return new Range(newKey(row, ts), null); - } - - public static void newKeyValue(TreeMap tm, int row, long ts, boolean deleted, - String val) { - Key k = newKey(row, ts); - k.setDeleted(deleted); - tm.put(k, new Value(val)); - } - - public static Text newRow(int row) { - return new Text(String.format("r%03d", row)); - } - - void verify(int start, int end, Key seekKey, Text endRow, Text prevEndRow, boolean init, - boolean incrRow, List> maps) throws IOException { - List> iters = new ArrayList<>(maps.size()); - - for (TreeMap map : maps) { - iters.add(new SortedMapIterator(map)); - } - - MultiShuffledIterator mi; - if (endRow == null && prevEndRow == null) { - mi = new MultiShuffledIterator(iters, init); - } else { - Range range = new Range(prevEndRow, false, endRow, true); - if (init) { - for (SortedKeyValueIterator iter : iters) { - iter.seek(range, Set.of(), false); - } - } - mi = new MultiShuffledIterator(iters, range); - - if (init) { - mi.seek(range, Set.of(), false); - } - } - - if (seekKey != null) { - mi.seek(new Range(seekKey, null), EMPTY_COL_FAMS, false); - } else { - mi.seek(new Range(), EMPTY_COL_FAMS, false); - } - - int i = start; - while (mi.hasTop()) { - if (incrRow) { - assertEquals(newKey(i, 0), mi.getTopKey()); - } else { - assertEquals(newKey(0, i), mi.getTopKey()); - } - - assertEquals("v" + i, mi.getTopValue().toString()); - - mi.next(); - if (incrRow) { - i++; - } else { - i--; - } - } - - assertEquals(end, i, - "start=" + start + " end=" + end + " seekKey=" + seekKey + " endRow=" + endRow - + " prevEndRow=" + prevEndRow + " init=" + init + " incrRow=" + incrRow + " maps=" - + maps); - } - - void verify(int start, Key seekKey, List> maps) throws IOException { - if (seekKey != null) { - verify(start, -1, seekKey, null, null, false, false, maps); - } - - verify(start, -1, seekKey, null, null, true, false, maps); - } - - @Test - public void test1() throws IOException { - // TEST non overlapping inputs - - TreeMap tm1 = new TreeMap<>(); - List> tmpList = new ArrayList<>(2); - - for (int i = 0; i < 4; i++) { - newKeyValue(tm1, 0, i, false, "v" + i); - } - tmpList.add(tm1); - tm1 = new TreeMap<>(); - for (int i = 4; i < 8; i++) { - newKeyValue(tm1, 0, i, false, "v" + i); - } - tmpList.add(tm1); - for (int seek = -1; seek < 8; seek++) { - if (seek == 7) { - verify(seek, null, tmpList); - } - verify(seek, newKey(0, seek), tmpList); - } - } - - @Test - public void test2() throws IOException { - // TEST overlapping inputs - - TreeMap tm1 = new TreeMap<>(); - TreeMap tm2 = new TreeMap<>(); - List> tmpList = new ArrayList<>(2); - - for (int i = 0; i < 8; i++) { - if (i % 2 == 0) { - newKeyValue(tm1, 0, i, false, "v" + i); - } else { - newKeyValue(tm2, 0, i, false, "v" + i); - } - } - tmpList.add(tm1); - tmpList.add(tm2); - for (int seek = -1; seek < 8; seek++) { - if (seek == 7) { - verify(seek, null, tmpList); - } - verify(seek, newKey(0, seek), tmpList); - } - } - - @Test - public void test3() throws IOException { - // TEST single input - - TreeMap tm1 = new TreeMap<>(); - List> tmpList = new ArrayList<>(2); - - for (int i = 0; i < 8; i++) { - newKeyValue(tm1, 0, i, false, "v" + i); - } - tmpList.add(tm1); - - for (int seek = -1; seek < 8; seek++) { - if (seek == 7) { - verify(seek, null, tmpList); - } - verify(seek, newKey(0, seek), tmpList); - } - } - - @Test - public void test4() throws IOException { - // TEST empty input - - TreeMap tm1 = new TreeMap<>(); - - List> skvil = new ArrayList<>(1); - skvil.add(new SortedMapIterator(tm1)); - MultiIterator mi = new MultiIterator(skvil, true); - - assertFalse(mi.hasTop()); - - mi.seek(newRange(0, 6), EMPTY_COL_FAMS, false); - assertFalse(mi.hasTop()); - } - - @Test - public void test5() throws IOException { - // TEST overlapping inputs AND prevRow AND endRow AND seek - - TreeMap tm1 = new TreeMap<>(); - TreeMap tm2 = new TreeMap<>(); - List> tmpList = new ArrayList<>(2); - - for (int i = 0; i < 8; i++) { - if (i % 2 == 0) { - newKeyValue(tm1, i, 0, false, "v" + i); - } else { - newKeyValue(tm2, i, 0, false, "v" + i); - } - } - - tmpList.add(tm1); - tmpList.add(tm2); - for (int seek = -1; seek < 9; seek++) { - verify(Math.max(0, seek), 8, newKey(seek, 0), null, null, true, true, tmpList); - verify(Math.max(0, seek), 8, newKey(seek, 0), null, null, false, true, tmpList); - - for (int er = seek; er < 10; er++) { - - int end = seek > er ? seek : Math.min(er + 1, 8); - - int noSeekEnd = Math.min(er + 1, 8); - if (er < 0) { - noSeekEnd = 0; - } - - verify(0, noSeekEnd, null, newRow(er), null, true, true, tmpList); - verify(Math.max(0, seek), end, newKey(seek, 0), newRow(er), null, true, true, tmpList); - verify(Math.max(0, seek), end, newKey(seek, 0), newRow(er), null, false, true, tmpList); - - for (int per = -1; per < er; per++) { - - int start = Math.max(per + 1, seek); - - if (start > er) { - end = start; - } - - if (per >= 8) { - end = start; - } - - int noSeekStart = Math.max(0, per + 1); - - if (er < 0 || per >= 7) { - noSeekEnd = noSeekStart; - } - - verify(noSeekStart, noSeekEnd, null, newRow(er), newRow(per), true, true, tmpList); - verify(Math.max(0, start), end, newKey(seek, 0), newRow(er), newRow(per), true, true, - tmpList); - verify(Math.max(0, start), end, newKey(seek, 0), newRow(er), newRow(per), false, true, - tmpList); - } - } - } - } - - @Test - public void test6() throws IOException { - // TEst setting an endKey - TreeMap tm1 = new TreeMap<>(); - newKeyValue(tm1, 3, 0, false, "1"); - newKeyValue(tm1, 4, 0, false, "2"); - newKeyValue(tm1, 6, 0, false, "3"); - - List> skvil = new ArrayList<>(1); - skvil.add(new SortedMapIterator(tm1)); - MultiIterator mi = new MultiIterator(skvil, true); - mi.seek(new Range(null, true, newKey(5, 9), false), EMPTY_COL_FAMS, false); - - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(3, 0)); - assertEquals("1", mi.getTopValue().toString()); - mi.next(); - - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(4, 0)); - assertEquals("2", mi.getTopValue().toString()); - mi.next(); - - assertFalse(mi.hasTop()); - - mi.seek(new Range(newKey(4, 10), true, newKey(5, 9), false), EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(4, 0)); - assertEquals("2", mi.getTopValue().toString()); - mi.next(); - - assertFalse(mi.hasTop()); - - mi.seek(new Range(newKey(4, 10), true, newKey(6, 0), false), EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(4, 0)); - assertEquals("2", mi.getTopValue().toString()); - mi.next(); - - assertFalse(mi.hasTop()); - - mi.seek(new Range(newKey(4, 10), true, newKey(6, 0), true), EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(4, 0)); - assertEquals("2", mi.getTopValue().toString()); - mi.next(); - - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(6, 0)); - assertEquals("3", mi.getTopValue().toString()); - mi.next(); - - assertFalse(mi.hasTop()); - - mi.seek(new Range(newKey(4, 0), true, newKey(6, 0), false), EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(4, 0)); - assertEquals("2", mi.getTopValue().toString()); - mi.next(); - - assertFalse(mi.hasTop()); - - mi.seek(new Range(newKey(4, 0), false, newKey(6, 0), false), EMPTY_COL_FAMS, false); - assertFalse(mi.hasTop()); - - mi.seek(new Range(newKey(4, 0), false, newKey(6, 0), true), EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals(mi.getTopKey(), newKey(6, 0)); - assertEquals("3", mi.getTopValue().toString()); - mi.next(); - assertFalse(mi.hasTop()); - - } - - @Test - public void test7() throws IOException { - // TEst setting an endKey - TreeMap tm1 = new TreeMap<>(); - newKeyValue(tm1, 0, 3, false, "1"); - newKeyValue(tm1, 0, 2, false, "2"); - newKeyValue(tm1, 0, 1, false, "3"); - newKeyValue(tm1, 0, 0, false, "4"); - newKeyValue(tm1, 1, 2, false, "5"); - newKeyValue(tm1, 1, 1, false, "6"); - newKeyValue(tm1, 1, 0, false, "7"); - newKeyValue(tm1, 2, 1, false, "8"); - newKeyValue(tm1, 2, 0, false, "9"); - - List> skvil = new ArrayList<>(1); - skvil.add(new SortedMapIterator(tm1)); - - KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - - MultiIterator mi = new MultiIterator(skvil, extent.toDataRange()); - - Range r1 = new Range((Text) null, (Text) null); - mi.seek(r1, EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals("5", mi.getTopValue().toString()); - mi.next(); - assertTrue(mi.hasTop()); - assertEquals("6", mi.getTopValue().toString()); - mi.next(); - assertTrue(mi.hasTop()); - assertEquals("7", mi.getTopValue().toString()); - mi.next(); - assertFalse(mi.hasTop()); - - Range r2 = new Range(newKey(0, 0), true, newKey(1, 1), true); - mi.seek(r2, EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals("5", mi.getTopValue().toString()); - mi.next(); - assertTrue(mi.hasTop()); - assertEquals("6", mi.getTopValue().toString()); - mi.next(); - assertFalse(mi.hasTop()); - - Range r3 = new Range(newKey(0, 0), false, newKey(1, 1), false); - mi.seek(r3, EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals("5", mi.getTopValue().toString()); - mi.next(); - assertFalse(mi.hasTop()); - - Range r4 = new Range(newKey(1, 2), true, newKey(1, 1), false); - mi.seek(r4, EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals("5", mi.getTopValue().toString()); - mi.next(); - assertFalse(mi.hasTop()); - - Range r5 = new Range(newKey(1, 2), false, newKey(1, 1), true); - mi.seek(r5, EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals("6", mi.getTopValue().toString()); - mi.next(); - assertFalse(mi.hasTop()); - - Range r6 = new Range(newKey(2, 1), true, newKey(2, 0), true); - mi.seek(r6, EMPTY_COL_FAMS, false); - assertFalse(mi.hasTop()); - - Range r7 = new Range(newKey(0, 3), true, newKey(0, 1), true); - mi.seek(r7, EMPTY_COL_FAMS, false); - assertFalse(mi.hasTop()); - } - - @Test - public void testDeepCopy() throws IOException { - // TEst setting an endKey - TreeMap tm1 = new TreeMap<>(); - newKeyValue(tm1, 0, 3, false, "1"); - newKeyValue(tm1, 0, 2, false, "2"); - newKeyValue(tm1, 0, 1, false, "3"); - newKeyValue(tm1, 0, 0, false, "4"); - newKeyValue(tm1, 1, 2, false, "5"); - newKeyValue(tm1, 1, 1, false, "6"); - newKeyValue(tm1, 1, 0, false, "7"); - newKeyValue(tm1, 2, 1, false, "8"); - newKeyValue(tm1, 2, 0, false, "9"); - - List> skvil = new ArrayList<>(1); - skvil.add(new SortedMapIterator(tm1)); - - KeyExtent extent = new KeyExtent(TableId.of("tablename"), newRow(1), newRow(0)); - - SortedKeyValueIterator mi = new MultiShuffledIterator(skvil, extent.toDataRange()); - SortedKeyValueIterator miCopy = mi.deepCopy(null); - - Range r1 = new Range((Text) null, null); - mi.seek(r1, EMPTY_COL_FAMS, false); - miCopy.seek(r1, EMPTY_COL_FAMS, false); - assertTrue(mi.hasTop()); - assertEquals("5", mi.getTopValue().toString()); - mi.next(); - assertTrue(mi.hasTop()); - assertEquals("6", mi.getTopValue().toString()); - assertTrue(miCopy.hasTop()); - assertEquals("5", miCopy.getTopValue().toString()); - mi.next(); - assertTrue(mi.hasTop()); - assertEquals("7", mi.getTopValue().toString()); - mi.next(); - assertFalse(mi.hasTop()); - assertEquals("5", miCopy.getTopValue().toString()); - miCopy.seek(r1, EMPTY_COL_FAMS, false); +public class MultiShuffledIteratorTest extends MultiIteratorTest { - assertTrue(miCopy.hasTop()); - assertEquals("5", miCopy.getTopValue().toString()); - miCopy.next(); - assertTrue(miCopy.hasTop()); - assertEquals("6", miCopy.getTopValue().toString()); - assertFalse(mi.hasTop()); - miCopy.next(); - assertTrue(miCopy.hasTop()); - assertEquals("7", miCopy.getTopValue().toString()); - miCopy.next(); - assertFalse(miCopy.hasTop()); + @Override + protected MultiIterator makeIterator(List> list, Range range) { + return new MultiShuffledIterator(list, range); } }