Skip to content

Add indexes to columns used when upserting for faster loads of big tables#152

Merged
AlexanderMann merged 2 commits intodatamill-co:masterfrom
airhorns:upsert-indexes
Oct 7, 2019
Merged

Add indexes to columns used when upserting for faster loads of big tables#152
AlexanderMann merged 2 commits intodatamill-co:masterfrom
airhorns:upsert-indexes

Conversation

@airhorns
Copy link
Copy Markdown
Contributor

@airhorns airhorns commented Sep 15, 2019

This fixes #123.

target-postgres leverages the database to do a lot of the heavy lifting for deduplication phase of run. The big query that selects the most recent row to insert into the final table joins the existing table and the temporary table using the sequence keys and the primary keys, and to do that efficiently, we can add an index on all those columns which gets used for the join. We add a compound index on all the primary keys of the table and the _sdc_sequence column so that it covers the whole join condition, and we make sure the primary keys are the first keys of the index so the index is more likely to be useful for downstream consumers as well.

The new structure of the test cats table looks like this:

Screen Shot 2019-09-15 at 5 15 50 PM

The DELETE deduplication query looks like this:

DELETE FROM "public"."cats" USING (
    SELECT "dedupped".*
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."id"
                                   ORDER BY "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."id", "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."_sdc_sequence" DESC) AS "pk_ranked"
        FROM "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"
         ORDER BY "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."id", "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."_sdc_sequence" DESC) AS "dedupped"
    JOIN "public"."cats" ON "public"."cats"."id" = "dedupped"."id" AND "dedupped"."_sdc_sequence" >= "public"."cats"."_sdc_sequence"
    WHERE pk_ranked = 1
) AS "pks" WHERE "public"."cats"."id" = "pks"."id";

and before this commit the EXPLAIN looks like this:

"QUERY PLAN"
"Delete on cats  (cost=63.06..78.86 rows=3 width=198)"
"  ->  Hash Join  (cost=63.06..78.86 rows=3 width=198)"
"        Hash Cond: (cats.id = cats_1.id)"
"        ->  Seq Scan on cats  (cost=0.00..14.20 rows=420 width=14)"
"        ->  Hash  (cost=63.05..63.05 rows=1 width=208)"
"              ->  Hash Join  (cost=47.22..63.05 rows=1 width=208)"
"                    Hash Cond: (cats_1.id = dedupped.id)"
"                    Join Filter: (dedupped._sdc_sequence >= cats_1._sdc_sequence)"
"                    ->  Seq Scan on cats cats_1  (cost=0.00..14.20 rows=420 width=22)"
"                    ->  Hash  (cost=47.20..47.20 rows=2 width=202)"
"                          ->  Subquery Scan on dedupped  (cost=32.50..47.20 rows=2 width=202)"
"                                Filter: (dedupped.pk_ranked = 1)"
"                                ->  WindowAgg  (cost=32.50..41.95 rows=420 width=170)"
"                                      ->  Sort  (cost=32.50..33.55 rows=420 width=162)"
"                                            Sort Key: tmp_ba182dd2_f462_4dea_b27f_22d6ec0b77e1.id, tmp_ba182dd2_f462_4dea_b27f_22d6ec0b77e1._sdc_sequence DESC"
"                                            ->  Seq Scan on tmp_ba182dd2_f462_4dea_b27f_22d6ec0b77e1  (cost=0.00..14.20 rows=420 width=162)"

and after this change, it looks like this:

"QUERY PLAN"
"Delete on cats  (cost=6.62..18.31 rows=1 width=118)"
"  ->  Nested Loop  (cost=6.62..18.31 rows=1 width=118)"
"        ->  Nested Loop  (cost=6.47..18.00 rows=1 width=128)"
"              ->  Subquery Scan on dedupped  (cost=6.32..9.82 rows=1 width=122)"
"                    Filter: (dedupped.pk_ranked = 1)"
"                    ->  WindowAgg  (cost=6.32..8.57 rows=100 width=90)"
"                          ->  Sort  (cost=6.32..6.57 rows=100 width=82)"
"                                Sort Key: tmp_1a07aa57_fa76_4062_bf80_48f84851832f.id, tmp_1a07aa57_fa76_4062_bf80_48f84851832f._sdc_sequence DESC"
"                                ->  Seq Scan on tmp_1a07aa57_fa76_4062_bf80_48f84851832f  (cost=0.00..3.00 rows=100 width=82)"
"              ->  Index Scan using tp_cats_id__sdc_sequence_idx on cats cats_1  (cost=0.15..8.17 rows=1 width=22)"
"                    Index Cond: ((id = dedupped.id) AND (dedupped._sdc_sequence >= _sdc_sequence))"
"        ->  Index Scan using tp_cats_id__sdc_sequence_idx on cats  (cost=0.15..0.29 rows=2 width=14)"
"              Index Cond: (id = cats_1.id)"

So, the hash join becomes an index scan! Horray!

Other options would be to add multiple indexes with one for each of the columns necessary, but I think that the compound index is better as it's only one to maintain, and Postgres isn't very good at the bitmap index combination thing. I tried in this setup but Postgres 11 locally still only used one index.

The concern with this change could be that data loads actually get slower, especially for small tables, because PG spends time maintaining an index that doesn't actually make the load much faster, and overall the load time goes up. I think this change should still go in despite that risk as I think operators of target-postgres are most sensitive to the speed of loading big tables, not small ones, as that's what is more likely to break or consume undue resources.

This new behaviour is behind a config flag, but I chose to make it on by default, as I think performance shouldn't be opt-in. I can only think of pretty contrived reasons to opt-out of indexing to make these heavyweight queries go faster like really restricted disk space environments or something like that, but if anyone thinks this should be off by default I am all ears.

@awm33
Copy link
Copy Markdown
Member

awm33 commented Sep 17, 2019

@airhorns This is interesting! A couple questions come to mind:

  • It looks like it creates it for every table? We probably don't want to create indices on full replication / versioned tables that use ACTIVATE_VERSION. Upserts don't occur on these tables.
  • I think this would also create the indices on the temp tables, since we use CREATE TABLE .. LIKE .... Do we want to skip creating indices in this case? Since loads would be slower, but we only load to temp tables. Temp tables are much much smaller than target tables in large table join scenario, I believe postgres could use the index on the large table side when joining to the small temp table.

@AlexanderMann Would love your thoughts and maybe checking of the logic as well.

@awm33
Copy link
Copy Markdown
Member

awm33 commented Sep 17, 2019

Another note would be that a couple other targets use this code downstream (redshift and snowflake), so we would need to make sure the code/interface can performantly ignore indices. Like placing the add index loop inside of an if block that checks for index support.

@AlexanderMann
Copy link
Copy Markdown
Collaborator

CREATE TABLE LIKE ... doesn't include the indexes:

Indexes, PRIMARY KEY, UNIQUE, and EXCLUDE constraints on the original table will be created on the new table only if INCLUDING INDEXES is specified.

So that's at least one concern knocked out.

…bles

This fixes datamill-co#123.

`target-postgres` leverages the database to do a lot of the heavy lifting for deduplication phase of run. The big query that selects the most recent row to insert into the final table joins the existing table and the temporary table using the sequence keys and the primary keys, and to do that efficiently, we can add an index on all those columns which gets used for the join. We add a compound index on all the primary keys of the table and the _sdc_sequence column so that it covers the whole join condition, and we make sure the primary keys are the first keys of the index so the index is more likely to be useful for downstream consumers as well.

The DELETE deduplication query looks like this:

```sql
DELETE FROM "public"."cats" USING (
    SELECT "dedupped".*
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."id"
                                   ORDER BY "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."id", "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."_sdc_sequence" DESC) AS "pk_ranked"
        FROM "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"
         ORDER BY "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."id", "public"."tmp_1a07aa57_fa76_4062_bf80_48f84851832f"."_sdc_sequence" DESC) AS "dedupped"
    JOIN "public"."cats" ON "public"."cats"."id" = "dedupped"."id" AND "dedupped"."_sdc_sequence" >= "public"."cats"."_sdc_sequence"
    WHERE pk_ranked = 1
) AS "pks" WHERE "public"."cats"."id" = "pks"."id";
```

