Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Into Performance to Iceberg Table #3607

Closed
codejoyan opened this issue Nov 25, 2021 · 9 comments
Closed

Merge Into Performance to Iceberg Table #3607

codejoyan opened this issue Nov 25, 2021 · 9 comments

Comments

@codejoyan
Copy link

Hi Team,

Can you suggest me some way to tune a slow running MERGE query. It is taking ~ 20 mins to upsert 1.5 million records.
Sample Merge query:

df.createOrReplaceTempView("source")
df.cache()

MERGE INTO iceberg_hive_cat.iceberg_poc_db.iceberg_tab target
USING (SELECT * FROM source)
ON target.col1 = source.col1 AND target.col2 = target.col2 AND target.col3 = source.col3
WHEN MATCHED AND part_date_col between '2021-01-01' and '2021-01-16' THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

The source dataset is a temporary view and it contains 1.5 million records and contains data between '2021-01-01' and '2021-01-16'.
The target iceberg table is a partitioned table partitioned by day and has 60 partitions. The source dataset will only upsert few trailing partitions of the target table. But from Spark UI it looks like it is touching all the partitions instead of looking just for partitions between '2021-01-01' and '2021-01-16'.

  1. Can partition pruning happen in MERGE?
  2. Is there a way to tune and improve the performance
  3. Is there any Java or Spark UI to achieve Merge instead of the SQL syntax?

Let me know if you need any further details.

Regards
Joyan

@RussellSpitzer
Copy link
Member

The match expressions are not pushed down and are only applied to the result of the "ON" join clause. This means any pushdowns you want need to occur in the "ON" clause. So in your case you can just add "part_date_col" predicate into the "on" restriction and it should push down as you expect.

@elferherrera
Copy link

@RussellSpitzer is it possible for you to elaborate a bit more what you mean with pushdown?

Also, based on your suggestion, the merge query should look like this?

MERGE INTO iceberg_hive_cat.iceberg_poc_db.iceberg_tab target
USING (SELECT * FROM source)
ON target.col1 = source.col1 AND target.col2 = target.col2 AND target.col3 = source.col3 AND part_date_col between '2021-01-01' and '2021-01-16'

Btw, how does this work with hidden partitions?

@RussellSpitzer
Copy link
Member

MERGE INTO iceberg_hive_cat.iceberg_poc_db.iceberg_tab target
USING source // no need to select here I believe
ON target.col1 = source.col1 AND target.col2 = target.col2 AND target.col3 = source.col3 AND part_date_col between '2021-01-01' and '2021-01-16'

It works with hidden partitions as well.

The MERGE INTO is performed by first doing a an join of Target vs Source where (ON CLAUSE)
In the join it can then push the contents of on clause down to both target and source if possible. The join type is dependent on the types of "matched" clauses. For example a "not matched" requires an outer join, a "matched" clause only requires an inner join.

Matched clauses are then applied to specific rows in the result of this join. Because they are not applied universally the predicates inside the match clause cannot be pushed down to "source" or "target"This is why any pushdown clauses muse be in the "ON".

This works in on hidden partitioning as well, just as it would in a normal query. If the predicate is on a column that has been partitioned we transform the predicate into the value that was used in partitioning. Certain predicates cannot be transformed though and require a full scan.

