From c52509b893ad3d5385f4d725aeb275767488c0a7 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Thu, 29 Apr 2021 15:42:43 +0000 Subject: [PATCH 1/8] Added a explicit Flush Task to flush data at Thread level once it completes the processing --- .../byTask/tasks/FlushIndexTask.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushIndexTask.java diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushIndexTask.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushIndexTask.java new file mode 100644 index 000000000000..d951413cccd2 --- /dev/null +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/FlushIndexTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.benchmark.byTask.tasks; + +import org.apache.lucene.benchmark.byTask.PerfRunData; +import org.apache.lucene.index.IndexWriter; + +/** Flush Index Task uses flushNextBuffer() to flush documents at thread level */ +public class FlushIndexTask extends PerfTask { + + public FlushIndexTask(PerfRunData runData) { + super(runData); + } + + @Override + public int doLogic() throws Exception { + IndexWriter iw = getRunData().getIndexWriter(); + if (iw != null) { + iw.flushNextBuffer(); + } + return 1; + } +} From d9d95a91435b057f533817e40ebe2a80ea985911 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Thu, 29 Apr 2021 16:18:33 +0000 Subject: [PATCH 2/8] Included explicit flush per Thread level --- lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg b/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg index c25c9c3fac91..43a6c91bbebb 100644 --- a/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg +++ b/lucene/benchmark/conf/indexing-flush-by-RAM-multithreaded.alg @@ -53,7 +53,7 @@ log.queries=true { "Populate" CreateIndex - [{ "MAddDocs" AddDoc } : 5000] : 4 + [{ {{"MAddDocs" AddDoc } : 5000} FlushIndex } ] : 8 ForceMerge(1) CloseIndex } From 012296644f31d449a9231523444331af63fc1d3a Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Tue, 11 May 2021 07:04:32 +0000 Subject: [PATCH 3/8] Done changes for parallel processing --- .../byTask/feeds/ReutersContentSource.java | 47 ++++++++++++++----- .../benchmark/byTask/tasks/TaskSequence.java | 6 ++- .../lucene/benchmark/byTask/utils/Config.java | 9 ++++ 3 files changed, 50 insertions(+), 12 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 76c229ff9088..65c0bccd485d 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -52,6 +52,8 @@ private static final class DateFormatInfo { private ArrayList inputFiles = new ArrayList<>(); private int nextFile = 0; private int iteration = 0; + private int[] threadIndex; + private volatile boolean threadIndexCreated; @Override public void setConfig(Config config) { @@ -102,17 +104,33 @@ public void close() throws IOException { public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException { Path f = null; String name = null; - synchronized (this) { - if (nextFile >= inputFiles.size()) { - // exhausted files, start a new round, unless forever set to false. - if (!forever) { - throw new NoMoreDataException(); - } - nextFile = 0; - iteration++; - } - f = inputFiles.get(nextFile++); - name = f.toRealPath() + "_" + iteration; + int inputFilesSize = inputFiles.size(); + + if (threadIndexCreated == false) { + createThreadIndex(); + } + + // Getting file index value which is set for each thread + int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); + int fIndex = index + threadIndex[index] * threadIndex.length; + threadIndex[index]++; + + // Sanity check, if # threads is greater than # input files, wrap index + if (index >= inputFilesSize) { + index %= inputFilesSize; + } + + // Check if this thread has exhausted its files + if (fIndex >= inputFilesSize) { + threadIndex[index] = 0; + fIndex = index + threadIndex[index] * threadIndex.length; + threadIndex[index]++; + iteration++; + } + + f = inputFiles.get(fIndex); + name = f.toRealPath() + "_" + iteration; + } try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) { @@ -146,4 +164,11 @@ public synchronized void resetInputs() throws IOException { nextFile = 0; iteration = 0; } + + private synchronized void createThreadIndex() { + if (threadIndexCreated == false) { + threadIndex = new int[getConfig().getNumThreads()]; + threadIndexCreated = true; + } + } } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index f74aaab11f0c..2beec8dd2ffe 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -340,12 +340,16 @@ private int doParallelTasks() throws Exception { initTasksArray(); ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()]; + this.getRunData().getConfig().setNumThreads(t.length); // prepare threads int index = 0; for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index++] = new ParallelTask(task); + t[index] = new ParallelTask(task); + // Set the thread name for guaranteed file index while processing. + t[index].setName("IndexThread-" + index); + index++; } } // run threads diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java index 74709156c9a5..5eafb553fcf7 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/utils/Config.java @@ -54,6 +54,7 @@ public class Config { private HashMap valByRound = new HashMap<>(); private HashMap colForValByRound = new HashMap<>(); private String algorithmText; + private int numThreads = 1; /** * Read both algorithm and config properties. @@ -113,6 +114,14 @@ public Config(Properties props) { } } + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public int getNumThreads() { + return numThreads; + } + @SuppressWarnings({"unchecked", "rawtypes"}) private void printProps() { System.out.println("------------> config properties:"); From 760604df10777f76013de7d3ca332bb9c41158ca Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Tue, 11 May 2021 07:26:33 +0000 Subject: [PATCH 4/8] Removed extra brace --- .../lucene/benchmark/byTask/feeds/ReutersContentSource.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 65c0bccd485d..5ddadbd7c81c 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -131,8 +131,6 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc f = inputFiles.get(fIndex); name = f.toRealPath() + "_" + iteration; - } - try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) { // First line is the date, 3rd is the title, rest is body String dateStr = reader.readLine(); From f660c9ff1d5f1bae4021d69913445cc953f22032 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Tue, 11 May 2021 07:26:33 +0000 Subject: [PATCH 5/8] Removed unused variable --- .../lucene/benchmark/byTask/feeds/ReutersContentSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 5ddadbd7c81c..e8eabe6f95ec 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -50,7 +50,6 @@ private static final class DateFormatInfo { private ThreadLocal dateFormat = new ThreadLocal<>(); private Path dataDir = null; private ArrayList inputFiles = new ArrayList<>(); - private int nextFile = 0; private int iteration = 0; private int[] threadIndex; private volatile boolean threadIndexCreated; From 2a871d50ce4195e3154701a44d38b4b1d48d8895 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Tue, 11 May 2021 07:26:33 +0000 Subject: [PATCH 6/8] Removed unused variable initialization --- .../lucene/benchmark/byTask/feeds/ReutersContentSource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index e8eabe6f95ec..10f688378340 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -158,7 +158,6 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc @Override public synchronized void resetInputs() throws IOException { super.resetInputs(); - nextFile = 0; iteration = 0; } From 04e7d6deea9a0a62519fa09b81b7026b0471153c Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Tue, 11 May 2021 07:26:33 +0000 Subject: [PATCH 7/8] Did the required formating --- .../lucene/benchmark/byTask/feeds/ReutersContentSource.java | 2 +- .../org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 10f688378340..7fbb5034cf18 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -106,7 +106,7 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc int inputFilesSize = inputFiles.size(); if (threadIndexCreated == false) { - createThreadIndex(); + createThreadIndex(); } // Getting file index value which is set for each thread diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index 2beec8dd2ffe..bcd5fabe21f8 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -346,7 +346,7 @@ private int doParallelTasks() throws Exception { for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index] = new ParallelTask(task); + t[index] = new ParallelTask(task); // Set the thread name for guaranteed file index while processing. t[index].setName("IndexThread-" + index); index++; From 8318a16823b52f89876e7c89bd98c10e376a5624 Mon Sep 17 00:00:00 2001 From: balmukundblr Date: Tue, 11 May 2021 07:26:33 +0000 Subject: [PATCH 8/8] Refactored the code and added required comments & checks --- .../byTask/feeds/ReutersContentSource.java | 51 +++++++------------ .../benchmark/byTask/tasks/TaskSequence.java | 5 +- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java index 10f688378340..7c60848ebdb1 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/feeds/ReutersContentSource.java @@ -50,9 +50,8 @@ private static final class DateFormatInfo { private ThreadLocal dateFormat = new ThreadLocal<>(); private Path dataDir = null; private ArrayList inputFiles = new ArrayList<>(); - private int iteration = 0; - private int[] threadIndex; - private volatile boolean threadIndexCreated; + private int[] docCountArr; + private volatile boolean docCountArrCreated; @Override public void setConfig(Config config) { @@ -101,34 +100,23 @@ public void close() throws IOException { @Override public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException { - Path f = null; - String name = null; - int inputFilesSize = inputFiles.size(); - - if (threadIndexCreated == false) { - createThreadIndex(); + if (docCountArrCreated == false) { + docCountArrInit(); } - // Getting file index value which is set for each thread - int index = Integer.parseInt(Thread.currentThread().getName().substring(12)); - int fIndex = index + threadIndex[index] * threadIndex.length; - threadIndex[index]++; - - // Sanity check, if # threads is greater than # input files, wrap index - if (index >= inputFilesSize) { - index %= inputFilesSize; - } + //Extract ThreadIndex from unique ThreadName (at position 12), which is set with '"IndexThread-"+index', in TaskSequence.java's doParallelTasks() + int threadIndex = Integer.parseInt(Thread.currentThread().getName().substring(12)); + assert (threadIndex >= 0 && threadIndex < docCountArr.length):"Please check threadIndex or docCountArr length"; + int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length; + int inFileSize = inputFiles.size(); - // Check if this thread has exhausted its files - if (fIndex >= inputFilesSize) { - threadIndex[index] = 0; - fIndex = index + threadIndex[index] * threadIndex.length; - threadIndex[index]++; - iteration++; - } + //Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads + int fileIndex = stride % inFileSize; + int iteration = stride / inFileSize; + docCountArr[threadIndex]++; - f = inputFiles.get(fIndex); - name = f.toRealPath() + "_" + iteration; + Path f = inputFiles.get(fIndex); + String name = f.toRealPath() + "_" + iteration; try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) { // First line is the date, 3rd is the title, rest is body @@ -158,13 +146,12 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc @Override public synchronized void resetInputs() throws IOException { super.resetInputs(); - iteration = 0; } - private synchronized void createThreadIndex() { - if (threadIndexCreated == false) { - threadIndex = new int[getConfig().getNumThreads()]; - threadIndexCreated = true; + private synchronized void docCountArrInit() { + if (docCountArrCreated == false) { + docCountArr = new int[getConfig().getNumThreads()]; + docCountArrCreated = true; } } } diff --git a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java index 2beec8dd2ffe..3a28c0de6e3f 100644 --- a/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java +++ b/lucene/benchmark/src/java/org/apache/lucene/benchmark/byTask/tasks/TaskSequence.java @@ -340,14 +340,15 @@ private int doParallelTasks() throws Exception { initTasksArray(); ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()]; + //Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's fileIndexArrInit() this.getRunData().getConfig().setNumThreads(t.length); // prepare threads int index = 0; for (int k = 0; k < repetitions; k++) { for (int i = 0; i < tasksArray.length; i++) { final PerfTask task = tasksArray[i].clone(); - t[index] = new ParallelTask(task); - // Set the thread name for guaranteed file index while processing. + t[index] = new ParallelTask(task); + //Setting unique ThreadName with index value which is used in ReuersContentSource.java's getNextDocData() t[index].setName("IndexThread-" + index); index++; }