From 13a4eeb51cf8c88c28e84ee95294cbff835a6a7c Mon Sep 17 00:00:00 2001 From: Bruno Roustant <33934988+bruno-roustant@users.noreply.github.com> Date: Thu, 22 Apr 2021 09:36:09 +0200 Subject: [PATCH] Use BackupRepository instead of BlobStore. * BlobPusher: max threads, thread local stream buffer. * Synchronize internal collections in BlobDirectory --- .../org/apache/solr/blob/BlobDirectory.java | 130 +++++++++++------- .../solr/blob/BlobDirectoryFactory.java | 89 ++++++++---- .../java/org/apache/solr/blob/BlobPusher.java | 94 ++++++++++--- .../java/org/apache/solr/blob/BlobStore.java | 81 ----------- .../apache/solr/blob/FilterIndexOutput.java | 3 + .../org/apache/solr/blob/LocalBlobStore.java | 95 ------------- .../org/apache/solr/blob/package-info.java | 2 +- .../solr/blob/BlobDirectoryFactoryTest.java | 32 +++-- .../apache/solr/blob/BlobDirectoryTest.java | 28 ++-- 9 files changed, 265 insertions(+), 289 deletions(-) delete mode 100644 solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java delete mode 100644 solr/contrib/blob-directory/src/java/org/apache/solr/blob/LocalBlobStore.java diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java index a539ce32e23..6743f48b847 100644 --- a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java +++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectory.java @@ -17,6 +17,15 @@ package org.apache.solr.blob; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.solr.common.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.InputStream; import java.lang.invoke.MethodHandles; @@ -27,15 +36,6 @@ import java.util.Map; import java.util.Set; -import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FilterDirectory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexOutput; -import org.apache.solr.common.util.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class BlobDirectory extends FilterDirectory { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -50,25 +50,25 @@ public class BlobDirectory extends FilterDirectory { * file, to provide the checksums on {@link #sync(Collection)}. But it is able to free earlier the * reference each time an {@link IndexOutput} is closed, by getting the checksum at that time. */ - private final Map blobFileSupplierMap; - private final Set synchronizedFileNames; - private final Collection deletedFileNames; + private final Map blobFileSupplierMap = new HashMap<>(); + private final Set synchronizedFileNames = new HashSet<>(); + private final Collection deletedFileNames = new ArrayList<>(); + private final Object lock = new Object(); private volatile boolean isOpen; public BlobDirectory(Directory delegate, String blobDirPath, BlobPusher blobPusher) { super(delegate); this.blobDirPath = blobDirPath; this.blobPusher = blobPusher; - blobFileSupplierMap = new HashMap<>(); - synchronizedFileNames = new HashSet<>(); - deletedFileNames = new ArrayList<>(); } @Override public void deleteFile(String name) throws IOException { log.debug("deleteFile {}", name); in.deleteFile(name); - deletedFileNames.add(name); + synchronized (lock) { + deletedFileNames.add(name); + } } @Override @@ -76,7 +76,9 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti log.debug("createOutput {}", name); IndexOutput indexOutput = in.createOutput(name, context); BlobFileSupplier blobFileSupplier = new BlobFileSupplier(indexOutput); - blobFileSupplierMap.put(name, blobFileSupplier); + synchronized (lock) { + blobFileSupplierMap.put(name, blobFileSupplier); + } return new BlobIndexOutput(indexOutput, blobFileSupplier); } @@ -86,22 +88,26 @@ public IndexOutput createOutput(String name, IOContext context) throws IOExcepti public void sync(Collection names) throws IOException { log.debug("sync {}", names); in.sync(names); - synchronizedFileNames.addAll(names); + synchronized (lock) { + synchronizedFileNames.addAll(names); + } } @Override public void rename(String source, String dest) throws IOException { log.debug("rename {} to {}", source, dest); in.rename(source, dest); - // Also rename the corresponding BlobFile. - BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source); - if (blobFileSupplier != null) { - blobFileSupplier.rename(source, dest); - blobFileSupplierMap.put(dest, blobFileSupplier); - } - // Also rename the tracked synchronized file. - if (synchronizedFileNames.remove(source)) { - synchronizedFileNames.add(dest); + synchronized (lock) { + // Rename the corresponding BlobFile. + BlobFileSupplier blobFileSupplier = blobFileSupplierMap.remove(source); + if (blobFileSupplier != null) { + blobFileSupplier.rename(source, dest); + blobFileSupplierMap.put(dest, blobFileSupplier); + } + // Rename the tracked synchronized file. + if (synchronizedFileNames.remove(source)) { + synchronizedFileNames.add(dest); + } } } @@ -109,27 +115,32 @@ public void rename(String source, String dest) throws IOException { public void syncMetaData() throws IOException { log.debug("syncMetaData"); in.syncMetaData(); - syncToBlobStore(); + syncToRepository(); } - private void syncToBlobStore() throws IOException { + private void syncToRepository() throws IOException { log.debug("File names to sync {}", synchronizedFileNames); - Collection writes = new ArrayList<>(synchronizedFileNames.size()); - for (String fileName : synchronizedFileNames) { - BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName); - if (blobFileSupplier != null) { - // Only sync files that were synced since this directory was released. Previous files don't - // need to be synced. - writes.add(blobFileSupplier.getBlobFile()); + Collection writes; + Collection deletes; + synchronized (lock) { + writes = new ArrayList<>(synchronizedFileNames.size()); + for (String fileName : synchronizedFileNames) { + BlobFileSupplier blobFileSupplier = blobFileSupplierMap.get(fileName); + if (blobFileSupplier != null) { + // Only sync files that were synced since this directory was released. + // Previous files don't need to be synced. + blobFileSupplier.freeIndexOutput(); + writes.add(blobFileSupplier.getBlobFile()); + } } + synchronizedFileNames.clear(); + deletes = new ArrayList<>(deletedFileNames); + deletedFileNames.clear(); } - log.debug("Sync to BlobStore writes={} deleted={}", writes, deletedFileNames); - blobPusher.push(blobDirPath, writes, this::openInputStream, deletedFileNames); - - synchronizedFileNames.clear(); - deletedFileNames.clear(); + log.debug("Sync to repository writes={} deleted={}", writes, deletes); + blobPusher.push(blobDirPath, writes, this::openInputStream, deletes); } private InputStream openInputStream(BlobFile blobFile) throws IOException { @@ -138,9 +149,11 @@ private InputStream openInputStream(BlobFile blobFile) throws IOException { public void release() { log.debug("release"); - blobFileSupplierMap.clear(); - synchronizedFileNames.clear(); - deletedFileNames.clear(); + synchronized (lock) { + blobFileSupplierMap.clear(); + synchronizedFileNames.clear(); + deletedFileNames.clear(); + } } // obtainLock(): We get the delegate Directory lock. @@ -180,8 +193,14 @@ private static class BlobIndexOutput extends FilterIndexOutput { @Override public void close() throws IOException { - // Free the reference to the IndexOutput. - blobFileSupplier.indexOutput = null; + blobFileSupplier.freeIndexOutput(); + // TODO + // It could be possible to start pushing the file asynchronously with BlobPusher at this time, + // provided that the file size is larger than a threshold (to avoid sending small intermediate + // files that could be removed later before the sync, e.g. merge-on-commit). We would have to + // wait for the completion of the push in the BlobPusher when sync()/syncMetadata() is called. + // Other option: intercept the calls to the write methods, buffer written data, and start + // pushing to BlobPusher earlier than the call to this close method. super.close(); } } @@ -208,16 +227,27 @@ void rename(String source, String dest) { name = dest; if (blobFile != null) { blobFile = new BlobFile(name, blobFile.size(), blobFile.checksum()); + assert indexOutput == null; } } - BlobFile getBlobFile() throws IOException { - if (blobFile == null) { + /** + * Frees the reference to the {@link IndexOutput}. + * Creates the {@link BlobFile} if it has not been created yet. + */ + void freeIndexOutput() throws IOException { + if (indexOutput != null) { blobFile = new BlobFile(name, indexOutput.getFilePointer(), indexOutput.getChecksum()); - // log.debug("Freeing IndexOutput {}", indexOutput); - // Free the reference to the IndexOutput. indexOutput = null; } + } + + /** + * Gets the {@link BlobFile}. {@link #freeIndexOutput()} must have been called before. + */ + BlobFile getBlobFile() { + assert indexOutput == null : "IndexOutput must be freed before"; + assert blobFile != null; return blobFile; } } diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java index 16bc03a0116..432b56f8d87 100644 --- a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java +++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobDirectoryFactory.java @@ -20,13 +20,17 @@ import java.io.File; import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; import org.apache.lucene.store.*; import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.NamedList; @@ -34,6 +38,7 @@ import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.DirectoryFactory; +import org.apache.solr.core.backup.repository.BackupRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,10 +46,23 @@ public class BlobDirectoryFactory extends CachingDirectoryFactory { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + /** + * Default maximum number of threads that push files concurrently to the repository. + */ + private static final int DEFAULT_MAX_THREADS = 20; + + /** + * Default size of the buffer used to transfer input-output stream to the repository, in bytes. + * The appropriate size depends on the repository. + * There is one buffer per thread. + */ + public static final int DEFAULT_STREAM_BUFFER_SIZE = 32_768; + private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("index(?:\\.[0-9]{17})?"); private String localRootPath; - private BlobStore blobStore; + private BackupRepository repository; + private URI repositoryLocation; private BlobPusher blobPusher; private MMapParams mMapParams; @@ -60,26 +78,42 @@ public void init(@SuppressWarnings("rawtypes") NamedList args) { super.init(args); SolrParams params = args.toSolrParams(); - // BlobStore where files are persisted. - blobStore = initBlobStore(params); - blobPusher = new BlobPusher(blobStore); + // Repository where files are persisted. + // 'repository' parameter must be defined, we don't want to use a default one. + String repositoryName = params.get(CoreAdminParams.BACKUP_REPOSITORY); + if (repositoryName == null) { + throw new IllegalArgumentException("Parameter '" + CoreAdminParams.BACKUP_REPOSITORY + "' is required"); + } + repository = coreContainer.newBackupRepository(repositoryName); + + // 'location' parameter must be defined, we don't want to use a default one. + String location = repository.getBackupLocation(params.get(CoreAdminParams.BACKUP_LOCATION)); + if (location == null) { + throw new IllegalArgumentException("Parameter '" + CoreAdminParams.BACKUP_LOCATION + "' is required"); + } + repositoryLocation = repository.createURI(location); + + // 'threads' defines the maximum number of threads that push files concurrently to the repository. + int maxThreads = params.getInt("threads", DEFAULT_MAX_THREADS); + + // 'streamBufferSize' defines the size of the stream copy buffer. + // This determines the size of the chunk of data sent to the repository during files transfer. + // It is optional but recommended to set the appropriate size for the selected repository. + // There is one buffer per thread. + int streamBufferSize = params.getInt("streamBufferSize", DEFAULT_STREAM_BUFFER_SIZE); + + blobPusher = new BlobPusher(repository, repositoryLocation, maxThreads, streamBufferSize); // Filesystem MMapDirectory used as a local file cache. mMapParams = new MMapParams(params); } - private BlobStore initBlobStore(SolrParams params) { - String blobStoreClass = params.get("blobStore.class"); - if (blobStoreClass == null) { - throw new IllegalArgumentException("blobStore.class is required"); - } - BlobStore blobStore = coreContainer.getResourceLoader().newInstance(blobStoreClass, BlobStore.class); - blobStore.init(params); - return blobStore; + BackupRepository getRepository() { + return repository; } - BlobStore getBlobStore() { - return blobStore; + URI getRepositoryLocation() { + return repositoryLocation; } MMapParams getMMapParams() { @@ -96,7 +130,7 @@ public void doneWithDirectory(Directory directory) throws IOException { @Override public void close() throws IOException { log.debug("close"); - IOUtils.closeQuietly(blobStore); + IOUtils.closeQuietly(repository); IOUtils.closeQuietly(blobPusher); super.close(); } @@ -152,8 +186,8 @@ String getLocalRelativePath(String path) { private String getRelativePath(String path, String referencePath) { if (!path.startsWith(referencePath)) { - throw new IllegalArgumentException("Path=" + path + " is expected to start with referencePath=" - + referencePath + " otherwise we have to adapt the code"); + throw new IllegalArgumentException("Path '" + path + "' is expected to start with referencePath '" + + referencePath + "' otherwise we have to adapt the code"); } String relativePath = path.substring(referencePath.length()); if (relativePath.startsWith("/")) { @@ -162,6 +196,10 @@ private String getRelativePath(String path, String referencePath) { return relativePath; } + URI resolveBlobPath(String blobPath) { + return repository.resolve(repositoryLocation, blobPath); + } + @Override public boolean exists(String path) throws IOException { boolean exists = super.exists(path); @@ -175,7 +213,7 @@ protected void removeDirectory(CacheValue cacheValue) throws IOException { File dirFile = new File(cacheValue.path); FileUtils.deleteDirectory(dirFile); String blobDirPath = getLocalRelativePath(cacheValue.path); - blobStore.deleteDirectory(blobDirPath); + repository.deleteDirectory(resolveBlobPath(blobDirPath)); } @Override @@ -241,18 +279,21 @@ public void cleanupOldIndexDirectories(String dataDirPath, String currentIndexDi currentIndexDirPath = normalize(currentIndexDirPath); } catch (IOException e) { log.error("Failed to delete old index directories in {} due to: ", dataDirPath, e); + return; } String blobDirPath = getLocalRelativePath(dataDirPath); - String currentIndexDirName = getRelativePath(currentIndexDirPath, dataDirPath); - List oldIndexDirs; + URI blobDirUri = resolveBlobPath(blobDirPath); + String[] dirEntries; try { - oldIndexDirs = blobStore.listInDirectory(blobDirPath, - (name) -> !name.equals(currentIndexDirName) - && INDEX_NAME_PATTERN.matcher(name).matches()); + dirEntries = repository.listAll(blobDirUri); } catch (IOException e) { log.error("Failed to delete old index directories in {} due to: ", blobDirPath, e); return; } + String currentIndexDirName = getRelativePath(currentIndexDirPath, dataDirPath); + List oldIndexDirs = Arrays.stream(dirEntries) + .filter((name) -> !name.equals(currentIndexDirName) && INDEX_NAME_PATTERN.matcher(name).matches()) + .collect(Collectors.toList()); if (oldIndexDirs.isEmpty()) { return; } @@ -266,7 +307,7 @@ public void cleanupOldIndexDirectories(String dataDirPath, String currentIndexDi } try { for (String oldIndexDir : oldIndexDirs) { - blobStore.deleteDirectory(blobDirPath + '/' + oldIndexDir); + repository.deleteDirectory(repository.resolve(blobDirUri, oldIndexDir)); } } catch (IOException e) { log.error("Failed to delete old index directories {} in {} due to: ", oldIndexDirs, blobDirPath, e); diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java index 4421e2bd145..b8ca28aca81 100644 --- a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java +++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobPusher.java @@ -17,36 +17,63 @@ package org.apache.solr.blob; +import org.apache.lucene.util.IOUtils; +import org.apache.solr.common.util.ExecutorUtil; +import org.apache.solr.common.util.SolrNamedThreadFactory; +import org.apache.solr.core.backup.repository.BackupRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.invoke.MethodHandles; +import java.net.URI; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.concurrent.*; +import java.util.regex.Pattern; import java.util.stream.Collectors; -import org.apache.lucene.util.IOUtils; -import org.apache.solr.common.util.ExecutorUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** Pushes a set of files to Blob, and works with listings. */ +/** + * Pushes a set of files to Blob, and works with listings. + */ public class BlobPusher implements Closeable { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - // WORK IN PROGRESS!! + private static final String PATH_SEPARATOR = "/"; + private static final Pattern PATH_SPLITTER = Pattern.compile(PATH_SEPARATOR); - private final BlobStore blobStore; - private final ExecutorService executor = ExecutorUtil.newMDCAwareCachedThreadPool("blobPusher"); + // WORK IN PROGRESS!! - public BlobPusher(BlobStore blobStore) { - this.blobStore = blobStore; + private final BackupRepository repository; + private final URI repositoryLocation; + private final ExecutorService executor; + // Pool of stream buffers, one per executor thread, to reuse them as the buffer size may be large + // to have efficient stream transfer to the remote repository. + private final ThreadLocal streamBuffers; + + /** + * @param repository The repository to push files to. + * @param repositoryLocation Base location in the repository. This is used to build the URIs. + * @param maxThreads Maximum number of threads to push files concurrently. + * @param streamBufferSize Size of the stream copy buffer, in bytes. This determines the size of the chunk + * of data sent to the repository during files transfer. There is one buffer per thread. + */ + public BlobPusher(BackupRepository repository, URI repositoryLocation, int maxThreads, int streamBufferSize) { + this.repository = Objects.requireNonNull(repository); + this.repositoryLocation = Objects.requireNonNull(repositoryLocation); + executor = ExecutorUtil.newMDCAwareCachedThreadPool( + maxThreads, + new SolrNamedThreadFactory(BlobPusher.class.getSimpleName())); + streamBuffers = ThreadLocal.withInitial(() -> new byte[streamBufferSize]); } public void push( - String blobDirPath, + String blobDirPath, Collection writes, IOUtils.IOFunction inputStreamSupplier, Collection deletes) @@ -55,7 +82,7 @@ public void push( // update "foreign" listings // TODO David - // send files to BlobStore and delete our files too + // Send files to repository and delete our files too. log.debug("Pushing {}", writes); executeAll(pushFiles(blobDirPath, writes, inputStreamSupplier)); log.debug("Deleting {}", deletes); @@ -79,24 +106,55 @@ private void executeAll(List> actions) throws IOException { } private List> pushFiles( - String blobDirPath, + String blobDirPath, Collection blobFiles, - IOUtils.IOFunction inputStreamSupplier) { + IOUtils.IOFunction inputStreamSupplier) + throws IOException { + URI blobDirUri = repository.resolve(repositoryLocation, blobDirPath); + createDirectories(blobDirUri, blobDirPath); return blobFiles.stream() .map( (blobFile) -> (Callable) () -> { - try (InputStream in = inputStreamSupplier.apply(blobFile)) { - blobStore.create(blobDirPath, blobFile.fileName(), in, blobFile.size()); + try (InputStream in = inputStreamSupplier.apply(blobFile); + OutputStream out = repository.createOutput(repository.resolve(blobDirUri, blobFile.fileName()))) { + copyStream(in, out); } return null; }) .collect(Collectors.toList()); } + private void copyStream(InputStream input, OutputStream output) throws IOException { + byte[] buffer = streamBuffers.get(); + int n; + while ((n = input.read(buffer)) != -1) { + output.write(buffer, 0, n); + } + } + + private void createDirectories(URI blobDirUri, String blobDirPath) throws IOException { + // Create the parent directories if needed. + // The goal is to have a minimal number of calls to the repository in most cases. + // Common case: the directory exists or the parent directory exists. + // Try a direct call to 'createDirectory', avoiding a call to 'exists' in many cases. + // The API says if the directory already exists, it is a no-op. + try { + repository.createDirectory(blobDirUri); + } catch (IOException e) { + // Create the parent directories. + URI pathUri = repositoryLocation; + for (String pathElement : PATH_SPLITTER.split(blobDirPath)) { + pathUri = repository.resolve(pathUri, pathElement); + repository.createDirectory(pathUri); + } + } + } + private void deleteFiles(String blobDirPath, Collection fileNames) throws IOException { - blobStore.delete(blobDirPath, fileNames); + URI blobDirUri = repository.resolve(repositoryLocation, blobDirPath); + repository.delete(blobDirUri, fileNames, true); } @Override diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java deleted file mode 100644 index 2c5eb55faa2..00000000000 --- a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/BlobStore.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.solr.blob; - -import org.apache.solr.common.params.SolrParams; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.util.Collection; -import java.util.List; -import java.util.function.Predicate; - -public abstract class BlobStore implements Closeable { - - /** - * Initializes this {@link BlobStore} based on provided parameters. - */ - public abstract void init(SolrParams params); - - /** - * Creates a file. - * - * @param dirPath The path to the directory in which to create the file. - * @param fileName The file name. - */ - public abstract void create(String dirPath, String fileName, InputStream inputStream, long contentLength) - throws IOException; - - /** - * Deletes files. - * - * @param dirPath The path to the directory in which to delete files. - * @param fileNames The file names. - */ - public abstract void delete(String dirPath, Collection fileNames) - throws IOException; - - /** - * Deletes a directory. - * - * @param dirPath The path to the directory to delete. It is deleted even if it is not empty. - */ - public abstract void deleteDirectory(String dirPath) - throws IOException; - - /** - * Lists all files and sub-directories that are selected by the provided filter in a given directory. - * The output is in sorted order (UTF-16, java's {@link String#compareTo}). - * Returns an empty list if the directory does not exist. - * - * @param dirPath The path to the directory in which to list files/sub-directories. - * @param nameFilter Filters the listed file/directory names (does not include the path). - */ - public abstract List listInDirectory(String dirPath, Predicate nameFilter) - throws IOException; - - /** - * Reads a file. - * - * @param dirPath The path to the directory in which to read the file. - * @param fileName The file name. - */ - public abstract InputStream read(String dirPath, String fileName) - throws IOException; -} diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java index ff9331dfe4d..aaabdad2644 100644 --- a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java +++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/FilterIndexOutput.java @@ -24,6 +24,9 @@ import org.apache.lucene.store.DataInput; import org.apache.lucene.store.IndexOutput; +/** + * Delegates to another {@link IndexOutput}. + */ public class FilterIndexOutput extends IndexOutput { protected final IndexOutput delegate; diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/LocalBlobStore.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/LocalBlobStore.java deleted file mode 100644 index 5ae0a386fc8..00000000000 --- a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/LocalBlobStore.java +++ /dev/null @@ -1,95 +0,0 @@ -package org.apache.solr.blob; - -import org.apache.commons.io.FileUtils; -import org.apache.solr.common.params.SolrParams; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.function.Predicate; - -public class LocalBlobStore extends BlobStore { - - private String blobRootDir; - - public String getBlobRootDir() { - return blobRootDir; - } - - @Override - public void init(SolrParams params) { - blobRootDir = params.get("blobStore.rootDir"); - if (blobRootDir == null) { - throw new IllegalArgumentException("blobStore.rootDir is required"); - } - } - - @Override - public void create(String dirPath, String fileName, InputStream inputStream, long contentLength) throws IOException { - File file = new File(new File(blobRootDir, dirPath), fileName); - if (file.exists()) { - throw new IOException("\"" + file + "\" already exists"); - } - FileUtils.copyToFile(inputStream, file); - } - - @Override - public void delete(String dirPath, Collection fileNames) throws IOException { - for (String fileName : fileNames) { - File file = new File(new File(blobRootDir, dirPath), fileName); - if (!file.delete() && file.exists()) { - throw new IOException("IO error while deleting file \"" + file + "\""); - } - } - } - - @Override - public void deleteDirectory(String dirPath) throws IOException { - File dir = new File(blobRootDir, dirPath); - if (!dir.isDirectory()) { - if (dir.exists()) { - throw new IOException("\"" + dir + "\" is not a directory"); - } - return; - } - FileUtils.deleteDirectory(dir); - if (dir.exists()) { - throw new IOException("IO error while deleting file \"" + dir + "\""); - } - } - - @Override - public List listInDirectory(String dirPath, Predicate nameFilter) throws IOException { - File dir = new File(blobRootDir, dirPath); - if (!dir.exists()) { - return Collections.emptyList(); - } - String[] list = dir.list((__, name) -> nameFilter.test(name)); - if (list == null) { - if (!dir.isDirectory()) { - throw new IOException("\"" + dir + "\" is not a directory"); - } - throw new IOException("IO error while listing in directory \"" + dir + "\""); - } - if (list.length == 0) { - return Collections.emptyList(); - } - Arrays.sort(list); - return Arrays.asList(list); - } - - @Override - public InputStream read(String dirPath, String fileName) throws IOException { - return new FileInputStream(new File(new File(blobRootDir, dirPath), fileName)); - } - - @Override - public void close() { - // Nothing to do. - } -} diff --git a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/package-info.java b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/package-info.java index 119f4c52f78..23305befa13 100644 --- a/solr/contrib/blob-directory/src/java/org/apache/solr/blob/package-info.java +++ b/solr/contrib/blob-directory/src/java/org/apache/solr/blob/package-info.java @@ -17,7 +17,7 @@ /** * A {@link org.apache.solr.core.DirectoryFactory} for storing every directories in a remote - * {@link org.apache.solr.blob.BlobStore}, while keeping a local + * {@link org.apache.solr.core.backup.repository.BackupRepository}, while keeping a local * {@link org.apache.solr.core.MMapDirectoryFactory} as a cache for fast read/write access. */ package org.apache.solr.blob; diff --git a/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryFactoryTest.java b/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryFactoryTest.java index 540beeeb292..b7d3ddbc425 100644 --- a/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryFactoryTest.java +++ b/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryFactoryTest.java @@ -6,18 +6,28 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.*; +import org.apache.solr.core.backup.repository.LocalFileSystemRepository; import org.junit.*; import java.io.IOException; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; public class BlobDirectoryFactoryTest extends SolrTestCaseJ4 { - private static final String SOLR_CONFIG = ""; + private static final String REPOSITORY_NAME = "local"; + + private static final String SOLR_CONFIG = "" + + " " + + " " + + " " + + ""; private static Path blobRootDir; private static NodeConfig nodeConfig; @@ -62,24 +72,24 @@ public void testInitArgs() { // Then it throws an exception. expectThrows(IllegalArgumentException.class, () -> blobDirectoryFactory.init(args)); - // Given only the LocalBlobStore class arg. - args.add("blobStore.class", LocalBlobStore.class.getName()); + // Given only the 'repository' arg. + args.add(CoreAdminParams.BACKUP_REPOSITORY, REPOSITORY_NAME); // When the factory is initialized. // Then it throws an exception. expectThrows(IllegalArgumentException.class, () -> blobDirectoryFactory.init(args)); - // Given the LocalBlobStore root dir args is provided. - args.add("blobStore.rootDir", blobRootDir.toString()); + // Given the 'location' args is provided. + args.add(CoreAdminParams.BACKUP_LOCATION, blobRootDir.toString()); // Given other optional MMapDirectory params are provided. args.add("mmap.maxChunkSize", Integer.toString(10)); args.add("mmap.unmap", Boolean.toString(false)); args.add("mmap.preload", Boolean.toString(true)); // When the factory is initialized. blobDirectoryFactory.init(args); - // Then the BlobStore is correctly initialized. - assertTrue(blobDirectoryFactory.getBlobStore() instanceof LocalBlobStore); - // Then the LocalBlobStore root dir is correctly initialized. - assertEquals(blobRootDir.toString(), ((LocalBlobStore) blobDirectoryFactory.getBlobStore()).getBlobRootDir()); + // Then the BackupRepository is correctly initialized. + assertTrue(blobDirectoryFactory.getRepository() instanceof LocalFileSystemRepository); + // Then the backup location is correctly initialized. + assertEquals(blobRootDir, Paths.get(blobDirectoryFactory.getRepositoryLocation())); // Then the optional MMapDirectory params are correctly initialized. assertEquals(10, blobDirectoryFactory.getMMapParams().maxChunk); assertFalse(blobDirectoryFactory.getMMapParams().unmap); @@ -90,8 +100,8 @@ public void testInitArgs() { public void testCleanupOldIndexDirectories() throws Exception { // Given a correctly initialized BlobDirectoryFactory. NamedList args = new NamedList<>(); - args.add("blobStore.class", LocalBlobStore.class.getName()); - args.add("blobStore.rootDir", blobRootDir.toString()); + args.add(CoreAdminParams.BACKUP_REPOSITORY, REPOSITORY_NAME); + args.add(CoreAdminParams.BACKUP_LOCATION, blobRootDir.toString()); blobDirectoryFactory.init(args); // Given a mock Core with 3 mock indexes. The last one is the active index. diff --git a/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java b/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java index aa6714d6649..748c54d1736 100644 --- a/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java +++ b/solr/contrib/blob-directory/src/test/org/apache/solr/blob/BlobDirectoryTest.java @@ -3,22 +3,31 @@ import org.apache.lucene.mockfile.FilterPath; import org.apache.lucene.store.*; import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.DirectoryFactory; import org.apache.solr.core.NodeConfig; import org.apache.solr.core.SolrXmlConfig; +import org.apache.solr.core.backup.repository.LocalFileSystemRepository; import org.junit.*; import java.io.IOException; -import java.io.InputStream; +import java.net.URI; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.util.Arrays; public class BlobDirectoryTest extends SolrTestCaseJ4 { - private static final String SOLR_CONFIG = ""; + private static final String REPOSITORY_NAME = "local"; + + private static final String SOLR_CONFIG = "" + + " " + + " " + + " " + + ""; private static Path blobRootDir; private static NodeConfig nodeConfig; @@ -49,8 +58,8 @@ public void setupDirectory() throws Exception { directoryFactory = new BlobDirectoryFactory(); directoryFactory.initCoreContainer(coreContainer); NamedList args = new NamedList<>(); - args.add("blobStore.class", LocalBlobStore.class.getName()); - args.add("blobStore.rootDir", blobRootDir.toString()); + args.add(CoreAdminParams.BACKUP_REPOSITORY, REPOSITORY_NAME); + args.add(CoreAdminParams.BACKUP_LOCATION, blobRootDir.toString()); directoryFactory.init(args); directoryPath = nodeConfig.getCoreRootDirectory() .resolve("core").resolve("data").resolve("index").toAbsolutePath().toString(); @@ -165,8 +174,9 @@ private void checkDirListing(String... expectedFileNames) throws IOException { // Check the file listing in the local filesystem directory. assertEquals(Arrays.asList(expectedFileNames), Arrays.asList(directory.listAll())); // Check the file listing in the blob store. - assertEquals(Arrays.asList(expectedFileNames), directoryFactory.getBlobStore() - .listInDirectory(directoryFactory.getLocalRelativePath(directoryPath), __ -> true)); + URI blobDirUri = directoryFactory.resolveBlobPath(directoryFactory.getLocalRelativePath(directoryPath)); + assertEquals(Arrays.asList(expectedFileNames), Arrays.asList(directoryFactory.getRepository() + .listAll(blobDirUri))); } private void checkFileContent(String name, String expectedContent) throws IOException { @@ -175,9 +185,9 @@ private void checkFileContent(String name, String expectedContent) throws IOExce assertEquals(expectedContent, input.readString()); } // Check the file content in the blob store. - try (InputStream input = directoryFactory.getBlobStore() - .read(directoryFactory.getLocalRelativePath(directoryPath), name)) { - assertEquals(expectedContent, new InputStreamDataInput(input).readString()); + URI blobDirUri = directoryFactory.resolveBlobPath(directoryFactory.getLocalRelativePath(directoryPath)); + try (IndexInput input = directoryFactory.getRepository().openInput(blobDirUri, name, IOContext.READ)) { + assertEquals(expectedContent, input.readString()); } } }