Skip to content

Commit

Permalink
[FLINK-1483] IOManager puts temp files in dedicated directory and rem…
Browse files Browse the repository at this point in the history
…oves that on shutdown

This closes #417
  • Loading branch information
StephanEwen committed Feb 19, 2015
1 parent 681ea06 commit c1a334e
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 85 deletions.
Expand Up @@ -84,24 +84,33 @@ public static class ID {

private static final int RANDOM_BYTES_LENGTH = 16;

private final String path;
private final File path;

private final int threadNum;

protected ID(String path, int threadNum) {
protected ID(File path, int threadNum) {
this.path = path;
this.threadNum = threadNum;
}

protected ID(String basePath, int threadNum, Random random) {
this.path = basePath + File.separator + randomString(random) + ".channel";
protected ID(File basePath, int threadNum, Random random) {
this.path = new File(basePath, randomString(random) + ".channel");
this.threadNum = threadNum;
}

/**
* Returns the path to the underlying temporary file.
* @return The path to the underlying temporary file..
*/
public String getPath() {
return path.getAbsolutePath();
}

/**
* Returns the path to the underlying temporary file as a File.
* @return The path to the underlying temporary file as a File.
*/
public File getPathFile() {
return path;
}

Expand All @@ -126,11 +135,11 @@ public int hashCode() {

@Override
public String toString() {
return path;
return path.getAbsolutePath();
}

private static final String randomString(final Random random) {
final byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
private static String randomString(Random random) {
byte[] bytes = new byte[RANDOM_BYTES_LENGTH];
random.nextBytes(bytes);
return StringUtils.byteToHexString(bytes);
}
Expand All @@ -140,24 +149,23 @@ private static final String randomString(final Random random) {
* An enumerator for channels that logically belong together.
*/
public static final class Enumerator {

private static final String FORMAT = "%s%s%s.%06d.channel";

private final String[] paths;
private final File[] paths;

private final String namePrefix;

private int counter;

protected Enumerator(String[] basePaths, Random random) {
protected Enumerator(File[] basePaths, Random random) {
this.paths = basePaths;
this.namePrefix = ID.randomString(random);
this.counter = 0;
}

public ID next() {
final int threadNum = counter % paths.length;
return new ID(String.format(FORMAT, this.paths[threadNum], File.separator, namePrefix, (counter++)), threadNum);
int threadNum = counter % paths.length;
String filename = String.format(" %s.%06d.channel", namePrefix, (counter++));
return new ID(new File(paths[threadNum], filename), threadNum);
}
}
}
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.disk.iomanager;

import org.apache.commons.io.FileUtils;
import org.apache.flink.core.memory.MemorySegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,6 +27,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;

/**
Expand All @@ -37,41 +39,104 @@ public abstract class IOManager {
protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class);

/** The temporary directories for files */
private final String[] paths;
private final File[] paths;

/** A random number generator for the anonymous ChannelIDs. */
private final Random random;

/** The number of the next path to use. */
private volatile int nextPath;

/** Shutdown hook to make sure that the directories are removed on exit */
private final Thread shutdownHook;

// -------------------------------------------------------------------------
// Constructors / Destructors
// -------------------------------------------------------------------------

/**
* Constructs a new IOManager.
*
* @param paths
* the basic directory paths for files underlying anonymous channels.
* @param tempDirs The basic directories for files underlying anonymous channels.
*/
protected IOManager(String[] paths) {
this.paths = paths;
protected IOManager(String[] tempDirs) {
if (tempDirs == null || tempDirs.length == 0) {
throw new IllegalArgumentException("The temporary directories must not be null or empty.");
}

this.random = new Random();
this.nextPath = 0;

this.paths = new File[tempDirs.length];
for (int i = 0; i < tempDirs.length; i++) {
File baseDir = new File(tempDirs[i]);
String subfolder = String.format("flink-io-%s", UUID.randomUUID().toString());
File storageDir = new File(baseDir, subfolder);

if (!storageDir.exists() && !storageDir.mkdirs()) {
throw new RuntimeException(
"Could not create storage directory for IOManager: " + storageDir.getAbsolutePath());
}
paths[i] = storageDir;
LOG.info("I/O manager uses directory {} for spill files.", storageDir.getAbsolutePath());
}

this.shutdownHook = new Thread("I/O manager shutdown hook") {
@Override
public void run() {
shutdown();
}
};
Runtime.getRuntime().addShutdownHook(this.shutdownHook);
}

/**
* Close method, marks the I/O manager as closed.
* Close method, marks the I/O manager as closed
* and removed all temporary files.
*/
public abstract void shutdown();
public void shutdown() {
// remove all of our temp directories
for (File path : paths) {
try {
if (path != null) {
if (path.exists()) {
FileUtils.deleteDirectory(path);
LOG.info("I/O manager removed spill file directory {}", path.getAbsolutePath());
}
}
} catch (Throwable t) {
LOG.error("IOManager failed to properly clean up temp file directory: " + path, t);
}
}

// Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself
if (shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
}
catch (Throwable t) {
LOG.warn("Exception while unregistering IOManager's shutdown hook.", t);
}
}
}

/**
* Utility method to check whether the IO manager has been properly shut down.
* For this base implementation, this means that all files have been removed.
*
* @return True, if the IO manager has properly shut down, false otherwise.
*/
public abstract boolean isProperlyShutDown();
public boolean isProperlyShutDown() {
for (File path : paths) {
if (path != null && path.exists()) {
return false;
}
}
return true;
}

// ------------------------------------------------------------------------
// Channel Instantiations
Expand Down Expand Up @@ -107,7 +172,9 @@ public FileIOChannel.Enumerator createChannelEnumerator() {
*/
public void deleteChannel(FileIOChannel.ID channel) throws IOException {
if (channel != null) {
new File(channel.getPath()).delete();
if (channel.getPathFile().exists() && !channel.getPathFile().delete()) {
LOG.warn("IOManager failed to delete temporary file {}", channel.getPath());
}
}
}

Expand Down Expand Up @@ -193,7 +260,8 @@ public abstract BlockChannelReader createBlockChannelReader(FileIOChannel.ID cha
*/
public abstract BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
List<MemorySegment> targetSegments, int numBlocks) throws IOException;



// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkState;

Expand All @@ -38,12 +39,9 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle

/** The reader threads used for asynchronous block oriented channel reading. */
private final ReaderThread[] readers;

/** Lock object to guard shutdown */
private final Object shutdownLock = new Object();

/** Flag to mark the I/O manager as alive or shut down */
private volatile boolean shutdown;

/** Flag to signify that the IOManager has been shut down already */
private final AtomicBoolean isShutdown = new AtomicBoolean();

// -------------------------------------------------------------------------
// Constructors / Destructors
Expand Down Expand Up @@ -103,13 +101,12 @@ public IOManagerAsync(String[] tempDirs) {
*/
@Override
public void shutdown() {
synchronized (shutdownLock) {
if (shutdown) {
return;
}

shutdown = true;

// mark shut down and exit if it already was shut down
if (!isShutdown.compareAndSet(false, true)) {
return;
}

try {
if (LOG.isDebugEnabled()) {
LOG.debug("Shutting down I/O manager.");
}
Expand Down Expand Up @@ -141,7 +138,14 @@ public void shutdown() {
rt.join();
}
}
catch (InterruptedException iex) {}
catch (InterruptedException iex) {
// ignore this on shutdown
}
}
finally {
// make sure we all the super implementation in any case and at the last point,
// because this will clean up the I/O directories
super.shutdown();
}
}

Expand All @@ -160,17 +164,17 @@ public boolean isProperlyShutDown() {

boolean writersShutDown = true;
for (WriterThread wt : writers) {
readersShutDown &= wt.getState() == Thread.State.TERMINATED;
writersShutDown &= wt.getState() == Thread.State.TERMINATED;
}

return shutdown && writersShutDown && readersShutDown;
return isShutdown.get() && readersShutDown && writersShutDown && super.isProperlyShutDown();
}


@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
shutdown();
LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Shutting down I/O Manager.", e);
shutdown();
}

// ------------------------------------------------------------------------
Expand All @@ -181,13 +185,13 @@ public void uncaughtException(Thread t, Throwable e) {
public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
{
checkState(!shutdown, "I/O-Manger is closed.");
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
}

@Override
public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
checkState(!shutdown, "I/O-Manger is closed.");
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
}

Expand All @@ -205,7 +209,7 @@ public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID
public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
{
checkState(!shutdown, "I/O-Manger is closed.");
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
}

Expand All @@ -228,7 +232,7 @@ public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
List<MemorySegment> targetSegments, int numBlocks) throws IOException
{
checkState(!shutdown, "I/O-Manger is closed.");
checkState(!isShutdown.get(), "I/O-Manger is shut down.");
return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
}

Expand Down

0 comments on commit c1a334e

Please sign in to comment.