Skip to content

Latest commit

 

History

History
327 lines (240 loc) · 17.4 KB

20220901_01.md

File metadata and controls

327 lines (240 loc) · 17.4 KB

DuckDB 读写 Parquet 文件 - 同时支持远程s3, oss, http等parquet文件读写

作者

digoal

日期

2022-09-01

标签

PostgreSQL , DuckDB , s3 , oss , https , parquet


背景

  • parquet 文件是什么? 列存储格式的数据文件, 有比较细致的内部存储结构利于数据的快速搜索、减少IO. 支持压缩.
  • parquet 文件它适合什么场景? 共享数据, 数据可以被用于多个业务平台进行数据分析.
  • parquet 文件有什么优势? 有比较细致的内部存储结构利于数据的快速搜索、减少IO. 支持压缩节约空间.
    • 列存储, 使用projection下推之需要扫描需要的列, 效率更高.
    • parquet文件内部使用分组存储, 每个组有对应的元数据(每一列的边界), 用于filter过滤, 减少扫描量, 提升性能; (例如1万行一组)
    • 压缩存储, 降低成本;
  • duckdb 如何使用parquet文件? 可以直接读写.
  • duckdb 查询parquet文件是否支持下推和拉取过滤? 是 , 只获取需要查询返回的字段, where条件支持下推.

Parquet files are compressed columnar files that are efficient to load and process. DuckDB provides support for both reading and writing Parquet files in an efficient manner, as well as support for pushing filters and projections into the Parquet file scans.

DuckDB supports projection pushdown into the Parquet file itself. That is to say, when querying a Parquet file, only the columns required for the query are read. This allows you to read only the part of the Parquet file that you are interested in. This will be done automatically by the system.

DuckDB also supports filter pushdown into the Parquet reader. When you apply a filter to a column that is scanned from a Parquet file, the filter will be pushed down into the scan, and can even be used to skip parts of the file using the built-in zonemaps. Note that this will depend on whether or not your Parquet file contains zonemaps.

远程parquet文件读写依赖httpfs模块, 可以读写s3, oss, https等里面存放的文件.

INSTALL httpfs;  
  
LOAD httpfs;  
  
SELECT * FROM read_parquet('https://<domain>/path/to/file.parquet');  
  
-- s3  
  
SET s3_region='us-east-1';  
SET s3_access_key_id='<AWS access key id>';  
SET s3_secret_access_key='<AWS secret access key>';  
  
-- or  
  
SET s3_region='us-east-1';  
SET s3_session_token='<AWS session token>';  
  
-- then  
  
SELECT * FROM read_parquet('s3://<bucket>/<file>');  

用法举例

create table test (id int, info text);    
create table tbl (c1 int, c2 text, c3 timestamp, c4 int);    
create table t1 (id int, info text, crt_time timestamp, gid int);    
create table t2 (id int, info text, crt_time timestamp, gid int);    
create table t3 (id int, info text, crt_time timestamp, gid int);    
create table t4 (id int, info text, crt_time timestamp, gid int);    
create table t5 (id int, info text, crt_time timestamp, gid int);    
create table t6 (id int, info text, crt_time timestamp, gid int);    
create table t7 (id int, info text, crt_time timestamp, gid int);    
    
insert into test select generate_series, md5(random()::text) from generate_series(1,100000);    
insert into tbl select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
insert into t1 select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
insert into t2 select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
insert into t3 select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
insert into t4 select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
insert into t5 select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
insert into t6 select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
insert into t7 select generate_series, md5(random()::text), now()+(generate_series||' second')::interval, 100*random() from generate_series(1,100000);    
    
D .tables    
t1    t2    t3    t4    t5    t6    t7    tbl   test    

1、导出单个表到parquet

copy test to 'test.parquet' (FORMAT 'parquet');    

2、导出一个查询结果到parquet

copy (select * from test where id<100) to 'test_100.parquet' (FORMAT 'parquet');    

3、导出整个数据库到parquet

EXPORT DATABASE 'duckdb_main_20220901' (FORMAT PARQUET);    

4、parquet文件压缩

DuckDB also has support for writing to Parquet files using the COPY statement syntax. You can specify which compression format should be used using the CODEC parameter (options: UNCOMPRESSED, SNAPPY (default), ZSTD, GZIP).

-- write a query to a snappy compressed parquet file    
COPY (SELECT * FROM tbl) TO 'tbl.parquet' (FORMAT 'parquet');    
    
-- write "tbl" to a zstd compressed parquet file    
COPY tbl TO 'tbl_zstd.parquet' (FORMAT 'PARQUET', CODEC 'ZSTD');    
    
-- write a csv file to an uncompressed parquet file    
copy test to 'test.csv' (format 'csv');    
COPY 'test.csv' TO 'test.parquet' (FORMAT 'PARQUET', CODEC 'UNCOMPRESSED');    
COPY 'test.csv' TO 'test_zstd.parquet' (FORMAT 'PARQUET', CODEC 'zstd');    
D .system pwd    
/Users/digoal/Downloads    
D .system ls *.parquet    
tbl.parquet		tbl_zstd.parquet	test.parquet		test_100.parquet	test_zstd.parquet    
D .system ls *.csv    
test.csv    

5、查询parquet文件的元数据

SELECT * FROM parquet_metadata('test.parquet');    
    
D .mode markdown    
    
D SELECT * FROM parquet_metadata('test.parquet');    
|  file_name   | row_group_id | row_group_num_rows | row_group_num_columns | row_group_bytes | column_id | file_offset | num_values | path_in_schema |    type    |            stats_min             |            stats_max             | stats_null_count | stats_distinct_count |         stats_min_value          |         stats_max_value          | compression  | encodings | index_page_offset | dictionary_page_offset | data_page_offset | total_compressed_size | total_uncompressed_size |    
|--------------|--------------|--------------------|-----------------------|-----------------|-----------|-------------|------------|----------------|------------|----------------------------------|----------------------------------|------------------|----------------------|----------------------------------|----------------------------------|--------------|-----------|-------------------|------------------------|------------------|-----------------------|-------------------------|    
| test.parquet | 0            | 100000             | 2                     | 0               | 0         | 0           | 100000     | column0        | INT32      | 1                                | 100000                           | 0                |                      | 1                                | 100000                           | UNCOMPRESSED |           | 0                 | 0                      | 4                | 400031                | 0                       |    
| test.parquet | 0            | 100000             | 2                     | 0               | 1         | 0           | 100000     | column1        | BYTE_ARRAY | 00008bbd2f4e4f8583e4cfa4c162c34c | ffff91d8cffae19ca6769735c5ae8a74 | 0                |                      | 00008bbd2f4e4f8583e4cfa4c162c34c | ffff91d8cffae19ca6769735c5ae8a74 | UNCOMPRESSED |           | 0                 | 0                      | 400035           | 3600033               | 0                       |    

6、查询parquet文件的数据结构(定义)

-- fetch the column names and column types    
DESCRIBE SELECT * FROM 'test.parquet';    
-- fetch the internal schema of a parquet file    
SELECT * FROM parquet_schema('test.parquet');    
    
    
D DESCRIBE SELECT * FROM 'test.parquet';    
| column_name | column_type | null | key | default | extra |    
|-------------|-------------|------|-----|---------|-------|    
| column0     | INTEGER     | YES  |     |         |       |    
| column1     | VARCHAR     | YES  |     |         |       |    
    
D SELECT * FROM parquet_schema('test.parquet');    
|  file_name   |     name      |    type    | type_length | repetition_type | num_children | converted_type | scale | precision | field_id | logical_type |    
|--------------|---------------|------------|-------------|-----------------|--------------|----------------|-------|-----------|----------|--------------|    
| test.parquet | duckdb_schema | BOOLEAN    | 0           | REQUIRED        | 2            | UTF8           | 0     | 0         | 0        |              |    
| test.parquet | column0       | INT32      | 0           | OPTIONAL        | 0            | INT_32         | 0     | 0         | 0        |              |    
| test.parquet | column1       | BYTE_ARRAY | 0           | OPTIONAL        | 0            | UTF8           | 0     | 0         | 0        |              |    

7、查询单个parquet文件的数据

SELECT * FROM 'test.parquet' limit 10;    
    
