Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -3666,9 +3666,9 @@ public static enum ConfVars {
"For example, arithmetic expressions which can overflow the output data type can be evaluated using\n" +
" checked vector expressions so that they produce same result as non-vectorized evaluation."),
HIVE_VECTORIZED_ADAPTOR_SUPPRESS_EVALUATE_EXCEPTIONS(
"hive.vectorized.adaptor.suppress.evaluate.exceptions", false,
"hive.vectorized.adaptor.suppress.evaluate.exceptions", false,
"This flag should be set to true to suppress HiveException from the generic UDF function\n" +
"evaluate call and turn them into NULLs. Assume, by default, this is not needed"),
"evaluate call and turn them into NULLs. Assume, by default, this is not needed"),
HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED(
"hive.vectorized.input.format.supports.enabled",
"decimal_64",
Expand Down Expand Up @@ -4110,7 +4110,8 @@ public static enum ConfVars {
LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR("hive.llap.mapjoin.memory.oversubscribe.factor", 0.2f,
"Fraction of memory from hive.auto.convert.join.noconditionaltask.size that can be over subscribed\n" +
"by queries running in LLAP mode. This factor has to be from 0.0 to 1.0. Default is 20% over subscription.\n"),
LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query", 3,
LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY("hive.llap.memory.oversubscription.max.executors.per.query",
-1,
"Used along with hive.llap.mapjoin.memory.oversubscribe.factor to limit the number of executors from\n" +
"which memory for mapjoin can be borrowed. Default 3 (from 3 other executors\n" +
"hive.llap.mapjoin.memory.oversubscribe.factor amount of memory can be borrowed based on which mapjoin\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,51 @@ public void testGetSplitsLocks() throws Exception {
assertEquals(1, rows.size());
}

@Test
public void testGetSplitsLocksWithMaterializedView() throws Exception {
// Need to test this with LLAP settings, which requires some additional configurations set.
HiveConf modConf = new HiveConf(hiveConf);
setupTez(modConf);
modConf.setVar(ConfVars.HIVE_EXECUTION_ENGINE, "tez");
modConf.setVar(ConfVars.HIVEFETCHTASKCONVERSION, "more");
modConf.setVar(HiveConf.ConfVars.LLAP_DAEMON_SERVICE_HOSTS, "localhost");

// SessionState/Driver needs to be restarted with the Tez conf settings.
restartSessionAndDriver(modConf);
TxnStore txnHandler = TxnUtils.getTxnStore(modConf);
String mvName = "mv_acidTbl";
try {
runStatementOnDriver("create materialized view " + mvName + " as select a from " + Table.ACIDTBL + " where a > 5");

// Request LLAP splits for a table.
String queryParam = "select a from " + Table.ACIDTBL + " where a > 5";
runStatementOnDriver("select get_splits(\"" + queryParam + "\", 1)");

// The get_splits call should have resulted in a lock on ACIDTBL and materialized view mv_acidTbl
ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED,
"default", Table.ACIDTBL.name, null, slr.getLocks());
TestDbTxnManager2.checkLock(LockType.SHARED_READ, LockState.ACQUIRED,
"default", mvName, null, slr.getLocks());
assertEquals(2, slr.getLocksSize());
} finally {
// Close the session which should free up the TxnHandler/locks held by the session.
// Done in the finally block to make sure we free up the locks; otherwise
// the cleanup in tearDown() will get stuck waiting on the lock held here on ACIDTBL.
restartSessionAndDriver(hiveConf);
runStatementOnDriver("drop materialized view if exists " + mvName);
}

// Lock should be freed up now.
ShowLocksResponse slr = txnHandler.showLocks(new ShowLocksRequest());
assertEquals(0, slr.getLocksSize());

List<String> rows = runStatementOnDriver("show transactions");
// Transactions should be committed.
// No transactions - just the header row
assertEquals(1, rows.size());
}

private void restartSessionAndDriver(HiveConf conf) throws Exception {
SessionState ss = SessionState.get();
if (ss != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ public BuddyAllocator(boolean isDirectVal, boolean isMappedVal, int minAllocVal,
maxAllocation = maxAllocVal;
if (isMapped) {
try {
cacheDir = Files.createTempDirectory(
FileSystems.getDefault().getPath(mapPath), "llap-", RWX);
Path path = FileSystems.getDefault().getPath(mapPath);
if (!Files.exists(path)) {
Files.createDirectory(path);
}
cacheDir = Files.createTempDirectory(path, "llap-", RWX);
} catch (IOException ioe) {
// conf validator already checks this, so it will never trigger usually
throw new AssertionError("Configured mmap directory should be writable", ioe);
Expand Down Expand Up @@ -542,7 +545,7 @@ private void discardAllBuffersFromCtx(Arena arena, DiscardContext ctx) {
/**
* Unlocks the buffer after the discard has been abandoned.
*/
private void cancelDiscard(LlapAllocatorBuffer buf, int arenaIx, int headerIx) {
private void cancelDiscard(LlapAllocatorBuffer buf, int arenaIx) {
Boolean result = buf.cancelDiscard();
if (result == null) return;
// If the result is not null, the buffer was evicted during the move.
Expand Down Expand Up @@ -967,12 +970,12 @@ private void abandonOneHeaderBeingMoved(int headerIx, CasLog.Src src) {
if (assertsEnabled) {
assertBufferLooksValid(freeListIx, buf, arenaIx, headerIx);
}
cancelDiscard(buf, arenaIx, headerIx);
cancelDiscard(buf, arenaIx);
} else {
if (assertsEnabled) {
checkHeader(headerIx, -1, true);
}
addToFreeListWithMerge(headerIx, freeListIx, null, src);
addToFreeListWithMerge(headerIx, freeListIx, src);
}
}

Expand Down Expand Up @@ -1234,7 +1237,7 @@ private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest,
lastSplitNextHeader = headerIx; // If anything remains, this is where it starts.
headerIx = getNextFreeListItem(origOffset);
}
replaceListHeadUnderLock(splitList, headerIx, splitListIx); // In the end, update free list head.
replaceListHeadUnderLock(splitList, headerIx); // In the end, update free list head.
} finally {
splitList.lock.unlock();
}
Expand All @@ -1250,7 +1253,7 @@ private int allocateWithSplit(int freeListIx, MemoryBuffer[] dest,
int newListIndex = freeListIx;
while (lastSplitBlocksRemaining > 0) {
if ((lastSplitBlocksRemaining & 1) == 1) {
addToFreeListWithMerge(lastSplitNextHeader, newListIndex, null, src);
addToFreeListWithMerge(lastSplitNextHeader, newListIndex, src);
lastSplitNextHeader += (1 << newListIndex);
}
lastSplitBlocksRemaining >>>= 1;
Expand All @@ -1269,7 +1272,7 @@ private void initializeNewlyAllocated(
buffer.setNewAllocLocation(arenaIx, headerIx);
}

private void replaceListHeadUnderLock(FreeList freeList, int headerIx, int ix) {
private void replaceListHeadUnderLock(FreeList freeList, int headerIx) {
if (headerIx == freeList.listHead) return;
if (headerIx >= 0) {
int newHeadOffset = offsetFromHeaderIndex(headerIx);
Expand Down Expand Up @@ -1339,7 +1342,7 @@ public int allocateFromFreeListUnderLock(FreeList freeList, int freeListIx,
}
++destIx;
}
replaceListHeadUnderLock(freeList, current, freeListIx);
replaceListHeadUnderLock(freeList, current);
return destIx;
}

Expand Down Expand Up @@ -1367,11 +1370,10 @@ public void deallocate(LlapAllocatorBuffer buffer, boolean isAfterMove) {
checkHeader(headerIx, freeListIx, true);
}
buffers[headerIx] = null;
addToFreeListWithMerge(headerIx, freeListIx, buffer, CasLog.Src.DEALLOC);
addToFreeListWithMerge(headerIx, freeListIx, CasLog.Src.DEALLOC);
}

private void addToFreeListWithMerge(int headerIx, int freeListIx,
LlapAllocatorBuffer buffer, CasLog.Src src) {
private void addToFreeListWithMerge(int headerIx, int freeListIx, CasLog.Src src) {
while (true) {
FreeList freeList = freeLists[freeListIx];
int bHeaderIx = getBuddyHeaderIx(freeListIx, headerIx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ public Void call() throws Exception {
}
}

@Test
public void testCachedirCreated() throws Exception {
int min = 3, max = 8, maxAlloc = 1 << max;
new BuddyAllocator(isDirect, isMapped, 1 << min, maxAlloc, maxAlloc, maxAlloc, 0, tmpDir + "/testifcreated",
new DummyMemoryManager(), LlapDaemonCacheMetrics.create("test", "1"), null);
}

static void syncThreadStart(final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
cdlIn.countDown();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ public final class FunctionRegistry {
system.registerGenericUDF("restrict_information_schema", GenericUDFRestrictInformationSchema.class);
system.registerGenericUDF("current_authorizer", GenericUDFCurrentAuthorizer.class);

system.registerGenericUDF("surrogate_key", GenericUDFSurrogateKey.class);

system.registerGenericUDF("isnull", GenericUDFOPNull.class);
system.registerGenericUDF("isnotnull", GenericUDFOPNotNull.class);
system.registerGenericUDF("istrue", GenericUDFOPTrue.class);
Expand Down
52 changes: 25 additions & 27 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,15 @@ public String getName() {
@Override
protected int execute(DriverContext driverContext) {
try {
Hive hiveDb = getHive();
Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir());
DumpMetaData dmd = new DumpMetaData(dumpRoot, conf);
Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
Long lastReplId;
if (work.isBootStrapDump()) {
lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot);
lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot, hiveDb);
} else {
lastReplId = incrementalDump(dumpRoot, dmd, cmRoot);
lastReplId = incrementalDump(dumpRoot, dmd, cmRoot, hiveDb);
}
prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
} catch (RuntimeException e) {
Expand All @@ -140,7 +141,7 @@ private void prepareReturnValues(List<String> values, String schema) throws Sema
Utils.writeOutput(values, new Path(work.resultTempPath), conf);
}

private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception {
Long lastReplId;// get list of events matching dbPattern & tblPattern
// go through each event, and dump out each event to a event-level dump dir inside dumproot

Expand All @@ -150,15 +151,15 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw
// same factory, restricting by message format is effectively a guard against
// older leftover data that would cause us problems.

work.overrideEventTo(getHive());
work.overrideEventTo(hiveDb);

IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern),
new EventBoundaryFilter(work.eventFrom, work.eventTo),
new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));

EventUtils.MSClientNotificationFetcher evFetcher
= new EventUtils.MSClientNotificationFetcher(getHive());
= new EventUtils.MSClientNotificationFetcher(hiveDb);

EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);
Expand All @@ -174,7 +175,7 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw
NotificationEvent ev = evIter.next();
lastReplId = ev.getEventId();
Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId));
dumpEvent(ev, evRoot, cmRoot);
dumpEvent(ev, evRoot, cmRoot, hiveDb);
}

replLogger.endLog(lastReplId.toString());
Expand All @@ -192,11 +193,11 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throw
return lastReplId;
}

private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception {
private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot, Hive hiveDb) throws Exception {
EventHandler.Context context = new EventHandler.Context(
evRoot,
cmRoot,
getHive(),
hiveDb,
conf,
getNewEventOnlyReplicationSpec(ev.getEventId()),
work.dbNameOrPattern,
Expand All @@ -215,31 +216,30 @@ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) {
return rspec;
}

private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception {
// bootstrap case
// Last repl id would've been captured during compile phase in queryState configs before opening txn.
// This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from
// concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore.
Hive hiveDb = getHive();
Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L);
assert (bootDumpBeginReplId >= 0L);

String validTxnList = getValidTxnListForReplDump(hiveDb);
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
Utils.getAllTables(getHive(), dbName).size(),
getHive().getAllFunctions().size());
Utils.getAllTables(hiveDb, dbName).size(),
hiveDb.getAllFunctions().size());
replLogger.startLog();
Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId);
dumpFunctionMetadata(dbName, dumpRoot);
Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb);
dumpFunctionMetadata(dbName, dumpRoot, hiveDb);

String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
LOG.debug(
"analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId);
dumpConstraintMetadata(dbName, tblName, dbRoot);
dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb);
dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb);
}
Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
replLogger.endLog(bootDumpBeginReplId.toString());
Expand All @@ -256,19 +256,18 @@ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws
return bootDumpBeginReplId;
}

