Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-27317: Temporary (local) session files cleanup improvements #4403

Merged
merged 1 commit into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package org.apache.hadoop.hive.ql.session;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.util.UUID;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.util.Shell;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -43,6 +46,8 @@ public class TestClearDanglingScratchDir {
private ByteArrayOutputStream stderr;
private PrintStream origStdoutPs;
private PrintStream origStderrPs;
private static Path customScratchDir;
private static Path customLocalTmpDir;

@BeforeClass
static public void oneTimeSetup() throws Exception {
Expand All @@ -64,6 +69,11 @@ static public void oneTimeSetup() throws Exception {
@AfterClass
static public void shutdown() throws Exception {
m_dfs.shutdown();

// Need to make sure deleting in correct FS
FileSystem fs = customScratchDir.getFileSystem(new Configuration());
fs.delete(customScratchDir, true);
fs.delete(customLocalTmpDir, true);
}

public void redirectStdOutErr() {
Expand Down Expand Up @@ -129,4 +139,71 @@ public void testClearDanglingScratchDir() throws Exception {
Assert.assertEquals(StringUtils.countMatches(stderr.toString(), "removed"), 1);
ss.close();
}

/**
* Testing behaviour of ClearDanglingScratchDir service over local tmp files/dirs
* @throws Exception
*/
@Test
public void testLocalDanglingFilesCleaning() throws Exception {
HiveConf conf = new HiveConf();
conf.set("fs.default.name", "file:///");
String tmpDir = System.getProperty("test.tmp.dir");
conf.set("hive.exec.scratchdir", tmpDir + "/hive-27317-hdfsscratchdir");
conf.set("hive.exec.local.scratchdir", tmpDir + "/hive-27317-localscratchdir");
FileSystem fs = FileSystem.get(conf);

// Constants
String appId = "appId_" + System.currentTimeMillis();
String userName = System.getProperty("user.name");
String hdfs = "hdfs";
String inuse = "inuse.lck";
String l = File.separator;

// Simulating hdfs dangling dir and its inuse.lck file
// Note: Give scratch dirs all the write permissions
FsPermission allPermissions = new FsPermission((short)00777);
customScratchDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
Utilities.createDirsWithPermission(conf, customScratchDir, allPermissions, true);
Path hdfsRootDir = new Path(customScratchDir + l + userName + l + hdfs);
Path hdfsSessionDir = new Path(hdfsRootDir + l + userName + l + appId);
Path hdfsSessionLock = new Path(hdfsSessionDir + l + inuse);
fs.create(hdfsSessionLock);

// Simulating local dangling files
customLocalTmpDir = new Path (HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR));
Path localSessionDir = new Path(customLocalTmpDir + l + appId);
Path localPipeOutFileRemove = new Path(customLocalTmpDir + l
+ appId + "-started-with-session-name.pipeout");
Path localPipeOutFileNotRemove = new Path(customLocalTmpDir + l
+ "not-started-with-session-name-" + appId + ".pipeout");
Path localPipeOutFileFailRemove = new Path(customLocalTmpDir + l
+ appId + "-started-with-session-name-but-fail-delete.pipeout");

// Create dirs/files
Utilities.createDirsWithPermission(conf, localSessionDir, allPermissions, true);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local scratchdir is all writable.

fs.create(localPipeOutFileRemove);
fs.create(localPipeOutFileNotRemove);
fs.create(localPipeOutFileFailRemove);

// Set permission for localPipeOutFileFailRemove file as not writable
// This will avoid file to be deleted as we check whether it is writable or not first
fs.setPermission(localPipeOutFileFailRemove, FsPermission.valueOf("-r--r--r--"));

// The main service will be identifying which session files/dirs are dangling
ClearDanglingScratchDir clearDanglingScratchDirMain = new ClearDanglingScratchDir(false,
false, true, hdfsRootDir.toString(), conf);
clearDanglingScratchDirMain.run();

// localSessionDir and localPipeOutFileRemove should be removed
// localPipeOutFileNotRemove and localPipeOutFileFailRemove should not be removed
Assert.assertFalse("Local session dir '" + localSessionDir
+ "' still exists, should have been removed!", fs.exists(localSessionDir));
Assert.assertFalse("Local .pipeout file '" + localPipeOutFileRemove
+ "' still exists, should have been removed!", fs.exists(localPipeOutFileRemove));
Assert.assertTrue("Local .pipeout file '" + localPipeOutFileNotRemove
+ "' does not exist, should have not been removed!", fs.exists(localPipeOutFileNotRemove));
Assert.assertTrue("Local .pipeout file '" + localPipeOutFileFailRemove
+ "' does not exist, should have not been removed!", fs.exists(localPipeOutFileFailRemove));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.hadoop.hive.ql.session;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
Expand Down Expand Up @@ -53,6 +56,9 @@
* lease after 10 min, ie, the HDFS file hold by the dead HiveCli/HiveServer2 is writable
* again after 10 min. Once it become writable, cleardanglingscratchDir will be able to
* remove it
* 4. Additional functionality; once it is decided which session scratch dirs are residual,
* while removing them from hdfs, we will remove them from local tmp location as well.
* Please see {@link ClearDanglingScratchDir#removeLocalTmpFiles(String, String)}.
*/
public class ClearDanglingScratchDir implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ClearDanglingScratchDir.class);
Expand Down Expand Up @@ -141,25 +147,26 @@ public void run() {
// if the file is currently held by a writer
if(AlreadyBeingCreatedException.class.getName().equals(eAppend.getClassName())){
inuse = true;
} else if (UnsupportedOperationException.class.getName().equals(eAppend.getClassName())) {
// Append is not supported in the cluster, try to use create
try {
IOUtils.closeStream(fs.create(lockFilePath, false));
} catch (RemoteException eCreate) {
if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
// If the file is held by a writer, will throw AlreadyBeingCreatedException
inuse = true;
} else {
consoleMessage("Unexpected error:" + eCreate.getMessage());
}
} catch (FileAlreadyExistsException eCreateNormal) {
// Otherwise, throw FileAlreadyExistsException, which means the file owner is
// dead
removable = true;
}
} else {
consoleMessage("Unexpected error:" + eAppend.getMessage());
}
} catch (UnsupportedOperationException eUnsupported) {
// In Hadoop-3, append method is not supported.
// This is an alternative check to make sure whether a file is in use or not.
// Trying to open the file. If it is in use, it will throw IOException.
try {
IOUtils.closeStream(fs.create(lockFilePath, false));
} catch (RemoteException eCreate) {
if (AlreadyBeingCreatedException.class.getName().equals(eCreate.getClassName())){
// If the file is held by a writer, will throw AlreadyBeingCreatedException
inuse = true;
} else {
consoleMessage("Unexpected error:" + eCreate.getMessage());
}
} catch (FileAlreadyExistsException eCreateNormal) {
// Otherwise, throw FileAlreadyExistsException, which means the file owner is dead
removable = true;
}
}
if (inuse) {
// Cannot open the lock file for writing, must be held by a live process
Expand All @@ -179,6 +186,7 @@ public void run() {
return;
}
consoleMessage("Removing " + scratchDirToRemove.size() + " scratch directories");
String localTmpDir = HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR);
for (Path scratchDir : scratchDirToRemove) {
if (dryRun) {
System.out.println(scratchDir);
Expand All @@ -192,6 +200,8 @@ public void run() {
consoleMessage(message);
}
}
// cleaning up on local file system as well
removeLocalTmpFiles(scratchDir.getName(), localTmpDir);
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -236,4 +246,29 @@ static Options createOptions() {

return result;
}

/**
* While deleting dangling scratch dirs from hdfs, we can clean corresponding local files as well
* @param sessionName prefix to determine removable tmp files
* @param localTmpdir local tmp file location
*/
private void removeLocalTmpFiles(String sessionName, String localTmpdir) {
File[] files = new File(localTmpdir).listFiles(fn -> fn.getName().startsWith(sessionName));
boolean success;
if (files != null) {
for (File file : files) {
success = false;
if (file.canWrite()) {
success = file.delete();
}
if (success) {
consoleMessage("While removing '" + sessionName + "' dangling scratch dir from HDFS, "
+ "local tmp session file '" + file.getPath() + "' has been cleaned as well.");
} else if (file.getName().startsWith(sessionName)) {
consoleMessage("Even though '" + sessionName + "' is marked as dangling session dir, "
+ "local tmp session file '" + file.getPath() + "' could not be removed.");
}
}
}
}
}