D SELECT * FROM 'test.parquet' limit 10;    
| column0 |             column1              |    
|---------|----------------------------------|    
| 1       | 4e87f07aebd72c8dee6dd1e38c81c117 |    
| 2       | c6593071f52d1fc6b209328f44947783 |    
| 3       | 7a0381740c4316beca3277ad803086d9 |    
| 4       | bb42349afadc86a9b984649a26d64083 |    
| 5       | b1135634112bf4aadfeb2992216ea60a |    
| 6       | 86bf92735799e74ea9d397eccdd0c7c8 |    
| 7       | 9ca0ea5e9e0651024592066fdf7288fb |    
| 8       | 247bab9a54195564744bcf229a6061fd |    
| 9       | e11d4ff8845a329b17f52f419b32d2f2 |    
| 10      | 5d6185e29ff76ee5d7261bcf638146a5 |    

8、查询多个parquet文件的数据 (当数据结构(定义)一致时, 支持多个文件读.)

DuckDB can also read a series of Parquet files and treat them as if they were a single table. Note that this only works if the Parquet files have the same schema. You can specify which Parquet files you want to read using a list parameter, glob pattern matching syntax, or a combination of both.

-- read 3 parquet files and treat them as a single table    
SELECT * FROM read_parquet(['file1.parquet', 'file2.parquet', 'file3.parquet']);    
Wildcard Description
* matches any number of any characters (including none)
? matches any single character
[abc] matches one character given in the bracket
[a-z] matches one character from the range given in the bracket
-- read all files that match the glob pattern    
SELECT * FROM read_parquet('test/*.parquet');    
    
    
-- Read all parquet files from 2 specific folders    
SELECT * FROM read_parquet(['folder1/*.parquet','folder2/*.parquet']);    

9、将parquet的数据插入本地表

-- insert the data from the parquet file in the table    
INSERT INTO people SELECT * FROM read_parquet('test.parquet');    
    
-- create a table directly from a parquet file    
CREATE TABLE people AS SELECT * FROM read_parquet('test.parquet');    
  
-- or  
COPY tbl FROM 'input.parquet' (FORMAT PARQUET);   

10、创建parquet文件视图

-- create a view over the parquet file    
CREATE VIEW people AS SELECT * FROM read_parquet('test.parquet');    
    
-- query the parquet file    
SELECT * FROM people;    

11、查看执行计划(观察filter和projections下推)

D explain SELECT * FROM 'test.parquet' limit 10;    
    
┌─────────────────────────────┐    
│┌───────────────────────────┐│    
││       Physical Plan       ││    
│└───────────────────────────┘│    
└─────────────────────────────┘    
┌───────────────────────────┐    
│           LIMIT           │    
└─────────────┬─────────────┘                                 
┌─────────────┴─────────────┐    
│        PARQUET_SCAN       │    
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │    
│          column0          │    
│          column1          │    
└───────────────────────────┘                                 
D explain SELECT column1 FROM 'test.parquet' limit 10;    
    
┌─────────────────────────────┐    
│┌───────────────────────────┐│    
││       Physical Plan       ││    
│└───────────────────────────┘│    
└─────────────────────────────┘    
┌───────────────────────────┐    
│           LIMIT           │    
└─────────────┬─────────────┘                                 
┌─────────────┴─────────────┐    
│        PARQUET_SCAN       │    
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │    
│          column1          │    
└───────────────────────────┘     
    
    
D explain SELECT column1 FROM 'test.parquet' where column1='test';    
    
┌─────────────────────────────┐    
│┌───────────────────────────┐│    
││       Physical Plan       ││    
│└───────────────────────────┘│    
└─────────────────────────────┘    
┌───────────────────────────┐    
│        EMPTY_RESULT       │    
└───────────────────────────┘                                 
D explain SELECT column1 FROM 'test.parquet' where column1='b1135634112bf4aadfeb2992216ea60a';    
    
┌─────────────────────────────┐    
│┌───────────────────────────┐│    
││       Physical Plan       ││    
│└───────────────────────────┘│    
└─────────────────────────────┘    
┌───────────────────────────┐    
│        PARQUET_SCAN       │    
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │    
│          column1          │    
│   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   │    
│      Filters: column1     │    
│=b1135634112bf4aadfeb2...  │    
│     column1 IS NOT NULL   │    
└───────────────────────────┘      

参考

digoal's wechat