and before this commit the EXPLAIN looks like this:

```
"QUERY PLAN"
"Delete on cats  (cost=63.06..78.86 rows=3 width=198)"
"  ->  Hash Join  (cost=63.06..78.86 rows=3 width=198)"
"        Hash Cond: (cats.id = cats_1.id)"
"        ->  Seq Scan on cats  (cost=0.00..14.20 rows=420 width=14)"
"        ->  Hash  (cost=63.05..63.05 rows=1 width=208)"
"              ->  Hash Join  (cost=47.22..63.05 rows=1 width=208)"
"                    Hash Cond: (cats_1.id = dedupped.id)"
"                    Join Filter: (dedupped._sdc_sequence >= cats_1._sdc_sequence)"
"                    ->  Seq Scan on cats cats_1  (cost=0.00..14.20 rows=420 width=22)"
"                    ->  Hash  (cost=47.20..47.20 rows=2 width=202)"
"                          ->  Subquery Scan on dedupped  (cost=32.50..47.20 rows=2 width=202)"
"                                Filter: (dedupped.pk_ranked = 1)"
"                                ->  WindowAgg  (cost=32.50..41.95 rows=420 width=170)"
"                                      ->  Sort  (cost=32.50..33.55 rows=420 width=162)"
"                                            Sort Key: tmp_ba182dd2_f462_4dea_b27f_22d6ec0b77e1.id, tmp_ba182dd2_f462_4dea_b27f_22d6ec0b77e1._sdc_sequence DESC"
"                                            ->  Seq Scan on tmp_ba182dd2_f462_4dea_b27f_22d6ec0b77e1  (cost=0.00..14.20 rows=420 width=162)"
```

and after this change, it looks like this:

```
"QUERY PLAN"
"Delete on cats  (cost=6.62..18.31 rows=1 width=118)"
"  ->  Nested Loop  (cost=6.62..18.31 rows=1 width=118)"
"        ->  Nested Loop  (cost=6.47..18.00 rows=1 width=128)"
"              ->  Subquery Scan on dedupped  (cost=6.32..9.82 rows=1 width=122)"
"                    Filter: (dedupped.pk_ranked = 1)"
"                    ->  WindowAgg  (cost=6.32..8.57 rows=100 width=90)"
"                          ->  Sort  (cost=6.32..6.57 rows=100 width=82)"
"                                Sort Key: tmp_1a07aa57_fa76_4062_bf80_48f84851832f.id, tmp_1a07aa57_fa76_4062_bf80_48f84851832f._sdc_sequence DESC"
"                                ->  Seq Scan on tmp_1a07aa57_fa76_4062_bf80_48f84851832f  (cost=0.00..3.00 rows=100 width=82)"
"              ->  Index Scan using tp_cats_id__sdc_sequence_idx on cats cats_1  (cost=0.15..8.17 rows=1 width=22)"
"                    Index Cond: ((id = dedupped.id) AND (dedupped._sdc_sequence >= _sdc_sequence))"
"        ->  Index Scan using tp_cats_id__sdc_sequence_idx on cats  (cost=0.15..0.29 rows=2 width=14)"
"              Index Cond: (id = cats_1.id)"
```

So, the hash join becomes an index scan! Horray!

Other options would be to add multiple indexes with one for each of the columns necessary, but I think that the compound index is better as it's only one to maintain, and Postgres isn't very good at the bitmap index combination thing. I tried in this setup but Postgres 11 locally still only used one index.

The concern with this change could be that data loads actually get slower, especially for small tables, because PG spends time maintaining an index that doesn't actually make much faster. I think this change should still go in despite that risk as I think operators of target-postgres are most sensitive to the speed of loading big tables, not small ones, as that's what is more likely to break or consume undue resources.
@airhorns
Copy link
Copy Markdown
Contributor Author

It looks like it creates it for every table? We probably don't want to create indices on full replication / versioned tables that use ACTIVATE_VERSION. Upserts don't occur on these tables.

