Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: scan_parquet on folders #773

Closed
alanpaulkwan opened this issue Jul 21, 2020 · 35 comments · Fixed by #936
Closed

feature request: scan_parquet on folders #773

alanpaulkwan opened this issue Jul 21, 2020 · 35 comments · Fixed by #936

Comments

@alanpaulkwan
Copy link

alanpaulkwan commented Jul 21, 2020

Loving the package and am going to teach it in my data science classes. Question, do you know if scan_parquet can be used to check folders? e.g. suppose I sharded data.pq into data.pq/shard1,data.pq/shard2....

@hannes
Copy link
Member

hannes commented Jul 21, 2020

Which API are you using? Python/R/...?

@alanpaulkwan
Copy link
Author

alanpaulkwan commented Jul 21, 2020 via email

@hannes
Copy link
Member

hannes commented Jul 21, 2020

We will eventually support this, right now you can work around this using glob and UNION ALL like so:

dbGetQuery(con, paste0("SELECT * FROM parquet_scan('", Sys.glob("test/*.pq"), "')", collapse=" UNION ALL "))

I have a folder test with three parquet files which end in .pq here, and then this code produces

SELECT * FROM parquet_scan('test/1.pq') UNION ALL SELECT * FROM parquet_scan('test/2.pq') UNION ALL SELECT * FROM parquet_scan('test/3.pq')

@alanpaulkwan
Copy link
Author

How does this globbing method treat memory? I will just run some tests but the query i need to run is probably something like

select a,b,count(distinct c) from (select ...union all .. select ... union all) where a between 1 and N and b >= 500 group by a,b

The interpreter may not know how to deal with knowing what the outer query needs, thus I'd have to impose on the inner query.

@hannes
Copy link
Member

hannes commented Aug 19, 2020

It should be pushed down check the explain output

@alanpaulkwan
Copy link
Author

Thanks. Your suggestion worked. The explain is quite complicated but I think the pushdown is happening.

Not sure I'm contributing any new insight, but I'm definitely observing a performance hit.

I have a stock-date file (96 million rows), I sharded it partioning by year, then ordering by date within the file to maximize the chance of a performance boost from leveraging block min/max values. Snappy compressed.

select count() from orig_view group by date takes 7.38 seconds
select count() from sharded_view group by date 38.658 seconds

FWIW - hope it's helpful.

@mskyttner
Copy link

I think I'm running into this too, when using sparklyr to write to a parquet file (resulting in a folder with snappy compressed part-#.parquet files) and then attempting to use it like so:

#remotes::install_github("cwida/duckdb/tools/rpkg",  build = FALSE) 
library(DBI)
library(duckdb)

con <- dbConnect(duckdb::duckdb(), dbdir = ":memory:", read_only = FALSE)
sql <- "CREATE VIEW mf AS SELECT * FROM parquet_scan('appdir/datasets/masterfile.parquet');"
dbExecute(con, sql)

Which gives:

Error in dbSendQuery(conn, statement, ...) : 
  duckdb_prepare_R: Failed to prepare query CREATE VIEW mf AS SELECT * FROM parquet_scan('appdir/datasets/mf.parquet');
Error: IO: Could not read from file "appdir/datasets/mf.parquet": Is a directory

There seems to be a similar feature requested for the SQLite sqlite_parquet_vtable at cldellow/sqlite-parquet-vtable#24. And similar to the issue w the parquet foreign data wrapper for Postgres at adjust/parquet_fdw#15 (comment).

The suggested workaround w "globbing" for parquet data located in a folder is not super heavy and seems to work when I try this:

sql_union <- paste0("SELECT * FROM parquet_scan('", 
  Sys.glob("appdir/datasets/masterfile.parquet/*.parquet"), "')", 
  collapse=" UNION ALL ")
sql <- sprintf("CREATE VIEW mf AS %s", sql_union)
dbExecute(con, sql)

library(dplyr)
dbGetQuery(con, "select * from mf;") %>% as_tibble()
dbDisconnect(con, shutdown = TRUE)

