Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel processing #132

Merged
merged 13 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ log.queries=true

{ "Populate"
CreateIndex
[{ "MAddDocs" AddDoc } : 5000] : 4
[{ {{"MAddDocs" AddDoc } : 5000} FlushIndex } ] : 8
ForceMerge(1)
CloseIndex
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ private static final class DateFormatInfo {
private ThreadLocal<DateFormatInfo> dateFormat = new ThreadLocal<>();
private Path dataDir = null;
private ArrayList<Path> inputFiles = new ArrayList<>();
private int nextFile = 0;
private int iteration = 0;
private int[] docCountArr;
private volatile boolean docCountArrCreated;

@Override
public void setConfig(Config config) {
Expand Down Expand Up @@ -100,21 +100,24 @@ public void close() throws IOException {

@Override
public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOException {
Path f = null;
String name = null;
synchronized (this) {
if (nextFile >= inputFiles.size()) {
// exhausted files, start a new round, unless forever set to false.
if (!forever) {
throw new NoMoreDataException();
}
nextFile = 0;
iteration++;
}
f = inputFiles.get(nextFile++);
name = f.toRealPath() + "_" + iteration;
if (docCountArrCreated == false) {
docCountArrInit();
}

//Extract ThreadIndex from unique ThreadName (at position 12), which is set with '"IndexThread-"+index', in TaskSequence.java's doParallelTasks()
int threadIndex = Integer.parseInt(Thread.currentThread().getName().substring(12));
assert (threadIndex >= 0 && threadIndex < docCountArr.length):"Please check threadIndex or docCountArr length";
int stride = threadIndex + docCountArr[threadIndex] * docCountArr.length;
int inFileSize = inputFiles.size();

//Modulo Operator covers all three possible senarios i.e. 1. If inputFiles.size() < Num Of Threads 2.inputFiles.size() == Num Of Threads 3.inputFiles.size() > Num Of Threads
int fileIndex = stride % inFileSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm do we already guard for the (degenerate) case of inFileSize == 0? If not can we add some protection here, e.g. maybe throw a clear exception that there is nothing to index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mike, its already handling in ReutersContentSource.java's setConfig(). Please find the code snippet for the same.
if (inputFiles.size() == 0) {
throw new RuntimeException("No txt files in dataDir: "+dataDir.toAbsolutePath());
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Mike, i forgot to mention that i've tested with inFileSize == 0 and it throws expected exception.

int iteration = stride / inFileSize;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for improving this logic -- much easier to understand now!

docCountArr[threadIndex]++;

Path f = inputFiles.get(fileIndex);
String name = f.toRealPath() + "_" + iteration;

try (BufferedReader reader = Files.newBufferedReader(f, StandardCharsets.UTF_8)) {
// First line is the date, 3rd is the title, rest is body
String dateStr = reader.readLine();
Expand Down Expand Up @@ -143,7 +146,12 @@ public DocData getNextDocData(DocData docData) throws NoMoreDataException, IOExc
@Override
public synchronized void resetInputs() throws IOException {
super.resetInputs();
nextFile = 0;
iteration = 0;
}

private synchronized void docCountArrInit() {
if (docCountArrCreated == false) {
docCountArr = new int[getConfig().getNumThreads()];
docCountArrCreated = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.benchmark.byTask.tasks;

import org.apache.lucene.benchmark.byTask.PerfRunData;
import org.apache.lucene.index.IndexWriter;

/** Flush Index Task uses flushNextBuffer() to flush documents at thread level */
public class FlushIndexTask extends PerfTask {

public FlushIndexTask(PerfRunData runData) {
super(runData);
}

@Override
public int doLogic() throws Exception {
IndexWriter iw = getRunData().getIndexWriter();
if (iw != null) {
iw.flushNextBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flushes one thread; not all. I'm honestly not sure what the use-case is of that method. Did you mean to call iw.flush()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for delay response. We observed that post- processing was taking longer time because it was singly threaded. Also, it was depending upon the per-thread indexed data. Hence, we are explicitly flushing the per thread data as soon as it finishes the indexing process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this task is optional and can be used purely on need basis.

}
return 1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,12 +340,17 @@ private int doParallelTasks() throws Exception {

initTasksArray();
ParallelTask t[] = runningParallelTasks = new ParallelTask[repetitions * tasks.size()];
//Get number of parallel threads from algo file and set it to use in ReuersContentSource.java's docCountArrInit()
this.getRunData().getConfig().setNumThreads(t.length);
// prepare threads
int index = 0;
for (int k = 0; k < repetitions; k++) {
for (int i = 0; i < tasksArray.length; i++) {
final PerfTask task = tasksArray[i].clone();
t[index++] = new ParallelTask(task);
t[index] = new ParallelTask(task);
//Setting unique ThreadName with index value which is used in ReuersContentSource.java's getNextDocData()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you strengthen the comment to state that we should NOT change this thread name, unless we also fix the String -> int parsing logic in ReutersContentSource?

Actually, could we factor out this string part of the thread name into a static final String constant, e.g.static final String PARALLEL_TASK_THREAD_NAME_PREFIX = "ParallelTaskThread";, and reference that constant from both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incorporated the required changes through adding it in Constants.java file and referred from both places.

t[index].setName("IndexThread-" + index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, parallel tasks might be running queries too right? Maybe we should pick a more generic name? Maybe ParallelTaskThread-N?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Mike, did the required changes.

index++;
}
}
// run threads
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class Config {
private HashMap<String, Object> valByRound = new HashMap<>();
private HashMap<String, String> colForValByRound = new HashMap<>();
private String algorithmText;
private int numThreads = 1;

/**
* Read both algorithm and config properties.
Expand Down Expand Up @@ -113,6 +114,14 @@ public Config(Properties props) {
}
}

public void setNumThreads(int numThreads) {
this.numThreads = numThreads;
}

public int getNumThreads() {
return numThreads;
}

@SuppressWarnings({"unchecked", "rawtypes"})
private void printProps() {
System.out.println("------------> config properties:");
Expand Down