For example if you say purchase_ts = timestampOf(2021-01-01) and you have actually partitioning on day(purchase_ts) it transforms the predicate into day(purchase_ts) = day(timestampOf(2021-01-01).

But if the partitioning was bucket(userId) and your predicate was userId > 50 there is no way to transform the >50 because bucket uses a hashing. In this case you would want to query on userID in (50,51,52 ....) since we can transform equality predicates with the bucket function.

@codejoyan
Copy link
Author

Thanks @RussellSpitzer this helped. Will close this ticket.

@smaspe
Copy link

smaspe commented Jan 28, 2022

Hi @RussellSpitzer , sorry to jump on this thread, but I have a question regarding your last message, if you could elaborate, as I think I'm missing a step:

For example if you say purchase_ts = timestampOf(2021-01-01) and you have actually partitioning on day(purchase_ts) it transforms the predicate into day(purchase_ts) = day(timestampOf(2021-01-01).

Surely, purchase_ts being a timestamp, has all sorts of values that are not a round day, so to have a predicate transformed into day(purchase_ts) = day(timestampOf(2021-01-01), how would you write it?

  • on target.purchase_ts = timestampOf(2021-01-01) doesn't seem right, because if purchase_ts = 2021-01-01 14:10:00.001, the meaning is actually different
  • on target.purchase_ts = timestamp '2021-01-01 14:10:00.001', if you know the exact value of the partition key in the target row
  • on target.purchase_ts >= timestamp '2021-01-01' and target.purchase_ts < timestamp '2021-01-02'
  • I don't think on day(target.purchase_ts) = '2021-01-01' or similar works, as far as I've tried?

Thanks!

@RussellSpitzer
Copy link
Member

Hi @RussellSpitzer , sorry to jump on this thread, but I have a question regarding your last message, if you could elaborate, as I think I'm missing a step:

For example if you say purchase_ts = timestampOf(2021-01-01) and you have actually partitioning on day(purchase_ts) it transforms the predicate into day(purchase_ts) = day(timestampOf(2021-01-01).

Surely, purchase_ts being a timestamp, has all sorts of values that are not a round day, so to have a predicate transformed into day(purchase_ts) = day(timestampOf(2021-01-01), how would you write it?

  • on target.purchase_ts = timestampOf(2021-01-01) doesn't seem right, because if purchase_ts = 2021-01-01 14:10:00.001, the meaning is actually different
  • on target.purchase_ts = timestamp '2021-01-01 14:10:00.001', if you know the exact value of the partition key in the target row
  • on target.purchase_ts >= timestamp '2021-01-01' and target.purchase_ts < timestamp '2021-01-02'
  • I don't think on day(target.purchase_ts) = '2021-01-01' or similar works, as far as I've tried?

Thanks!

I think I may have misled you by oversimplifying. The user here still only writes queries using their exact restrictions, Iceberg then uses this restriction to create restrictions which match the partitioning. For example, Iceberg knows a specific timestamp can only occur in a certain day and it can use that information to limit the files read. Iceberg doesn't disregard the original predicate, that stays with the execution engine for actually evaluating rows but Iceberg can still use this timestamp for partition pruning and file evaluation.

For example say you are looking for
ts = 3PM on Aug 12

First thing we do is look at our manifest_list file, see docs in the spec
Each entry there will have a partitions field summary column and a partition_spec_id to let us know how to use that data.

We load up the partition spec for the given spec ID and transform the original predicate into one that matches that spec. If our spec contains a day(ts) transform we take the original predicate and transform it using the day transform . So for evaluating this line the original predicate becomes day(ts) = projectDay(3pm Aug 12) = Aug 12.

With this new transform we evaluate all the partitions listed in this file. These values only contain day(ts) since that is the only thing kept by the spec. If any pass we know the particular manifest file may have valid datafiles to be scanned.

Once we have a list of all the possible manifest files that may have hits we play this game again. Now we check against ManifestEntries. Every entry contains a partition value and then details about the datafile (spec_id is inherited from the entry in manifest_list).

Here we can do two steps of evaluation for each individual data file. First we can use the transformed predicate (day(ts) = Aug 12) to check if the partition value is a match, if so we then move to evaluating the individual metrics of the file. Here we would use the original predicate and would check whether the 3PM on Aug 12 is a possible value for the timestamp column of each file based on the min and max values for that column. If datafile passes both of these checks we keep it for the scan.

The scan then contains of all data files which we know may have our given row, this is transformed into a set of tasks for whatever execution engine is in use and evaluated. The execution engine then will use its own logic to filter individual rows with the original predicates (YMMV based on engine specific implementations).

So what happens if we cannot transform a predicate into the partition spec? Or what if a data file was inserted into the table when it was unpartitioned? In both of these cases we default to "this file may contain the row we are looking for" and return it to the engine. For example, suppose you have a predicate age > 10 and a partition spec of bucket(age,128). There is no way to project a greater than predicate into a valid bucket predicate so we simply have to say all partitions may match.

My big TLDR here is:

As a user you query on normal columns, Iceberg attempts to transform your predicates into ones that match the partitioning of the files within the table to prune out files. When Iceberg cannot transform the predicates it simply assumes there may be a match and returns those files to the execution engine which does the actual row level filtering.

@Neuw84
Copy link
Contributor

Neuw84 commented Jun 20, 2024

Hi,

Sorry to jump in but this thread is gold for understanding how partition filter works using Merge INTO

I have one question regarding to bucketed columns. It is clear in this thread that we should use userID in (50,51,52 ....), but what happens if I have like 100.000 userIDs ( in a table with millions)?

We can't inject just the bucketing info into the ON clause with the already transformed "column"? For example, in this table we could inject if possible 8 values and not thousands ( I suppose that the SQL query has some size limits in Spark and we may hit those).

    CREATE TABLE IF NOT EXISTS employee
                (employee_id bigint,
                age int,
                start_date timestamp,
                team string,
                role string,
                address string,
                name string
                )
                PARTITIONED BY (bucket(8, employee_id), hours(start_date), team)

I know that I could built ´n´ MERGE INTO queries ( and maybe that´s good for performance but just want to know if there is the possibility to do this just specifying directly the transformed partition.
Thanks!

@Alsaxian
Copy link

Alsaxian commented Jul 16, 2024

Hi @RussellSpitzer , does it mean that in the following case

MERGE INTO iceberg_hive_cat.iceberg_poc_db.iceberg_tab target
USING (SELECT * FROM source)
ON target.col1 = source.col1 AND target.purchase_date = source.purchase_date
WHEN NOT MATCHED THEN INSERT *

where purchase_date is the partition key of the target table, the pushdown will happen as well?

Can I expect an almost same performance as for INSERT INTO, when the source table doesn't share the same partitions (here purchase_dates) at all?

@RussellSpitzer
Copy link
Member

In the above example you would only be able to get dynamic pushdown so it depends on the partitioning. Column = Column means there is no static pushdown, only runtime filtering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants