Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,12 @@ public void testCtasTezUnion() throws Exception {
/*
* Expected result 0th entry is the RecordIdentifier + data. 1st entry file before compact*/
String expected[][] = {
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0001/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":2}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000001_0000001_0002/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t6", "/delta_0000001_0000001_0001/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":2}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000001_0000001_0002/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000_0"},
};
Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size());
//verify data and layout
Expand All @@ -500,9 +500,9 @@ public void testCtasTezUnion() throws Exception {
LOG.warn(s);
}
String[][] expected2 = {
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":2}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":2}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000_0"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "/delta_0000002_0000002_0000/bucket_00000_0"}
};
Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size());
Expand Down
4 changes: 4 additions & 0 deletions itests/src/test/resources/testconfiguration.properties
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ minillap.query.files=\
create_table.q,\
create_udaf.q,\
create_view.q,\
ctas_direct.q,\
ctas_direct_with_specified_locations.q,\
ctas_direct_with_suffixed_locations.q,\
ctas_direct_with_transformers.q,\
cte_2.q,\
cte_4.q,\
cttl.q,\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
List<SQLDefaultConstraint> defaultConstraints;
List<SQLCheckConstraint> checkConstraints;
private ColumnStatistics colStats; // For the sake of replication
private Long initialMmWriteId; // Initial MM write ID for CTAS and import.
private Long initialWriteId; // Initial write ID for CTAS and import.
// The FSOP configuration for the FSOP that is going to write initial data during ctas.
// This is not needed beyond compilation, so it is transient.
private transient FileSinkDesc writer;
Expand Down Expand Up @@ -944,12 +944,12 @@ public Table toTable(HiveConf conf) throws HiveException {
return tbl;
}

public void setInitialMmWriteId(Long mmWriteId) {
this.initialMmWriteId = mmWriteId;
public void setInitialWriteId(Long writeId) {
this.initialWriteId = writeId;
}

public Long getInitialMmWriteId() {
return initialMmWriteId;
public Long getInitialWriteId() {
return initialWriteId;
}

public FileSinkDesc getAndUnsetWriter() {
Expand Down
5 changes: 5 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.io;

import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFIX;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
import static org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
Expand Down Expand Up @@ -3215,6 +3216,10 @@ private static boolean isSoftDeleteTxn(Configuration conf, ASTNode tree) {
}
}

public static String getPathSuffix(long txnId) {
return (SOFT_DELETE_PATH_SUFFIX + String.format(DELTA_DIGITS, txnId));
}

@VisibleForTesting
public static void initDirCache(int durationInMts) {
if (dirCacheInited.get()) {
Expand Down
82 changes: 75 additions & 7 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.parse;

import static java.util.Objects.nonNull;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.DYNAMICPARTITIONCONVERT;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEARCHIVEENABLED;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_DEFAULT_STORAGE_HANDLER;
Expand Down Expand Up @@ -7598,6 +7599,33 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)

destTableIsTransactional = tblProps != null && AcidUtils.isTablePropertyTransactional(tblProps);
if (destTableIsTransactional) {
isNonNativeTable = MetaStoreUtils.isNonNativeTable(tblProps);
boolean isCtas = tblDesc != null && tblDesc.isCTAS();
isMmTable = isMmCreate = AcidUtils.isInsertOnlyTable(tblProps);
if (!isNonNativeTable && !destTableIsTemporary && isCtas) {
destTableIsFullAcid = AcidUtils.isFullAcidTable(tblProps);
acidOperation = getAcidType(dest);
isDirectInsert = isDirectInsert(destTableIsFullAcid, acidOperation);

// Add suffix only when required confs are present
// and user has not specified a location to the table.
boolean createTableUseSuffix = (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX)
|| HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED))
&& tblDesc.getLocation() == null;
if (isDirectInsert || isMmTable) {
destinationPath = getCtasLocation(tblDesc);
// Property SOFT_DELETE_TABLE needs to be added to indicate that suffixing is used.
if (createTableUseSuffix) {
long txnId = ctx.getHiveTxnManager().getCurrentTxnId();
String suffix = AcidUtils.getPathSuffix(txnId);
destinationPath = new Path(destinationPath.toString() + suffix);
tblDesc.getTblProps().put(SOFT_DELETE_TABLE, Boolean.TRUE.toString());
}
// Setting the location so that metadata transformers
// does not change the location later while creating the table.
tblDesc.setLocation(destinationPath.toString());
}
}
try {
if (ctx.getExplainConfig() != null) {
writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
Expand All @@ -7607,13 +7635,14 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
} catch (LockException ex) {
throw new SemanticException("Failed to allocate write Id", ex);
}
if (AcidUtils.isInsertOnlyTable(tblProps, true)) {
isMmTable = isMmCreate = true;
if (isMmTable) {
if (tblDesc != null) {
tblDesc.setInitialMmWriteId(writeId);
tblDesc.setInitialWriteId(writeId);
} else {
viewDesc.setInitialMmWriteId(writeId);
}
} else if (isDirectInsert) {
tblDesc.setInitialWriteId(writeId);
}
}

Expand All @@ -7627,7 +7656,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
// no copy is required. we may want to revisit this policy in future
try {
Path qPath = FileUtils.makeQualified(destinationPath, conf);
queryTmpdir = isMmTable ? qPath : ctx.getTempDirForFinalJobPath(qPath);
queryTmpdir = isMmTable || isDirectInsert ? qPath : ctx.getTempDirForFinalJobPath(qPath);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Setting query directory " + queryTmpdir
+ " from " + destinationPath + " (" + isMmTable + ")");
Expand Down Expand Up @@ -7786,6 +7815,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
}
ltd.setLoadFileType(LoadFileType.KEEP_EXISTING);
ltd.setInsertOverwrite(false);
ltd.setIsDirectInsert(isDirectInsert);
loadTableWork.add(ltd);
} else {
// This is a non-native table.
Expand Down Expand Up @@ -7883,6 +7913,10 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
} else {
createVwDesc.setWriter(fileSinkDesc);
}
} else if (qb.isCTAS() && isDirectInsert) {
if (tableDesc != null) {
tableDesc.setWriter(fileSinkDesc);
}
}

