Skip to content

Latest commit

 

History

History
257 lines (203 loc) · 9.86 KB

20240423_01.md

File metadata and controls

257 lines (203 loc) · 9.86 KB

也太好用了, 让PG拥有DuckDB csv解析和copy的灵活性

作者

digoal

日期

2024-04-23

标签

PostgreSQL , PolarDB , DuckDB , sniff_csv , csv , 元数据格式 , 错误记录


背景

PostgreSQL COPY 的问题备受诟病:

  • 1、虽然COPY导入速度非常快, 但是一旦遇到任何错误都会报错, 所有导入的数据都会被回滚. 所以COPY在迁移、ETL等场景中, 带来了非常多的不便利.
  • 2、不支持自动解析csv文件的格式: 是否第一行为头信息, 每个字段的类型, 分隔符, 换行符, 逃逸字符, quote字符等. 导入的时候也会增加复杂度, 需要人为分析.

详细可以参考我之前的吐槽:

直到17版本, 增加了copy跳过错误的能力, 但是并不会把错误记录下来, 意味着我们要定位错误的行非常麻烦.

在从文件导入数据库这个工作中DuckDB就方便多了.

那么我们有没有办法在PostgreSQL中利用duckdb的能力呢? 答案是有的, 使用duckdb_fdw插件:

例子

csv文件样本 /tmp/a.csv

Pedro,31  
Oogie Boogie, three  

1、创建duckdb_fdw插件

create extension duckdb_fdw ;   
  
CREATE SERVER duckdb_server FOREIGN DATA WRAPPER duckdb_fdw OPTIONS (database ':memory:');   
  
alter server duckdb_server options ( keep_connections 'true');     

2、解析csv文件格式

SELECT duckdb_execute('duckdb_server',       
$$      
create table a_csv_parser as select * FROM sniff_csv('/tmp/a.csv', sample_size = 1000);     
$$);     

将解析结果导入本地

IMPORT FOREIGN SCHEMA public limit to (a_csv_parser) FROM SERVER    
duckdb_server INTO public;  

查询解析结果prompt:

postgres=# select "Prompt" from a_csv_parser ;  
-[ RECORD 1 ]----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------  
Prompt | FROM read_csv('/tmp/a.csv', auto_detect=false, delim=',', quote='"', escape='"', new_line='\n', skip=0, header=true, columns={'Pedro': 'VARCHAR', '31': 'VARCHAR'}, sample_size=1000);  

3、跳过错误行导入, 并将错误行记录下来

-- 创建csv本地表  
create table t (n name, age int);   
-- 清理  
SELECT duckdb_execute('duckdb_server',       
$$      
drop view v;   
drop table rejects_table;   
$$);   
-- 使用duckdb 视图导入csv  
SELECT duckdb_execute('duckdb_server',       
$$      
create view v as   
select *   
FROM read_csv(   
  '/tmp/a.csv',  
  columns = {'name': 'VARCHAR', 'age': 'INTEGER'},  
  rejects_table = 'rejects_table',  
  ignore_errors = true  
);  
$$);   
-- 定义外表  
IMPORT FOREIGN SCHEMA public limit to (v) FROM SERVER    
duckdb_server INTO public;  
-- 导入csv本地表  
insert into t select * from v;  

3、查看错误记录

-- 创建外部表, 查看错误记录  
create foreign TABLE rejects_table(  
scan_id text,  
line text,    
column_idx text,  
column_name text,  
error_type text,  
csv_line text,  
error_message text  
)  
  SERVER duckdb_server OPTIONS (table 'rejects_table');  

注意每次扫描都会被记录下来:

postgres=# select * from rejects_table;  
 scan_id | line | column_idx | column_name | error_type |      csv_line       |                                   error_message                                      
---------+------+------------+-------------+------------+---------------------+------------------------------------------------------------------------------------  
 184     | 2    | 2          | age         | CAST       | Oogie Boogie, three | Error when converting column "age". Could not convert string " three" to 'INTEGER'  
 193     | 2    | 2          | age         | CAST       | Oogie Boogie, three | Error when converting column "age". Could not convert string " three" to 'INTEGER'  
(2 rows)  

注意, 如果查询foreign table时遇到类似以下错误, 可能是类型定义不正确, 把foreign table的类型改成text就可以.

ERROR:  invalid input syntax for type =1, column type =3  

4、DuckDB甚至支持文件通配符, 还支持不同schema的文件导入到同一个表, 例如一些文件中缺失某些字段内容.

https://duckdb.org/docs/data/csv/tips#provide-names-if-the-file-does-not-contain-a-header

The union_by_name option can be used to unify the schema of files that have different or missing columns. For files that do not have certain columns, NULL values are filled in.

SELECT * FROM read_csv('flights*.csv', union_by_name = true);  

例子

csv文件样本 /tmp/a.csv

name, age  
Pedro,31    
Oogie Boogie, three    

csv文件样本 /tmp/a0.csv

name, age, desc  
Pedro,31  , good  
Oogie Boogie, three , very good    

csv文件样本 /tmp/a1.csv

name, mail, desc  
Pedro,a@b.com  , good  
Oogie Boogie, three@b.com , very good    
root@364a16ddb485:/tmp# su - postgres  
postgres@364a16ddb485:~$ ./duckdb   
v0.10.2 1601d94f94  
Enter ".help" for usage hints.  
Connected to a transient in-memory database.  
Use ".open FILENAME" to reopen on a persistent database.  
D FROM read_csv(     
    '/tmp/a*.csv',    
    ignore_errors = true,  
    union_by_name = true   
  );    
┌──────────────┬──────────┬──────────────┬───────────────┐  
│     name     │   age    │     desc     │     mail      │  
│   varchar    │ varchar  │   varchar    │    varchar    │  
├──────────────┼──────────┼──────────────┼───────────────┤  
│ Pedro        │ 31       │              │               │  
│ Oogie Boogie │  three   │              │               │  
│ Pedro        │ 31       │  good        │               │  
│ Oogie Boogie │  three   │  very good   │               │  
│ Pedro        │          │  good        │ a@b.com       │  
│ Oogie Boogie │          │  very good   │  three@b.com  │  
└──────────────┴──────────┴──────────────┴───────────────┘  

PG 白嫖这个功能:

-- 使用duckdb 视图导入csv    
SELECT duckdb_execute('duckdb_server',         
$$        
create view v1 as     
select *     
FROM read_csv(     
  '/tmp/a*.csv',    
  ignore_errors = true,  
  union_by_name = true   
);    
$$);     
-- 定义外表    
IMPORT FOREIGN SCHEMA public limit to (v1) FROM SERVER      
duckdb_server INTO public;    
postgres=# select * from v1;  
     name     |   age    |     desc     |     mail        
--------------+----------+--------------+---------------  
 Pedro        | 31       |              |   
 Oogie Boogie |  three   |              |   
 Pedro        | 31       |  good        |   
 Oogie Boogie |  three   |  very good   |   
 Pedro        |          |  good        | a@b.com    
 Oogie Boogie |          |  very good   |  three@b.com   
(6 rows)  

digoal's wechat