Skip to content

Commit

Permalink
project LRU cache part 2 (#1865)
Browse files Browse the repository at this point in the history
The problem this PR targets to solve is detailed in #1803

Drawback of previous design is detailed in #1841

New design:

Create a file in each project directory and write the size of the project to the file when the project is created. The project files are not supposed to change after creation. Touch this file each time the project is used. This way, we can have a more efficient LRU algorithm based on last access time, not creation time.

Maintain the total size of the project cache in memory to avoid the overhead of re-calculating it. The size shouldn't change too often. This way we can afford to run the check more frequently. Project dir size check and corresponding deletion will be performed when a new project is downloaded.

This PR implements part 2.

Next step is to shorten execution dir retention period to really free up space, given there's always a hard link from execution to project directory.
  • Loading branch information
burgerkingeater committed Jul 25, 2018
1 parent c6509ee commit 0408919
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 79 deletions.
3 changes: 3 additions & 0 deletions az-core/src/main/java/azkaban/Constants.java
Expand Up @@ -226,6 +226,9 @@ public static class ConfigurationKeys {
public static final String QUEUEPROCESSING_ENABLED = "azkaban.queueprocessing.enabled";

public static final String SESSION_TIME_TO_LIVE = "session.time.to.live";

// allowed max size of shared project dir in MB
public static final String PROJECT_DIR_MAX_SIZE_IN_MB = "azkaban.project_cache_max_size_in_mb";
}

public static class FlowProperties {
Expand Down
113 changes: 97 additions & 16 deletions azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
Expand Up @@ -35,6 +35,8 @@
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
Expand All @@ -52,16 +54,21 @@ public class FlowPreparer {
private final File projectsDir;
private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
private final StorageManager storageManager;
private final ProjectCacheDirCleaner projectDirCleaner;

public FlowPreparer(final StorageManager storageManager, final File executionsDir,
final File projectsDir,
final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects) {
final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects,
final Long projectDirMaxSizeInMb) {
this.storageManager = storageManager;
this.executionsDir = executionsDir;
this.projectsDir = projectsDir;
this.installedProjects = installedProjects;
this.projectDirCleaner = new ProjectCacheDirCleaner(projectDirMaxSizeInMb);

}


/**
* Prepare the flow directory for execution.
*
Expand All @@ -79,18 +86,31 @@ void setup(final ExecutableFlow flow) {
// Create the execution directory
execDir = createExecDir(flow);

// Create the symlinks from the project
copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);

log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
flow.getExecutionId(), execDir.getPath()));
// Synchronized on {@code projectVersion} to prevent one thread deleting a project dir
// in {@link FlowPreparer#setup} while another is creating hardlink from the same project dir
synchronized (projectVersion) {
// Create the symlinks from the project
copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
flow.getExecutionId(), execDir.getPath()));
}
} catch (final Exception e) {
log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
cleanup(execDir);
throw new RuntimeException(e);
}
}

private void cleanup(final File execDir) {
if (execDir != null) {
try {
FileUtils.deleteDirectory(execDir);
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}

/**
* Touch the file if it exists.
*
Expand All @@ -105,7 +125,6 @@ void touchIfExists(final Path path) {
}
}


/**
* Prepare the project directory.
*
Expand Down Expand Up @@ -148,7 +167,9 @@ void setupProject(final ProjectVersion pv)
final ZipFile zip = new ZipFile(zipFile);
Utils.unzip(zip, tempDir);
updateDirSize(tempDir, pv);
this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
this.installedProjects.put(new Pair<>(pv.getProjectId(), pv.getVersion()), pv);
log.warn(String.format("Project preparation completes. [%s]", pv));
} finally {
if (projectFileHandler != null) {
Expand All @@ -168,7 +189,7 @@ void setupProject(final ProjectVersion pv)
*/
private void updateDirSize(final File dir, final ProjectVersion pv) {
final long sizeInByte = FileUtils.sizeOfDirectory(dir);
pv.setDirSize(sizeInByte);
pv.setDirSizeInBytes(sizeInByte);
try {
FileIOUtils.dumpNumberToFile(Paths.get(dir.getPath(), PROJECT_DIR_SIZE_FILE_NAME),
sizeInByte);
Expand Down Expand Up @@ -197,19 +218,79 @@ private ProjectVersion getProjectVersion(final ExecutableFlow flow) {
// to set up.
final ProjectVersion projectVersion;
synchronized (this.installedProjects) {
projectVersion = this.installedProjects
.computeIfAbsent(new Pair<>(flow.getProjectId(), flow.getVersion()),
k -> new ProjectVersion(flow.getProjectId(), flow.getVersion()));
final Pair<Integer, Integer> pair = new Pair<>(flow.getProjectId(), flow.getVersion());
projectVersion = this.installedProjects.getOrDefault(pair, new ProjectVersion(flow
.getProjectId(), flow.getVersion()));
}
return projectVersion;
}

private void cleanup(final File execDir) {
if (execDir != null) {
private class ProjectCacheDirCleaner {

private final Long projectDirMaxSizeInMb;

ProjectCacheDirCleaner(final Long projectDirMaxSizeInMb) {
this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
}

/**
* @return sum of the size of all project dirs
*/
private long getProjectDirsTotalSizeInBytes() throws IOException {
long totalSizeInBytes = 0;
for (final ProjectVersion version : FlowPreparer.this.installedProjects.values()) {
totalSizeInBytes += version.getDirSizeInBytes();
}
return totalSizeInBytes;
}

private FileTime getLastReferenceTime(final ProjectVersion pv) throws IOException {
final Path dirSizeFile = Paths
.get(pv.getInstalledDir().toPath().toString(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
return Files.getLastModifiedTime(dirSizeFile);
}

private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
final List<ProjectVersion>
projectVersions) throws IOException {
// sort project version by last reference time in ascending order
try {
FileUtils.deleteDirectory(execDir);
} catch (final IOException e) {
throw new RuntimeException(e);
projectVersions.sort((o1, o2) -> {
try {
final FileTime lastReferenceTime1 = getLastReferenceTime(o1);
final FileTime lastReferenceTime2 = getLastReferenceTime(o2);
return lastReferenceTime1.compareTo(lastReferenceTime2);
} catch (final IOException ex) {
throw new RuntimeException(ex);
}
});
} catch (final RuntimeException ex) {
throw new IOException(ex);
}

for (final ProjectVersion version : projectVersions) {
if (sizeToFreeInBytes > 0) {
try {
// delete the project directory even if flow within is running. It's ok to
// delete the directory since execution dir is HARD linked to project dir.
FlowRunnerManager.deleteDirectory(version);
FlowPreparer.this.installedProjects.remove(new Pair<>(version.getProjectId(), version
.getVersion()));
sizeToFreeInBytes -= version.getDirSizeInBytes();
} catch (final IOException ex) {
log.error(ex);
}
}
}
}

void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) throws IOException {
final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes();
if (this.projectDirMaxSizeInMb != null
&& (currentSpaceInBytes + spaceToDeleteInBytes) >= this
.projectDirMaxSizeInMb * 1024 * 1024) {
deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes,
new ArrayList<>(FlowPreparer.this.installedProjects.values()));
}
}
}
Expand Down

0 comments on commit 0408919

Please sign in to comment.