Skip to content

Latest commit

 

History

History
470 lines (364 loc) · 29.8 KB

20220302_01.md

File metadata and controls

470 lines (364 loc) · 29.8 KB

PostgreSQL + FDW + vector 插件加速向量检索 - 在不确定世界寻找确定答案 (例如图像相似)

作者

digoal

日期

2022-03-02

标签

PostgreSQL , 开源


背景

向量: 简单的理解是一个number数组, 非结构化对象(图像、视频、音频等)经过转换可以成为N维的number数组, 包含了非结构化对象的特征, 针对数组可以进行识别、相似等搜索(向量检索).

向量检索属于比较耗费CPU和IO的查询, 即使有索引也如此, 所以首先会想到的是怎么提升扩展性, 例如怎么用多机资源提高请求处理吞吐能力.

本文介绍PostgreSQL postgres_fdw和vector插件的组合使用. 加速图像识别相似搜索, 但是还有一些并行的问题, 建议再和postgresql社区沟通一下fdw的问题. 目前看来性能最好的还是dblink异步调用的方式.

当然, 如果单机用vector插件和向量索引就能满足你的需求, 就不需要看下去了.

  • 需要提醒一下, 一定要看看vector的ivfflat原理, 在造数据时考虑实际数据集的特征, 不要完全随机, 也不要完全一样的数据. 在创建索引时分几个桶、查询时找几个中心点的桶进行筛选, 都需要注意.

其他方法亦可参考: citus, dblink异步, polardb for postgresql(https://github.com/ApsaraDB/PolarDB-for-PostgreSQL).

postgres_fdw 如何实现select并行

https://www.postgresql.org/docs/devel/postgres-fdw.html

1、关于SELECT语句WHERE条件的下推必须符合:

  • they use only data types, operators, and functions that are built-in
  • or belong to an extension that's listed in the foreign server's extensions option.
  • Operators and functions in such clauses must be IMMUTABLE as well.

2、支持fdw分区表

  • 已支持

3、支持fdw分区异步查询, 并行多fdw server同时查询

4、支持分区并行开关

  • 已支持

5、sort 下推

  • 已支持

6、merge sort append

  • 已支持

7、limit 下推 或 fetch_size (默认是100)设置

  • 已支持

例子

部署vector插件

git clone https://github.com/pgvector/pgvector  
  
export PATH=/Users/digoal/pg14/bin:$PATH  
export PGDATA=/Users/digoal/data14  
export PGUSER=postgres  
export PGPORT=1922  
  
USE_PGXS=1 make  
make install  

由于是本机测试, 所以配置一下密码登录认证

pg_hba.conf  
host    all             all             127.0.0.1/32            md5  
  
pg_ctl reload  

创建角色

postgres=# create role test login encrypted password 'test123';  
CREATE ROLE  

创建几个数据库(真实场景使用多机上不同的实例, 本文测试采用同一个实例中不同的database来模拟).

create database db0;  
create database db1;  
create database db2;  
create database db3;  
create database db4;  
postgres=# grant all on database db0,db1,db2,db3,db4 to test;  
GRANT  


\c db0;  
create extension postgres_fdw;  
create extension vector ;  

\c db1  
create extension vector ;  
\c db2  
create extension vector ;  
\c db3  
create extension vector ;  
\c db4  
create extension vector ;  

db0 为查询入口, 所以在db0创建入口表、postgres_fdw插件、vector插件和fdw分区表.

db0=# \c db0 test  
You are now connected to database "db0" as user "test".  
db0=>   
  
CREATE TABLE tbl (id int, c1 vector(32), c2 text, c3 timestamp) PARTITION BY hash (id);  
CREATE INDEX idx_tbl_1 ON tbl USING ivfflat (c1 vector_l2_ops);  
SELECT * FROM tbl ORDER BY c1 <-> '[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' LIMIT 5;  

db1,db2,db3,db4 / test role 登录

CREATE TABLE tbl (id int, c1 vector(32), c2 text, c3 timestamp);  
CREATE INDEX idx_tbl_1 ON tbl USING ivfflat (c1 vector_l2_ops);  

db0 , 创建foreign server 开启异步请求, 设置插件参数(使得op可以下推). 配置user mapping.

\c db0 postgres  
db0=# show port;  
 port   
------  
 1922  
(1 row)  
  
  
create server db1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '127.0.0.1', dbname 'db1', port '1922', async_capable 'true', extensions 'vector', batch_size '200', use_remote_estimate 'true');  
create server db2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '127.0.0.1', dbname 'db2', port '1922', async_capable 'true', extensions 'vector', batch_size '200', use_remote_estimate 'true');  
create server db3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '127.0.0.1', dbname 'db3', port '1922', async_capable 'true', extensions 'vector', batch_size '200', use_remote_estimate 'true');  
create server db4 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '127.0.0.1', dbname 'db4', port '1922', async_capable 'true', extensions 'vector', batch_size '200', use_remote_estimate 'true');  
  
