Skip to content

Commit

Permalink
Use BackupRepository instead of BlobStore.
Browse files Browse the repository at this point in the history
* BlobPusher: max threads, thread local stream buffer.
* Synchronize internal collections in BlobDirectory
  • Loading branch information
bruno-roustant committed Apr 30, 2021
1 parent af4702e commit 13a4eeb
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 289 deletions.
Expand Up @@ -17,6 +17,15 @@

package org.apache.solr.blob;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.common.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
Expand All @@ -27,15 +36,6 @@
import java.util.Map;
import java.util.Set;

import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.common.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobDirectory extends FilterDirectory {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
Expand All @@ -50,33 +50,35 @@ public class BlobDirectory extends FilterDirectory {
* file, to provide the checksums on {@link #sync(Collection)}. But it is able to free earlier the
* reference each time an {@link IndexOutput} is closed, by getting the checksum at that time.
*/
private final Map<String, BlobFileSupplier> blobFileSupplierMap;
private final Set<String> synchronizedFileNames;
private final Collection<String> deletedFileNames;
private final Map<String, BlobFileSupplier> blobFileSupplierMap = new HashMap<>();
private final Set<String> synchronizedFileNames = new HashSet<>();
private final Collection<String> deletedFileNames = new ArrayList<>();
private final Object lock = new Object();
private volatile boolean isOpen;

public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher blobPusher) {
super(delegate);
this.blobDirPath = blobDirPath;
this.blobPusher = blobPusher;
blobFileSupplierMap = new HashMap<>();
synchronizedFileNames = new HashSet<>();
deletedFileNames = new ArrayList<>();
}

@Override
public void deleteFile(String name) throws IOException {
log.debug("deleteFile {}", name);
in.deleteFile(name);
deletedFileNames.add(name);
synchronized (lock) {
deletedFileNames.add(name);
}
}

@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
log.debug("createOutput {}", name);
IndexOutput indexOutput = in.createOutput(name, context);
BlobFileSupplier blobFileSupplier = new BlobFileSupplier(indexOutput);
blobFileSupplierMap.put(name, blobFileSupplier);
synchronized (lock) {
blobFileSupplierMap.put(name, blobFileSupplier);
}
return new BlobIndexOutput(indexOutput, blobFileSupplier);
}

Expand All @@ -86,50 +88,59 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti
public void sync(Collection<String> names) throws IOException {
log.debug("sync {}", names);
in.sync(names);
synchronizedFileNames.addAll(names);
synchronized (lock) {
synchronizedFileNames.addAll(names);
}
}

@Override
public void rename(String source, String dest) throws IOException {
log.debug("rename {} to {}", source, dest);
in.rename(source, dest);
// Also rename the corresponding BlobFile.
BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source);
if (blobFileSupplier != null) {
blobFileSupplier.rename(source, dest);
blobFileSupplierMap.put(dest, blobFileSupplier);
}
// Also rename the tracked synchronized file.
if (synchronizedFileNames.remove(source)) {
synchronizedFileNames.add(dest);
synchronized (lock) {
// Rename the corresponding BlobFile.
BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source);
if (blobFileSupplier != null) {
blobFileSupplier.rename(source, dest);
blobFileSupplierMap.put(dest, blobFileSupplier);
}
// Rename the tracked synchronized file.
if (synchronizedFileNames.remove(source)) {
synchronizedFileNames.add(dest);
}
}
}

@Override
public void syncMetaData() throws IOException {
log.debug("syncMetaData");
in.syncMetaData();
syncToBlobStore();
syncToRepository();
}

private void syncToBlobStore() throws IOException {
private void syncToRepository() throws IOException {
log.debug("File names to sync {}", synchronizedFileNames);

Collection<BlobFile> writes = new ArrayList<>(synchronizedFileNames.size());
for (String fileName : synchronizedFileNames) {
BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName);
if (blobFileSupplier != null) {
// Only sync files that were synced since this directory was released. Previous files don't
// need to be synced.
writes.add(blobFileSupplier.getBlobFile());
Collection<BlobFile> writes;
Collection<String> deletes;
synchronized (lock) {
writes = new ArrayList<>(synchronizedFileNames.size());
for (String fileName : synchronizedFileNames) {
BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName);
if (blobFileSupplier != null) {
// Only sync files that were synced since this directory was released.
// Previous files don't need to be synced.
blobFileSupplier.freeIndexOutput();
writes.add(blobFileSupplier.getBlobFile());
}
}
synchronizedFileNames.clear();
deletes = new ArrayList<>(deletedFileNames);
deletedFileNames.clear();
}

log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames);
blobPusher.push(blobDirPath, writes, this::openInputStream, deletedFileNames);

synchronizedFileNames.clear();
deletedFileNames.clear();
log.debug("Sync to repository writes={} deleted={}", writes, deletes);
blobPusher.push(blobDirPath, writes, this::openInputStream, deletes);
}

private InputStream openInputStream(BlobFile blobFile) throws IOException {
Expand All @@ -138,9 +149,11 @@ private InputStream openInputStream(BlobFile blobFile) throws IOException {

public void release() {
log.debug("release");
blobFileSupplierMap.clear();
synchronizedFileNames.clear();
deletedFileNames.clear();
synchronized (lock) {
blobFileSupplierMap.clear();
synchronizedFileNames.clear();
deletedFileNames.clear();
}
}

// obtainLock(): We get the delegate Directory lock.
Expand Down Expand Up @@ -180,8 +193,14 @@ private static class BlobIndexOutput extends FilterIndexOutput {

@Override
public void close() throws IOException {
// Free the reference to the IndexOutput.
blobFileSupplier.indexOutput = null;
blobFileSupplier.freeIndexOutput();
// TODO
// It could be possible to start pushing the file asynchronously with BlobPusher at this time,
// provided that the file size is larger than a threshold (to avoid sending small intermediate
// files that could be removed later before the sync, e.g. merge-on-commit). We would have to
// wait for the completion of the push in the BlobPusher when sync()/syncMetadata() is called.
// Other option: intercept the calls to the write methods, buffer written data, and start
// pushing to BlobPusher earlier than the call to this close method.
super.close();
}
}
Expand All @@ -208,16 +227,27 @@ void rename(String source, String dest) {
name = dest;
if (blobFile != null) {
blobFile = new BlobFile(name, blobFile.size(), blobFile.checksum());
assert indexOutput == null;
}
}

BlobFile getBlobFile() throws IOException {
if (blobFile == null) {
/**
* Frees the reference to the {@link IndexOutput}.
* Creates the {@link BlobFile} if it has not been created yet.
*/
void freeIndexOutput() throws IOException {
if (indexOutput != null) {
blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum());
// log.debug("Freeing IndexOutput {}", indexOutput);
// Free the reference to the IndexOutput.
indexOutput = null;
}
}

/**
* Gets the {@link BlobFile}. {@link #freeIndexOutput()} must have been called before.
*/
BlobFile getBlobFile() {
assert indexOutput == null : "IndexOutput must be freed before";
assert blobFile != null;
return blobFile;
}
}
Expand Down

0 comments on commit 13a4eeb

Please sign in to comment.