Skip to content

Commit

Permalink
Removed multiple paths from MetadataStateFormat (#72821)
Browse files Browse the repository at this point in the history
relates #71205
  • Loading branch information
rjernst committed May 11, 2021
1 parent f3ce700 commit 76e515a
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 218 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
Expand All @@ -39,12 +39,10 @@
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* MetadataStateFormat is a base class to write checksummed
Expand Down Expand Up @@ -168,10 +166,10 @@ private static void performStateDirectoriesFsync(List<Tuple<Path, Directory>> st
/**
* Writes the given state to the given directories and performs cleanup of old state files if the write succeeds or
* newly created state file if write fails.
* See also {@link #write(Object, Path...)} and {@link #cleanupOldFiles(long, Path[])}.
* See also {@link #write(Object, Path)} and {@link #cleanupOldFiles(long, Path)}.
*/
public final long writeAndCleanup(final T state, final Path... locations) throws WriteStateException {
return write(state, true, locations);
public final long writeAndCleanup(final T state, final Path location) throws WriteStateException {
return write(state, true, location);
}

/**
Expand All @@ -186,29 +184,26 @@ public final long writeAndCleanup(final T state, final Path... locations) throws
* If this method fails with an exception, it performs cleanup of newly created state file.
* But if this method succeeds, it does not perform cleanup of old state files.
* If this write succeeds, but some further write fails, you may want to rollback the transaction and keep old file around.
* After transaction is finished use {@link #cleanupOldFiles(long, Path[])} for the clean-up.
* If this write is not a part of bigger transaction, consider using {@link #writeAndCleanup(Object, Path...)} method instead.
* After transaction is finished use {@link #cleanupOldFiles(long, Path)} for the clean-up.
* If this write is not a part of bigger transaction, consider using {@link #writeAndCleanup(Object, Path)} method instead.
*
* @param state the state object to write
* @param locations the locations where the state should be written to.
* @param location the data dir the state should be written into
* @throws WriteStateException if some exception during writing state occurs. See also {@link WriteStateException#isDirty()}.
* @return generation of newly written state.
*/
public final long write(final T state, final Path... locations) throws WriteStateException {
return write(state, false, locations);
public final long write(final T state, final Path location) throws WriteStateException {
return write(state, false, location);
}

private long write(final T state, boolean cleanup, final Path... locations) throws WriteStateException {
if (locations == null) {
private long write(final T state, boolean cleanup, final Path location) throws WriteStateException {
if (location == null) {
throw new IllegalArgumentException("Locations must not be null");
}
if (locations.length <= 0) {
throw new IllegalArgumentException("One or more locations required");
}

final long oldGenerationId, newGenerationId;
try {
oldGenerationId = findMaxGenerationId(prefix, locations);
oldGenerationId = findMaxGenerationId(prefix, location);
newGenerationId = oldGenerationId + 1;
} catch (Exception e) {
throw new WriteStateException(false, "exception during looking up new generation id", e);
Expand All @@ -220,13 +215,11 @@ private long write(final T state, boolean cleanup, final Path... locations) thro
List<Tuple<Path, Directory>> directories = new ArrayList<>();

try {
for (Path location : locations) {
Path stateLocation = location.resolve(STATE_DIR_NAME);
try {
directories.add(new Tuple<>(location, newDirectory(stateLocation)));
} catch (IOException e) {
throw new WriteStateException(false, "failed to open state directory " + stateLocation, e);
}
Path stateLocation = location.resolve(STATE_DIR_NAME);
try {
directories.add(new Tuple<>(location, newDirectory(stateLocation)));
} catch (IOException e) {
throw new WriteStateException(false, "failed to open state directory " + stateLocation, e);
}

writeStateToFirstLocation(state, directories.get(0).v1(), directories.get(0).v2(), tmpFileName);
Expand All @@ -235,7 +228,7 @@ private long write(final T state, boolean cleanup, final Path... locations) thro
performStateDirectoriesFsync(directories);
} catch (WriteStateException e) {
if (cleanup) {
cleanupOldFiles(oldGenerationId, locations);
cleanupOldFiles(oldGenerationId, location);
}
throw e;
} finally {
Expand All @@ -246,7 +239,7 @@ private long write(final T state, boolean cleanup, final Path... locations) thro
}

if (cleanup) {
cleanupOldFiles(newGenerationId, locations);
cleanupOldFiles(newGenerationId, location);
}

return newGenerationId;
Expand Down Expand Up @@ -307,68 +300,61 @@ protected Directory newDirectory(Path dir) throws IOException {
* Clean ups all state files not matching passed generation.
*
* @param currentGeneration state generation to keep.
* @param locations state paths.
* @param location data dir.
*/
public void cleanupOldFiles(final long currentGeneration, Path... locations) {
public void cleanupOldFiles(final long currentGeneration, Path location) {
final String fileNameToKeep = getStateFileName(currentGeneration);
for (Path location : locations) {
logger.trace("cleanupOldFiles: cleaning up {}", location);
Path stateLocation = location.resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(stateLocation)) {
for (String file : stateDir.listAll()) {
if (file.startsWith(prefix) && file.equals(fileNameToKeep) == false) {
deleteFileIgnoreExceptions(stateLocation, stateDir, file);
}
logger.trace("cleanupOldFiles: cleaning up {}", location);
Path stateLocation = location.resolve(STATE_DIR_NAME);
try (Directory stateDir = newDirectory(stateLocation)) {
for (String file : stateDir.listAll()) {
if (file.startsWith(prefix) && file.equals(fileNameToKeep) == false) {
deleteFileIgnoreExceptions(stateLocation, stateDir, file);
}
} catch (Exception e) {
logger.trace("clean up failed for state location {}", stateLocation);
}
} catch (Exception e) {
logger.trace("clean up failed for state location {}", stateLocation);
}
}

/**
* Finds state file with maximum id.
*
* @param prefix - filename prefix
* @param locations - paths to directories with state folder
* @param dataLocation - path to directory with state folder
* @return maximum id of state file or -1 if no such files are found
* @throws IOException if IOException occurs
*/
private long findMaxGenerationId(final String prefix, Path... locations) throws IOException {
private long findMaxGenerationId(final String prefix, Path dataLocation) throws IOException {
long maxId = -1;
for (Path dataLocation : locations) {
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
if (Files.exists(resolve)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(resolve, prefix + "*")) {
for (Path stateFile : stream) {
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long id = Long.parseLong(matcher.group(1));
maxId = Math.max(maxId, id);
}
final Path resolve = dataLocation.resolve(STATE_DIR_NAME);
if (Files.exists(resolve)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(resolve, prefix + "*")) {
for (Path stateFile : stream) {
final Matcher matcher = stateFilePattern.matcher(stateFile.getFileName().toString());
if (matcher.matches()) {
final long id = Long.parseLong(matcher.group(1));
maxId = Math.max(maxId, id);
}
}
}
}
return maxId;
}

private List<Path> findStateFilesByGeneration(final long generation, Path... locations) {
List<Path> files = new ArrayList<>();
private Path findStateFilesByGeneration(final long generation, Path dataLocation) {
if (generation == -1) {
return files;
return null;
}

final String fileName = getStateFileName(generation);
for (Path dataLocation : locations) {
final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName);
if (Files.exists(stateFilePath)) {
logger.trace("found state file: {}", stateFilePath);
files.add(stateFilePath);
}
final Path stateFilePath = dataLocation.resolve(STATE_DIR_NAME).resolve(fileName);
if (Files.exists(stateFilePath)) {
logger.trace("found state file: {}", stateFilePath);
return stateFilePath;
}

return files;
return null;
}

public String getStateFileName(long generation) {
Expand All @@ -381,52 +367,40 @@ public String getStateFileName(long generation) {
*
* @param logger a logger instance.
* @param generation the generation to be loaded.
* @param dataLocations the data-locations to try.
* @param dataLocation the data dir to read from
* @return the state of asked generation or <code>null</code> if no state was found.
*/
public T loadGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, long generation, Path... dataLocations) {
List<Path> stateFiles = findStateFilesByGeneration(generation, dataLocations);
public T loadGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, long generation, Path dataLocation) {
Path stateFile = findStateFilesByGeneration(generation, dataLocation);

final List<Throwable> exceptions = new ArrayList<>();
for (Path stateFile : stateFiles) {
if (stateFile != null) {
try {
T state = read(namedXContentRegistry, stateFile);
logger.trace("generation id [{}] read from [{}]", generation, stateFile.getFileName());
return state;
} catch (Exception e) {
exceptions.add(new IOException("failed to read " + stateFile, e));
logger.debug(() -> new ParameterizedMessage(
"{}: failed to read [{}], ignoring...", stateFile, prefix), e);
logger.debug(() -> new ParameterizedMessage("{}: failed to read [{}], ignoring...", stateFile, prefix), e);
throw new ElasticsearchException("failed to read " + stateFile, e);
}
}
// if we reach this something went wrong
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
if (stateFiles.size() > 0) {
// We have some state files but none of them gave us a usable state
throw new IllegalStateException("Could not find a state file to recover from among " +
stateFiles.stream().map(Object::toString).collect(Collectors.joining(", ")));
}
return null;
}

/**
* Tries to load the latest state from the given data-locations.
*
* @param logger a logger instance.
* @param dataLocations the data-locations to try.
* @param dataLocation the data dir to read from
* @return tuple of the latest state and generation. (null, -1) if no state is found.
*/
public Tuple<T, Long> loadLatestStateWithGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations)
public Tuple<T, Long> loadLatestStateWithGeneration(Logger logger, NamedXContentRegistry namedXContentRegistry, Path dataLocation)
throws IOException {
long generation = findMaxGenerationId(prefix, dataLocations);
T state = loadGeneration(logger, namedXContentRegistry, generation, dataLocations);
long generation = findMaxGenerationId(prefix, dataLocation);
T state = loadGeneration(logger, namedXContentRegistry, generation, dataLocation);

if (generation > -1 && state == null) {
throw new IllegalStateException("unable to find state files with generation id " + generation +
" returned by findMaxGenerationId function, in data folders [" +
Arrays.stream(dataLocations).
map(Object::toString).collect(Collectors.joining(", ")) +
"], concurrent writes?");
" returned by findMaxGenerationId function, in data folder [" + dataLocation + "], concurrent writes?");
}
return Tuple.tuple(state, generation);
}
Expand All @@ -435,24 +409,19 @@ public Tuple<T, Long> loadLatestStateWithGeneration(Logger logger, NamedXContent
* Tries to load the latest state from the given data-locations.
*
* @param logger a logger instance.
* @param dataLocations the data-locations to try.
* @param dataLocation the data dir to read from
* @return the latest state or <code>null</code> if no state was found.
*/
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path... dataLocations) throws
IOException {
return loadLatestStateWithGeneration(logger, namedXContentRegistry, dataLocations).v1();
public T loadLatestState(Logger logger, NamedXContentRegistry namedXContentRegistry, Path dataLocation) throws IOException {
return loadLatestStateWithGeneration(logger, namedXContentRegistry, dataLocation).v1();
}

/**
* Deletes all meta state directories recursively for the given data locations
* @param dataLocations the data location to delete
* @param dataLocation the data dir to delete state from
*/
public static void deleteMetaState(Path... dataLocations) throws IOException {
Path[] stateDirectories = new Path[dataLocations.length];
for (int i = 0; i < dataLocations.length; i++) {
stateDirectories[i] = dataLocations[i].resolve(STATE_DIR_NAME);
}
IOUtils.rm(stateDirectories);
public static void deleteMetaState(Path dataLocation) throws IOException {
IOUtils.rm(dataLocation.resolve(STATE_DIR_NAME));
}

public String getPrefix() {
Expand Down

0 comments on commit 76e515a

Please sign in to comment.