grant all on FOREIGN server db1,db2,db3,db4 to test;  
  
  
CREATE USER MAPPING FOR test SERVER db1 OPTIONS (user 'test', password 'test123');  
CREATE USER MAPPING FOR test SERVER db2 OPTIONS (user 'test', password 'test123');  
CREATE USER MAPPING FOR test SERVER db3 OPTIONS (user 'test', password 'test123');  
CREATE USER MAPPING FOR test SERVER db4 OPTIONS (user 'test', password 'test123');  

db0, 创建fdw分区表, 参数参考postgres_fdw帮助手册, 不一定要完全参照本文, 一定要理解清楚配置的目的(例如返回量大fetch_size也可以调大, 返回量小可以调小).

\c db0 test  
  
CREATE FOREIGN TABLE tbl_0  
    PARTITION OF tbl FOR VALUES WITH ( MODULUS 4, REMAINDER 0)  
    SERVER db1 OPTIONS (schema_name 'public', table_name 'tbl', async_capable 'true', fetch_size '1');  
  
CREATE FOREIGN TABLE tbl_1  
    PARTITION OF tbl FOR VALUES WITH ( MODULUS 4, REMAINDER 1)  
    SERVER db2 OPTIONS (schema_name 'public', table_name 'tbl', async_capable 'true', fetch_size '1');  
  
CREATE FOREIGN TABLE tbl_2  
    PARTITION OF tbl FOR VALUES WITH ( MODULUS 4, REMAINDER 2)  
    SERVER db3 OPTIONS (schema_name 'public', table_name 'tbl', async_capable 'true', fetch_size '1');  
  
CREATE FOREIGN TABLE tbl_3  
    PARTITION OF tbl FOR VALUES WITH ( MODULUS 4, REMAINDER 3)  
    SERVER db4 OPTIONS (schema_name 'public', table_name 'tbl', async_capable 'true', fetch_size '1');  

确认vector的欧式距离计算操作符对应的是immutable函数

\do+  
                                                        List of operators  
 Schema | Name | Left arg type | Right arg type |   Result type    |           Function            |         Description            
--------+------+---------------+----------------+------------------+-------------------------------+------------------------------  
 public | <#>  | vector        | vector         | double precision | vector_negative_inner_product |   
 public | <->  | vector        | vector         | double precision | l2_distance                   |   
 public | <=>  | vector        | vector         | double precision | cosine_distance               |   
  
  
postgres=# \df+ l2_distance  
                                                                               List of functions  
 Schema |    Name     | Result data type | Argument data types | Type | Volatility | Parallel |  Owner   | Security | Access privileges | Language | Source code | Description   
--------+-------------+------------------+---------------------+------+------------+----------+----------+----------+-------------------+----------+-------------+-------------  
 public | l2_distance | double precision | vector, vector      | func | immutable  | safe     | postgres | invoker  |                   | c        | l2_distance |   
(1 row)  

查询一下影响fdw 分区并行的参数是否都开启了.

postgres=# show enable_gathermerge ;  
 enable_gathermerge   
--------------------  
 on  
(1 row)  
  
postgres=# show enable_async_append ;  
 enable_async_append   
---------------------  
 on  
(1 row)  
  
db0=> show enable_parallel_append ;  
 enable_parallel_append   
------------------------  
 on  
(1 row)  

检查执行计划.

db0=> explain (verbose) SELECT * FROM tbl ORDER BY c1 <-> '[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' LIMIT 5;  
                                                                 QUERY PLAN                                                                   