But I have most parquet files in this partitioned format (a folder with compressed files in it) and would like to up-vote this feature.

I'll end the quacking with a big thanks for the duckdb!

@Mytherin
Copy link
Collaborator

We have started working on this by supporting globs directly in the parquet_scan and read_csv functions in the globparquetreader branch.

@hannes hannes linked a pull request Sep 21, 2020 that will close this issue
@alanpaulkwan
Copy link
Author

Is this available now? I tried downloading the latest PR via the "bleeding edge" release, wasn't working.

@hannes
Copy link
Member

hannes commented Sep 22, 2020

What do you mean by "not working"? What did you try and what did not work?

@alanpaulkwan
Copy link
Author

I tried install.packages against the bleeding edge version. Not sure if I hit the right syntax (though note it doesn't matter if I use " or '). Thanks!

dbGetQuery(con,"select count() from  glob('*parquet')")**

Error in dbSendQuery(conn, statement, ...) :
duckdb_prepare_R: Failed to prepare query select count() from
glob('*parquet')
Error: Catalog: Table Function with name glob does not exist!

dbGetQuery(con,"select count() from    parquet_scan('*parquet')")

Error in dbSendQuery(conn, statement, ...) :
duckdb_prepare_R: Failed to prepare query select count() from
parquet_scan('*parquet')

@Mytherin
Copy link
Collaborator

Should be there now, but testing it I realized there's still a bug where it only works with relative paths. Will push a fix.

@Mytherin
Copy link
Collaborator

I tried install.packages against the bleeding edge version. Not sure if I hit the right syntax (though note it doesn't matter if I use " or '). Thanks!

dbGetQuery(con,"select count() from  glob('*parquet')")**

Error in dbSendQuery(conn, statement, ...) :
duckdb_prepare_R: Failed to prepare query select count() from
glob('*parquet')
Error: Catalog: Table Function with name glob does not exist!

dbGetQuery(con,"select count() from    parquet_scan('*parquet')")

Error in dbSendQuery(conn, statement, ...) :
duckdb_prepare_R: Failed to prepare query select count() from
parquet_scan('*parquet')

This looks to me like you're still on an older version. Can you check the output of PRAGMA version and post it here?

@alanpaulkwan
Copy link
Author

alanpaulkwan commented Sep 22, 2020

How very strange. I wanted to be absolutely sure so I reinstalled, it works now.

Not sure why typing installed.packages earlier did not work, perhaps because I tried it almost immediately after I got the notification Hannes closed this issue :) maybe I jumped the gun and pulled an old repository before Github pushed it down.

In any case I will test with larger files, but it seems to work out well now. Thanks!