private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception {
private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception {
Path dbRoot = new Path(dumpRoot, dbName);
// TODO : instantiating FS objects are generally costly. Refactor
FileSystem fs = dbRoot.getFileSystem(conf);
Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName, lastReplId).database();
HiveWrapper.Tuple<Database> database = new HiveWrapper(hiveDb, dbName, lastReplId).database();
EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
return dbRoot;
}

private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) throws Exception {
private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId, Hive db) throws Exception {
try {
Hive db = getHive();
HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName);
TableSpec tableSpec = new TableSpec(tuple.object);
TableExport.Paths exportPaths =
Expand Down Expand Up @@ -383,11 +382,11 @@ private String getNextDumpDir() {
}
}

private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception {
private void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception {
Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
List<String> functionNames = getHive().getFunctions(dbName, "*");
List<String> functionNames = hiveDb.getFunctions(dbName, "*");
for (String functionName : functionNames) {
HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName);
HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb);
if (tuple == null) {
continue;
}
Expand All @@ -402,12 +401,11 @@ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception
}
}

private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception {
private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot, Hive db) throws Exception {
try {
Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME);
Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName);
Path fkConstraintsFile = new Path(constraintsRoot, ConstraintFileType.FOREIGNKEY.getPrefix() + tblName);
Hive db = getHive();
List<SQLPrimaryKey> pks = db.getPrimaryKeyList(dbName, tblName);
List<SQLForeignKey> fks = db.getForeignKeyList(dbName, tblName);
List<SQLUniqueConstraint> uks = db.getUniqueConstraintList(dbName, tblName);
Expand All @@ -434,9 +432,9 @@ private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot)
}
}

private HiveWrapper.Tuple<Function> functionTuple(String functionName, String dbName) {
private HiveWrapper.Tuple<Function> functionTuple(String functionName, String dbName, Hive db) {
try {
HiveWrapper.Tuple<Function> tuple = new HiveWrapper(getHive(), dbName).function(functionName);
HiveWrapper.Tuple<Function> tuple = new HiveWrapper(db, dbName).function(functionName);
if (tuple.object.getResourceUris().isEmpty()) {
LOG.warn("Not replicating function: " + functionName + " as it seems to have been created "
+ "without USING clause");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ long openTxn(Context ctx, String user, long delay) throws LockException {
tableWriteIds.clear();
isExplicitTransaction = false;
startTransactionCount = 0;
LOG.debug("Opened " + JavaUtils.txnIdToString(txnId));
LOG.info("Opened " + JavaUtils.txnIdToString(txnId));
ctx.setHeartbeater(startHeartbeat(delay));
return txnId;
} catch (TException e) {
Expand Down
Loading