--------------------------------------------------------------------------------------------------------------------------------------------  
 Limit  (cost=400.04..400.36 rows=5 width=84)  
   Output: tbl.id, tbl.c1, tbl.c2, tbl.c3, ((tbl.c1 <-> '[1,2,3]'::vector))  
   ->  Merge Append  (cost=400.04..610.20 rows=3276 width=84)  
         Sort Key: ((tbl.c1 <-> '[1,2,3]'::vector))  
         ->  Foreign Scan on public.tbl_0 tbl_1  (cost=100.00..140.26 rows=819 width=84)  
               Output: tbl_1.id, tbl_1.c1, tbl_1.c2, tbl_1.c3, (tbl_1.c1 <-> '[1,2,3]'::vector)  
               Remote SQL: SELECT id, c1, c2, c3 FROM public.tbl ORDER BY (c1 OPERATOR(public.<->) '[1,2,3]'::public.vector) ASC NULLS LAST  
         ->  Foreign Scan on public.tbl_1 tbl_2  (cost=100.00..140.26 rows=819 width=84)  
               Output: tbl_2.id, tbl_2.c1, tbl_2.c2, tbl_2.c3, (tbl_2.c1 <-> '[1,2,3]'::vector)  
               Remote SQL: SELECT id, c1, c2, c3 FROM public.tbl ORDER BY (c1 OPERATOR(public.<->) '[1,2,3]'::public.vector) ASC NULLS LAST  
         ->  Foreign Scan on public.tbl_2 tbl_3  (cost=100.00..140.26 rows=819 width=84)  
               Output: tbl_3.id, tbl_3.c1, tbl_3.c2, tbl_3.c3, (tbl_3.c1 <-> '[1,2,3]'::vector)  
               Remote SQL: SELECT id, c1, c2, c3 FROM public.tbl ORDER BY (c1 OPERATOR(public.<->) '[1,2,3]'::public.vector) ASC NULLS LAST  
         ->  Foreign Scan on public.tbl_3 tbl_4  (cost=100.00..140.26 rows=819 width=84)  
               Output: tbl_4.id, tbl_4.c1, tbl_4.c2, tbl_4.c3, (tbl_4.c1 <-> '[1,2,3]'::vector)  
               Remote SQL: SELECT id, c1, c2, c3 FROM public.tbl ORDER BY (c1 OPERATOR(public.<->) '[1,2,3]'::public.vector) ASC NULLS LAST  
 Query Identifier: -3107671033622996886  
(17 rows)  
  
  
db0=> explain (verbose) select count(*) from tbl;  
                                          QUERY PLAN                                             
-----------------------------------------------------------------------------------------------  
 Aggregate  (cost=951.95..951.96 rows=1 width=8)  
   Output: count(*)  
   ->  Append  (cost=100.00..917.82 rows=13652 width=0)  
         ->  Async Foreign Scan on public.tbl_0 tbl_1  (cost=100.00..212.39 rows=3413 width=0)  
               Remote SQL: SELECT NULL FROM public.tbl  
         ->  Async Foreign Scan on public.tbl_1 tbl_2  (cost=100.00..212.39 rows=3413 width=0)  
               Remote SQL: SELECT NULL FROM public.tbl  
         ->  Async Foreign Scan on public.tbl_2 tbl_3  (cost=100.00..212.39 rows=3413 width=0)  
               Remote SQL: SELECT NULL FROM public.tbl  
         ->  Async Foreign Scan on public.tbl_3 tbl_4  (cost=100.00..212.39 rows=3413 width=0)  
               Remote SQL: SELECT NULL FROM public.tbl  
 Query Identifier: -7696835127160622742  
(12 rows)  

写入100万数据后再检查一下计划.

db0=> insert into tbl select generate_series(1,1000000), '[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]', 'test', now();  
  
db1=> explain (analyze,verbose) select count(*) from tbl;  
                                                       QUERY PLAN                                                         
------------------------------------------------------------------------------------------------------------------------  
 Aggregate  (cost=8924.86..8924.87 rows=1 width=8) (actual time=51.989..51.991 rows=1 loops=1)  
   Output: count(*)  
   ->  Seq Scan on public.tbl  (cost=0.00..8300.89 rows=249589 width=0) (actual time=0.016..34.891 rows=249589 loops=1)  
         Output: id, c1, c2, c3  
 Query Identifier: 5443052778932622058  
 Planning Time: 2.148 ms  
 Execution Time: 52.154 ms  
(7 rows)  
  
db0=> explain (analyze,verbose) select count(*) from tbl;   
                                                                  QUERY PLAN                                                                     
