Skip to content

Commit

Permalink
BlobDirectoryFactoryTest, BlobDirectoryTest, BlobStore configurable, …
Browse files Browse the repository at this point in the history
…and some refactoring.

* Add BlobDirectoryFactoryTest and BlobDirectoryTest.

* Add javadoc overview and package-info.

* Clean BlobDirectoryFactory conf params, make BlobStore configurable, improve tests.
  • Loading branch information
bruno-roustant committed Apr 30, 2021
1 parent 2bdcb96 commit af4702e
Show file tree
Hide file tree
Showing 9 changed files with 595 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private void syncToBlobStore() throws IOException {

log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames);
blobPusher.push(blobDirPath, writes, this::openInputStream, deletedFileNames);

synchronizedFileNames.clear();
deletedFileNames.clear();
}
Expand All @@ -149,7 +150,6 @@ public void close() {
log.debug("close");
isOpen = false;
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(blobPusher);
}

@Override
Expand Down Expand Up @@ -180,7 +180,8 @@ private static class BlobIndexOutput extends FilterIndexOutput {

@Override
public void close() throws IOException {
blobFileSupplier.getBlobFileFromIndexOutput();
// Free the reference to the IndexOutput.
blobFileSupplier.indexOutput = null;
super.close();
}
}
Expand Down Expand Up @@ -212,18 +213,12 @@ void rename(String source, String dest) {

BlobFile getBlobFile() throws IOException {
if (blobFile == null) {
getBlobFileFromIndexOutput();
blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum());
// log.debug("Freeing IndexOutput {}", indexOutput);
// Free the reference to the IndexOutput.
indexOutput = null;
}
return blobFile;
}

/**
* Gets the {@link BlobFile} of the referenced {@link IndexOutput} and then frees the reference.
*/
void getBlobFileFromIndexOutput() throws IOException {
blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum());
// log.debug("Freeing IndexOutput {}", indexOutput);
indexOutput = null; // Free the reference since we have the checksum.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;

