Skip to content
Closed

pull #109

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
d0f5b89
HIVE-14635 : establish a separate path for FSOP to write into final p…
sershe-apache Aug 29, 2016
67f1b92
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Aug 30, 2016
2cef25d
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Aug 31, 2016
87dcab4
HIVE-14636 : pass information from FSOP/TezTask to commit to take car…
sershe-apache Aug 31, 2016
38409da
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Sep 8, 2016
30fd19f
HIVE-14637 : edit or split MoveTask to commit job results to metastor…
sershe-apache Sep 8, 2016
815e069
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Sep 12, 2016
3e481b4
HIVE-14644 : use metastore information on the read path appropriately…
sershe-apache Sep 12, 2016
8708398
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Sep 19, 2016
70299dc
HIVE-14700 : clean up file/txn information via a metastore thread (Se…
sershe-apache Sep 19, 2016
6d91448
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 1, 2016
e02691b
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 3, 2016
ad3df23
HIVE-14641 : handle writing to dynamic partitions (Sergey Shelukhin)
sershe-apache Oct 3, 2016
754443e
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 4, 2016
0ce24b9
HIVE-14638 : handle unions (Sergey Shelukhin)
sershe-apache Oct 4, 2016
7d50a15
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 11, 2016
b9e8157
HIVE-14639 : handle SKEWED BY for MM tables (Sergey Shelukhin)
sershe-apache Oct 11, 2016
bd78d66
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 14, 2016
eacf9f9
HIVE-14640 : handle hive.merge.*files in select queries (Sergey Shelu…
sershe-apache Oct 14, 2016
2474f06
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 17, 2016
af4ff37
HIVE-14643 : handle ctas for the MM tables (Sergey Shelukhin)
sershe-apache Oct 17, 2016
edaebb4
HIVE-14932 : handle bucketing for MM tables (Sergey Shelukhin)
sershe-apache Oct 17, 2016
9ecffcb
HIVE-14899 : MM: support (or disable) alter table concatenate (Sergey…
sershe-apache Oct 17, 2016
b6571ea
HIVE-14878 : integrate MM tables into ACID: add separate ACID type (W…
weiatwork Oct 18, 2016
3f34134
HIVE-14671 : merge master into hive-14535 (Wei Zheng)
weiatwork Oct 19, 2016
f60bf24
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 19, 2016
4de0d96
HIVE-14642 : handle insert overwrite for MM tables (Sergey Shelukhin)
sershe-apache Oct 19, 2016
a057e12
HIVE-14996 : handle load for MM tables (Sergey Shelukhin)
sershe-apache Oct 19, 2016
98b0b8e
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 20, 2016
8004f71
HIVE-15019 : handle import for MM tables (Sergey Shelukhin)
sershe-apache Oct 20, 2016
ff1ea20
HIVE-15020 : handle truncate for MM tables (not atomic yet) (Sergey S…
sershe-apache Oct 20, 2016
b60bbc2
HIVE-15021 : handle (or add a test for) multi-insert into MM tables (…
sershe-apache Oct 20, 2016
423537a
HIVE-14671 : merge master into hive-14535 (Sergey Shelukhin)
sershe-apache Oct 25, 2016
65a380d
HIVE-14954 : put FSOP manifests for the instances of the same vertex …
sershe-apache Oct 25, 2016
0f7f4ed
HIVE-14953 : don't use globStatus on S3 in MM tables (Sergey Shelukhin)
sershe-apache Oct 25, 2016
c050e69
HIVE-15064 : fix explain for MM tables - don't output for non-MM tabl…
sershe-apache Oct 26, 2016
38cd0a6
HIVE-15064 : fix explain for MM tables - don't output for non-MM tabl…
sershe-apache Oct 26, 2016
36ad3a4
HIVE-14990 : run all tests for MM tables and fix the issues that are …
sershe-apache Oct 26, 2016
b143f5c
HIVE-14990 : run all tests for MM tables and fix the issues that are …
sershe-apache Oct 27, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion common/src/java/org/apache/hadoop/hive/common/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,13 @@ public static String unescapePathName(String path) {
*/
public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
List<FileStatus> results) throws IOException {
listStatusRecursively(fs, fileStatus, HIDDEN_FILES_PATH_FILTER, results);
}

public static void listStatusRecursively(FileSystem fs, FileStatus fileStatus,
PathFilter filter, List<FileStatus> results) throws IOException {
if (fileStatus.isDir()) {
for (FileStatus stat : fs.listStatus(fileStatus.getPath(), HIDDEN_FILES_PATH_FILTER)) {
for (FileStatus stat : fs.listStatus(fileStatus.getPath(), filter)) {
listStatusRecursively(fs, stat, results);
}
} else {
Expand Down
47 changes: 43 additions & 4 deletions common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,20 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

/**
* HiveStatsUtils.
* A collection of utilities used for hive statistics.
Expand All @@ -50,15 +54,26 @@ public class HiveStatsUtils {
* @return array of FileStatus
* @throws IOException
*/
public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
throws IOException {
return getFileStatusRecurse(path, level, fs, FileUtils.HIDDEN_FILES_PATH_FILTER, false);
}

public static FileStatus[] getFileStatusRecurse(
Path path, int level, FileSystem fs, PathFilter filter) throws IOException {
return getFileStatusRecurse(path, level, fs, filter, false);
}

public static FileStatus[] getFileStatusRecurse(
Path path, int level, FileSystem fs, PathFilter filter, boolean allLevelsBelow)
throws IOException {

// if level is <0, the return all files/directories under the specified path
if ( level < 0) {
if (level < 0) {
List<FileStatus> result = new ArrayList<FileStatus>();
try {
FileStatus fileStatus = fs.getFileStatus(path);
FileUtils.listStatusRecursively(fs, fileStatus, result);
FileUtils.listStatusRecursively(fs, fileStatus, filter, result);
} catch (IOException e) {
// globStatus() API returns empty FileStatus[] when the specified path
// does not exist. But getFileStatus() throw IOException. To mimic the
Expand All @@ -75,7 +90,31 @@ public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem
sb.append(Path.SEPARATOR).append("*");
}
Path pathPattern = new Path(path, sb.toString());
return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (!allLevelsBelow) {
return fs.globStatus(pathPattern, filter);
}
LinkedList<FileStatus> queue = new LinkedList<>();
List<FileStatus> results = new ArrayList<FileStatus>();
for (FileStatus status : fs.globStatus(pathPattern)) {
if (filter.accept(status.getPath())) {
results.add(status);
}
if (status.isDirectory()) {
queue.add(status);
}
}
while (!queue.isEmpty()) {
FileStatus status = queue.poll();
for (FileStatus child : fs.listStatus(status.getPath())) {
if (filter.accept(child.getPath())) {
results.add(child);
}
if (child.isDirectory()) {
queue.add(child);
}
}
}
return results.toArray(new FileStatus[results.size()]);
}

public static int getNumBitVectorsForNDVEstimation(Configuration conf) throws Exception {
Expand Down
171 changes: 171 additions & 0 deletions common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.common;

import java.util.Arrays;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ValidWriteIds {
public static final ValidWriteIds NO_WRITE_IDS = new ValidWriteIds(-1, -1, false, null);

public static final String MM_PREFIX = "mm";

private final static Logger LOG = LoggerFactory.getLogger(ValidWriteIds.class);

private static final String VALID_WRITEIDS_PREFIX = "hive.valid.write.ids.";
private final long lowWatermark, highWatermark;
private final boolean areIdsValid;
private final HashSet<Long> ids;
private String source = null;

public ValidWriteIds(
long lowWatermark, long highWatermark, boolean areIdsValid, HashSet<Long> ids) {
this.lowWatermark = lowWatermark;
this.highWatermark = highWatermark;
this.areIdsValid = areIdsValid;
this.ids = ids;
}

public static ValidWriteIds createFromConf(Configuration conf, String dbName, String tblName) {
return createFromConf(conf, dbName + "." + tblName);
}

public static ValidWriteIds createFromConf(Configuration conf, String fullTblName) {
String idStr = conf.get(createConfKey(fullTblName), null);
if (idStr == null || idStr.isEmpty()) return null;
return new ValidWriteIds(idStr);
}

private static String createConfKey(String dbName, String tblName) {
return createConfKey(dbName + "." + tblName);
}

private static String createConfKey(String fullName) {
return VALID_WRITEIDS_PREFIX + fullName;
}

private ValidWriteIds(String src) {
// TODO: lifted from ACID config implementation... optimize if needed? e.g. ranges, base64
String[] values = src.split(":");
highWatermark = Long.parseLong(values[0]);
lowWatermark = Long.parseLong(values[1]);
if (values.length > 2) {
areIdsValid = Long.parseLong(values[2]) > 0;
ids = new HashSet<Long>();
for(int i = 3; i < values.length; ++i) {
ids.add(Long.parseLong(values[i]));
}
} else {
areIdsValid = false;
ids = null;
}
}

public void addToConf(Configuration conf, String dbName, String tblName) {
if (source == null) {
source = toString();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + createConfKey(dbName, tblName) + " => " + source);
}
conf.set(createConfKey(dbName, tblName), source);
}

public String toString() {
// TODO: lifted from ACID config implementation... optimize if needed? e.g. ranges, base64
StringBuilder buf = new StringBuilder();
buf.append(highWatermark);
buf.append(':');
buf.append(lowWatermark);
if (ids != null) {
buf.append(':');
buf.append(areIdsValid ? 1 : 0);
for (long id : ids) {
buf.append(':');
buf.append(id);
}
}
return buf.toString();
}

public boolean isValid(long writeId) {
if (writeId < 0) throw new RuntimeException("Incorrect write ID " + writeId);
if (writeId <= lowWatermark) return true;
if (writeId >= highWatermark) return false;
return ids != null && (areIdsValid == ids.contains(writeId));
}

public static String getMmFilePrefix(long mmWriteId) {
return MM_PREFIX + "_" + mmWriteId;
}


public static class IdPathFilter implements PathFilter {
private final String mmDirName;
private final boolean isMatch;
public IdPathFilter(long writeId, boolean isMatch) {
this.mmDirName = ValidWriteIds.getMmFilePrefix(writeId);
this.isMatch = isMatch;
}

@Override
public boolean accept(Path path) {
String name = path.getName();
return isMatch == name.equals(mmDirName);
}
}

public static class AnyIdDirFilter implements PathFilter {
@Override
public boolean accept(Path path) {
String name = path.getName();
if (!name.startsWith(MM_PREFIX + "_")) return false;
String idStr = name.substring(MM_PREFIX.length() + 1);
try {
Long.parseLong(idStr);
} catch (NumberFormatException ex) {
return false;
}
return true;
}
}
public static Long extractWriteId(Path file) {
String fileName = file.getName();
String[] parts = fileName.split("_", 3);
if (parts.length < 2 || !MM_PREFIX.equals(parts[0])) {
LOG.info("Cannot extract write ID for a MM table: " + file
+ " (" + Arrays.toString(parts) + ")");
return null;
}
long writeId = -1;
try {
writeId = Long.parseLong(parts[1]);
} catch (NumberFormatException ex) {
LOG.info("Cannot extract write ID for a MM table: " + file
+ "; parsing " + parts[1] + " got " + ex.getMessage());
return null;
}
return writeId;
}
}
37 changes: 32 additions & 5 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,10 @@ private static URL checkConfigFile(File f) {
HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_MEMORY_TTL,
HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_INVALIDATOR_FREQUENCY,
HiveConf.ConfVars.METASTORE_HBASE_AGGR_STATS_HBASE_TTL,
HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS
HiveConf.ConfVars.METASTORE_HBASE_FILE_METADATA_THREADS,
HiveConf.ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL,
HiveConf.ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT,
HiveConf.ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT
};

/**
Expand Down Expand Up @@ -1206,6 +1209,8 @@ public static enum ConfVars {
HIVETESTMODE("hive.test.mode", false,
"Whether Hive is running in test mode. If yes, it turns on sampling and prefixes the output tablename.",
false),
HIVEEXIMTESTMODE("hive.exim.test.mode", false,
"The subset of test mode that only enables custom path handling for ExIm.", false),
HIVETESTMODEPREFIX("hive.test.mode.prefix", "test_",
"In test mode, specfies prefixes for the output table", false),
HIVETESTMODESAMPLEFREQ("hive.test.mode.samplefreq", 32,
Expand Down Expand Up @@ -1784,10 +1789,12 @@ public static enum ConfVars {

HIVE_TXN_OPERATIONAL_PROPERTIES("hive.txn.operational.properties", 0,
"Sets the operational properties that control the appropriate behavior for various\n"
+ "versions of the Hive ACID subsystem. Setting it to zero will turn on the legacy mode\n"
+ "for ACID, while setting it to one will enable a split-update feature found in the newer\n"
+ "version of Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
+ "for future versions of ACID. (See HIVE-14035 for details.)"),
+ "versions of the Hive ACID subsystem. Mostly it is intended to be used as an internal property\n"
+ "for future versions of ACID. (See HIVE-14035 for details.)\n"
+ "0: Turn on the legacy mode for ACID\n"
+ "1: Enable split-update feature found in the newer version of Hive ACID subsystem\n"
+ "2: Hash-based merge, which combines delta files using GRACE hash join based approach (not implemented)\n"
+ "3: Make the table 'quarter-acid' as it only supports insert. But it doesn't require ORC or bucketing."),

HIVE_MAX_OPEN_TXNS("hive.max.open.txns", 100000, "Maximum number of open transactions. If \n" +
"current open transactions reach this limit, future open transaction requests will be \n" +
Expand Down Expand Up @@ -3117,6 +3124,26 @@ public static enum ConfVars {
"Log tracing id that can be used by upstream clients for tracking respective logs. " +
"Truncated to " + LOG_PREFIX_LENGTH + " characters. Defaults to use auto-generated session id."),

HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL("hive.metastore.mm.thread.scan.interval", "900s",
new TimeValidator(TimeUnit.SECONDS),
"MM table housekeeping thread interval in this metastore instance. 0 to disable."),

HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT("hive.metastore.mm.heartbeat.timeout", "1800s",
new TimeValidator(TimeUnit.SECONDS),
"MM write ID times out after this long if a heartbeat is not send. Currently disabled."),

HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT("hive.metastore.mm.absolute.timeout", "7d",
new TimeValidator(TimeUnit.SECONDS),
"MM write ID cannot be outstanding for more than this long."),

HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD("hive.metastore.mm.aborted.grace.period", "1d",
new TimeValidator(TimeUnit.SECONDS),
"MM write ID will not be removed up for that long after it has been aborted;\n" +
"this is to work around potential races e.g. with FS visibility, when deleting files."),


HIVE_MM_AVOID_GLOBSTATUS_ON_S3("hive.mm.avoid.s3.globstatus", true,
"Whether to use listFiles (optimized on S3) instead of globStatus when on S3."),

HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
"hive.security.authenticator.manager,hive.security.authorization.manager," +
Expand Down
Loading