digoal
2024-05-06
PostgreSQL , PolarDB , DuckDB , bridge , 数据湖 , fdw , crunchydata
DuckDB对于开发者来说确实非常好用, 例如你的数据放在oss, 不需要建表就可以查询, 自动解析各种格式的内容.
如果你要将本地数据写到外部文件或oss中, 使用csv,json,parquet等格式, 也非常的方便. 关键不仅仅方便, 性能还很棒. 看一些例子:
-- read all files with a name ending in ".csv" in the folder "dir"
SELECT * FROM 'dir/*.csv';
-- read all files with a name ending in ".csv", two directories deep
SELECT * FROM '*/*/*.csv';
-- read all files with a name ending in ".csv", at any depth in the folder "dir"
SELECT * FROM 'dir/**/*.csv';
-- read the CSV files 'flights1.csv' and 'flights2.csv'
SELECT * FROM read_csv(['flights1.csv', 'flights2.csv']);
-- read the CSV files 'flights1.csv' and 'flights2.csv', unifying schemas by name and outputting a `filename` column
SELECT * FROM read_csv(['flights1.csv', 'flights2.csv'], union_by_name = true, filename = true);
-- read a single Parquet file
SELECT * FROM 'test.parquet';
-- figure out which columns/types are in a Parquet file
DESCRIBE SELECT * FROM 'test.parquet';
-- create a table from a Parquet file
CREATE TABLE test AS SELECT * FROM 'test.parquet';
-- if the file does not end in ".parquet", use the read_parquet function
SELECT * FROM read_parquet('test.parq');
-- use list parameter to read 3 Parquet files and treat them as a single table
SELECT * FROM read_parquet(['file1.parquet', 'file2.parquet', 'file3.parquet']);
-- read all files that match the glob pattern
SELECT * FROM 'test/*.parquet';
-- read all files that match the glob pattern, and include a "filename" column
-- that specifies which file each row came from
SELECT * FROM read_parquet('test/*.parquet', filename = true);
-- use a list of globs to read all Parquet files from 2 specific folders
SELECT * FROM read_parquet(['folder1/*.parquet', 'folder2/*.parquet']);
-- read over https
SELECT * FROM read_parquet('https://some.url/some_file.parquet');
-- query the metadata of a Parquet file
SELECT * FROM parquet_metadata('test.parquet');
-- query the schema of a Parquet file
SELECT * FROM parquet_schema('test.parquet');
-- write the results of a query to a Parquet file using the default compression (Snappy)
COPY
(SELECT * FROM tbl)
TO 'result-snappy.parquet'
(FORMAT 'parquet');
-- write the results from a query to a Parquet file with specific compression and row group size
COPY
(FROM generate_series(100_000))
TO 'test.parquet'
(FORMAT 'parquet', COMPRESSION 'zstd', ROW_GROUP_SIZE 100_000);
-- export the table contents of the entire database as parquet
EXPORT DATABASE 'target_directory' (FORMAT PARQUET);
SELECT *
FROM 's3://my-bucket/file.parquet';
COPY table TO 's3://my-bucket/partitioned' (
FORMAT PARQUET,
PARTITION_BY (part_col_a, part_col_b)
);
是不是很丝滑? 这不PostgreSQL大佬tom lane所在的crunchydata都在模仿duckdb的创意, crunchydata的云产品 bridge的一些功能, 不说完全像吧, 但是给开发者带来的体验是朝着一个方向去的.
让OLTP数据库和数据湖产品、对象存储的融合更加紧密.
https://www.crunchydata.com/blog/crunchy-bridge-for-analytics-your-data-lake-in-postgresql
crunchydata的一些例子如下
不需要指定列名称和类型!对于 Parquet 文件,如果您将列定义留空,将直接从文件元数据推断架构。
-- create a table from a Parquet file, column definitions can be empty
create foreign table hits ()
server crunchy_lake_analytics
options (path 's3://mybucket/hits.parquet');
\d hits
┌───────────────────────┬──────────────────────────┬───────────┬──────────┬─────────┐
│ Column │ Type │ Collation │ Nullable │ Default │
├───────────────────────┼──────────────────────────┼───────────┼──────────┼─────────┤
│ watchid │ bigint │ │ │ │
│ javaenable │ smallint │ │ │ │
│ title │ text │ │ │ │
│ goodevent │ smallint │ │ │ │
...
-- count ~100M rows
select count(*) from hits;
┌──────────┐
│ count │
├──────────┤
│ 99997497 │
└──────────┘
(1 row)
Time: 55.530 ms
可以在路径中使用通配符(例如s3://mybucket/hits/*.parquet
)来查询文件列表。
Bridge for Analytics 利用范围请求来加速 Parquet 文件的查询。在后台,文件还将自动缓存在 NVMe 驱动器上,以提高性能。下载完成后,查询速度将变得更快。
例子:
-- Run a query on a ~100M row Parquet file in S3
select AdvEngineID, count(*) from hits where AdvEngineID <> 0
group by 1 order by 2 desc limit 5;
┌─────────────┬────────┐
│ advengineid │ count │
├─────────────┼────────┤
│ 2 │ 404602 │
│ 27 │ 113167 │
│ 13 │ 45631 │
│ 45 │ 38960 │
│ 44 │ 9730 │
└─────────────┴────────┘
(5 rows)
Time: 317.460 ms
查询结合使用 PostgreSQL 执行器和分析查询引擎。这样,所有 SQL 查询都受支持(包括与常规 PostgreSQL 表的联接),但无法下推到分析引擎的查询可能会更慢。使用 Parquet 时,由于其列式格式和压缩,您最有可能会看到性能优势。
-- Add a file to the cache, or wait for background caching to be done.
select * from crunchy_file_cache.add('s3://mybucket/hits.parquet');
-- Run a query on a ~100M row Parquet file in cache
select AdvEngineID, count(*) from hits where AdvEngineID <> 0
group by 1 order by 2 desc limit 5;
┌─────────────┬────────┐
│ advengineid │ count │
├─────────────┼────────┤
│ 2 │ 404602 │
│ 27 │ 113167 │
│ 13 │ 45631 │
│ 45 │ 38960 │
│ 44 │ 9730 │
└─────────────┴────────┘
(5 rows)
Time: 90.109 ms
为了方便导入/导出,crunchydata 修改了copy
和create table
命令(通过扩展)以接受 URL
并添加新格式。可以使用 copy .. to
/from
's3://…'
和指定格式(csv、parquet、json
)和压缩(none、gzip、zstd、snappy
)。默认情况下,格式和压缩是根据文件扩展名推断的。还可以 create table
使用该load_from
选项直接在语句中加载文件,或者definition_from
(如果您只需要列定义)。
下面这些操作是不是有点眼熟, 和DuckDB像极了, 特别顺滑.
-- Create a temporary table from compressed JSON
create temp table log_input ()
with (load_from = 's3://mybucket/logs/20240101.json', compression = 'zstd');
-- Alternatively, only infer columns and load data separately using the copy command
create temp table log_input ()
with (definition_from = 's3://mybucket/logs/20240101.json', compression = 'zstd');
copy log_input from 's3://mybucket/logs/20240101.json' WITH (compression 'zstd');
copy log_input from 's3://mybucket/logs/20240102.json' WITH (compression 'zstd');
-- Clean the input and insert into a heap table
insert into log_errors
select event_time, code, message from log_input where level = 'ERROR';
-- Export query result to a Parquet file, compressed using snappy by default
copy (
select date_trunc('minute', event_time), code, count(*)
from log_errors where event_time between '2024-01-01' and '2024-01-02'
group by 1, 2
) to 's3://mybucket/summaries/log_errors/20240101.parquet';
连psql的客户端语法糖都改了
$ psql
-- Import a compressed newline-delimited JSON file from local disk
\copy data from '/tmp/data.json.gz' with (format 'json', compression 'gzip')
-- Export a Parquet file to local disk
\copy data to '/tmp/data.parquet' with (format 'parquet')
-- Note: always specify format & compression when using \copy in psql, because the
-- local file extension is not visible to the server.
结合pg_cron插件, 可以定时对历史数据进行转储, 放到oss等对象存储以parquet格式存放.
回顾Bridge for Analytics示例语法:
- 从对象存储中的
Parquet/CSV/JSON
文件创建分析表:CREATE FOREIGN TABLE data () SERVER crunchy_lake_analytics OPTIONS (path 's3://mybucket/data/*.parquet');
- 从文件创建常规表并立即加载数据:
CREATE TABLE data () WITH (load_from = 's3://mybucket/data.csv.gz');
- 创建一个常规表,其列基于文件:
CREATE TABLE data () WITH (definition_from = 's3://mybucket/data.json');
- 将数据加载到常规表中:
COPY data FROM 's3://mybucket/data.parquet';
- 将表导出到文件:
COPY data TO 's3://mybucket/data.csv.zst' WITH (header);
- 将查询结果保存到文件中:
COPY (SELECT * FROM data JOIN dim USING (id)) TO 's3://mybucket/joined.json.gz';
- 以
Parquet
和JSON
格式导入/导出本地文件:\copy data TO 'data.parquet' WITH (format 'parquet')
Bridge for Analytics的目标是为您提供一把与数据湖交互的瑞士军刀。将本地数据与 Parquet
(未压缩/gzip/zstd/snapp
y)、CSV
(未压缩/gzip/zstd
)和换行符分隔的 JSON
(未压缩/gzip/zstd
)一起使用.