Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for parallel pandas scans (fixes #1220) #1226

Merged
merged 7 commits into from Dec 14, 2020

Conversation

Mytherin
Copy link
Collaborator

Pandas scans can happen in parallel in threads that do not hold the GIL when background threads are enabled (i.e. PRAGMA threads=X where X > 1). This PR modifies the pandas scans to make sure that we are not constructing Python objects or doing anything that can invoke race conditions (e.g. adding new references to objects). This PR should also significantly speed up pandas scans of strings, "nullable" integers (e.g. Int8, Int16) and timestamps with timezones, as we are now directly scanning the underlying NumPy arrays rather than doing Python object construction.

@Mytherin
Copy link
Collaborator Author

Some benchmarks from Pandas DF reading (old vs new):

benchmark old new
100000000 unique ASCII strings 4.45s 0.85s
100000000 integers (numpy.int32) 0.15s 0.02s
100000000 integers (Int32) 8.77s 0.06s

Benchmark script:

import duckdb
import pandas as pd
import time
import numpy

con = duckdb.connect()
c = con.cursor()

str_df = c.execute('select i::varchar i from range(100000000) tbl(i)').fetchdf()

start = time.time()
c.register('str_df', str_df)
results = c.execute('select count(i) from str_df').fetchall()
end = time.time()
print(results)
print("Pandas string column scan: ", end - start)

int_df = pd.DataFrame({'i': numpy.arange(100000000, dtype=numpy.int32) })

start = time.time()
c.register('int_df', int_df)
results = c.execute('select count(i) from int_df').fetchall()
end = time.time()
print(results)
print("Pandas integer column scan (int32): ", end - start)

int_obj_df = pd.DataFrame({'i': pd.Series(numpy.arange(100000000), dtype='Int32') })

start = time.time()
c.register('int_obj_df', int_obj_df)
results = c.execute('select count(i) from int_obj_df').fetchall()
end = time.time()
print(results)
print("Pandas integer column scan (Int32): ", end - start)

@Alex-Monahan
Copy link
Contributor

Alex-Monahan commented Dec 14, 2020 via email

@hannes
Copy link
Member

hannes commented Dec 14, 2020

CC @tdoehmen

@Mytherin
Copy link
Collaborator Author

Mytherin commented Dec 14, 2020

Since unicode is kind of a mess on Python 2, and fixing this issue forces us to deal with unicode ourselves (rather than off-loading it to pybind), I have disabled unicode support in Pandas scans for Python 2 as well. Python 2 is not used much anymore anyway, and we might disable support for it soon entirely.

@Mytherin Mytherin merged commit 5eff218 into duckdb:master Dec 14, 2020
@tdoehmen
Copy link
Member

Great! Looking forward to try it out. I'll report some numbers soon

@Mytherin
Copy link
Collaborator Author

Note this is only for Pandas -> DuckDB, not for DuckDB -> Pandas. More performance improvement need to happen there. We also still need to enable parallel scans on Pandas DataFrames (as in, partitioning the DataFrame into separate chunks, the parallel scans mentioned here are only for when the same DataFrame is used twice in one query), and parallel Pandas DataFrame construction.

@Alex-Monahan
Copy link
Contributor

Just a small question for my own understanding: If we have two (or more) different dataframes that we have registered and are querying both in the same SQL statement, will those scans be in parallel? (Ex: scanning DF1 and DF2 at the same time)

@Mytherin
Copy link
Collaborator Author

So in DuckDB there are two types of parallelism within a single query: inter-pipeline and intra-pipeline parallelism. Pipelines are segments of a query tree that can be run independently. For example, if we look at the query that you supplied in the issue:

explain select *
from main_table
left join left_join_table t1
	on main_table.join_column = t1.join_column
left join left_join_table t2
	on main_table.join_column = t2.join_column;
┌─────────────────────────────┐
│┌───────────────────────────┐│
││       Physical Plan       ││
│└───────────────────────────┘│
└─────────────────────────────┘
┌───────────────────────────┐                                                          
│         HASH_JOIN         │                                                          
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │                                                          
│            LEFT           ├───────────────────────────────────────────┐              
│  join_column=join_column  │                                           │              
└─────────────┬─────────────┘                                           │                                           
┌─────────────┴─────────────┐                             ┌─────────────┴─────────────┐
│         HASH_JOIN         │                             │          SEQ_SCAN         │
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │                             │   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│            LEFT           │                             │      left_join_table      │
│  join_column=join_column  ├──────────────┐              │   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │
│                           │              │              │        join_column        │
│                           │              │              │        other_column       │
└─────────────┬─────────────┘              │              └───────────────────────────┘                             
┌─────────────┴─────────────┐┌─────────────┴─────────────┐                             
│          SEQ_SCAN         ││          SEQ_SCAN         │                             
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │                             
│         main_table        ││      left_join_table      │                             
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ││   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │                             
│        join_column        ││        join_column        │                             
│                           ││        other_column       │                             
└───────────────────────────┘└───────────────────────────┘                                                          

There are three pipelines. First, the two hash table builds for the join(hash tables are always built on the right side). In this case we have SEQ_SCAN(left_join_table) -> HASH_JOIN (build) and SEQ_SCAN(left_join_table) -> HASH_JOIN (build) (the bottom and the top one). Then we have the final pipeline, that goes from SEQ_SCAN(main_table) -> HASH_JOIN (probe) -> HASH_JOIN (probe) -> Result. The final pipeline is dependent on the first two pipelines (the two hash table builds). However, the two hash table builds are separate, and can thus be executed independently. This is what triggered the bug: the left_join_table was scanned in parallel in the two separate pipelines. This is what we call inter-pipeline parallelism. This can happen regardless of which scans you are using as long as the pipelines are independent, but this depends on the complexity of the query. Simple queries do not have many pipelines (e.g. SELECT SUM(i) FROM table has only one pipeline).

Intra-pipeline parallelism is when the data source (i.e. the scan over the pandas dataframe, in this case) is partitioned and the pipeline itself is run in parallel. This requires the scan operator to "know" about the partitioning, i.e. we need to turn the scan into a parallel scan. This is not very complicated, but just hasn't happened yet for Pandas DataFrames. Currently this is only enabled for base table scans and for parquet scans.

Here are some slides if you want to read more:

duckdb-parallelism.pdf

@tdoehmen
Copy link
Member

tdoehmen commented Dec 29, 2020

Awesome, great improvement! On our test case (1 scan + 5 scan & group-bys on ~3.8M rows), the pandas scan is now quite competitive with regular table scans. For comparison:

data type table scan pandas scan \ (v.0.2.4.dev256) table scan, 8 threads pandas scan, 8 threads \ (v.0.2.4.dev256) pandas scan (v.0.2.3)
numeric 0.55s 0.76s 0.22s 0.28s 15.3s
string 2.23s 3.66s 1.15s 1.26s 25.4s

@Mytherin Mytherin deleted the pandasscanfix branch February 18, 2021 14:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants