Skip to content

Commit

Permalink
(long-array) Implement java.lang.foreign.Arena based lifecycle contro…
Browse files Browse the repository at this point in the history
…l for LongArray.

Further de-ByteBuffer:ing of these classes is to be done, but this is the smallest most urgently needed benefit.

This commit is a WIP but in a fully working state, pushing due to the importance of the changes to offer lifecycle control over mmaps.
  • Loading branch information
vlofgren committed Sep 24, 2023
1 parent dbe9235 commit d0aa754
Show file tree
Hide file tree
Showing 27 changed files with 555 additions and 88 deletions.
@@ -1,7 +1,7 @@
package nu.marginalia.index.forward;

import com.upserve.uppend.blobs.NativeIO;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.index.journal.reader.IndexJournalReader;
import nu.marginalia.array.LongArray;
import nu.marginalia.model.id.UrlIdCodec;
Expand Down Expand Up @@ -71,7 +71,7 @@ public void convert() throws IOException {

// docIdToIdx -> file offset for id

LongArray docFileData = LongArray.mmapForWriting(outputFileDocsData, ForwardIndexParameters.ENTRY_SIZE * docsFileId.size());
LongArray docFileData = LongArrayFactory.mmapForWritingConfined(outputFileDocsData, ForwardIndexParameters.ENTRY_SIZE * docsFileId.size());

var pointer = journalReader.newPointer();
while (pointer.nextDocument()) {
Expand Down Expand Up @@ -106,7 +106,7 @@ private LongArray getDocIds(Path outputFileDocs, IndexJournalReader journalReade
Roaring64Bitmap rbm = new Roaring64Bitmap();
journalReader.forEachDocId(rbm::add);

LongArray ret = LongArray.mmapForWriting(outputFileDocs, rbm.getIntCardinality());
LongArray ret = LongArrayFactory.mmapForWritingConfined(outputFileDocs, rbm.getIntCardinality());
rbm.forEach(new LongConsumer() {
int offset;
@Override
Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.upserve.uppend.blobs.NativeIO;
import gnu.trove.map.hash.TLongIntHashMap;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -52,7 +53,7 @@ else if (!Files.exists(idsFile)) {
}

private static TLongIntHashMap loadIds(Path idsFile) throws IOException {
var idsArray = LongArray.mmapRead(idsFile);
var idsArray = LongArrayFactory.mmapForReadingShared(idsFile);

var ids = new TLongIntHashMap((int) idsArray.size(), 0.5f, -1, -1);

Expand All @@ -63,7 +64,7 @@ private static TLongIntHashMap loadIds(Path idsFile) throws IOException {
}

private static LongArray loadData(Path dataFile) throws IOException {
var data = LongArray.mmapRead(dataFile);
var data = LongArrayFactory.mmapForReadingShared(dataFile);

// Total data is small, try to keep it in RAM for speed
data.advice(NativeIO.Advice.WillNeed);
Expand Down
@@ -1,6 +1,7 @@
package nu.marginalia.index;

import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.btree.BTreeReader;
import nu.marginalia.index.query.EmptyEntrySource;
import nu.marginalia.index.query.EntrySource;
Expand Down Expand Up @@ -36,8 +37,8 @@ public ReverseIndexReader(Path words, Path documents) throws IOException {

logger.info("Switching reverse index");

this.words = LongArray.mmapRead(words);
this.documents = LongArray.mmapRead(documents);
this.words = LongArrayFactory.mmapForReadingShared(words);
this.documents = LongArrayFactory.mmapForReadingShared (documents);

wordsBTreeReader = new BTreeReader(this.words, ReverseIndexParameters.wordsBTreeContext, 0);
wordsDataOffset = wordsBTreeReader.getHeader().dataOffsetLongs();
Expand Down
Expand Up @@ -9,6 +9,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

public class ReverseIndexConstructor {
Expand Down Expand Up @@ -37,7 +38,7 @@ public static void createReverseIndex(
}

try (var heartbeat = processHeartbeat.createProcessTaskHeartbeat(CreateReverseIndexSteps.class, "createReverseIndex")) {
List<ReversePreindex> preindexes = new ArrayList<>();
List<ReversePreindexReference> preindexes = new ArrayList<>();

heartbeat.progress(CreateReverseIndexSteps.CREATE_PREINDEXES);

Expand All @@ -47,69 +48,81 @@ public static void createReverseIndex(

preindexHeartbeat.progress(input.toFile().getName(), i, inputs.size());

preindexes.add(ReversePreindex.constructPreindex(readerSource.construct(input), docIdRewriter, tmpDir));
preindexes.add(
ReversePreindex
.constructPreindex(readerSource.construct(input), docIdRewriter, tmpDir)
.closeToReference()
);
}

preindexHeartbeat.progress("FINISHED", inputs.size(), inputs.size());
}

heartbeat.progress(CreateReverseIndexSteps.MERGE_PREINDEXES);
ReversePreindex finalPreindex;
ReversePreindex finalPreindex = null;

try (var mergeHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("mergePreindexes")) {
finalPreindex = mergePreindexes(tmpDir, mergeHeartbeat, preindexes);
}
finalPreindex = mergePreindexes(tmpDir, mergeHeartbeat, preindexes)
.open();

heartbeat.progress(CreateReverseIndexSteps.FINALIZE);
finalPreindex.finalizeIndex(outputFileDocs, outputFileWords);
heartbeat.progress(CreateReverseIndexSteps.FINALIZE);
finalPreindex.finalizeIndex(outputFileDocs, outputFileWords);
}
finally {
if (null != finalPreindex)
finalPreindex.delete();
}

heartbeat.progress(CreateReverseIndexSteps.FINISHED);
finalPreindex.delete();
}
}

private static ReversePreindex mergePreindexes(Path workDir, ProcessAdHocTaskHeartbeat mergeHeartbeat, List<ReversePreindex> preindexes) throws IOException {
private static ReversePreindexReference mergePreindexes(Path workDir,
ProcessAdHocTaskHeartbeat mergeHeartbeat,
List<ReversePreindexReference> preindexes) throws IOException {
assert !preindexes.isEmpty();

if (preindexes.size() == 1) {
logger.info("Single preindex, no merge necessary");
return preindexes.get(0);
}

List<ReversePreindex> toMerge = new ArrayList<>(preindexes);
List<ReversePreindex> merged = new ArrayList<>();
LinkedList<ReversePreindexReference> toMerge = new LinkedList<>(preindexes);
List<ReversePreindexReference> mergedItems = new ArrayList<>(preindexes.size() / 2);

int pass = 0;
while (toMerge.size() != 1) {
String stage = String.format("PASS[%d]: %d -> %d", ++pass,
toMerge.size(),
toMerge.size()/2 + (toMerge.size() % 2)
);
while (toMerge.size() > 1) {
String stage = String.format("PASS[%d]: %d -> %d", ++pass, toMerge.size(), toMerge.size()/2 + (toMerge.size() % 2));

for (int i = 0; i + 1 < toMerge.size(); i+=2) {
mergeHeartbeat.progress(stage, i/2, toMerge.size()/2);
int totalToMergeCount = toMerge.size()/2;
int toMergeProgress = 0;

var left = toMerge.get(i);
var right = toMerge.get(i+1);
while (toMerge.size() >= 2) {
mergeHeartbeat.progress(stage, toMergeProgress++, totalToMergeCount);

merged.add(ReversePreindex.merge(workDir, left, right));
var left = toMerge.removeFirst().open();
var right = toMerge.removeFirst().open();

mergedItems.add(
ReversePreindex
.merge(workDir, left, right)
.closeToReference()
);

left.delete();
right.delete();
}

if ((toMerge.size() % 2) != 0) {
merged.add(toMerge.get(toMerge.size()-1));
}

toMerge.clear();
toMerge.addAll(merged);
merged.clear();
// Pour the merged items back in the toMerge queue
// (note, toMerge may still have a single item in it,
// in the case where it had an odd population)
toMerge.addAll(mergedItems);
mergedItems.clear();
}

mergeHeartbeat.progress("FINISHED", 1, 1);

return toMerge.get(0);
return toMerge.getFirst();
}

}
@@ -1,6 +1,7 @@
package nu.marginalia.index.construction;

import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.btree.BTreeWriter;
import nu.marginalia.index.ReverseIndexParameters;
import nu.marginalia.index.journal.reader.IndexJournalReader;
Expand Down Expand Up @@ -54,6 +55,20 @@ public static ReversePreindex constructPreindex(IndexJournalReader reader,
return new ReversePreindex(segments, docs);
}

/** Close the associated memory mapped areas and return
* a dehydrated version of this object that can be re-opened
* later.
*/
public ReversePreindexReference closeToReference() {
try {
return new ReversePreindexReference(segments, documents);
}
finally {
segments.close();
documents.close();
}
}

/** Transform the preindex into a reverse index */
public void finalizeIndex(Path outputFileDocs, Path outputFileWords) throws IOException {
var offsets = segments.counts;
Expand All @@ -67,7 +82,7 @@ public void finalizeIndex(Path outputFileDocs, Path outputFileWords) throws IOEx
offsets.fold(0, 0, offsets.size(), sizeEstimator);

// Write the docs file
LongArray finalDocs = LongArray.mmapForWriting(outputFileDocs, sizeEstimator.size);
LongArray finalDocs = LongArrayFactory.mmapForWritingConfined(outputFileDocs, sizeEstimator.size);
try (var intermediateDocChannel = documents.createDocumentsFileChannel()) {
offsets.transformEachIO(0, offsets.size(),
new ReverseIndexBTreeTransformer(finalDocs, 2,
Expand All @@ -84,7 +99,7 @@ public void finalizeIndex(Path outputFileDocs, Path outputFileWords) throws IOEx
long wordsSize = ReverseIndexParameters.wordsBTreeContext.calculateSize((int) offsets.size());

// Construct the tree
LongArray wordsArray = LongArray.mmapForWriting(outputFileWords, wordsSize);
LongArray wordsArray = LongArrayFactory.mmapForWritingConfined(outputFileWords, wordsSize);

new BTreeWriter(wordsArray, ReverseIndexParameters.wordsBTreeContext)
.write(0, (int) offsets.size(), mapRegion -> {
Expand All @@ -95,7 +110,9 @@ public void finalizeIndex(Path outputFileDocs, Path outputFileWords) throws IOEx
});

finalDocs.force();
finalDocs.close();
wordsArray.force();
wordsArray.close();

}

Expand All @@ -118,7 +135,7 @@ public static ReversePreindex merge(Path destDir,

Path docsFile = Files.createTempFile(destDir, "docs", ".dat");

LongArray mergedDocuments = LongArray.mmapForWriting(docsFile, 2 * (left.documents.size() + right.documents.size()));
LongArray mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, 2 * (left.documents.size() + right.documents.size()));

leftIter.next();
rightIter.next();
Expand Down Expand Up @@ -194,14 +211,14 @@ static ReversePreindexWordSegments createMergedSegmentWordFile(Path destDir,
0, left.wordIds.size(),
0, right.wordIds.size());

LongArray wordIdsFile = LongArray.mmapForWriting(segmentWordsFile, segmentsSize);
LongArray wordIdsFile = LongArrayFactory.mmapForWritingConfined(segmentWordsFile, segmentsSize);

mergeArrays(wordIdsFile, left.wordIds, right.wordIds,
0, wordIdsFile.size(),
0, left.wordIds.size(),
0, right.wordIds.size());

LongArray counts = LongArray.mmapForWriting(segmentCountsFile, segmentsSize);
LongArray counts = LongArrayFactory.mmapForWritingConfined(segmentCountsFile, segmentsSize);

return new ReversePreindexWordSegments(wordIdsFile, counts, segmentWordsFile, segmentCountsFile);
}
Expand All @@ -218,7 +235,9 @@ private static LongArray shrinkMergedDocuments(LongArray mergedDocuments, Path d
bc.truncate(sizeLongs * 8);
}
long afterSize = mergedDocuments.size();
mergedDocuments = LongArray.mmapForWriting(docsFile, sizeLongs);
mergedDocuments.close();

mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, sizeLongs);

if (beforeSize != afterSize) {
logger.info("Shrunk {} from {}b to {}b", docsFile, beforeSize, afterSize);
Expand Down
Expand Up @@ -2,6 +2,7 @@

import lombok.SneakyThrows;
import nu.marginalia.array.LongArray;
import nu.marginalia.array.LongArrayFactory;
import nu.marginalia.index.journal.reader.IndexJournalReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -19,7 +20,7 @@
* the associated ReversePreindexWordSegments data
*/
public class ReversePreindexDocuments {
private final Path file;
final Path file;
public final LongArray documents;
private static final int RECORD_SIZE_LONGS = 2;
private static final Logger logger= LoggerFactory.getLogger(ReversePreindexDocuments.class);
Expand All @@ -39,7 +40,7 @@ public static ReversePreindexDocuments construct(
logger.info("Transferring data");
createUnsortedDocsFile(docsFile, reader, segments, docIdRewriter);

LongArray docsFileMap = LongArray.mmapForModifying(docsFile);
LongArray docsFileMap = LongArrayFactory.mmapForModifyingShared(docsFile);
logger.info("Sorting data");
sortDocsFile(docsFileMap, segments);

Expand All @@ -64,26 +65,28 @@ private static void createUnsortedDocsFile(Path docsFile,
ReversePreindexWordSegments segments,
DocIdRewriter docIdRewriter) throws IOException {
long fileSize = RECORD_SIZE_LONGS * segments.totalSize();
LongArray outArray = LongArray.mmapForWriting(docsFile, fileSize);

var offsetMap = segments.asMap(RECORD_SIZE_LONGS);
offsetMap.defaultReturnValue(0);
try (LongArray outArray = LongArrayFactory.onHeapConfined(fileSize)) {

var pointer = reader.newPointer();
while (pointer.nextDocument()) {
long rankEncodedId = docIdRewriter.rewriteDocId(pointer.documentId());
while (pointer.nextRecord()) {
long wordId = pointer.wordId();
long wordMeta = pointer.wordMeta();
var offsetMap = segments.asMap(RECORD_SIZE_LONGS);
offsetMap.defaultReturnValue(0);

long offset = offsetMap.addTo(wordId, RECORD_SIZE_LONGS);
var pointer = reader.newPointer();
while (pointer.nextDocument()) {
long rankEncodedId = docIdRewriter.rewriteDocId(pointer.documentId());
while (pointer.nextRecord()) {
long wordId = pointer.wordId();
long wordMeta = pointer.wordMeta();

outArray.set(offset + 0, rankEncodedId);
outArray.set(offset + 1, wordMeta);
long offset = offsetMap.addTo(wordId, RECORD_SIZE_LONGS);

outArray.set(offset + 0, rankEncodedId);
outArray.set(offset + 1, wordMeta);
}
}
}

outArray.force();
outArray.write(docsFile);
}
}

@SneakyThrows
Expand Down Expand Up @@ -116,5 +119,10 @@ private static void sortDocsFile(LongArray docsFileMap, ReversePreindexWordSegme

public void delete() throws IOException {
Files.delete(this.file);
documents.close();
}

public void close() {
documents.close();
}
}

0 comments on commit d0aa754

Please sign in to comment.