Forgive my insolence -- I'm not super familiar with ACTIVATE_VERSION and not exactly sure when and how it gets used in the wild. Is this more just convention over anything that streams managed by version numbers are replaced wholesale every time as opposed to ever appended to? I thought that you could mix and match so to speak and ACTIVATE_VERSION could be used to do a "full refresh" of a source stream, but then you could carry on upserting against that in the target after that refresh was complete.

Another note would be that a couple other targets use this code downstream

Indeed, I tried to account for that with https://github.com/datamill-co/target-postgres/pull/152/files#diff-5b906fece7e68cf803bb91e89b80047aR886 where the base target just never creates indexes for anything, and then that's overridden in the PostgresTarget. Does that do what you mean?

CREATE TABLE LIKE ... doesn't include the indexes:

Ah, I didn't know that, and that's handy but totally by accident. I can add a comment if we want?

And, in the original issue, @AlexanderMann mentioned that the way that tables are created makes this change a little bit tricky, and I think that is a potential source of issues. There are pretty distinct code paths where the different tables are all created, but the spot where the indexes are made doesn't exactly know what "kind" of table is in the process of being created, so if there are more classes of table that indexes shouldn't be created for, it's hard to implement.

@AlexanderMann
Copy link
Copy Markdown
Collaborator

And, in the original issue, @AlexanderMann mentioned that the way that tables are created makes this change a little bit tricky

Yurp, that was the case, but I changed things for pure performance sake with regards to creating temp tables. ie, before we used upsert logic from SQLBase (or whatever) and now we're using CREATE TABLE LIKE... so that's 👍


@airhorns I'm still looking this over. My gut reaction is that this is a bit of code smell, because this isn't really being done for the sake of having "indexes" but rather that it's being done as a sort of post processing on tables to make larger changes to the data more efficient.

The tradeoff, as I understand it, is that this should speed things up dramatically over time, but it also uses more space on the remote. I'm hoping to have clearer thoughts on this by end of day...

@airhorns
Copy link
Copy Markdown
Contributor Author

airhorns commented Oct 2, 2019

@airhorns I'm still looking this over. My gut reaction is that this is a bit of code smell, because this isn't really being done for the sake of having "indexes" but rather that it's being done as a sort of post processing on tables to make larger changes to the data more efficient.

What is the difference between less-smelly post-processing-for-performance and smelly indexes in your head?

The tradeoff, as I understand it, is that this should speed things up dramatically over time, but it also uses more space on the remote.

For my deployment, it does indeed speed things up dramatically, but to the point where it actually succeeds instead of times out after hours of upserting, so for me it's kind of mission critical. Our database is somewhat underpowered and PG 12 apparently has some stuff that will make the baseline performance of stuff like this improve, but, indexes are like the #1 weapon in RDBMS land for making things go fast. You can't beat the right data structure. You're right that it's CPU/space tradeoff, but I feel pretty good about making that kind of tradeoff these days.

I also feel good about automating that tradeoff. If you folks end up manually provisioning indexes when data loads get too slow, it seems like the tradeoff is worth it for you at that point, and I would argue that the potentially superfluous indexes created before that point won't take up enough space to be meaningful, so wouldn't it be nice to just not have to care about that manual process and the time spent figuring it out when it breaks?

@airhorns airhorns closed this Oct 2, 2019
@airhorns airhorns reopened this Oct 2, 2019
@airhorns
Copy link
Copy Markdown
Contributor Author

airhorns commented Oct 2, 2019

Whoops accidental close my bad

@AlexanderMann
Copy link
Copy Markdown
Collaborator

Hey @airhorns, sorry for the delay here. "Real Work" has gotten in the way of open source stuffs (gotta make rent somehow 😉)

I'm planning on devoting next Monday to target-* maintenance and will most likely have merges etc. for everything then. There are a number of other housekeeping things which would be good to get in as well.

@AlexanderMann AlexanderMann merged commit 5635473 into datamill-co:master Oct 7, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Large datasets cause extremely slow imports

3 participants