system.time(dbGetQuery(con,"select count(),avg(ret),count(distinct bid) from
parquet_scan('*parquet') as a
group by permno,substring(date,1,4)"))
user system elapsed
1.491 0.404 1.895
system.time(dbGetQuery(con,"select count(),avg(ret),count(distinct bid) from
parquet_scan('/home/alan/msf.parquet') as a
group by permno,substring(date,1,4)"))
user system elapsed
1.474 0.401 1.874

@Mytherin
Copy link
Collaborator

How very strange. I wanted to be absolutely sure so I reinstalled, it works now.

Not sure why typing installed.packages earlier did not work, perhaps because I tried it almost immediately after I got the notification Hannes closed this issue :) maybe I jumped the gun and pulled an old repository before Github pushed it down.

In any case I will test with larger files, but it seems to work out well now. Thanks!

system.time(dbGetQuery(con,"select count(),avg(ret),count(distinct bid) from
parquet_scan('*parquet') as a
group by permno,substring(date,1,4)"))
user system elapsed
1.491 0.404 1.895
system.time(dbGetQuery(con,"select count(),avg(ret),count(distinct bid) from
parquet_scan('/home/alan/msf.parquet') as a
group by permno,substring(date,1,4)"))
user system elapsed
1.474 0.401 1.874

Great! I think that is indeed the issue. The closing happens automatically upon merging a PR that references an issue. However, the merge only initiates the build of the new build artifacts, it takes some time for them to actually finish :)

@alanpaulkwan
Copy link
Author

alanpaulkwan commented Sep 23, 2020

also, i really think the implementation was thoughtfully done. rather than read the folder (i guess some people may find that annoying still) i do think the glob implementation is great in that it allows implicit indexing, which saves a ton of time.

will you guys support partitioned WRITES? i understand there are workarounds to that already, just wondering

@hannes
Copy link
Member

hannes commented Sep 23, 2020

There is no support for partitioned writes yet, but as you said you can script around that.

@mskyttner
Copy link

Here is another confirmation that this works great when installing the R package "duckdb" from the master branch now, at least for me.

I like the globbing approach and I'm not too annoyed about not being able to use a folder, since I can just minimally append "/*" (if I have only .parquet "shards", which often is not the case since I also have files like _SUCCESS probably generated by sparklyr parquet export or Apache Spark) or more often something like "/*.parquet" to my folder name.

The error message if I just use the folder name or the folder name with a slash appended is Error: IO: No files found that match the pattern "appdir/datasets/masterfile.parquet" which provides enough of a hint to start digging for how to do the globbing, I guess. Even if it perhaps can be debated whether a folder is also a file ;)

@alanpaulkwan what do you mean by "implicit indexing"? The duckdb docs says indexing happens on columns with primary key or unique constraints implicitly/automatically?

I tried the parquet scan with an explicit set of columns, see below for a halway reprex, and it works, but it appears that the column names got mangled into lower case, which did not happen with a "select * from parquet_scan(...)" query. Is this by design or should I open an issue separately for that?

I also was wondering a little about how to query the duckdb version (PRAGMA version; returns 1, see below, perhaps this is correct although more of an integer than a semantic version nr?) and I was wondering how to query the default setting for threads. Didn't find it yet in the docs. My use case for those things is that the library version nr could be useful in reprexes/tickets and that I'd like to make maximum use of threads on my laptop (since it probably improves performance?).

#remotes::install_github("cwida/duckdb/tools/rpkg",  build = FALSE) 
library(DBI)
library(duckdb)
library(dplyr)

con <- dbConnect(duckdb::duckdb(), 
  dbdir = ":memory:", read_only = FALSE)

# what does the duckdb version return?
dbGetQuery(con, "PRAGMA version;")
# this returns:
#   library_version source_id
#1          DuckDB    

# is this how I allocate n threads?
# hmmm... and how do I query for this setting?
dbSendStatement(con, "PRAGMA threads=10;")
# this returns:
# [1] NA

dbGetQuery(con, "SELECT 'hello' GLOB 'he*';")
# this returns:
#hello ~~~ he*
#  1          TRUE

if (dbExistsTable(con, "mf")) dbSendQuery(con, "drop view mf;")
# read only two columns from the parquet data
sql <- "CREATE VIEW mf AS SELECT Unit_code, Unit_Fraction FROM 
  parquet_scan('appdir/datasets/masterfile.parquet/*.parquet');"

dbSendQuery(con, sql)

# note, the view's field names are now lower case -> 
# use unit_code and unit_fraction
# this mangling does not happen if using "CREATE VIEW mf AS SELECT * FROM ..."

con %>% tbl("mf") %>% 
  group_by(unit_code) %>% 
  summarize(frac = mean(unit_fraction))

# cannot create index on view, I guess.
db_create_index(con, "mf", "unit_code")
#Error in dbSendQuery(conn, statement, ...) : 
#  duckdb_prepare_R: Failed to prepare query CREATE INDEX "mf_unit_code" ON "mf" ("unit_code")
#Error: Unknown: Can only delete from base table!

# index on table
dbExecute(con, "CREATE TABLE mft AS SELECT * from mf;")
db_create_index(con, "mft", "unit_code")

# table is slightly faster than view due to the index
tictoc::tic()
con %>% tbl("mft") %>% filter(unit_code == "u1yv126n")
tictoc::toc()

tictoc::tic()
con %>% tbl("mf") %>% filter(unit_code == "u1yv126n")
tictoc::toc()

dbDisconnect(con, shutdown = TRUE)

@alanpaulkwan
Copy link
Author

alanpaulkwan commented Sep 23, 2020

@mskyttner , I'm referring to file layout as an implicit index. A partitioned file layout like folder/month/[stocksymbol].parquet which would store each stock in a folder group, so one could do something like

parquet_scan('folder/*/AAPL.parquet')

I'm somewhat new to having to store / interact Parquet more deeply, but I think this is a HIVE partition? I'm not referring to a row min/max stat or a page index, by which I mean a file footer index of all row groups. There might be another aspect of indexing I don't fully understand. I'm not 100% clear how @Mytherin @hannesmuehleisen process these currently. Whether this information is fully used or not, in either case the option to skip files explicitly saves DuckDB the price of a scan.

@hannes
Copy link
Member

hannes commented Sep 23, 2020

We don't consider the hive-style partitioning information at the moment.

@hannes
Copy link
Member

hannes commented Sep 23, 2020

sql <- "CREATE VIEW mf AS SELECT Unit_code, Unit_Fraction FROM 
  parquet_scan('appdir/datasets/masterfile.parquet/*.parquet');"

Try quoting the column names in the view definition, like so:

sql <- "CREATE VIEW mf AS SELECT \"Unit_code\", \"Unit_Fraction\" FROM 
  parquet_scan('appdir/datasets/masterfile.parquet/*.parquet');"

@DonIvanCorleone
Copy link

Hi there,
thanks for the new feature! This reduces the UNION ALL workaround quite efficiently.

One thing I do not see answered in this thread: In #665 @Mytherin said that it would be possible to use parallel thread to digest parquet files more efficiently but this requires of course some implementation. Is this already done with this new feature or is this something for the future?

To be honest I went through the merge-commit and saw some changes, which would suggest that you implemented some parallelization, but I am not really sure. From my perspective it seems as if the Python Client does not consider the "PRAGMA threads=N", but it might be that I am missing something ...

@hannes
Copy link
Member

hannes commented Sep 24, 2020

The parallel reading of parquet files is also implemented. Indeed if you set the PRAGMA the Parquet files should also be read in parallel. Did you install the latest pre-release version?

@DonIvanCorleone
Copy link

Yep, latest and greatest ( 0.2.2.dev220) on Python 3.8.
It seems as if con.execute("PRAGMA threads=X;") does not have an effect. I even tried force_parallelism without noticeable impact.

@hannes
Copy link
Member

hannes commented Sep 24, 2020

Could you please be more specific and perhaps post an entire example where "does not have an effect" is visible?

@DonIvanCorleone
Copy link

DonIvanCorleone commented Sep 24, 2020

Sure.

I have a directory of several (40) parquet files. Each file contains 1 Million records and 30 columns.

con = duckdb.connect(database='/tmp/myDB', read_only=False)
con.execute("PRAGMA threads=1;") #changing here threads
start = time.perf_counter()
con.execute("CREATE VIEW test AS SELECT * FROM parquet_scan('data/fixtures/1Mio*.parquet');")
con.execute("SELECT ... FROM test ;") #easiest case would be e.g. "SELECT * FROM test;"
end = time.perf_counter()

Timing difference between end-start is not affected by number of used threads.
But maybe I got something mixed up in my head?! :)

@hannes
Copy link
Member

hannes commented Sep 24, 2020

How many cores does your CPU have? :)

@hannes
Copy link
Member

hannes commented Sep 24, 2020

How big is the result set that you expect from this? Consider adding an ORDER BY. Right now, straight SELECT queries are not parallelized.

@DonIvanCorleone
Copy link

How many cores does your CPU have? :)

More than 1 ;) Right now at my local machine 12 cores. Later more, hopefully > 32

How big is the result set that you expect from this?

Right now, I cannot really precisely say how big the result set will be. Still playing around with the query and different approaches.

Consider adding an ORDER BY. Right now, straight SELECT queries are not parallelized.

Tried adding an order by clause on one of the columns (unix timestamp) but timing is not different between 1 or lets say 8 threads.

@alanpaulkwan
Copy link
Author

I have not diagnosed this rigorously, but Parquet scans on partitioned files seems much faster than even a few weeks ago. Could be other optimizations, the role of threads does not surprise me though.

@Mytherin
Copy link
Collaborator

How many cores does your CPU have? :)