-----------------------------------------------------------------------------------------------------------------------------------------------  
 Aggregate  (cost=951.95..951.96 rows=1 width=8) (actual time=528.009..528.016 rows=1 loops=1)  
   Output: count(*)  
   ->  Append  (cost=100.00..917.82 rows=13652 width=0) (actual time=6.367..467.490 rows=1000000 loops=1)  
         ->  Async Foreign Scan on public.tbl_0 tbl_1  (cost=100.00..212.39 rows=3413 width=0) (actual time=1.964..85.057 rows=249589 loops=1)  
               Remote SQL: SELECT NULL FROM public.tbl  
         ->  Async Foreign Scan on public.tbl_1 tbl_2  (cost=100.00..212.39 rows=3413 width=0) (actual time=1.430..82.288 rows=250376 loops=1)  
               Remote SQL: SELECT NULL FROM public.tbl  
         ->  Async Foreign Scan on public.tbl_2 tbl_3  (cost=100.00..212.39 rows=3413 width=0) (actual time=1.422..79.067 rows=249786 loops=1)  
               Remote SQL: SELECT NULL FROM public.tbl  
         ->  Async Foreign Scan on public.tbl_3 tbl_4  (cost=100.00..212.39 rows=3413 width=0) (actual time=1.557..76.345 rows=250249 loops=1)  
               Remote SQL: SELECT NULL FROM public.tbl  
 Query Identifier: 6515815319459192952  
 Planning Time: 2.976 ms  
 Execution Time: 582.248 ms  
(14 rows)  

并不是50ms左右完成, 而且count没有下推.

db0=> SELECT * FROM tbl ORDER BY c1 <-> '[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' LIMIT 5;  
 id |                                            c1                                            |  c2  |             c3               
----+------------------------------------------------------------------------------------------+------+----------------------------  
  1 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 12 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 14 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 16 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 17 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
(5 rows)  
  
  
db1=> SELECT * FROM tbl ORDER BY c1 <-> '[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' LIMIT 5;  
 id |                                            c1                                            |  c2  |             c3               
----+------------------------------------------------------------------------------------------+------+----------------------------  
  1 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 12 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 14 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 16 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 17 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
(5 rows)  
  
  
db2=> SELECT * FROM tbl ORDER BY c1 <-> '[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' LIMIT 5;  
 id |                                            c1                                            |  c2  |             c3               
----+------------------------------------------------------------------------------------------+------+----------------------------  
  3 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
  5 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
  8 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
  9 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
 11 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-02 15:55:26.214479  
(5 rows)  

发现了几个问题:

  • 并不是所有请求都用了async foreign scan , 发起请求后需要等待返回第一批结果后再向第二个ftbl发送请求.
\c db1
insert into tbl select generate_series(1,1), '[2,3,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]', 'test', now();  

\c db2
insert into tbl select generate_series(1,1), '[2,3,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]', 'test', now();  


db0=> select * from
(select * from tbl_0
union all                                                                                                                                                                    select * from tbl_1 ) t                                                                                                                                                      order by c1 <-> '[2,3,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' limit 5;
   id   |                                            c1                                            |  c2  |             c3             
--------+------------------------------------------------------------------------------------------+------+----------------------------
      1 | [2,3,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:27:54.132848
 862019 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:11:52.261664
 862021 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:11:52.261664
 862024 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:11:52.261664
 862029 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:11:52.261664
(5 rows)

Time: 215.103 ms

db0=> select * from
(select * from tbl_0
union all
select * from tbl_1
union all
select * from tbl_2 ) t
order by c1 <-> '[2,3,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' limit 5;
   id   |                                            c1                                            |  c2  |             c3             
--------+------------------------------------------------------------------------------------------+------+----------------------------
      1 | [2,3,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:27:54.132848
      1 | [2,3,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:28:52.980465
 864133 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:11:52.261664
 864134 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:11:52.261664
 864136 | [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32] | test | 2022-03-03 11:11:52.261664
(5 rows)

Time: 348.644 ms
  • count虽然是build-in聚合函数, 并且是parallel safe的, 但是没有下推.
db0=> \df+ count
                                                                                                          List of functions
   Schema   | Name  | Result data type | Argument data types | Type | Volatility | Parallel |  Owner   | Security | Access privileges | Language |   Source code   |                           Description                           
------------+-------+------------------+---------------------+------+------------+----------+----------+----------+-------------------+----------+-----------------+-----------------------------------------------------------------
 pg_catalog | count | bigint           |                     | agg  | immutable  | safe     | postgres | invoker  |                   | internal | aggregate_dummy | number of input rows
 pg_catalog | count | bigint           | "any"               | agg  | immutable  | safe     | postgres | invoker  |                   | internal | aggregate_dummy | number of input rows for which the input expression is not null
(2 rows)

希望fdw的异步越来越完善: 《PostgreSQL 15 preview - postgres_fdw 支持更多异步, 增强基于FDW的sharding能力.例如 DML异步写入、分区表、union all、union》

参考

digoal's wechat