Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.VectorizedParquetInputFormat;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
Expand Down Expand Up @@ -195,7 +196,8 @@ private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(Jo
// If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
// TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num)
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
LlapProxy.getIo() != null && task.deletes().isEmpty()) {
LlapProxy.getIo() != null && task.deletes().isEmpty() &&
!HiveCustomStorageHandlerUtils.isWriteOperation(job, tableName)) {
boolean isDisableVectorization =
job.getBoolean(HiveIcebergInputFormat.getVectorizationConfName(tableName), false);
if (isDisableVectorization) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,87 @@
--test against vectorized LLAP execution mode
-- SORT_QUERY_RESULTS
-- Mask neededVirtualColumns due to non-strict order
--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/

set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

DROP TABLE IF EXISTS llap_orders PURGE;
DROP TABLE IF EXISTS llap_items PURGE;
DROP TABLE IF EXISTS mig_source PURGE;

DROP TABLE IF EXISTS target_ice PURGE;
DROP TABLE IF EXISTS calls PURGE;
DROP TABLE IF EXISTS display PURGE;

-- read after a merge call
CREATE EXTERNAL TABLE calls (
s_key bigint,
year int
) PARTITIONED BY SPEC (year)
STORED BY Iceberg STORED AS ORC
TBLPROPERTIES ('format-version'='2');

INSERT INTO calls (s_key, year) VALUES (1090969, 2022);


CREATE EXTERNAL TABLE display (
skey bigint,
hierarchy_number string,
hierarchy_name string,
language_id int,
hierarchy_display string,
orderby string
)
STORED BY Iceberg STORED AS ORC
TBLPROPERTIES ('format-version'='2');

INSERT INTO display (skey, language_id, hierarchy_display) VALUES
(1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
(1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1');

MERGE INTO display USING (
SELECT distinct display_skey, display, display as orig_display
FROM (
SELECT D.skey display_skey, D.hierarchy_display display
FROM (
SELECT s_key FROM calls WHERE s_key = 1090969
) R
INNER JOIN display D
ON R.s_key = D.skey AND D.language_id = 3
GROUP BY D.skey,
D.hierarchy_display
) sub1

UNION ALL

SELECT distinct display_skey, null as display, display as orig_display
FROM (
SELECT D.skey display_skey, D.hierarchy_display display
FROM (
SELECT s_key FROM calls WHERE s_key = 1090969
) R
INNER JOIN display D
ON R.s_key = D.skey AND D.language_id = 3
GROUP BY D.skey,
D.hierarchy_display
) sub2
) sub
ON display.skey = sub.display_skey
and display.hierarchy_display = sub.display

WHEN MATCHED THEN
UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
WHEN NOT MATCHED THEN
INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1'));

SELECT * FROM display;

-- try read after a delete query
CREATE EXTERNAL TABLE target_ice(a int, b string, c int) STORED BY ICEBERG STORED AS ORC tblproperties ('format-version'='2');
INSERT INTO target_ice values (1, 'one', 50);
DELETE FROM target_ice WHERE a = 1;
SELECT * FROM target_ice;

CREATE EXTERNAL TABLE llap_items (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS ORC;
INSERT INTO llap_items VALUES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,209 @@ PREHOOK: query: DROP TABLE IF EXISTS mig_source PURGE
PREHOOK: type: DROPTABLE
POSTHOOK: query: DROP TABLE IF EXISTS mig_source PURGE
POSTHOOK: type: DROPTABLE
PREHOOK: query: DROP TABLE IF EXISTS target_ice PURGE
PREHOOK: type: DROPTABLE
POSTHOOK: query: DROP TABLE IF EXISTS target_ice PURGE
POSTHOOK: type: DROPTABLE
PREHOOK: query: DROP TABLE IF EXISTS calls PURGE
PREHOOK: type: DROPTABLE
POSTHOOK: query: DROP TABLE IF EXISTS calls PURGE
POSTHOOK: type: DROPTABLE
PREHOOK: query: DROP TABLE IF EXISTS display PURGE
PREHOOK: type: DROPTABLE
POSTHOOK: query: DROP TABLE IF EXISTS display PURGE
POSTHOOK: type: DROPTABLE
PREHOOK: query: CREATE EXTERNAL TABLE calls (
s_key bigint,
year int
) PARTITIONED BY SPEC (year)
STORED BY Iceberg STORED AS ORC
TBLPROPERTIES ('format-version'='2')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@calls
POSTHOOK: query: CREATE EXTERNAL TABLE calls (
s_key bigint,
year int
) PARTITIONED BY SPEC (year)
STORED BY Iceberg STORED AS ORC
TBLPROPERTIES ('format-version'='2')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@calls
PREHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@calls
POSTHOOK: query: INSERT INTO calls (s_key, year) VALUES (1090969, 2022)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@calls
PREHOOK: query: CREATE EXTERNAL TABLE display (
skey bigint,
hierarchy_number string,
hierarchy_name string,
language_id int,
hierarchy_display string,
orderby string
)
STORED BY Iceberg STORED AS ORC
TBLPROPERTIES ('format-version'='2')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@display
POSTHOOK: query: CREATE EXTERNAL TABLE display (
skey bigint,
hierarchy_number string,
hierarchy_name string,
language_id int,
hierarchy_display string,
orderby string
)
STORED BY Iceberg STORED AS ORC
TBLPROPERTIES ('format-version'='2')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@display
PREHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) VALUES
(1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
(1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@display
POSTHOOK: query: INSERT INTO display (skey, language_id, hierarchy_display) VALUES
(1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1'),
(1090969, 3, 'f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@display
Warning: Shuffle Join MERGEJOIN[62][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
Warning: Shuffle Join MERGEJOIN[63][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 8' is a cross product
PREHOOK: query: MERGE INTO display USING (
SELECT distinct display_skey, display, display as orig_display
FROM (
SELECT D.skey display_skey, D.hierarchy_display display
FROM (
SELECT s_key FROM calls WHERE s_key = 1090969
) R
INNER JOIN display D
ON R.s_key = D.skey AND D.language_id = 3
GROUP BY D.skey,
D.hierarchy_display
) sub1

UNION ALL

SELECT distinct display_skey, null as display, display as orig_display
FROM (
SELECT D.skey display_skey, D.hierarchy_display display
FROM (
SELECT s_key FROM calls WHERE s_key = 1090969
) R
INNER JOIN display D
ON R.s_key = D.skey AND D.language_id = 3
GROUP BY D.skey,
D.hierarchy_display
) sub2
) sub
ON display.skey = sub.display_skey
and display.hierarchy_display = sub.display

WHEN MATCHED THEN
UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
WHEN NOT MATCHED THEN
INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1'))
PREHOOK: type: QUERY
PREHOOK: Input: default@calls
PREHOOK: Input: default@display
PREHOOK: Output: default@display
PREHOOK: Output: default@display
PREHOOK: Output: default@merge_tmp_table
POSTHOOK: query: MERGE INTO display USING (
SELECT distinct display_skey, display, display as orig_display
FROM (
SELECT D.skey display_skey, D.hierarchy_display display
FROM (
SELECT s_key FROM calls WHERE s_key = 1090969
) R
INNER JOIN display D
ON R.s_key = D.skey AND D.language_id = 3
GROUP BY D.skey,
D.hierarchy_display
) sub1

UNION ALL

SELECT distinct display_skey, null as display, display as orig_display
FROM (
SELECT D.skey display_skey, D.hierarchy_display display
FROM (
SELECT s_key FROM calls WHERE s_key = 1090969
) R
INNER JOIN display D
ON R.s_key = D.skey AND D.language_id = 3
GROUP BY D.skey,
D.hierarchy_display
) sub2
) sub
ON display.skey = sub.display_skey
and display.hierarchy_display = sub.display

WHEN MATCHED THEN
UPDATE SET hierarchy_display = concat(sub.display, '-mergeupdated1')
WHEN NOT MATCHED THEN
INSERT (skey, language_id, hierarchy_display) values (sub.display_skey, 3, concat(sub.orig_display, '-mergenew1'))
POSTHOOK: type: QUERY
POSTHOOK: Input: default@calls
POSTHOOK: Input: default@display
POSTHOOK: Output: default@display
POSTHOOK: Output: default@display
POSTHOOK: Output: default@merge_tmp_table
POSTHOOK: Lineage: merge_tmp_table.val EXPRESSION [(display)display.null, ]
PREHOOK: query: SELECT * FROM display
PREHOOK: type: QUERY
PREHOOK: Input: default@display
#### A masked pattern was here ####
POSTHOOK: query: SELECT * FROM display
POSTHOOK: type: QUERY
POSTHOOK: Input: default@display
#### A masked pattern was here ####
1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergenew1 NULL
1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-insertnew1-mergeupdated1 NULL
1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergenew1 NULL
1090969 NULL NULL 3 f9e59bae9b131de1d8f02d887ee91e20-mergeupdated1-updated1-mergeupdated1 NULL
PREHOOK: query: CREATE EXTERNAL TABLE target_ice(a int, b string, c int) STORED BY ICEBERG STORED AS ORC tblproperties ('format-version'='2')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@target_ice
POSTHOOK: query: CREATE EXTERNAL TABLE target_ice(a int, b string, c int) STORED BY ICEBERG STORED AS ORC tblproperties ('format-version'='2')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@target_ice
PREHOOK: query: INSERT INTO target_ice values (1, 'one', 50)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@target_ice
POSTHOOK: query: INSERT INTO target_ice values (1, 'one', 50)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@target_ice
PREHOOK: query: DELETE FROM target_ice WHERE a = 1
PREHOOK: type: QUERY
PREHOOK: Input: default@target_ice
PREHOOK: Output: default@target_ice
POSTHOOK: query: DELETE FROM target_ice WHERE a = 1
POSTHOOK: type: QUERY
POSTHOOK: Input: default@target_ice
POSTHOOK: Output: default@target_ice
PREHOOK: query: SELECT * FROM target_ice
PREHOOK: type: QUERY
PREHOOK: Input: default@target_ice
#### A masked pattern was here ####
POSTHOOK: query: SELECT * FROM target_ice
POSTHOOK: type: QUERY
POSTHOOK: Input: default@target_ice
#### A masked pattern was here ####
PREHOOK: query: CREATE EXTERNAL TABLE llap_items (itemid INT, price INT, category STRING, name STRING, description STRING) STORED BY ICEBERG STORED AS ORC
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
Expand Down Expand Up @@ -551,9 +754,9 @@ POSTHOOK: Input: default@llap_orders
#### A masked pattern was here ####
München Cybertruck 50000 4.5 99
NULL Model 3 50000 NULL 42
Venezia Model S 123000 NULL 89
NULL Model S 83000 NULL 185
NULL Model Y 55000 NULL 76
Venezia Model S 123000 NULL 89
PREHOOK: query: SELECT i.name, i.description, SUM(o.quantity) FROM llap_items i JOIN llap_orders o ON i.itemid = o.itemid WHERE region = 'EU' and i.cost >= 50000 GROUP BY i.name, i.description
PREHOOK: type: QUERY
PREHOOK: Input: default@llap_items
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.ql.session.SessionState;


Expand Down Expand Up @@ -199,6 +200,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2)
if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
extraPredicate = s; //i.e. it's the 1st WHEN MATCHED
}
HiveCustomStorageHandlerUtils.setFileScanOperationType(conf, targetTable.getFullTableName().toString(),
Context.Operation.UPDATE);
break;
case HiveParser.TOK_DELETE:
numWhenMatchedDeleteClauses++;
Expand All @@ -208,6 +211,8 @@ WHEN NOT MATCHED THEN INSERT VALUES (source.a2, source.b2)
if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED
}
HiveCustomStorageHandlerUtils.setFileScanOperationType(conf, targetTable.getFullTableName().toString(),
Context.Operation.DELETE);
break;
default:
throw new IllegalStateException("Unexpected WHEN clause type: " + whenClause.getType() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;

/**
* A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
Expand Down Expand Up @@ -62,6 +63,8 @@ protected void analyze(ASTNode tree, Table table, ASTNode tabNameNode) throws Se
switch (tree.getToken().getType()) {
case HiveParser.TOK_DELETE_FROM:
operation = Context.Operation.DELETE;
HiveCustomStorageHandlerUtils.setFileScanOperationType(ctx.getConf(), table.getFullTableName().toString(),
operation);
reparseAndSuperAnalyze(tree, table, tabNameNode);
break;
case HiveParser.TOK_UPDATE_TABLE:
Expand All @@ -70,6 +73,8 @@ protected void analyze(ASTNode tree, Table table, ASTNode tabNameNode) throws Se
throw new SemanticException(ErrorMsg.NON_NATIVE_ACID_UPDATE.getErrorCodedMsg());
}
operation = Context.Operation.UPDATE;
HiveCustomStorageHandlerUtils.setFileScanOperationType(ctx.getConf(), table.getFullTableName().toString(),
operation);
reparseAndSuperAnalyze(tree, table, tabNameNode);
break;
default:
Expand Down
Loading