Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize postgres performance #531

Merged
merged 18 commits into from
Jul 18, 2022
Merged

Optimize postgres performance #531

merged 18 commits into from
Jul 18, 2022

Conversation

dimberman
Copy link
Collaborator

@dimberman dimberman commented Jul 12, 2022

Description

What is the current behavior?

Currently the postgres load numbers are far too slow for any production use-case. We need to find optimizations to bring the 10GB loading time to under 5 minutes for customers to reasonably use this feature.

Previous numbers:

database dataset total_time memory_rss cpu_time_user cpu_time_system
postgres_conn_benchmark few_kb 494.42ms 36.06MB 570.0ms 50.0ms
postgres_conn_benchmark ten_kb 689.05ms 42.96MB 540.0ms 40.0ms
postgres_conn_benchmark hundred_kb 580.7ms 36.43MB 570.0ms 50.0ms
postgres_conn_benchmark ten_mb 44.67s 1.38GB 31.03s 4.03s
postgres_conn_benchmark one_gb 5.56min 62.5MB 14.07s 1.14s
postgres_conn_benchmark five_gb 24.44min 78.15MB 1.34min 5.73s
postgres_conn_benchmark ten_gb 45.64min 61.71MB 2.37min 11.48s

closes: #428

What is the new behavior?

In this PR we found two primary optimizations that have led to significant speed up

The first optimization is to not directly place a smart_open into the read function of pandas. There have been multiple reports of pandas and smart_open working very slow when used together. Instead, we read the smart_open file into an IO buffer (either StringIO or BytesIO). This also has the benefit that we do not need to write to disc if there is enough memory to hold the object.

The second optimization is postgres specific. The pandas.to_sql function uses the HIGHLY suboptimal INSERT statement for postgres. This is 10X slower than the COPY command. By saving objects as CSV buffers, we can use COPY to load the data into postgres as a stream instead of one value at a time.

Here are the new numbers with the optimizations:

database dataset total_time memory_rss cpu_time_user cpu_time_system
postgres ten_kb 432.93ms 43.19MB 540.0ms 60.0ms
postgres hundred_kb 417.2ms 31.34MB 560.0ms 40.0ms
postgres ten_mb 2.21s 79.91MB 1.76s 170.0ms
postgres one_gb 13.9s 35.43MB 7.97s 5.76s
postgres five_gb 50.23s 43.27MB 26.64s 18.7s
postgres ten_gb 3.11min 43.29MB 1.36min 1.48min

Does this introduce a breaking change?

No

Checklist

  • Created tests which fail without the change (if possible)
  • Extended the README / documentation, if necessary

@@ -63,7 +63,7 @@ def create_dag(database_name, table_args, dataset):
output_table=table,
chunk_size=chunk_size,
)
aql.truncate(my_table)
aql.cleanup([my_table])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have we introduced this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana two reasons

  1. there are issues in truncate that I'm gonna make tickets for today
  2. Truncate empties but doesn't delete the table. We want to delete the table here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimberman Thank you! :)

I see, since we are not naming these tables, that will work.

Moving forward, we may have to add an aql.drop since cleanup does not drop tables which are not temporary. What do you think? Should we log a ticket for this? Another possibility would be to have a parameter in cleanup also to delete non-temporary tables..!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: are there more bugs besides #515?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana that's the bug I was referring to though I don't think it's limited to just load_file.

It's a pretty simple fix too. Just need to set template_fields = ("table",) in the TruncateOperator class. I can do that this afternoon or tomorrow if no one else gets around to it.

],
indirect=True,
ids=["snowflake", "bigquery", "postgresql"],
ids=["snowflake", "bigquery"],
)
def test_load_file_chunks(sample_dag, database_table_fixture):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we still support chunks in Postgres?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana AFAICT it seems that since we're now using StringIO that the buffering is handled for us, I can investigate further though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please, @dimberman , I'd love to understand this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana if you look here you'll see that we no longer use to_sql, so there actually isn't anywhere to put a chunksize! Instead we're just using an IOBuffer which is far more performant and simply pulls raw data as needed.

@tatiana
Copy link
Collaborator

tatiana commented Jul 13, 2022

Amazing results, @dimberman , some minor comments!

One of the cases we're covering in the Snowflake optimization is if the user gives the Table columns - should we also cover this in the Postgres optimization? It could be in a separate PR.

Comment on lines +146 to +148
with patch(type_method_map_fixture[FileType(filetype)]) as mocked_read, patch(
"astro.files.base.smart_open.open", mock_open(read_data=data)
) as mocked_smart_open:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of declaring the patches within the test instead of using the decorator? It would be great if we were consistent with other tests.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana from what I can tell mock_open must be used as part of the context statement. When I tried adding mock_open to the decorator it didn't work. I can try again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tatiana is that ok? :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @dimberman if the value of read_data didn't change, we could do something like this:
@patch( "astro.files.base.smart_open.open", new_callable=mock_open, read_data="data")
But since it changes, I believe it would add much complexity to have this fixture dependent on the filetype! So it's okay, thank you!

@dimberman
Copy link
Collaborator Author

@tatiana can you point me to where this work is being done (re: allowing users to define their own schema)?

@codecov
Copy link

codecov bot commented Jul 13, 2022

Codecov Report

Merging #531 (56ac86a) into main (b77e22e) will increase coverage by 0.07%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main     #531      +/-   ##
==========================================
+ Coverage   92.35%   92.43%   +0.07%     
==========================================
  Files          40       40              
  Lines        1557     1573      +16     
  Branches      203      204       +1     
==========================================
+ Hits         1438     1454      +16     
- Misses         92       93       +1     
+ Partials       27       26       -1     
Impacted Files Coverage Δ
src/astro/sql/operators/cleanup.py 97.67% <ø> (ø)
src/astro/databases/base.py 95.80% <100.00%> (+0.05%) ⬆️
src/astro/databases/postgres.py 72.85% <100.00%> (+3.50%) ⬆️
src/astro/files/base.py 94.23% <100.00%> (+0.75%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 26c4001...56ac86a. Read the comment docs.

@@ -46,7 +46,7 @@ def create_dag(database_name, table_args, dataset):
dag_name = f"load_file_{dataset_name}_into_{database_name}"

with DAG(dag_name, schedule_interval=None, start_date=START_DATE) as dag:
chunk_size = int(os.getenv("ASTRO_CHUNK_SIZE", DEFAULT_CHUNK_SIZE))
chunk_size = int(os.getenv("ASTRO_CHUNK_SIZE", str(DEFAULT_CHUNK_SIZE)))
Copy link
Collaborator

@utkarsharma2 utkarsharma2 Jul 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dimberman We should use airflow's config manager here.

Suggested change
chunk_size = int(os.getenv("ASTRO_CHUNK_SIZE", str(DEFAULT_CHUNK_SIZE)))
chunk_size = conf.get("astro_sdk", "chunk_size", str(DEFAULT_CHUNK_SIZE))

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@utkarsharma2 this isn't really an astro config. It's just an env variable we set for the benchmarking.

@tatiana
Copy link
Collaborator

tatiana commented Jul 18, 2022

@dimberman Sorry for the delay!

@tatiana can you point me to where this work is being done (re: allowing users to define their schema)?
I recently added it to the base class, so it should work for Postgres as well:
https://github.com/astronomer/astro-sdk/blob/main/src/astro/databases/base.py#L279-L282
https://github.com/astronomer/astro-sdk/blob/main/src/astro/databases/base.py#L204
https://github.com/astronomer/astro-sdk/blob/main/src/astro/databases/base.py#L167

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support loading datasets of 10GB in Postgres in less than 5 min
3 participants