Skip to content

Latest commit

 

History

History
412 lines (320 loc) · 15.3 KB

20180903_01.md

File metadata and controls

412 lines (320 loc) · 15.3 KB

PostgreSQL dblink异步调用实践,跑并行多任务 - 例如开N个并行后台任务创建索引, 开N个后台任务跑若干SQL

作者

digoal

日期

2018-09-03

标签

PostgreSQL , 后台任务 , DBLINK 异步调用


背景

使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

例如,结合我前面的一些文字,可以实现自动选择索引接口、指定并行度、指定表空间、给所有字段创建索引。

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

并行后台任务接口实现

接口效果:

select run_sqls_parallel (  
  参数1:并行度,  
  参数2:要执行的SQLs(数组呈现)  
  参数3:连接串  
);  

准备

1、创建普通测试用户

create role test login encrypted password 'test123' createdb;  

2、创建测试库

create database db with template template0 owner test;  

3、确认本地连接为密码认证

vi $PGDATA/pg_hba.conf  
  
  
# IPv4 local connections:  
host    all             all             127.0.0.1/32            md5  
# host    all             all             127.0.0.1/32            trust  
  
  
pg_ctl reload -D $PGDATA  

实现

以下操作使用test用户, 在db库中完成. (创建插件需要超级用户, 其他操作均使用test用户完成)

postgres=# \c db test  
You are now connected to database "db" as user "test".  

1、创建dblink插件

db=> \c db postgres  
You are now connected to database "db" as user "postgres".  
  
db=# create extension dblink;  
CREATE EXTENSION  

2、创建一个建立连接函数,不报错

create or replace function conn(  
  name,   -- dblink名字  
  text    -- 连接串, URL  
) returns void as $$  
declare  
begin  
  perform dblink_connect($1, $2);  
  return;  
exception when others then  
  return;  
end;  
$$ language plpgsql strict;  

3、创建跑多任务的接口函数

create or replace function run_sqls_parallel(  
  parallels int,    -- 并行度  
  sqls text[],      -- 需要执行的SQLs  
  conn_url text    default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database())       -- 连接串  
)  
returns setof record as $$  
declare  
  app_prefix_stat text := md5(random()::text);   -- 用来获取pg_stat_activity的实时内容 (由于pg_stat_activity的函数是stable的,无法在事务中获取到被其他会话变更的内容)  
  app_prefix text := md5(random()::text);        -- application, dblink name prefix  
  i int := 1;       -- 任务ID变量,1累加  
  app_conn_name text;  -- application_name, dblink conn name = app_prefix+i  
  sql text;       -- SQL 元素  
  current_conns int := 0;  -- 当前活跃的异步调用  
begin  
  -- 建立获取实时pg_stat_activity内容连接  
  perform conn(app_prefix_stat,  conn_url||app_prefix_stat);  
  
  foreach sql in array sqls  
  loop  
    -- 当前是否有空闲异步连接  
    select application_name into app_conn_name from  
      dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
    as t(application_name text);  
    -- 有空闲异步连接  
    if found then  
      -- 消耗掉上一次异步连接的结果,否则会报错。  
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);  
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);  
  
      -- 发送异步DBLINK调用  
      perform dblink_send_query(app_conn_name, sql);  
  
    -- 无空闲异步连接  
    else  
      -- 当前已建立的异步连接数  
      select cn into current_conns from  
        dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
      as t(cn int);  
      loop  
        -- 达到并行度  
        if current_conns >= parallels then  
          -- 是否有空闲异步连接  
          select application_name into app_conn_name from  
            dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
          as t(application_name text);  
          -- 有  
          if found then  
            -- 消耗掉上一次异步连接的结果,否则会报错。  
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);  
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);  
  
            -- 发送异步DBLINK调用  
            perform dblink_send_query(app_conn_name, sql);  
  
            -- 退出循环  
            exit;  
          -- 没有,等  
          else  
            perform pg_sleep(1);  
            raise notice 'current running tasks: %, waiting idle conns.', current_conns;  
          end if;  
        -- 未达到并行度  
        else  
          -- 建立连接  
          perform conn(app_prefix||i,  conn_url||app_prefix||i);             -- 建立连接。  
  
          -- 发送异步DBLINK调用  
          perform dblink_send_query(app_prefix||i, sql);  
  
          -- 连接suffix序号 递增  
          i := i+1;  
  
          -- 退出循环  
          exit;  
        end if;  
      end loop;  
    end if;  
  end loop;  
  
  loop  
    -- 当前已建立的异步连接数  
    select cn into current_conns from  
      dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))  
    as t(cn int);  
    if current_conns=0 then  
      raise notice 'whole tasks done.';  
      for app_conn_name in  
        select application_name from  
          dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
        as t(application_name text)  
      loop  
        return query select a from dblink_get_result(app_conn_name, false) as t(a text);  
      end loop;  
      return;  
    else  
      raise notice 'the last % tasks running.', current_conns;  
      perform pg_sleep(1);  
    end if;  
  end loop;  
  