import org.apache.commons.io.FileUtils;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NativeFSLockFactory;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.*;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
Expand All @@ -50,67 +46,50 @@ public class BlobDirectoryFactory extends CachingDirectoryFactory {
private String localRootPath;
private BlobStore blobStore;
private BlobPusher blobPusher;
private DirectoryFactory delegateFactory;
private String delegateLockType;

// Parameters for MMapDirectory
// TODO: Change DirectoryFactory.get() upstream to allow us to provide a Function<Directory,
// Directory> to wrap the directory when it is created. This would unblock the delegation
// of DirectoryFactory here. And we could get rid of these params, we could simply delegate
// to a delegateFactory instead.
private boolean unmapHack;
private boolean preload;
private int maxChunk;
private MMapParams mMapParams;

@Override
public void initCoreContainer(CoreContainer cc) {
super.initCoreContainer(cc);
if (delegateFactory != null) {
delegateFactory.initCoreContainer(cc);
}
localRootPath = (dataHomePath == null ? cc.getCoreRootDirectory() : dataHomePath).getParent().toString();
// blobListingManager = BlobListingManager.getInstance(cc, "/blobDirListings");
// blobListingManager = BlobListingManager.getInstance(cc, "/blobDirListings");
}

@Override
public void init(NamedList args) {
public void init(@SuppressWarnings("rawtypes") NamedList args) {
super.init(args);
SolrParams params = args.toSolrParams();

String delegateFactoryClass = params.get("delegateFactory");
if (delegateFactoryClass == null) {
throw new IllegalArgumentException("delegateFactory class is required");
}
delegateFactory =
coreContainer.getResourceLoader().newInstance(delegateFactoryClass, DirectoryFactory.class);
delegateFactory.initCoreContainer(coreContainer);
delegateFactory.init(args);

delegateLockType = params.get("delegateLockType");
if (delegateLockType == null) {
throw new IllegalArgumentException("delegateLockType is required");
}

String blobRootDir = params.get("blobRootDir");
if (blobRootDir == null) {
throw new IllegalArgumentException("blobRootDir is required");
}
blobStore = null;//TODO new BlobStore(blobRootDir);
// BlobStore where files are persisted.
blobStore = initBlobStore(params);
blobPusher = new BlobPusher(blobStore);

maxChunk = params.getInt("maxChunkSize", MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
if (maxChunk <= 0) {
throw new IllegalArgumentException("maxChunk must be greater than 0");
// Filesystem MMapDirectory used as a local file cache.
mMapParams = new MMapParams(params);
}

private BlobStore initBlobStore(SolrParams params) {
String blobStoreClass = params.get("blobStore.class");
if (blobStoreClass == null) {
throw new IllegalArgumentException("blobStore.class is required");
}
unmapHack = params.getBool("unmap", true);
preload = params.getBool("preload", false); // default turn-off
BlobStore blobStore = coreContainer.getResourceLoader().newInstance(blobStoreClass, BlobStore.class);
blobStore.init(params);
return blobStore;
}

BlobStore getBlobStore() {
return blobStore;
}

MMapParams getMMapParams() {
return mMapParams;
}

@Override
public void doneWithDirectory(Directory directory) throws IOException {
log.debug("doneWithDirectory {}", directory);
((BlobDirectory) directory).release();
// TODO delegateFactory.doneWithDirectory(directory);
super.doneWithDirectory(directory);
}

Expand All @@ -119,37 +98,56 @@ public void close() throws IOException {
log.debug("close");
IOUtils.closeQuietly(blobStore);
IOUtils.closeQuietly(blobPusher);
IOUtils.closeQuietly(delegateFactory);
super.close();
}

@Override
protected LockFactory createLockFactory(String rawLockType) throws IOException {
return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE)
? NoLockFactory.INSTANCE
: NativeFSLockFactory.INSTANCE;
// TODO return rawLockType.equals(DirectoryFactory.LOCK_TYPE_NONE) ? NoLockFactory.INSTANCE :
// DELEGATE_LOCK_FACTORY;
protected LockFactory createLockFactory(String rawLockType) {
log.debug("createLockFactory {}", rawLockType);
if (rawLockType == null) {
rawLockType = DirectoryFactory.LOCK_TYPE_NATIVE;
log.warn("No lockType configured, assuming '{}'.", rawLockType);
}
String lockType = rawLockType.toLowerCase(Locale.ROOT).trim();
switch (lockType) {
case DirectoryFactory.LOCK_TYPE_SIMPLE:
return SimpleFSLockFactory.INSTANCE;
case DirectoryFactory.LOCK_TYPE_NATIVE:
return NativeFSLockFactory.INSTANCE;
case DirectoryFactory.LOCK_TYPE_SINGLE:
return new SingleInstanceLockFactory();
case DirectoryFactory.LOCK_TYPE_NONE:
return NoLockFactory.INSTANCE;
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unrecognized lockType: " + rawLockType);
}
}

@Override
protected Directory create(String path, LockFactory lockFactory, DirContext dirContext)
throws IOException {
log.debug("Create Directory {}", path);
MMapDirectory mapDirectory = new MMapDirectory(new File(path).toPath(), lockFactory, maxChunk);
// Create directly a MMapDirectory without calling MMapDirectoryFactory because this BlobDirectoryFactory
// is already a CachingDirectoryFactory, so we don't want another CachingDirectoryFactory.
MMapDirectory mapDirectory = createMMapDirectory(path, lockFactory);
String blobDirPath = getLocalRelativePath(path);
return new BlobDirectory(mapDirectory, blobDirPath, blobPusher);
}

private MMapDirectory createMMapDirectory(String path, LockFactory lockFactory) throws IOException {
MMapDirectory mapDirectory = new MMapDirectory(new File(path).toPath(), lockFactory, mMapParams.maxChunk);
try {
mapDirectory.setUseUnmap(unmapHack);
mapDirectory.setUseUnmap(mMapParams.unmap);
} catch (IllegalArgumentException e) {
log.warn("Unmap not supported on this JVM, continuing on without setting unmap", e);
}
mapDirectory.setPreload(preload);
Directory delegateDirectory = mapDirectory;
// TODO
// String delegateLockType = lockFactory == NoLockFactory.INSTANCE ?
// DirectoryFactory.LOCK_TYPE_NONE : this.delegateLockType;
// Directory delegateDirectory = delegateFactory.get(path, dirContext, delegateLockType);
String blobDirPath = getRelativePath(path, localRootPath);
return new BlobDirectory(delegateDirectory, blobDirPath, blobPusher);
mapDirectory.setPreload(mMapParams.preload);
return mapDirectory;
}

String getLocalRelativePath(String path) {
return getRelativePath(path, localRootPath);
}

private String getRelativePath(String path, String referencePath) {
Expand All @@ -169,16 +167,14 @@ public boolean exists(String path) throws IOException {
boolean exists = super.exists(path);
log.debug("exists {} = {}", path, exists);
return exists;
// TODO return delegateFactory.exists(path);
}

@Override
protected void removeDirectory(CacheValue cacheValue) throws IOException {
log.debug("removeDirectory {}", cacheValue);
File dirFile = new File(cacheValue.path);
FileUtils.deleteDirectory(dirFile);
// TODO delegateFactory.remove(cacheValue.path);
String blobDirPath = getRelativePath(cacheValue.path, localRootPath);
String blobDirPath = getLocalRelativePath(cacheValue.path);
blobStore.deleteDirectory(blobDirPath);
}

Expand Down Expand Up @@ -212,7 +208,6 @@ public boolean isSharedStorage() {
public void release(Directory directory) throws IOException {
log.debug("release {}", directory);
((BlobDirectory) directory).release();
// TODO delegateFactory.release(directory);
super.release(directory);
}

Expand All @@ -221,7 +216,6 @@ public boolean isAbsolute(String path) {
boolean isAbsolute = new File(path).isAbsolute();
log.debug("isAbsolute {} = {}", path, isAbsolute);
return isAbsolute;
// TODO return delegateFactory.isAbsolute(path);
}

@Override
Expand All @@ -241,15 +235,14 @@ public void cleanupOldIndexDirectories(String dataDirPath, String currentIndexDi
log.debug("cleanupOldIndexDirectories {} {}", dataDirPath, currentIndexDirPath);

super.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath, afterCoreReload);
// TODO delegateFactory.cleanupOldIndexDirectories(dataDirPath, currentIndexDirPath, afterCoreReload);

try {
dataDirPath = normalize(dataDirPath);
currentIndexDirPath = normalize(currentIndexDirPath);
} catch (IOException e) {
log.error("Failed to delete old index directories in {} due to: ", dataDirPath, e);
}
String blobDirPath = getRelativePath(dataDirPath, localRootPath);
String blobDirPath = getLocalRelativePath(dataDirPath);
String currentIndexDirName = getRelativePath(currentIndexDirPath, dataDirPath);
List<String> oldIndexDirs;
try {
Expand All @@ -272,9 +265,30 @@ public void cleanupOldIndexDirectories(String dataDirPath, String currentIndexDi
oldIndexDirs = oldIndexDirs.subList(0, oldIndexDirs.size() - 1);
}
try {
blobStore.deleteDirectories(blobDirPath, oldIndexDirs);
for (String oldIndexDir : oldIndexDirs) {
blobStore.deleteDirectory(blobDirPath + '/' + oldIndexDir);
}
} catch (IOException e) {
log.error("Failed to delete old index directories {} in {} due to: ", oldIndexDirs, blobDirPath, e);
}
}

/**
* Parameters to create {@link MMapDirectory}.
*/
static class MMapParams {

final boolean unmap;
final boolean preload;
final int maxChunk;

private MMapParams(SolrParams params) {
maxChunk = params.getInt("mmap.maxChunkSize", MMapDirectory.DEFAULT_MAX_CHUNK_SIZE);
if (maxChunk <= 0) {
throw new IllegalArgumentException("mmap.maxChunkSize must be greater than 0");
}
unmap = params.getBool("mmap.unmap", true);
preload = params.getBool("mmap.preload", false); // default turn-off
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import org.apache.lucene.util.IOUtils;
Expand Down Expand Up @@ -104,6 +101,11 @@ private void deleteFiles(String blobDirPath, Collection<String> fileNames) throw

@Override
public void close() {
// TODO
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.solr.blob;

import org.apache.solr.common.params.SolrParams;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -26,6 +28,11 @@

public abstract class BlobStore implements Closeable {

/**
* Initializes this {@link BlobStore} based on provided parameters.
*/
public abstract void init(SolrParams params);

/**
* Creates a file.
*
Expand All @@ -52,17 +59,10 @@ public abstract void delete(String dirPath, Collection<String> fileNames)
public abstract void deleteDirectory(String dirPath)
throws IOException;

/**
* Deletes directories from the blob storage.
*
* @param dirPath The path to the directory in which to delete sub-directories.
* @param dirNames The directory names. They are deleted even if they are not empty.
*/
public abstract void deleteDirectories(String dirPath, Collection<String> dirNames)
throws IOException;

/**
* Lists all files and sub-directories that are selected by the provided filter in a given directory.
* The output is in sorted order (UTF-16, java's {@link String#compareTo}).
* Returns an empty list if the directory does not exist.
*
* @param dirPath The path to the directory in which to list files/sub-directories.
* @param nameFilter Filters the listed file/directory names (does not include the path).
Expand Down

0 comments on commit af4702e

Please sign in to comment.