More than 1 ;) Right now at my local machine 12 cores. Later more, hopefully > 32

How big is the result set that you expect from this?

Right now, I cannot really precisely say how big the result set will be. Still playing around with the query and different approaches.

Consider adding an ORDER BY. Right now, straight SELECT queries are not parallelized.

Tried adding an order by clause on one of the columns (unix timestamp) but timing is not different between 1 or lets say 8 threads.

Could you perhaps share the exact queries you are running? As Hannes mentioned, certain queries are not parallelized right now and will not benefit from the parallelization yet. This is a current limitation that should be resolved in the future. As the parallelization is new we have focused primarily on parallelizing common operators.

Aggregates should be parallelized, e.g. SELECT SUM(...) FROM tbl. Straight selections will not be, e.g. SELECT column FROM table. If you share the exact query we can investigate and tell you why it doesn't benefit from parallelization.

@DonIvanCorleone
Copy link

Hi @Mytherin,
I am really thankful for your support. From my perspective I think it is not necessary to invest more time into that topic. This project already helped us with a substantial performance gain in our analytics workflow.

I think the answer that straight SELECT * FROM tbl are not parallelized whereas aggregation queries are, explains some effects we saw during testing. Maybe if you have time at some later stage of development and it fits into your roadmap, it would be cool to parallelize those SELECT * FROM tbl queries as well.