if (fileSinkDesc.getInsertOverwrite()) {
Expand All @@ -7906,7 +7940,7 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
// operation is the same (the table location) and this can lead to invalid lineage information
// in case of a merge statement.
if (!isDirectInsert || acidOperation == AcidUtils.Operation.INSERT) {
handleLineage(ltd, output);
handleLineage(destinationTable, ltd, output);
}
setWriteIdForSurrogateKeys(ltd, input);

Expand Down Expand Up @@ -7940,6 +7974,35 @@ protected Operator genFileSinkPlan(String dest, QB qb, Operator input)
return output;
}

private Path getCtasLocation(CreateTableDesc tblDesc) throws SemanticException {
Path location;
try {
String protoName = tblDesc.getDbTableName();
String[] names = Utilities.getDbTableName(protoName);

// Handle table translation initially and if not present
// use default table path.
// Property modifications of the table is handled later.
// We are interested in the location if it has changed
// due to table translation.
Table tbl = tblDesc.toTable(conf);
tbl = db.getTranslateTableDryrun(tbl.getTTable());
Copy link
Member

@deniskuzZ deniskuzZ May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we pass through the transformers first and do the location check after?

tbl = db.getTranslateTableDryrun(tbl.getTTable());
if (tbl.getSd().getLocation() == null
            || tbl.getSd().getLocation().isEmpty()) {
     location = wh.getDefaultTablePath(db.getDatabase(names[0]), names[1], false);
} else {
    location = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
}
tbl.getSd().setLocation(location.toString());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.


Warehouse wh = new Warehouse(conf);
if (tbl.getSd() == null
|| tbl.getSd().getLocation() == null) {
location = wh.getDefaultTablePath(db.getDatabase(names[0]), names[1], false);
} else {
location = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
}
tbl.getSd().setLocation(location.toString());

return location;
} catch (HiveException | MetaException e) {
throw new SemanticException(e);
}
}

private boolean isDirectInsert(boolean destTableIsFullAcid, AcidUtils.Operation acidOp) {
// In case of an EXPLAIN ANALYZE query, the direct insert has to be turned off. HIVE-24336
if (ctx.getExplainAnalyze() == AnalyzeState.RUNNING) {
Expand Down Expand Up @@ -8219,7 +8282,7 @@ private FileSinkDesc createFileSinkDesc(String dest, TableDesc table_desc,
return fileSinkDesc;
}

private void handleLineage(LoadTableDesc ltd, Operator output)
private void handleLineage(Table destinationTable, LoadTableDesc ltd, Operator output)
throws SemanticException {
if (ltd != null) {
queryState.getLineageState().mapDirToOp(ltd.getSourcePath(), output);
Expand All @@ -8229,9 +8292,14 @@ private void handleLineage(LoadTableDesc ltd, Operator output)
Path tlocation = null;
String tName = Utilities.getDbTableName(tableDesc.getDbTableName())[1];
try {
String suffix = "";
if (AcidUtils.isTableSoftDeleteEnabled(destinationTable, conf)) {
long txnId = ctx.getHiveTxnManager().getCurrentTxnId();
suffix = AcidUtils.getPathSuffix(txnId);
}
Warehouse wh = new Warehouse(conf);
tlocation = wh.getDefaultTablePath(db.getDatabase(tableDesc.getDatabaseName()),
tName, tableDesc.isExternal());
tName + suffix, tableDesc.isExternal());
} catch (MetaException|HiveException e) {
throw new SemanticException(e);
}
Expand Down
34 changes: 20 additions & 14 deletions ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@
import java.util.Optional;
import java.util.Set;

import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFIX;
import static org.apache.hadoop.hive.ql.io.AcidUtils.DELTA_DIGITS;

/**
* TaskCompiler is a the base class for classes that compile
* operator pipelines into tasks.
Expand Down Expand Up @@ -470,7 +467,7 @@ private void setLoadFileLocation(
if (pCtx.getQueryProperties().isCTAS()) {
CreateTableDesc ctd = pCtx.getCreateTable();
dataSink = ctd.getAndUnsetWriter();
txnId = ctd.getInitialMmWriteId();
txnId = ctd.getInitialWriteId();
loc = ctd.getLocation();
} else {
CreateMaterializedViewDesc cmv = pCtx.getCreateViewDesc();
Expand Down Expand Up @@ -517,21 +514,18 @@ private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticExce
try {
String protoName = null, suffix = "";
boolean isExternal = false;

boolean createTableOrMVUseSuffix = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX)
|| HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED);

if (pCtx.getQueryProperties().isCTAS()) {
protoName = pCtx.getCreateTable().getDbTableName();
isExternal = pCtx.getCreateTable().isExternal();

createTableOrMVUseSuffix &= AcidUtils.isTransactionalTable(pCtx.getCreateTable());
suffix = getTableOrMVSuffix(pCtx, createTableOrMVUseSuffix);
} else if (pCtx.getQueryProperties().isMaterializedView()) {
protoName = pCtx.getCreateViewDesc().getViewName();
boolean createMVUseSuffix = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX)
|| HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED);

if (createMVUseSuffix) {
long txnId = Optional.ofNullable(pCtx.getContext())
.map(ctx -> ctx.getHiveTxnManager().getCurrentTxnId()).orElse(0L);
suffix = SOFT_DELETE_PATH_SUFFIX + String.format(DELTA_DIGITS, txnId);
}
createTableOrMVUseSuffix &= AcidUtils.isTransactionalView(pCtx.getCreateViewDesc());
suffix = getTableOrMVSuffix(pCtx, createTableOrMVUseSuffix);
}
String[] names = Utilities.getDbTableName(protoName);
if (!db.databaseExists(names[0])) {
Expand All @@ -544,6 +538,18 @@ private Path getDefaultCtasLocation(final ParseContext pCtx) throws SemanticExce
}
}

public String getTableOrMVSuffix(ParseContext pCtx, boolean createTableOrMVUseSuffix) {
String suffix = "";
if (createTableOrMVUseSuffix) {
long txnId = Optional.ofNullable(pCtx.getContext())
.map(ctx -> ctx.getHiveTxnManager().getCurrentTxnId()).orElse(0L);
if (txnId != 0) {
suffix = AcidUtils.getPathSuffix(txnId);
}
}
return suffix;
}

private void patchUpAfterCTASorMaterializedView(List<Task<?>> rootTasks,
Set<ReadEntity> inputs, Set<WriteEntity> outputs, Task<?> createTask,
boolean createTaskAfterMoveTask) {
Expand Down
20 changes: 10 additions & 10 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ public void testCTAS() throws Exception {
"'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL);
List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas order by ROW__ID");
String expected[][] = {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00001"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/myctas/delta_0000001_0000001_0000/bucket_00001_0"},
};
checkExpected(rs, expected, "Unexpected row count after ctas from non acid table");

Expand All @@ -265,8 +265,8 @@ public void testCTAS() throws Exception {
"'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL);
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID");
String expected2[][] = {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00001"}
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000001_0000001_0000/bucket_00001_0"}
};
checkExpected(rs, expected2, "Unexpected row count after ctas from acid table");

Expand All @@ -275,10 +275,10 @@ public void testCTAS() throws Exception {
" union all select a, b from " + Table.ACIDTBL);
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas3 order by ROW__ID");
String expected3[][] = {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00001"},
{"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00002"},
{"{\"writeid\":1,\"bucketid\":537067520,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00003"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00001_0"},
{"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00002_0"},
{"{\"writeid\":1,\"bucketid\":537067520,\"rowid\":0}\t1\t2", "warehouse/myctas3/delta_0000001_0000001_0000/bucket_00003_0"},
};
checkExpected(rs, expected3, "Unexpected row count after ctas from union all query");

Expand All @@ -287,8 +287,8 @@ public void testCTAS() throws Exception {
" union distinct select a, b from " + Table.ACIDTBL);
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas4 order by ROW__ID");
String expected4[][] = {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0000/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0000/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0000/bucket_00000_0"},
};
checkExpected(rs, expected4, "Unexpected row count after ctas from union distinct query");
}
Expand Down
Loading