Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-26385: Iceberg integration: Implement merge into iceberg table #3430

Merged
merged 9 commits into from Jul 20, 2022

Conversation

kasakrisz
Copy link
Contributor

What changes were proposed in this pull request?

  1. Extract common parts which collects and appends the sort and delete columns in case of split update and merge.
  2. Merge IcebergWriter data and delete file collections.
  3. Append the number of writers in the job to the operationId to generate a unique file name for each writer.

Why are the changes needed?

  1. Implement merge into iceberg table by reusing split update code parts.
  2. A merge with an insert and an update branch has at least two insert IcebergWriters. If the target table is not partitioned no sorting and exchange is required and both FileSink operators wrapping the IcebergWriters are located in the same Reducer and Job.

Does this PR introduce any user-facing change?

Yes. Merge statements should not throw any exceptions when the target is an iceberg table.

How was this patch tested?

mvn test -Dtest.output.overwrite -DskipSparkTests -Dtest=TestIcebergLlapLocalCliDriver -Dqfile=merge_iceberg_orc.q,merge_iceberg_partitioned_orc.q -pl itests/qtest-iceberg -Piceberg -Pitests

@@ -297,15 +309,16 @@ private boolean handleCardinalityViolation(StringBuilder rewrittenQueryStr, ASTN
//this is a tmp table and thus Session scoped and acid requires SQL statement to be serial in a
// given session, i.e. the name can be fixed across all invocations
String tableName = "merge_tmp_table";
List<String> sortKeys = columnAppender.getSortKeys();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please talk with @lcspinter about the sorted Iceberg tables. How the 2 PR will work together.

Thanks,
Peter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed with @lcspinter that his patch does independent changes.
I also applied both his and this patch to the same branch locally and wrote a test case for updating sorted table. It works only for non vectorized mode.
The reason why it does not work in vectorized mode is not related to this patch. It depends on the setting of iceberg.mr.in.memory.data.model
It is set to HIVE when vectorization is enabled:

job.setEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, InputFormatConfig.InMemoryDataModel.HIVE);

Delete delta files are not read with this setting.

Comment on lines 280 to 282
boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
int columnOffset;
List<String> deleteValues;
if (nonNativeAcid) {
List<FieldSchema> acidSelectColumns = mTable.getStorageHandler().acidSelectColumns(mTable, operation);
deleteValues = new ArrayList<>(acidSelectColumns.size());
for (FieldSchema fieldSchema : acidSelectColumns) {
String identifier = HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf);
rewrittenQueryStr.append(identifier).append(" AS ");
String prefixedIdentifier = HiveUtils.unparseIdentifier(DELETE_PREFIX + fieldSchema.getName(), this.conf);
rewrittenQueryStr.append(prefixedIdentifier);
rewrittenQueryStr.append(",");
deleteValues.add(String.format("%s.%s", SUB_QUERY_ALIAS, prefixedIdentifier));
}

columnOffset = acidSelectColumns.size();
} else {
rewrittenQueryStr.append("ROW__ID,");
deleteValues = new ArrayList<>(1 + mTable.getPartCols().size());
deleteValues.add(SUB_QUERY_ALIAS + ".ROW__ID");
for (FieldSchema fieldSchema : mTable.getPartCols()) {
deleteValues.add(SUB_QUERY_ALIAS + "." + HiveUtils.unparseIdentifier(fieldSchema.getName(), conf));
}
columnOffset = 1;
}
ColumnAppender columnAppender = nonNativeAcid ? new NonNativeAcidColumnAppender(mTable, conf, SUB_QUERY_ALIAS) :
new NativeAcidColumnAppender(mTable, conf, SUB_QUERY_ALIAS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it worth to create an util method for this?

pubic static ColumnAppender AcidUtils.getAppender(mTable);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer keeping this in RewriteSemanticAnalyzer because these classes are used for generating query text when rewriting updates and merges.
Added a factory method:

protected ColumnAppender getColumnAppender(String subQueryAlias)

@@ -2519,23 +2519,23 @@ STAGE PLANS:
Map 7
Map Operator Tree:
TableScan
alias: t
alias: tmerge
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How sure we are of these changes?
My first guess that this is an independent change/fix where we fix using the alias name, but I might be wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past the statement

explain MERGE INTO tmerge as t using nonacid as s ON t.key = s.key
WHEN MATCHED AND s.key < 5 THEN DELETE
WHEN MATCHED AND s.key < 3 THEN UPDATE set a1 = '1'
WHEN NOT MATCHED THEN INSERT VALUES (s.key, s.a1, s.value)

was rewritten to

FROM
`tmerge` `t`
  RIGHT OUTER JOIN
  `default`.`nonacid` `s`
  ON `t`.`key` = `s`.`key`
<insert branches>

In this patch I introduced a subquery because some of the target table columns needed twice in case of iceberg target and it is generated in the same way in case of updates.

FROM
(SELECT ROW__ID, `key`, `a1`, `value` FROM `default`.`tmerge`) `t`
  RIGHT OUTER JOIN
  `default`.`nonacid` `s`
  ON `t`.`key` = `s`.`key`
<insert branches>

In both the new and the old plan TS you highlighted scans the same table: tmerge
But in the old plane tmerge and t refers to the same object. In the new plan the table does not have an alias so it is referenced by its name. The alias t refers the subquery.

String operationId = queryId + "-" + attemptID.getJobID();
Map<String, List<HiveIcebergWriter>> writers = WriterRegistry.writers(attemptID);
int writerCount = writers == null ? 0 : writers.size();
String operationId = queryId + "-" + attemptID.getJobID() + "-" + writerCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding a random UUID adding to this, or using an AtomicInteger?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added AtomicInteger instead of getting the number of writers.

@kasakrisz kasakrisz merged commit 8fdf802 into apache:master Jul 20, 2022
@kasakrisz kasakrisz deleted the HIVE-26385-master-iceberg-merge branch July 20, 2022 12:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants