diff --git a/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg b/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg index c25c9c3fac9..43a6c91bbeb 100644 --- a/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg +++ b/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg @@ -53,7 +53,7 @@ log.queries=true { "Populate" CreateIndex - [{ "MAddDocs" AddDoc } : 5000] : 4 + [{ {{"MAddDocs" AddDoc } : 5000} FlushIndex } ] : 8 ForceMerge(1) CloseIndex } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java index 53a1a25fc0b..86534eaf99c 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/Constants.java @@ -26,4 +26,6 @@ public class Constants { public static Boolean[] BOOLEANS = new Boolean[] {Boolean.FALSE, Boolean.TRUE}; public static final int DEFAULT_MAXIMUM_DOCUMENTS = Integer.MAX_VALUE; + + public static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread"; } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 76c229ff908..8e040b7f134 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.Locale; +import org.apache.lucene.benchmark.Constants; import org.apache.lucene.benchmark.byTask.utils.Config; /** @@ -50,8 +51,8 @@ private static final class DateFormatInfo { private ThreadLocal dateFormat = new ThreadLocal<>(); private Path dataDir = null; private ArrayList inputFiles = new ArrayList<>(); - private int nextFile = 0; - private int iteration = 0; + private int[] docCountArr; + private volatile boolean docCountArrCreated; @Override public void setConfig(Config config) { @@ -100,21 +101,35 @@ public void close() throws IOException { @Override public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException { - Path f = null; - String name = null; - synchronized (this) { - if (nextFile >= inputFiles.size()) { - // exhausted files, start a new round, unless forever set to false. - if (!forever) { - throw new NoMoreDataException(); - } - nextFile = 0; - iteration++; - } - f = inputFiles.get(nextFile++); - name = f.toRealPath() + "_" + iteration; + if (docCountArrCreated == false) { + docCountArrInit(); } + int threadIndexSize = Thread.currentThread().getName().length(); + int parallelTaskThreadSize = Constants.PARALLEL_TASK_THREAD_NAME_PREFIX.length(); + + // Extract ThreadIndex from unique ThreadName which is set with '"ParallelTaskThread-"+index', + // in TaskSequence.java's doParallelTasks() + int threadIndex = + Integer.parseInt( + Thread.currentThread() + .getName() + .substring(parallelTaskThreadSize + 1, threadIndexSize)); + + assert (threadIndex >= 0 && threadIndex < docCountArr.length) + : "Please check threadIndex or docCountArr length"; + int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length; + int inFileSize = inputFiles.size(); + + // Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of + // Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads + int fileIndex = stride % inFileSize; + int iteration = stride / inFileSize; + docCountArr[threadIndex]++; + + Path f = inputFiles.get(fileIndex); + String name = f.toRealPath() + "_" + iteration; + try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) { // First line is the date, 3rd is the title, rest is body String dateStr = reader.readLine(); @@ -143,7 +158,12 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc @Override public synchronized void resetInputs() throws IOException { super.resetInputs(); - nextFile = 0; - iteration = 0; + } + + private synchronized void docCountArrInit() { + if (docCountArrCreated == false) { + docCountArr = new int[getConfig().getNumThreads()]; + docCountArrCreated = true; + } } } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushIndexTask.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushIndexTask.java new file mode 100644 index 00000000000..d951413cccd --- /dev/null +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushIndexTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.benchmark.byTask.tasks; + +import org.apache.lucene.benchmark.byTask.PerfRunData; +import org.apache.lucene.index.IndexWriter; + +/** Flush Index Task uses flushNextBuffer() to flush documents at thread level */ +public class FlushIndexTask extends PerfTask { + + public FlushIndexTask(PerfRunData runData) { + super(runData); + } + + @Override + public int doLogic() throws Exception { + IndexWriter iw = getRunData().getIndexWriter(); + if (iw != null) { + iw.flushNextBuffer(); + } + return 1; + } +} diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index f74aaab11f0..bb049d490ff 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Locale; +import org.apache.lucene.benchmark.Constants; import org.apache.lucene.benchmark.byTask.PerfRunData; import org.apache.lucene.benchmark.byTask.feeds.NoMoreDataException; import org.apache.lucene.benchmark.byTask.stats.TaskStats; @@ -340,12 +341,23 @@ private int doParallelTasks() throws Exception { initTasksArray(); ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()]; + // Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's + // docCountArrInit() + this.getRunData().getConfig().setNumThreads(t.length); // prepare threads int index = 0; for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index++] = new ParallelTask(task); + t[index] = new ParallelTask(task); + // Setting unique ThreadName with index value which is used in ReuersContentSource.java's + // getNextDocData().Please make changes + // in ReuersContentSource.java's getNextDocData() for + // Integer.parseInt(Thread.currentThread().getName().substring(parallelTaskThreadSize + 1, + // threadIndexSize)) + // before making any modifications here + t[index].setName(Constants.PARALLEL_TASK_THREAD_NAME_PREFIX + "-" + index); + index++; } } // run threads diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java index 74709156c9a..5eafb553fcf 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java @@ -54,6 +54,7 @@ public class Config { private HashMap valByRound = new HashMap<>(); private HashMap colForValByRound = new HashMap<>(); private String algorithmText; + private int numThreads = 1; /** * Read both algorithm and config properties. @@ -113,6 +114,14 @@ public Config(Properties props) { } } + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public int getNumThreads() { + return numThreads; + } + @SuppressWarnings({"unchecked", "rawtypes"}) private void printProps() { System.out.println("------------> config properties:");