From fb12b652d917d24163f455f30d9c414f808b6cf7 Mon Sep 17 00:00:00 2001 From: Sankar Hariappan Date: Sun, 1 Jul 2018 22:48:06 +0530 Subject: [PATCH 1/4] HIVE-20025: Clean-up of event files created by HiveProtoLoggingHook. --- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 ++ .../TestHiveProtoEventsCleanerTask.java | 141 ++++++++++++++ .../metastore/HiveProtoEventsCleanerTask.java | 176 ++++++++++++++++++ .../hive/ql/hooks/HiveProtoLoggingHook.java | 15 +- .../ql/hooks/TestHiveProtoLoggingHook.java | 2 +- .../hive/metastore/conf/MetastoreConf.java | 3 +- 6 files changed, 339 insertions(+), 9 deletions(-) create mode 100644 itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java create mode 100644 metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7ef22d6ff276..2da908673169 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -625,6 +625,17 @@ public static enum ConfVars { "Table alias will be added to column names for queries of type \"select *\" or \n" + "if query explicitly uses table alias \"select r1.x..\"."), + HIVE_PROTO_EVENTS_BASE_PATH("hive.hook.proto.base-directory", "", + "Base directory into which the proto event messages are written by HiveProtoLoggingHook."), + HIVE_PROTO_EVENTS_QUEUE_CAPACITY("hive.hook.proto.queue.capacity", 64, + "Queue capacity for the proto events logging threads."), + HIVE_PROTO_EVENTS_CLEAN_FREQ("hive.hook.proto.events.clean.freq", "1d", + new TimeValidator(TimeUnit.DAYS), + "Frequency at which timer task runs to purge expired proto event files."), + HIVE_PROTO_EVENTS_TTL("hive.hook.proto.events.ttl", "7d", + new TimeValidator(TimeUnit.DAYS), + "Time-To-Live (TTL) of proto event files before cleanup."), + // Hadoop Configuration Properties // Properties with null values are ignored and exist only for the purpose of giving us // a symbolic name to reference in the Hive source code. Properties with non-null diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java new file mode 100644 index 000000000000..e187fadb990d --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveProtoEventsCleanerTask.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.metastore; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask; +import org.apache.hadoop.hive.metastore.utils.JavaUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestHiveProtoEventsCleanerTask { + protected static final Logger LOG = LoggerFactory.getLogger(TestHiveProtoEventsCleanerTask.class); + + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + private Path baseDir; + private HiveConf hiveConf; + private SystemClock clock = SystemClock.getInstance(); + private HiveProtoEventsCleanerTask cleanerTask; + private FileSystem fs; + + private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" }; + + @Before + public void setup() throws Exception { + hiveConf = new HiveConf(TestHiveProtoEventsCleanerTask.class); + String tmpFolder = folder.newFolder().getAbsolutePath(); + hiveConf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder + "/" + eventsSubDirs[0]); + HiveConf.setTimeVar(hiveConf, ConfVars.HIVE_PROTO_EVENTS_TTL, 2, TimeUnit.DAYS); + + baseDir = new Path(tmpFolder); + fs = baseDir.getFileSystem(hiveConf); + cleanerTask = JavaUtils.newInstance(HiveProtoEventsCleanerTask.class); + cleanerTask.setConf(hiveConf); + } + + /** + * Returns the current date, using the underlying clock in UTC time. + */ + private LocalDate getNow() { + // Use UTC date to ensure reader date is same on all timezones. + return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC).toLocalDate(); + } + + /** + * Returns the directory name for a given date. + */ + public String getDirForDate(LocalDate date) { + return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date); + } + + private void addDatePartition(Path basePath, LocalDate date) throws Exception { + if (!fs.exists(basePath)) { + fs.mkdirs(basePath); + fs.setPermission(basePath, FsPermission.createImmutable((short)01777)); + } + + Path datePtn = new Path(basePath, getDirForDate(date)); + fs.mkdirs(datePtn); + fs.setPermission(datePtn, FsPermission.createImmutable((short) 01777)); + FsPermission.setUMask(hiveConf, FsPermission.createImmutable((short) 0066)); + Path partFile = new Path(datePtn, "data"); + FSDataOutputStream out = fs.create(partFile); + out.writeInt(1000); + out.close(); + } + + @Test + public void testCleanup() throws Exception { + int[] inRange = { 3, 5, 2, 1 }; // Must have one entry per eventsSubDirs + int[] outRange = { 2, 2, 2, 1 }; // Must have one entry per eventsSubDirs + LocalDate today = getNow(); + + // Add partitions for the given range of dates from today to past. + for (int i = 0; i < inRange.length; i++) { + Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]); + for (int j = 0; j < inRange[i]; j++) { + addDatePartition(basePath, today.minusDays(j)); + } + } + + // Run the task to cleanup + cleanerTask.run(); + + // Verify if the remaining partitions are not expired ones. + String expiredPtn = getDirForDate(today.minusDays(2)); + for (int i = 0; i < inRange.length; i++) { + Path basePath = new Path(baseDir + "/" + eventsSubDirs[i]); + FileStatus[] statuses = fs.listStatus(basePath); + + // If the test setup created today and if test runs tomorrow, then extra dir will be deleted. + // So, checking for both cases. + assertTrue((statuses.length == outRange[i]) || (statuses.length == (outRange[i] - 1))); + for (FileStatus status : statuses) { + assertTrue(status.getPath().getName().compareTo(expiredPtn) >= 0); + } + } + } +} diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java new file mode 100644 index 000000000000..df25a6f0c4c6 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.metastore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.util.SystemClock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class HiveProtoEventsCleanerTask implements MetastoreTaskThread { + public static final Logger LOG = LoggerFactory.getLogger(HiveProtoEventsCleanerTask.class); + + private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" }; + private List eventsBasePaths = new ArrayList<>(eventsSubDirs.length); + private Configuration conf; + private static long ttl; + private static String expiredDatePtn = null; + private static final SystemClock clock = SystemClock.getInstance(); + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + ttl = HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_TTL, TimeUnit.MILLISECONDS); + + Path hiveEventsBasePath = new Path(HiveConf.getVar(conf, ConfVars.HIVE_PROTO_EVENTS_BASE_PATH)); + Path baseDir = hiveEventsBasePath.getParent(); + for (String subDir : eventsSubDirs) { + eventsBasePaths.add(new Path(baseDir, subDir)); + } + assert(eventsBasePaths.get(0).equals(hiveEventsBasePath)); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public long runFrequency(TimeUnit unit) { + return HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_CLEAN_FREQ, unit); + } + + @Override + public void run() { + // Expired date should be computed each time we run cleaner thread. + expiredDatePtn = getExpiredDatePtn(); + for (Path basePath : eventsBasePaths) { + cleanupDir(basePath); + } + } + + /** + * Returns the expired date partition, using the underlying clock in UTC time. + */ + private static String getExpiredDatePtn() { + // Use UTC date to ensure reader date is same on all timezones. + LocalDate expiredDate + = LocalDateTime.ofEpochSecond((clock.getTime() - ttl) / 1000, 0, ZoneOffset.UTC).toLocalDate(); + return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(expiredDate); + } + + /** + * Path filters to include only expired date partitions based on TTL. + */ + private static final PathFilter expiredDatePartitionsFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + String dirName = path.getName(); + return ((dirName.startsWith("date=")) + && (dirName.compareTo(expiredDatePtn) <= 0)); + } + }; + + /** + * Finds the expired date partitioned events directory based on TTL and delete them. + */ + private void cleanupDir(Path eventsBasePath) { + LOG.debug("Trying to delete expired proto events from " + eventsBasePath); + try { + FileSystem fs = FileSystem.get(eventsBasePath.toUri(), conf); + if (!fs.exists(eventsBasePath)) { + return; + } + FileStatus[] statuses = fs.listStatus(eventsBasePath, expiredDatePartitionsFilter); + for (FileStatus dir : statuses) { + deleteDir(fs, dir); + LOG.info("Deleted expired proto events dir: " + dir.getPath()); + } + } catch (IOException e) { + LOG.error("Error while trying to delete expired proto events from " + eventsBasePath, e); + } + } + + /** + * Delete the dir and if it fails, then retry deleting each files with file owner as proxy user. + */ + private void deleteDir(FileSystem fs, FileStatus eventsDir) throws IOException { + try { + deleteByOwner(fs, eventsDir); + return; + } catch (IOException e) { + // Fall through. + LOG.info("Unable to delete the events dir " + eventsDir.getPath() + + " and so trying to delete event files one by one."); + } + + FileStatus[] statuses = fs.listStatus(eventsDir.getPath()); + for (FileStatus file : statuses) { + deleteByOwner(fs, file); + } + deleteByOwner(fs, eventsDir); + } + + /** + * Delete the file/dir with owner as proxy user. + */ + private void deleteByOwner(FileSystem fs, FileStatus fileStatus) throws IOException { + String owner = fileStatus.getOwner(); + if (owner.equals(System.getProperty("user.name"))) { + fs.delete(fileStatus.getPath(), true); + } else { + LOG.info("Deleting " + fileStatus.getPath() + " as user " + owner); + UserGroupInformation ugi = UserGroupInformation.createProxyUser(owner, + UserGroupInformation.getLoginUser()); + try { + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public Object run() throws Exception { + fs.delete(fileStatus.getPath(), true); + return null; + } + }); + } catch (InterruptedException ie) { + LOG.error("Could not delete " + fileStatus.getPath() + " for UGI: " + ugi, ie); + } + try { + FileSystem.closeAllForUGI(ugi); + } catch (IOException e) { + LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + + fileStatus.getPath(), e); + } + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java index 0820beabf7ae..f463437fae78 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/HiveProtoLoggingHook.java @@ -102,8 +102,10 @@ import java.util.stream.Collectors; import org.apache.commons.compress.utils.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -158,9 +160,7 @@ public class HiveProtoLoggingHook implements ExecuteWithHookContext { .collect(Collectors.toSet()); } - public static final String HIVE_EVENTS_BASE_PATH = "hive.hook.proto.base-directory"; - public static final String HIVE_HOOK_PROTO_QUEUE_CAPACITY = "hive.hook.proto.queue.capacity"; - public static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64; + private static final int HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT = 64; private static final int WAIT_TIME = 5; public enum EventType { @@ -190,9 +190,10 @@ static class EventLogger { this.clock = clock; // randomUUID is slow, since its cryptographically secure, only first query will take time. this.logFileName = "hive_" + UUID.randomUUID().toString(); - String baseDir = conf.get(HIVE_EVENTS_BASE_PATH); - if (baseDir == null) { - LOG.error(HIVE_EVENTS_BASE_PATH + " is not set, logging disabled."); + String baseDir = conf.getVar(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH); + if (StringUtils.isBlank(baseDir)) { + baseDir = null; + LOG.error(ConfVars.HIVE_PROTO_EVENTS_BASE_PATH.varname + " is not set, logging disabled."); } DatePartitionedLogger tmpLogger = null; @@ -211,7 +212,7 @@ static class EventLogger { return; } - int queueCapacity = conf.getInt(HIVE_HOOK_PROTO_QUEUE_CAPACITY, + int queueCapacity = conf.getInt(ConfVars.HIVE_PROTO_EVENTS_QUEUE_CAPACITY.varname, HIVE_HOOK_PROTO_QUEUE_CAPACITY_DEFAULT); ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java index 96fb73c5f7ca..8124528f35d5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/hooks/TestHiveProtoLoggingHook.java @@ -64,7 +64,7 @@ public class TestHiveProtoLoggingHook { public void setup() throws Exception { conf = new HiveConf(); tmpFolder = folder.newFolder().getAbsolutePath(); - conf.set(HiveProtoLoggingHook.HIVE_EVENTS_BASE_PATH, tmpFolder); + conf.setVar(HiveConf.ConfVars.HIVE_PROTO_EVENTS_BASE_PATH, tmpFolder); QueryState state = new QueryState.Builder().withHiveConf(conf).build(); @SuppressWarnings("serial") QueryPlan queryPlan = new QueryPlan(HiveOperation.QUERY) {}; diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 19da432bb8a1..74a301fce612 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -763,7 +763,8 @@ public enum ConfVars { EventCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," + "org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," + MaterializationsCacheCleanerTask.class.getName() + "," + - MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName(), + MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," + + "org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask", "Comma separated list of tasks that will be started in separate threads. These will " + "always be started, regardless of whether the metastore is running in embedded mode " + "or in server mode. They must implement " + MetastoreTaskThread.class.getName()), From d77bfbcf6972e8b6a5aba71e0269de86720ccb1e Mon Sep 17 00:00:00 2001 From: Sankar Hariappan Date: Mon, 2 Jul 2018 18:06:52 +0530 Subject: [PATCH 2/4] Bug fix --- .../metastore/HiveProtoEventsCleanerTask.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java index df25a6f0c4c6..7c7db586bfab 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.metastore; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -43,7 +44,7 @@ public class HiveProtoEventsCleanerTask implements MetastoreTaskThread { public static final Logger LOG = LoggerFactory.getLogger(HiveProtoEventsCleanerTask.class); private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" }; - private List eventsBasePaths = new ArrayList<>(eventsSubDirs.length); + private List eventsBasePaths = new ArrayList<>(); private Configuration conf; private static long ttl; private static String expiredDatePtn = null; @@ -52,14 +53,18 @@ public class HiveProtoEventsCleanerTask implements MetastoreTaskThread { @Override public void setConf(Configuration conf) { this.conf = conf; - ttl = HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_TTL, TimeUnit.MILLISECONDS); - Path hiveEventsBasePath = new Path(HiveConf.getVar(conf, ConfVars.HIVE_PROTO_EVENTS_BASE_PATH)); + String hiveEventsDir = HiveConf.getVar(conf, ConfVars.HIVE_PROTO_EVENTS_BASE_PATH); + if (StringUtils.isBlank(hiveEventsDir)) { + return; + } + Path hiveEventsBasePath = new Path(hiveEventsDir); Path baseDir = hiveEventsBasePath.getParent(); for (String subDir : eventsSubDirs) { eventsBasePaths.add(new Path(baseDir, subDir)); } assert(eventsBasePaths.get(0).equals(hiveEventsBasePath)); + ttl = HiveConf.getTimeVar(conf, ConfVars.HIVE_PROTO_EVENTS_TTL, TimeUnit.MILLISECONDS); } @Override @@ -74,6 +79,10 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { + if (eventsBasePaths.isEmpty()) { + return; + } + // Expired date should be computed each time we run cleaner thread. expiredDatePtn = getExpiredDatePtn(); for (Path basePath : eventsBasePaths) { From 7a35c56023f44d823e5db31511023bc96117a3a2 Mon Sep 17 00:00:00 2001 From: Sankar Hariappan Date: Tue, 3 Jul 2018 10:18:53 +0530 Subject: [PATCH 3/4] Review comments fix --- .../metastore/HiveProtoEventsCleanerTask.java | 53 +++++++------------ 1 file changed, 18 insertions(+), 35 deletions(-) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java index 7c7db586bfab..bcff883310b0 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java @@ -79,6 +79,7 @@ public long runFrequency(TimeUnit unit) { @Override public void run() { + // If Hive proto logging is not enabled, then nothing to be cleaned-up. if (eventsBasePaths.isEmpty()) { return; } @@ -124,8 +125,13 @@ private void cleanupDir(Path eventsBasePath) { } FileStatus[] statuses = fs.listStatus(eventsBasePath, expiredDatePartitionsFilter); for (FileStatus dir : statuses) { - deleteDir(fs, dir); - LOG.info("Deleted expired proto events dir: " + dir.getPath()); + try { + deleteDirByOwner(fs, dir); + LOG.info("Deleted expired proto events dir: " + dir.getPath()); + } catch (IOException ioe) { + // Log error and continue to delete other expired dirs. + LOG.error("Error deleting expired proto events dir " + dir.getPath(), ioe); + } } } catch (IOException e) { LOG.error("Error while trying to delete expired proto events from " + eventsBasePath, e); @@ -133,52 +139,29 @@ private void cleanupDir(Path eventsBasePath) { } /** - * Delete the dir and if it fails, then retry deleting each files with file owner as proxy user. + * Delete the events dir with it's owner as proxy user. */ - private void deleteDir(FileSystem fs, FileStatus eventsDir) throws IOException { - try { - deleteByOwner(fs, eventsDir); - return; - } catch (IOException e) { - // Fall through. - LOG.info("Unable to delete the events dir " + eventsDir.getPath() - + " and so trying to delete event files one by one."); - } - - FileStatus[] statuses = fs.listStatus(eventsDir.getPath()); - for (FileStatus file : statuses) { - deleteByOwner(fs, file); - } - deleteByOwner(fs, eventsDir); - } - - /** - * Delete the file/dir with owner as proxy user. - */ - private void deleteByOwner(FileSystem fs, FileStatus fileStatus) throws IOException { - String owner = fileStatus.getOwner(); + private void deleteDirByOwner(FileSystem fs, FileStatus eventsDir) throws IOException { + String owner = eventsDir.getOwner(); if (owner.equals(System.getProperty("user.name"))) { - fs.delete(fileStatus.getPath(), true); + fs.delete(eventsDir.getPath(), true); } else { - LOG.info("Deleting " + fileStatus.getPath() + " as user " + owner); + LOG.info("Deleting " + eventsDir.getPath() + " as user " + owner); UserGroupInformation ugi = UserGroupInformation.createProxyUser(owner, UserGroupInformation.getLoginUser()); try { ugi.doAs(new PrivilegedExceptionAction() { @Override public Object run() throws Exception { - fs.delete(fileStatus.getPath(), true); + // New FileSystem object to be obtained in user context for doAs flow. + try (FileSystem doAsFs = FileSystem.newInstance(eventsDir.getPath().toUri(), conf)) { + doAsFs.delete(eventsDir.getPath(), true); + } return null; } }); } catch (InterruptedException ie) { - LOG.error("Could not delete " + fileStatus.getPath() + " for UGI: " + ugi, ie); - } - try { - FileSystem.closeAllForUGI(ugi); - } catch (IOException e) { - LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + - fileStatus.getPath(), e); + LOG.error("Could not delete " + eventsDir.getPath() + " for UGI: " + ugi, ie); } } } From 623a32f4f1985ae4bd8889ec23ba3f88759d1f90 Mon Sep 17 00:00:00 2001 From: Sankar Hariappan Date: Tue, 3 Jul 2018 15:07:55 +0530 Subject: [PATCH 4/4] Findbugs fix --- .../hive/metastore/HiveProtoEventsCleanerTask.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java index bcff883310b0..2a772e2e0f44 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveProtoEventsCleanerTask.java @@ -46,7 +46,7 @@ public class HiveProtoEventsCleanerTask implements MetastoreTaskThread { private final String[] eventsSubDirs = new String[] { "query_data", "dag_meta", "dag_data", "app_data" }; private List eventsBasePaths = new ArrayList<>(); private Configuration conf; - private static long ttl; + private long ttl; private static String expiredDatePtn = null; private static final SystemClock clock = SystemClock.getInstance(); @@ -85,20 +85,20 @@ public void run() { } // Expired date should be computed each time we run cleaner thread. - expiredDatePtn = getExpiredDatePtn(); + computeExpiredDatePtn(ttl); for (Path basePath : eventsBasePaths) { cleanupDir(basePath); } } /** - * Returns the expired date partition, using the underlying clock in UTC time. + * Compute the expired date partition, using the underlying clock in UTC time. */ - private static String getExpiredDatePtn() { + private static void computeExpiredDatePtn(long ttl) { // Use UTC date to ensure reader date is same on all timezones. LocalDate expiredDate = LocalDateTime.ofEpochSecond((clock.getTime() - ttl) / 1000, 0, ZoneOffset.UTC).toLocalDate(); - return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(expiredDate); + expiredDatePtn = "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(expiredDate); } /**