end;  
$$ language plpgsql strict;  

试用

1、运行5条SQL,开2个并行任务

select * from run_sqls_parallel(  
 -- 并行度  
 parallels := 2,  
 -- SQLs: 将并行执行的SQL放到一个数组里面  
 sqls := array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)'],  
 -- 连接串  
 conn_url := format('hostaddr=%s port=%s user=%s dbname=%s password=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database(), 'test123')  
)  
as t(a text);  
  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  the last 2 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel  
-------------------  
  
(1 row)  
  
Time: 30070.275 ms (00:30.070)  

每次调用完任务后, 记得释放空闲的dblink连接, 防止连接被打满.

select dblink_disconnect (n) from unnest(dblink_get_connections ()) n where  dblink_is_busy(n) = 0;  

2、运行10个并行任务,跑6条SQL

postgres=# select * from run_sqls_parallel(  
 -- 并行度  
 10,  
 -- SQLs: 将并行执行的SQL放到一个数组里面  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)'],  
 -- 连接串  
 conn_url := format('hostaddr=%s port=%s user=%s dbname=%s password=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database(), 'test123')  
)  
as t(a text);  
  
NOTICE:  the last 6 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel  
-------------------  
  
(1 row)  
  
Time: 10050.064 ms (00:10.050)  

完全符合预期。

每次调用完任务后, 记得释放空闲的dblink连接, 防止连接被打满.

select dblink_disconnect (n) from unnest(dblink_get_connections ()) n where  dblink_is_busy(n) = 0;  

3、结合前面写的文档,我们如果要创建很多索引,可以使用同样的方法实现并行任务

create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
do language plpgsql $$  
declare  
  tables name[] := array['t1','t2','t3'];     -- 表名  
  n name;   -- 表名  
  x name;   -- 字段名  
  i int;    -- LOOP值  
  sql text;  
  sqls text[];  
  -- tbs name := 'tbs1';    -- 索引表空间  
begin  
  set maintenance_work_mem='1GB';  
  
  foreach n in array tables loop  
    i := 1;  
    for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped  
    loop  
      -- 结合自动选择索引接口(btree,hash,gin,gist等)的功能,可以实现更完美的全字段创建索引  
      -- [《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》](../201706/20170617_01.md)  
      -- [《PostgreSQL SQL自动优化案例 - 极简,自动推荐索引》](../201801/20180111_02.md)  
      -- [《PostgreSQL 快速给指定表每个字段创建索引 - 2 (近乎完美)》](../201809/20180903_03.md)  
      -- sql := format('create index on %s (%s) tablespace %s', n, x, tbs);      -- 封装创建索引的SQL , 指定索引表空间  
      sql := format('create index on %s (%s)', n, x);      -- 封装创建索引的SQL , 不指定索引表空间  
      sqls := array_append(sqls, sql);  
      i:=i+1;  
    end loop;  
  end loop;  
  
  perform * from run_sqls_parallel(  
    parallels := 10,   -- 并行度  
    sqls := sqls,  -- 执行index SQL数组  
    conn_url := format('hostaddr=%s port=%s user=%s dbname=%s password=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database(), 'test123')  
  ) as t(a text);  
  
  foreach n in array tables loop  
    execute format('analyze %s', n);  
  end loop;  
  
  -- 释放空闲的dblink连接, 防止连接被打满.  
  perform dblink_disconnect (unnest) from unnest(dblink_get_connections ()) unnest where  dblink_is_busy(unnest) = 0;  
end;  
$$;  

完全符合预期。

小结

本文使用dblink异步调用的功能,增加了一个API函数,可以用于开启N个并行,跑若干条长SQL,例如用来创建索引非常给力。

接口效果:

select * from run_sqls_parallel (  
  参数1:并行度,  
  参数2:要执行的SQLs(数组呈现)  
  参数3:连接串  
)  
as t(a text);  

参考

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

《PostgreSQL AB表切换最佳实践 - 提高切换成功率,杜绝雪崩 - 珍藏级》

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

digoal's wechat