Skip to content

Commit

Permalink
Project dir cache enhancement (#2017)
Browse files Browse the repository at this point in the history
a race condition scenarios could happen when multiple azkaban executor process are running:
It's possible when two azkaban executor process perform deletion in the same azkaban project dir even when one executor is inactive. E.g, flow run on any executor by using useExecutor label.
If so, the list of azkaban project dir in memory(installedProjects) kept by azkaban executor process, will be out of sync from what's on the disk.

Another case is race condition between one executor process is deleting a project dir while another executor process is creating execution dir based on the project dir.

This PR

removes the installedProjects from executor. So every time a project needs to be downloaded, a scan of every project dir and calculation of total disk usage sum will be done to decide whether purging is needed. This could takes tens of seconds when number of project dir is >= 5000 but a few seconds with inode cache.

make project dir cleanup(deleteProjectDirsIfNecessary) synchronized. Since the method is a check-then-act process which is vulnerable to race condition when multiple threads are doing deletion. An alternative is to synchronize on an interned string of project id+project version(https://stackoverflow.com/questions/133988/synchronizing-on-string-objects-in-java), however this is not that elegant as the linked post points out. Synchronization on the object level makes sense given flow setup is low frequency operation in most cases(<= 5 ops/mins in our production environment).

when project dir is created, another metadata file keeping the file count is created. The purpose of it is is to address the race condition between one executor process is deleting a project dir while another executor process is creating execution dir based on the project dir. A sanity check on the file count will be conducted against created execution dir. If execution dir's file count is not same as base project dir, then fail the flow setup and let azkaban web server dispatch it again.

Note even with this fix, there still could be race conditions. E.g, when two executor process are calling ProjectCacheDirCleaner#deleteProjectDirsIfNecessary, one might delete a dir while the other is loading the same dir.

A potential long term fix: #2020

Follow-up
add file count sanity check mentioned above.
  • Loading branch information
burgerkingeater committed Nov 14, 2018
1 parent 5044552 commit 1d251bc
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 162 deletions.
15 changes: 14 additions & 1 deletion azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
Expand Up @@ -72,6 +72,19 @@ public static boolean isDirWritable(final File dir) {
return true;
}

public static int getFileCount(final File file) {
final File[] files = file.listFiles();
int count = 0;
for (final File f : files) {
if (f.isDirectory()) {
count += getFileCount(f);
} else {
count++;
}
}
return count;
}


/**
* Dumps a number into a new file.
Expand All @@ -81,7 +94,7 @@ public static boolean isDirWritable(final File dir) {
* @throws IOException if file already exists
*/
public static void dumpNumberToFile(final Path filePath, final long num) throws IOException {
try (BufferedWriter writer = Files
try (final BufferedWriter writer = Files
.newBufferedWriter(filePath, StandardCharsets.UTF_8)) {
writer.write(String.valueOf(num));
} catch (final IOException e) {
Expand Down
Expand Up @@ -134,6 +134,11 @@ public void testDumpNumberToExistingFile() throws IOException {
.isInstanceOf(IOException.class).hasMessageContaining("already exists");
}

@Test
public void testFileCount() {
assertThat(FileIOUtils.getFileCount(this.baseDir)).isEqualTo(5);
}

@Test
public void testHardlinkCopy() throws IOException {
FileIOUtils.createDeepHardlink(this.sourceDir, this.destDir);
Expand Down
201 changes: 130 additions & 71 deletions azkaban-exec-server/src/main/java/azkaban/execapp/FlowPreparer.java
Expand Up @@ -25,19 +25,20 @@
import azkaban.project.ProjectManagerException;
import azkaban.storage.StorageManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Pair;
import azkaban.utils.Utils;
import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipFile;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
Expand All @@ -47,23 +48,23 @@ public class FlowPreparer {

// Name of the file which keeps project directory size
static final String PROJECT_DIR_SIZE_FILE_NAME = "___azkaban_project_dir_size_in_bytes___";

// Name of the file which keeps count of files inside the directory
static final String PROJECT_DIR_COUNT_FILE_NAME = "___azkaban_project_dir_count___";

private static final Logger log = Logger.getLogger(FlowPreparer.class);
// TODO spyne: move to config class
private final File executionsDir;
// TODO spyne: move to config class
private final File projectsDir;
private final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects;
private final StorageManager storageManager;
private final ProjectCacheDirCleaner projectDirCleaner;

public FlowPreparer(final StorageManager storageManager, final File executionsDir,
final File projectsDir,
final Map<Pair<Integer, Integer>, ProjectVersion> installedProjects,
final Long projectDirMaxSizeInMb) {
final File projectsDir, final Long projectDirMaxSizeInMb) {
this.storageManager = storageManager;
this.executionsDir = executionsDir;
this.projectsDir = projectsDir;
this.installedProjects = installedProjects;
this.projectDirCleaner = new ProjectCacheDirCleaner(projectDirMaxSizeInMb);

}
Expand All @@ -76,16 +77,38 @@ public FlowPreparer(final StorageManager storageManager, final File executionsDi
* @param pv the projectVersion whose size needs to updated.
*/
static void updateDirSize(final File dir, final ProjectVersion pv) {
final long sizeInByte = FileUtils.sizeOfDirectory(dir);
pv.setDirSizeInBytes(sizeInByte);
try {
FileIOUtils.dumpNumberToFile(Paths.get(dir.getPath(), PROJECT_DIR_SIZE_FILE_NAME),
sizeInByte);
final Path path = Paths.get(dir.getPath(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
if (!Files.exists(path)) {
final long sizeInByte = FileUtils.sizeOfDirectory(dir);
FileIOUtils.dumpNumberToFile(path, sizeInByte);
}
pv.setDirSizeInBytes(FileIOUtils.readNumberFromFile(path));
} catch (final IOException e) {
log.error("error when dumping dir size to file", e);
}
}

/**
* Creates a file which keeps the count of files inside {@param dir}
*
* @param dir the directory whose size needs to be kept in the file to be created.
* @param pv the projectVersion whose size needs to updated.
*/
static void updateFileCount(final File dir, final ProjectVersion pv) {
try {
final Path path = Paths.get(dir.getPath(), PROJECT_DIR_COUNT_FILE_NAME);
if (!Files.exists(path)) {
// count itself
final int fileCount = FileIOUtils.getFileCount(dir) + 1;
FileIOUtils.dumpNumberToFile(path, fileCount);
}
pv.setFileCount((int) FileIOUtils.readNumberFromFile(path));
} catch (final IOException e) {
log.error("error when dumping file count to file", e);
}
}

/**
* Prepare the flow directory for execution.
*
Expand All @@ -95,22 +118,21 @@ void setup(final ExecutableFlow flow) {
File execDir = null;
try {
// First get the ProjectVersion
final ProjectVersion projectVersion = getProjectVersion(flow);
final ProjectVersion projectVersion = new ProjectVersion(flow.getProjectId(),
flow.getVersion());

// Setup the project
setupProject(projectVersion);

// Create the execution directory
execDir = createExecDir(flow);

// Synchronized on {@code projectVersion} to prevent one thread deleting a project dir
// in {@link FlowPreparer#setup} while another is creating hardlink from the same project dir
synchronized (projectVersion) {
// Create the symlinks from the project
copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);
log.info(String.format("Flow Preparation complete. [execid: %d, path: %s]",
flow.getExecutionId(), execDir.getPath()));
}
// Create the symlinks from the project
copyCreateHardlinkDirectory(projectVersion.getInstalledDir(), execDir);

log.info(String
.format("Flow Preparation complete. [execid: %d, path: %s]", flow.getExecutionId(),
execDir.getPath()));
} catch (final Exception e) {
log.error("Error in setting up project directory: " + this.projectsDir + ", Exception: " + e);
cleanup(execDir);
Expand Down Expand Up @@ -176,17 +198,18 @@ void setupProject(final ProjectVersion pv)

ProjectFileHandler projectFileHandler = null;
try {
projectFileHandler = requireNonNull(this.storageManager.getProjectFile(projectId, version));
log.info(String.format("Downloading zip file for Project Version {%s}", pv));
projectFileHandler = requireNonNull(
this.storageManager.getProjectFile(pv.getProjectId(), pv.getVersion()));
checkState("zip".equals(projectFileHandler.getFileType()));

log.info("Downloading zip file.");
final File zipFile = requireNonNull(projectFileHandler.getLocalFile());
final ZipFile zip = new ZipFile(zipFile);
Utils.unzip(zip, tempDir);
updateDirSize(tempDir, pv);
updateFileCount(tempDir, pv);
log.info(String.format("Downloading zip file for Project Version {%s} completes", pv));
this.projectDirCleaner.deleteProjectDirsIfNecessary(pv.getDirSizeInBytes());
Files.move(tempDir.toPath(), pv.getInstalledDir().toPath(), StandardCopyOption.ATOMIC_MOVE);
this.installedProjects.put(new Pair<>(pv.getProjectId(), pv.getVersion()), pv);
log.warn(String.format("Project preparation completes. [%s]", pv));
} finally {
if (projectFileHandler != null) {
Expand All @@ -212,84 +235,120 @@ private File createExecDir(final ExecutableFlow flow) {
return execDir;
}

private ProjectVersion getProjectVersion(final ExecutableFlow flow) {
// We're setting up the installed projects. First time, it may take a while
// to set up.
final ProjectVersion projectVersion;
synchronized (this.installedProjects) {
final Pair<Integer, Integer> pair = new Pair<>(flow.getProjectId(), flow.getVersion());
projectVersion = this.installedProjects.getOrDefault(pair, new ProjectVersion(flow
.getProjectId(), flow.getVersion()));
}
return projectVersion;
}

private class ProjectCacheDirCleaner {

private final Long projectDirMaxSizeInMb;

/*
* Delete the project dir associated with {@code version}.
*/
private void deleteDirectory(final ProjectVersion pv) throws IOException {
final File installedDir = pv.getInstalledDir();
if (installedDir != null && installedDir.exists()) {
FileUtils.deleteDirectory(installedDir);
}
}

ProjectCacheDirCleaner(final Long projectDirMaxSizeInMb) {
this.projectDirMaxSizeInMb = projectDirMaxSizeInMb;
}

private List<Path> loadAllProjectDirs() {
final List<Path> projects = new ArrayList<>();
for (final File project : FlowPreparer.this.projectsDir.listFiles(new FilenameFilter() {

String pattern = "[0-9]+\\.[0-9]+";

@Override
public boolean accept(final File dir, final String name) {
return name.matches(this.pattern);
}
})) {
if (project.exists() && project.isDirectory()) {
projects.add(project.toPath());
} else {
FlowPreparer.log
.debug(String.format("project %s doesn't exist or is non-dir.", project.getName()));
}
}
return projects;
}

private List<ProjectVersion> loadAllProjects() {
final List<ProjectVersion> allProjects = new ArrayList<>();
for (final Path project : this.loadAllProjectDirs()) {
try {
final String fileName = project.getFileName().toString();
final int projectId = Integer.parseInt(fileName.split("\\.")[0]);
final int versionNum = Integer.parseInt(fileName.split("\\.")[1]);

final ProjectVersion projVersion =
new ProjectVersion(projectId, versionNum, project.toFile());

FlowPreparer.updateDirSize(projVersion.getInstalledDir(), projVersion);
FlowPreparer.updateFileCount(projVersion.getInstalledDir(), projVersion);

final Path projectDirFileCount = Paths.get(projVersion.getInstalledDir().toString(),
FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);

projVersion.setLastAccessTime(Files.getLastModifiedTime(projectDirFileCount));
allProjects.add(projVersion);
} catch (final Exception e) {
FlowPreparer.log
.error(String.format("error while loading project dir metadata for project %s",
project.getFileName()), e);
}
}
return allProjects;
}

/**
* @return sum of the size of all project dirs
*/
private long getProjectDirsTotalSizeInBytes() throws IOException {
private long getProjectDirsTotalSizeInBytes(final List<ProjectVersion> allProjects) {
long totalSizeInBytes = 0;
for (final ProjectVersion version : FlowPreparer.this.installedProjects.values()) {
for (final ProjectVersion version : allProjects) {
totalSizeInBytes += version.getDirSizeInBytes();
}
return totalSizeInBytes;
}

private FileTime getLastReferenceTime(final ProjectVersion pv) throws IOException {
final Path dirSizeFile = Paths
.get(pv.getInstalledDir().toPath().toString(), FlowPreparer.PROJECT_DIR_SIZE_FILE_NAME);
return Files.getLastModifiedTime(dirSizeFile);
}

private void deleteLeastRecentlyUsedProjects(long sizeToFreeInBytes,
final List<ProjectVersion>
projectVersions) throws IOException {
final List<ProjectVersion> projectVersions) {
// sort project version by last reference time in ascending order
try {
projectVersions.sort((o1, o2) -> {
try {
final FileTime lastReferenceTime1 = getLastReferenceTime(o1);
final FileTime lastReferenceTime2 = getLastReferenceTime(o2);
return lastReferenceTime1.compareTo(lastReferenceTime2);
} catch (final IOException ex) {
throw new RuntimeException(ex);
}
});
} catch (final RuntimeException ex) {
throw new IOException(ex);
}

projectVersions.sort(Comparator.comparing(ProjectVersion::getLastAccessTime));
for (final ProjectVersion version : projectVersions) {
if (sizeToFreeInBytes > 0) {
try {
// delete the project directory even if flow within is running. It's ok to
// delete the project directory even if flow within is running. It's OK to
// delete the directory since execution dir is HARD linked to project dir.
FlowRunnerManager.deleteDirectory(version);
FlowPreparer.this.installedProjects.remove(new Pair<>(version.getProjectId(), version
.getVersion()));
FlowPreparer.log.info(String.format("deleting project version %s", version));
deleteDirectory(version);
sizeToFreeInBytes -= version.getDirSizeInBytes();
} catch (final IOException ex) {
log.error(ex);
FlowPreparer.log.error(ex);
}
}
}
}

void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) throws IOException {
final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes();
if (this.projectDirMaxSizeInMb != null
&& (currentSpaceInBytes + spaceToDeleteInBytes) >= this
.projectDirMaxSizeInMb * 1024 * 1024) {
deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes,
new ArrayList<>(FlowPreparer.this.installedProjects.values()));
synchronized void deleteProjectDirsIfNecessary(final long spaceToDeleteInBytes) {
if (this.projectDirMaxSizeInMb != null) {
final long start = System.currentTimeMillis();
final List<ProjectVersion> allProjects = loadAllProjects();
FlowPreparer.log
.debug(String.format("loading all project dirs metadata completes in %s sec(s)",
Duration.ofSeconds(System.currentTimeMillis() - start).getSeconds()));

final long currentSpaceInBytes = getProjectDirsTotalSizeInBytes(allProjects);
if (currentSpaceInBytes + spaceToDeleteInBytes
>= this.projectDirMaxSizeInMb * 1024 * 1024) {
FlowPreparer.log.info(String.format("Project dir disk usage[%s bytes] exceeds the "
+ "limit[%s mb], start cleaning up project dirs",
currentSpaceInBytes + spaceToDeleteInBytes, this.projectDirMaxSizeInMb.longValue()));
deleteLeastRecentlyUsedProjects(spaceToDeleteInBytes, allProjects);
}
}
}
}
Expand Down

0 comments on commit 1d251bc

Please sign in to comment.