Skip to content
Permalink
Browse files

Intermediate changes

  • Loading branch information...
alexanderst committed Jan 21, 2019
1 parent 0687c79 commit 2bf77943dd7b8105d891ccccf17004b7452eabf6
@@ -203,7 +203,7 @@ public long getNumberOfSequences() {
*
* @return
*/
public VocabCache<T> buildJointVocabulary(boolean resetCounters, boolean buildHuffmanTree) {
public VocabCache<T> buildJointVocabularyOld(boolean resetCounters, boolean buildHuffmanTree) {
long lastTime = System.currentTimeMillis();
long lastSequences = 0;
long lastElements = 0;
@@ -394,6 +394,203 @@ public long getNumberOfSequences() {
return cache;
}

public VocabCache<T> buildJointVocabulary(boolean resetCounters, boolean buildHuffmanTree) {
long lastTime = System.currentTimeMillis();
long lastSequences = 0;
long lastElements = 0;
long startTime = lastTime;
long startWords = 0;
AtomicLong parsedCount = new AtomicLong(0);
if (resetCounters && buildHuffmanTree)
throw new IllegalStateException("You can't reset counters and build Huffman tree at the same time!");

if (cache == null)
cache = new AbstractCache.Builder<T>().build();
log.debug("Target vocab size before building: [" + cache.numWords() + "]");
final AtomicLong loopCounter = new AtomicLong(0);

AbstractCache<T> topHolder = new AbstractCache.Builder<T>().minElementFrequency(0).build();

int cnt = 0;
int numProc = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(numProc / 2, 2);
PriorityScheduler executorService = new PriorityScheduler(numThreads);
final AtomicLong execCounter = new AtomicLong(0);
final AtomicLong finCounter = new AtomicLong(0);

for (VocabSource<T> source : sources) {
SequenceIterator<T> iterator = source.getIterator();
iterator.reset();

log.debug("Trying source iterator: [" + cnt + "]");
log.debug("Target vocab size before building: [" + cache.numWords() + "]");
cnt++;

AbstractCache<T> tempHolder = new AbstractCache.Builder<T>().build();

List<Long> timesHasNext = new ArrayList<>();
List<Long> timesNext = new ArrayList<>();
int sequences = 0;
long time3 = 0;
while (iterator.hasMoreSequences()) {
Sequence<T> document = iterator.nextSequence();

seqCount.incrementAndGet();
parsedCount.addAndGet(document.size());
tempHolder.incrementTotalDocCount();
execCounter.incrementAndGet();
VocabRunnable runnable = new VocabRunnable(tempHolder, document, finCounter, loopCounter);

executorService.execute(runnable);

// if we're not in parallel mode - wait till this runnable finishes
if (!allowParallelBuilder) {
try {
runnable.awaitDone();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

// as we see in profiler, this lock isn't really happen too often
// we don't want too much left in tail

while (execCounter.get() - finCounter.get() > numProc) {
ThreadUtils.uncheckedSleep(1);
}

sequences++;
resetTrackers(parsedCount, lastTime, lastSequences, lastElements);
fireScavenger(loopCounter, execCounter, finCounter, tempHolder, source);
}

// block untill all threads are finished
log.debug("Waiting till all processes stop...");
while (execCounter.get() != finCounter.get()) {
ThreadUtils.uncheckedSleep(1);
}


// apply minWordFrequency set for this source
log.debug("Vocab size before truncation: [" + tempHolder.numWords() + "], NumWords: ["
+ tempHolder.totalWordOccurrences() + "], sequences parsed: [" + seqCount.get()
+ "], counter: [" + parsedCount.get() + "]");
if (source.getMinWordFrequency() > 0) {
filterVocab(tempHolder, source.getMinWordFrequency());
}

log.debug("Vocab size after truncation: [" + tempHolder.numWords() + "], NumWords: ["
+ tempHolder.totalWordOccurrences() + "], sequences parsed: [" + seqCount.get()
+ "], counter: [" + parsedCount.get() + "]");
// at this moment we're ready to transfer
topHolder.importVocabulary(tempHolder);
}

// at this moment, we have vocabulary full of words, and we have to reset counters before transfer everything back to VocabCache

//topHolder.resetWordCounters();



System.gc();

cache.importVocabulary(topHolder);

// adding UNK word
if (unk != null) {
log.info("Adding UNK element to vocab...");
unk.setSpecial(true);
cache.addToken(unk);
}

if (resetCounters) {
for (T element : cache.vocabWords()) {
element.setElementFrequency(0);
}
cache.updateWordsOccurrences();
}

if (buildHuffmanTree) {
buildHuffmanTree();
}

executorService.shutdown();

System.gc();

long endSequences = seqCount.get();
long endTime = System.currentTimeMillis();
double seconds = (endTime - startTime) / (double) 1000;
double seqPerSec = endSequences / seconds;
log.info("Sequences checked: [{}], Current vocabulary size: [{}]; Sequences/sec: [{}];", seqCount.get(),
cache.numWords(), String.format("%.2f", seqPerSec));
return cache;
}

private void buildHuffmanTree() {
if (limit > 0) {
// we want to sort labels before truncating them, so we'll keep most important words
val words = new ArrayList<T>(cache.vocabWords());
Collections.sort(words);

// now rolling through them
for (val element : words) {
if (element.getIndex() > limit && !element.isSpecial() && !element.isLabel())
cache.removeElement(element.getLabel());
}
}

// and now we're building Huffman tree
val huffman = new Huffman(cache.vocabWords());
huffman.build();
huffman.applyIndexes(cache);
}

private void resetTrackers(AtomicLong parsedCount, long lastTime, long lastSequences, long lastElements) {
if (seqCount.get() % 100000 == 0) {

long currentTime = System.currentTimeMillis();
long currentSequences = seqCount.get();
long currentElements = parsedCount.get();

double seconds = (currentTime - lastTime) / (double) 1000;

// Collections.sort(timesHasNext);
// Collections.sort(timesNext);

double seqPerSec = (currentSequences - lastSequences) / seconds;
double elPerSec = (currentElements - lastElements) / seconds;
// log.info("Document time: {} us; hasNext time: {} us", timesNext.get(timesNext.size() / 2), timesHasNext.get(timesHasNext.size() / 2));
/*log.info("Sequences checked: [{}]; Current vocabulary size: [{}]; Sequences/sec: {}; Words/sec: {};",
seqCount.get(), tempHolder.numWords(), String.format("%.2f", seqPerSec),
String.format("%.2f", elPerSec));*/
lastTime = currentTime;
lastElements = currentElements;
lastSequences = currentSequences;

// timesHasNext.clear();
// timesNext.clear();
}
}

private void fireScavenger(AtomicLong loopCounter, AtomicLong execCounter, AtomicLong finCounter,
AbstractCache<T> tempHolder, VocabSource<T> source) {
/**
* Firing scavenger loop
*/
if (enableScavenger && loopCounter.get() >= 2000000 && tempHolder.numWords() > 10000000) {
log.info("Starting scavenger...");
while (execCounter.get() != finCounter.get()) {
ThreadUtils.uncheckedSleep(1);
}

filterVocab(tempHolder, Math.max(1, source.getMinWordFrequency() / 2));
loopCounter.set(0);
}
}


protected void filterVocab(AbstractCache<T> cache, int minWordFrequency) {
int numWords = cache.numWords();
LinkedBlockingQueue<String> labelsToRemove = new LinkedBlockingQueue<>();

0 comments on commit 2bf7794

Please sign in to comment.
You can’t perform that action at this time.