Skip to content

Commit

Permalink
Merge pull request #3570 from dita-ot/feature/processing-queue-implem…
Browse files Browse the repository at this point in the history
…entation

Use concurrent sets or queues
  • Loading branch information
jelovirt committed Sep 12, 2020
2 parents a18da24 + cc115ba commit f7f7e3e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 37 deletions.
23 changes: 13 additions & 10 deletions src/main/java/org/dita/dost/module/GenMapAndTopicListModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.io.*;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -99,11 +102,11 @@ public final class GenMapAndTopicListModule extends SourceReaderModule {
private final Set<URI> relFlagImagesSet;

/** List of files waiting for parsing. Values are absolute URI references. */
private final Queue<Reference> waitList;
private final NavigableMap<URI, Reference> waitList;

/** List of parsed files */
private final List<URI> doneList;
private final List<URI> failureList;
private final Set<URI> doneList;
private final Set<URI> failureList;

/** Set of outer dita files */
private final Set<URI> outDitaFilesSet;
Expand Down Expand Up @@ -168,9 +171,9 @@ public GenMapAndTopicListModule() {
htmlSet = SetMultimapBuilder.hashKeys().hashSetValues().build();
hrefTargetSet = new HashSet<>(128);
coderefTargetSet = new HashSet<>(16);
waitList = new LinkedList<>();
doneList = new LinkedList<>();
failureList = new LinkedList<>();
waitList = new ConcurrentSkipListMap<>();
doneList = ConcurrentHashMap.newKeySet();
failureList = ConcurrentHashMap.newKeySet();
conrefTargetSet = new HashSet<>(128);
nonConrefCopytoTargetSet = new HashSet<>(128);
outDitaFilesSet = new HashSet<>(128);
Expand Down Expand Up @@ -344,8 +347,8 @@ private void parseInputParameters(final AbstractPipelineInput input) {
}

private void processWaitList() throws DITAOTException {
while (!waitList.isEmpty()) {
processFile(waitList.remove());
for (Map.Entry<URI, Reference> entry = waitList.pollFirstEntry(); entry != null; entry = waitList.pollFirstEntry()) {
processFile(entry.getValue());
}
}

Expand Down Expand Up @@ -659,11 +662,11 @@ private void updateUplevels(final URI file) {
private void addToWaitList(final Reference ref) {
final URI file = ref.filename;
assert file.isAbsolute() && file.getFragment() == null;
if (doneList.contains(file) || waitList.contains(ref) || file.equals(currentFile)) {
if (doneList.contains(file) || waitList.containsKey(ref.filename) || file.equals(currentFile)) {
return;
}

waitList.add(ref);
waitList.put(ref.filename, ref);
}

/**
Expand Down
57 changes: 30 additions & 27 deletions src/main/java/org/dita/dost/module/reader/AbstractReaderModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import java.io.*;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -52,47 +55,47 @@ public abstract class AbstractReaderModule extends AbstractPipelineModuleImpl {

Predicate<String> formatFilter;
/** FileInfos keyed by src. */
private final Map<URI, Collection<FileInfo>> fileinfos = new HashMap<>();
private final Map<URI, Collection<FileInfo>> fileinfos = new ConcurrentHashMap<>();
/** Set of all topic files */
final Set<URI> fullTopicSet = new HashSet<>(128);
final Set<URI> fullTopicSet = ConcurrentHashMap.newKeySet();
/** Set of all map files */
final Set<URI> fullMapSet = new HashSet<>(128);
final Set<URI> fullMapSet = ConcurrentHashMap.newKeySet();
/** Set of topic files containing href */
private final Set<URI> hrefTopicSet = new HashSet<>(128);
private final Set<URI> hrefTopicSet = ConcurrentHashMap.newKeySet();
/** Set of dita files containing conref */
final Set<URI> conrefSet = new HashSet<>(128);
final Set<URI> conrefSet = ConcurrentHashMap.newKeySet();
/** Set of topic files containing coderef */
private final Set<URI> coderefSet = new HashSet<>(128);
private final Set<URI> coderefSet = ConcurrentHashMap.newKeySet();
/** Set of all images */
final Set<Reference> formatSet = new HashSet<>();
final Set<Reference> formatSet = ConcurrentHashMap.newKeySet();
/** Set of all images used for flagging */
private final Set<URI> flagImageSet = new LinkedHashSet<>(128);
private final Set<URI> flagImageSet = ConcurrentHashMap.newKeySet();
/** Set of all HTML and other non-DITA or non-image files */
final SetMultimap<String, URI> htmlSet = SetMultimapBuilder.hashKeys().hashSetValues().build();
/** Set of all the href targets */
final Set<URI> hrefTargetSet = new HashSet<>(128);
final Set<URI> hrefTargetSet = ConcurrentHashMap.newKeySet();
/** Set of all the conref targets */
Set<URI> conrefTargetSet = new HashSet<>(128);
Set<URI> conrefTargetSet = ConcurrentHashMap.newKeySet();
/** Set of all targets except conref and copy-to */
final Set<URI> nonConrefCopytoTargetSet = new HashSet<>(128);
final Set<URI> nonConrefCopytoTargetSet = ConcurrentHashMap.newKeySet();
/** Set of subsidiary files */
private final Set<URI> coderefTargetSet = new HashSet<>(16);
private final Set<URI> coderefTargetSet = ConcurrentHashMap.newKeySet();
/** Set of absolute flag image files */
private final Set<URI> relFlagImagesSet = new LinkedHashSet<>(128);
private final Set<URI> relFlagImagesSet = ConcurrentHashMap.newKeySet();
/** List of files waiting for parsing. Values are absolute URI references. */
@VisibleForTesting
final Queue<Reference> waitList = new LinkedList<>();
final NavigableMap<URI, Reference> waitList = new ConcurrentSkipListMap<>();
/** List of parsed files */
final List<URI> doneList = new LinkedList<>();
final List<URI> failureList = new LinkedList<>();
final Set<URI> doneList = ConcurrentHashMap.newKeySet();
final Set<URI> failureList = ConcurrentHashMap.newKeySet();
/** Set of outer dita files */
final Set<URI> outDitaFilesSet = new HashSet<>(128);
final Set<URI> outDitaFilesSet = ConcurrentHashMap.newKeySet();
/** Set of sources of conacion */
final Set<URI> conrefpushSet = new HashSet<>(128);
final Set<URI> conrefpushSet = ConcurrentHashMap.newKeySet();
/** Set of files containing keyref */
final Set<URI> keyrefSet = new HashSet<>(128);
final Set<URI> keyrefSet = ConcurrentHashMap.newKeySet();
/** Set of files with "@processing-role=resource-only" */
final Set<URI> resourceOnlySet = new HashSet<>(128);
final Set<URI> resourceOnlySet = ConcurrentHashMap.newKeySet();
/** Absolute basedir for processing */
private URI baseInputDir;
GenListModuleReader listFilter;
Expand All @@ -105,10 +108,10 @@ public abstract class AbstractReaderModule extends AbstractPipelineModuleImpl {
URI rootFile;
List<URI> resources;
/** Subject scheme absolute file paths. */
private final Set<URI> schemeSet = new HashSet<>(128);
private final Set<URI> schemeSet = ConcurrentHashMap.newKeySet();
/** Subject scheme usage. Key is absolute file path, value is set of applicable subject schemes. */
private final Map<URI, Set<URI>> schemeDictionary = new HashMap<>();
private final Map<URI, URI> copyTo = new HashMap<>();
private final Map<URI, URI> copyTo = new ConcurrentHashMap<>();
Mode processingMode;
/** Generate {@code xtrf} and {@code xtrc} attributes */
boolean genDebugInfo;
Expand All @@ -130,7 +133,7 @@ public abstract class AbstractReaderModule extends AbstractPipelineModuleImpl {
DitaWriterFilter ditaWriterFilter;
TopicFragmentFilter topicFragmentFilter;
/** Files found during additional resource crawl. **/
final Set<URI> additionalResourcesSet = new HashSet<>();
final Set<URI> additionalResourcesSet = ConcurrentHashMap.newKeySet();

public abstract void readStartFile() throws DITAOTException;

Expand Down Expand Up @@ -306,8 +309,8 @@ void parseInputParameters(final AbstractPipelineInput input) {
}

void processWaitList() throws DITAOTException {
while (!waitList.isEmpty()) {
readFile(waitList.remove(), null);
for (Map.Entry<URI, Reference> entry = waitList.pollFirstEntry(); entry != null; entry = waitList.pollFirstEntry()) {
readFile(entry.getValue(), null);
}
}

Expand Down Expand Up @@ -565,11 +568,11 @@ void categorizeCurrentFile(final Reference ref) {
void addToWaitList(final Reference ref) {
final URI file = ref.filename;
assert file.isAbsolute() && file.getFragment() == null;
if (doneList.contains(file) || waitList.contains(ref) || file.equals(currentFile)) {
if (doneList.contains(file) || waitList.containsKey(ref.filename) || file.equals(currentFile)) {
return;
}

waitList.add(ref);
waitList.put(ref.filename, ref);
}

/**
Expand Down

0 comments on commit f7f7e3e

Please sign in to comment.