Thanks again!

@Mytherin
Copy link
Collaborator

I think the answer that straight SELECT * FROM tbl are not parallelized whereas aggregation queries are, explains some effects we saw during testing. Maybe if you have time at some later stage of development and it fits into your roadmap, it would be cool to parallelize those SELECT * FROM tbl queries as well.

This is definitely on the agenda :)

Thanks again!

No problem!

@dforsber
Copy link

I think the answer that straight SELECT * FROM tbl are not parallelized whereas aggregation queries are, explains some effects we saw during testing. Maybe if you have time at some later stage of development and it fits into your roadmap, it would be cool to parallelize those SELECT * FROM tbl queries as well.

This is definitely on the agenda :)

Thanks again!

No problem!

Awesome! Yes, we are also looking for efficient parallelised select queries with eg Parquet files. In other words getting good enough perf for row access with filtering and col selection to meet demand for interactive data browsing. That complements DuckDBs excellent OLAP capabilities in a way that you do not necessarily need any row stores anymore. Order by and paging filtered results are challenges that are in the middle ground whether they would need precreated indexes or not. Plus making the Parquet reader state of the art in a sense that round trips are minimized and metadata cached. I really enjoy working with DuckDB, really nice work!

ORC support is probably something worth thinking about in the future with bloom filters.

@ldacey
Copy link

ldacey commented May 21, 2021

Are we able to query a pyarrow dataset by chance?

partitioning = ds.partitioning(pa.schema([("date_id", pa.int32())]), flavor="hive")
dataset = ds.dataset("analytics/snowflake/issues", filesystem=fs, partitioning=partitioning)

The dataset would have all of the information about all of the files. I am able to filter this if I turn it into a table (and from there I can query the table using duckdb successfully), but I wonder if I will ever be able to query the dataset directly?

Or would I eventually skip the ds.dataset() portion and query the partitioned folder of parquet files directly with duckdb (on Azure Blob in this case using hive-style partitions)?

filter_list = [['date_id', '>=', 20210228], ['date_id', '<', 20210401]]
filter_expression = pq._filters_to_expression(filters=filter_list)
table = dataset.to_table(filter=filter_expression, columns=["id", "date", "kind", "creator_id")

query = '''
select 
  date
, count(*) as issue_count
from arrow 
group by date
'''
rel = duckdb.arrow(table)
print(rel.query("arrow", query).df())

The code above works, but it would be nice to take care of the partition-level filtering directly in a SQL query.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants