diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java index 6566c7b1f117..fa887a21768e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/vector/HiveVectorizedReader.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSplit; import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat; import org.apache.hadoop.hive.ql.io.parquet.VectorizedParquetInputFormat; +import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; @@ -195,7 +196,8 @@ private static RecordReader orcRecordReader(Jo // If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases // TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num) if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) && - LlapProxy.getIo() != null && task.deletes().isEmpty()) { + LlapProxy.getIo() != null && task.deletes().isEmpty() && + !HiveCustomStorageHandlerUtils.isWriteOperation(job, tableName)) { boolean isDisableVectorization = job.getBoolean(HiveIcebergInputFormat.getVectorizationConfName(tableName), false); if (isDisableVectorization) { diff --git a/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q index 1c6f4c7d6716..08ac2dd04696 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/llap_iceberg_read_orc.q @@ -1,11 +1,87 @@ --test against vectorized LLAP execution mode +-- SORT_QUERY_RESULTS +-- Mask neededVirtualColumns due to non-strict order +--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/ + set hive.llap.io.enabled=true; set hive.vectorized.execution.enabled=true; +set hive.optimize.shared.work.merge.ts.schema=true; DROP TABLE IF EXISTS llap_orders PURGE; DROP TABLE IF EXISTS llap_items PURGE; DROP TABLE IF EXISTS mig_source PURGE; - +DROP TABLE IF EXISTS target_ice PURGE; +DROP TABLE IF EXISTS calls PURGE; +DROP TABLE IF EXISTS display PURGE; + +-- read after a merge call +CREATE EXTERNAL TABLE calls ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS ORC +TBLPROPERTIES ('format-version'='2'); + +INSERT INTO calls (s_key, year) VALUES (1090969, 2022); + + +CREATE EXTERNAL TABLE display ( + skey bigint, + hierarchy_number string, + hierarchy_name string, + language_id int, + hierarchy_display string, + orderby string +) +STORED BY Iceberg STORED AS ORC +TBLPROPERTIES ('format-version'='2'); + +INSERT INTO display (skey, language_id, hierarchy_display) VALUES + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'), + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1'); + +MERGE INTO display USING ( + SELECT distinct display_skey, display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub1 + + UNION ALL + + SELECT distinct display_skey, null as display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub2 +) sub +ON display.skey = sub.display_skey + and display.hierarchy_display = sub.display + +WHEN MATCHED THEN + UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') +WHEN NOT MATCHED THEN + INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1')); + +SELECT * FROM display; + +-- try read after a delete query +CREATE EXTERNAL TABLE target_ice(a int, b string, c int) STORED BY ICEBERG STORED AS ORC tblproperties ('format-version'='2'); +INSERT INTO target_ice values (1, 'one', 50); +DELETE FROM target_ice WHERE a = 1; +SELECT * FROM target_ice; CREATE EXTERNAL TABLE llap_items (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS ORC; INSERT INTO llap_items VALUES diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out index c12562d31871..a3cabd9776c9 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/llap_iceberg_read_orc.q.out @@ -10,6 +10,209 @@ PREHOOK: query: DROP TABLE IF EXISTS mig_source PURGE PREHOOK: type: DROPTABLE POSTHOOK: query: DROP TABLE IF EXISTS mig_source PURGE POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS target_ice PURGE +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS target_ice PURGE +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS calls PURGE +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS calls PURGE +POSTHOOK: type: DROPTABLE +PREHOOK: query: DROP TABLE IF EXISTS display PURGE +PREHOOK: type: DROPTABLE +POSTHOOK: query: DROP TABLE IF EXISTS display PURGE +POSTHOOK: type: DROPTABLE +PREHOOK: query: CREATE EXTERNAL TABLE calls ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS ORC +TBLPROPERTIES ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@calls +POSTHOOK: query: CREATE EXTERNAL TABLE calls ( + s_key bigint, + year int +) PARTITIONED BY SPEC (year) +STORED BY Iceberg STORED AS ORC +TBLPROPERTIES ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@calls +PREHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@calls +POSTHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@calls +PREHOOK: query: CREATE EXTERNAL TABLE display ( + skey bigint, + hierarchy_number string, + hierarchy_name string, + language_id int, + hierarchy_display string, + orderby string +) +STORED BY Iceberg STORED AS ORC +TBLPROPERTIES ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@display +POSTHOOK: query: CREATE EXTERNAL TABLE display ( + skey bigint, + hierarchy_number string, + hierarchy_name string, + language_id int, + hierarchy_display string, + orderby string +) +STORED BY Iceberg STORED AS ORC +TBLPROPERTIES ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@display +PREHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) VALUES + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'), + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@display +POSTHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) VALUES + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'), + (1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@display +Warning: Shuffle Join MERGEJOIN[62][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product +Warning: Shuffle Join MERGEJOIN[63][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 8' is a cross product +PREHOOK: query: MERGE INTO display USING ( + SELECT distinct display_skey, display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub1 + + UNION ALL + + SELECT distinct display_skey, null as display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub2 +) sub +ON display.skey = sub.display_skey + and display.hierarchy_display = sub.display + +WHEN MATCHED THEN + UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') +WHEN NOT MATCHED THEN + INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1')) +PREHOOK: type: QUERY +PREHOOK: Input: default@calls +PREHOOK: Input: default@display +PREHOOK: Output: default@display +PREHOOK: Output: default@display +PREHOOK: Output: default@merge_tmp_table +POSTHOOK: query: MERGE INTO display USING ( + SELECT distinct display_skey, display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub1 + + UNION ALL + + SELECT distinct display_skey, null as display, display as orig_display + FROM ( + SELECT D.skey display_skey, D.hierarchy_display display + FROM ( + SELECT s_key FROM calls WHERE s_key = 1090969 + ) R + INNER JOIN display D + ON R.s_key = D.skey AND D.language_id = 3 + GROUP BY D.skey, + D.hierarchy_display + ) sub2 +) sub +ON display.skey = sub.display_skey + and display.hierarchy_display = sub.display + +WHEN MATCHED THEN + UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1') +WHEN NOT MATCHED THEN + INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1')) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@calls +POSTHOOK: Input: default@display +POSTHOOK: Output: default@display +POSTHOOK: Output: default@display +POSTHOOK: Output: default@merge_tmp_table +POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(display)display.null, ] +PREHOOK: query: SELECT * FROM display +PREHOOK: type: QUERY +PREHOOK: Input: default@display +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM display +POSTHOOK: type: QUERY +POSTHOOK: Input: default@display +#### A masked pattern was here #### +1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergenew1 NULL +1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergeupdated1 NULL +1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergenew1 NULL +1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergeupdated1 NULL +PREHOOK: query: CREATE EXTERNAL TABLE target_ice(a int, b string, c int) STORED BY ICEBERG STORED AS ORC tblproperties ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@target_ice +POSTHOOK: query: CREATE EXTERNAL TABLE target_ice(a int, b string, c int) STORED BY ICEBERG STORED AS ORC tblproperties ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@target_ice +PREHOOK: query: INSERT INTO target_ice values (1, 'one', 50) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@target_ice +POSTHOOK: query: INSERT INTO target_ice values (1, 'one', 50) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@target_ice +PREHOOK: query: DELETE FROM target_ice WHERE a = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@target_ice +PREHOOK: Output: default@target_ice +POSTHOOK: query: DELETE FROM target_ice WHERE a = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@target_ice +POSTHOOK: Output: default@target_ice +PREHOOK: query: SELECT * FROM target_ice +PREHOOK: type: QUERY +PREHOOK: Input: default@target_ice +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM target_ice +POSTHOOK: type: QUERY +POSTHOOK: Input: default@target_ice +#### A masked pattern was here #### PREHOOK: query: CREATE EXTERNAL TABLE llap_items (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS ORC PREHOOK: type: CREATETABLE PREHOOK: Output: database:default @@ -551,9 +754,9 @@ POSTHOOK: Input: default@llap_orders #### A masked pattern was here #### München Cybertruck 50000 4.5 99 NULL Model 3 50000 NULL 42 -Venezia Model S 123000 NULL 89 NULL Model S 83000 NULL 185 NULL Model Y 55000 NULL 76 +Venezia Model S 123000 NULL 89 PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items i JOIN llap_orders o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description PREHOOK: type: QUERY PREHOOK: Input: default@llap_items diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java index 74158d1619e2..3d3664eb4da2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; import org.apache.hadoop.hive.ql.session.SessionState; @@ -199,6 +200,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s; //i.e. it's the 1st WHEN MATCHED } + HiveCustomStorageHandlerUtils.setFileScanOperationType(conf, targetTable.getFullTableName().toString(), + Context.Operation.UPDATE); break; case HiveParser.TOK_DELETE: numWhenMatchedDeleteClauses++; @@ -208,6 +211,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2) if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) { extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED } + HiveCustomStorageHandlerUtils.setFileScanOperationType(conf, targetTable.getFullTableName().toString(), + Context.Operation.DELETE); break; default: throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java index 8caca5f54714..db816ac2fbf5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveUtils; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils; /** * A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles @@ -62,6 +63,8 @@ protected void analyze(ASTNode tree, Table table, ASTNode tabNameNode) throws Se switch (tree.getToken().getType()) { case HiveParser.TOK_DELETE_FROM: operation = Context.Operation.DELETE; + HiveCustomStorageHandlerUtils.setFileScanOperationType(ctx.getConf(), table.getFullTableName().toString(), + operation); reparseAndSuperAnalyze(tree, table, tabNameNode); break; case HiveParser.TOK_UPDATE_TABLE: @@ -70,6 +73,8 @@ protected void analyze(ASTNode tree, Table table, ASTNode tabNameNode) throws Se throw new SemanticException(ErrorMsg.NON_NATIVE_ACID_UPDATE.getErrorCodedMsg()); } operation = Context.Operation.UPDATE; + HiveCustomStorageHandlerUtils.setFileScanOperationType(ctx.getConf(), table.getFullTableName().toString(), + operation); reparseAndSuperAnalyze(tree, table, tabNameNode); break; default: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java index 8be4cfc5b8f8..bc62af2ec8a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/HiveCustomStorageHandlerUtils.java @@ -22,6 +22,7 @@ import java.util.Optional; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.serde.serdeConstants; @@ -30,6 +31,7 @@ public class HiveCustomStorageHandlerUtils { public static final String WRITE_OPERATION_CONFIG_PREFIX = "file.sink.write.operation."; + public static final String FILESCAN_WRITE_OPERATION_CONFIG_PREFIX = "file.scan.write.operation."; public static String getTablePropsForCustomStorageHandler(Map tableProperties) { @@ -71,4 +73,22 @@ public static void setWriteOperation(Configuration conf, String tableName, Conte conf.set(WRITE_OPERATION_CONFIG_PREFIX + tableName, operation.name()); } + + public static boolean isWriteOperation(Configuration conf, String tableName) { + if (conf == null || tableName == null) { + return false; + } + String dbAndTableName = TableName.fromString(tableName, null, null).getNotEmptyDbTable(); + String operation = conf.get(FILESCAN_WRITE_OPERATION_CONFIG_PREFIX + dbAndTableName); + return Context.Operation.DELETE.name().equalsIgnoreCase(operation) || + Context.Operation.UPDATE.name().equalsIgnoreCase(operation); + } + + public static void setFileScanOperationType(Configuration conf, String tableName, Context.Operation operation) { + if (conf == null || tableName == null) { + return; + } + String dbAndTableName = TableName.fromString(tableName, null, null).getNotEmptyDbTable(); + conf.set(FILESCAN_WRITE_OPERATION_CONFIG_PREFIX + dbAndTableName, operation.name()); + } }