Skip to content

Latest commit



625 lines (410 loc) · 23.4 KB

File metadata and controls

625 lines (410 loc) · 23.4 KB

PostgreSQL 流式统计 - insert on conflict 实现 流式 UV(distinct), min, max, avg, sum, count ...






PostgreSQL , 流式统计 , insert on conflict , count , avg , min , max , sum


流式统计count, avg, min, max, sum等是一个比较有意思的场景,可用于实时大屏,实时绘制统计图表。


PostgreSQL insert on conflict语法以及rule, trigger的功能,可以实现对数据的实时统计,ECS 56核的性能指标:




实时统计每个SID的value的min, max, sum,以及记录数。


create table tbl (    
  sid int primary key,     
  v1 int,     
  crt_time timestamp,     
  cnt int8 default 1,                       -- 统计值, 默认为1, 等于1时表示第一条记录    
  sum_v float8 default 0,                   -- 统计值, 默认为0      
  min_v float8 default float8 'Infinity',   -- 统计值, 默认设置为这个类型的最大值      
  max_v float8 default float8 '-Infinity'   -- 统计值, 默认设置为这个类型的最小值      


create table tbl_log (    
  sid int,     
  v1 int,     
  crt_time timestamp    

3、流计算算法由insert on conflict SQL完成,如下

insert into tbl (sid, v1, crt_time) values (:sid, :v1, now())     
on conflict (sid) do update set     
  sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end,     
  min_v=least(tbl.min_v, excluded.v1),     
  max_v=greatest(tbl.max_v, excluded.v1)      


vi test.sql    
\set sid random(1,1000000)    
\set v1 random(1,100000000)    
insert into tbl (sid, v1, crt_time) values (:sid, :v1, now()) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);     
insert into tbl_log values (:sid, :v1, now());    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    


postgres=# \timing    
Timing is on.    
postgres=# select sid, count(*), sum(v1), min(v1), max(v1) from tbl_log group by sid order by sid limit 10;    
 sid | count |    sum    |   min    |   max        
   1 |    14 | 740544728 | 11165285 | 90619042    
   2 |    10 | 414224202 |  2813223 | 83077953    
   3 |    11 | 501992396 | 13861878 | 79000001    
   4 |    17 | 902219309 |    23429 | 99312338    
   5 |     6 | 374351692 | 25582424 | 96340616    
   6 |    15 | 649447876 | 12987896 | 80478126    
   7 |     8 | 386687697 | 19697861 | 95097076    
   8 |    12 | 657650588 | 11339236 | 97211546    
   9 |    10 | 594843053 |  9192864 | 97362345    
  10 |     9 | 383123573 |  3877866 | 76604940    
(10 rows)    
Time: 1817.395 ms (00:01.817)    
postgres=# select * from tbl order by sid limit 10;    
 sid |    v1    |          crt_time          | cnt |   sum_v   |  min_v   |  max_v       
   1 | 26479786 | 2017-11-23 20:27:43.134594 |  14 | 740544728 | 11165285 | 90619042    
   2 | 25755108 | 2017-11-23 20:27:43.442651 |  10 | 414224202 |  2813223 | 83077953    
   3 | 51068648 | 2017-11-23 20:27:48.118906 |  11 | 501992396 | 13861878 | 79000001    
   4 | 81160224 | 2017-11-23 20:27:37.183186 |  17 | 902219309 |    23429 | 99312338    
   5 | 70208701 | 2017-11-23 20:27:35.399063 |   6 | 374351692 | 40289886 | 96340616    
   6 | 77536576 | 2017-11-23 20:27:46.04372  |  15 | 649447876 | 12987896 | 80478126    
   7 | 31153753 | 2017-11-23 20:27:46.54858  |   8 | 386687697 | 19697861 | 95097076    
   8 | 11339236 | 2017-11-23 20:27:40.947561 |  12 | 657650588 | 11339236 | 97211546    
   9 | 46103803 | 2017-11-23 20:27:38.450889 |  10 | 594843053 |  9192864 | 92049544    
  10 | 55630877 | 2017-11-23 20:27:28.944168 |   9 | 383123573 |  3877866 | 76604940    
(10 rows)    
Time: 0.512 ms    



vi test.sql    
\set sid random(1,1000000)    
\set v1 random(1,100000000)    
insert into tbl (sid, v1, crt_time) values (:sid, :v1, now()) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);     
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 300    
transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 300 s    
number of transactions actually processed: 57838943    
latency average = 0.166 ms    
latency stddev = 0.057 ms    
tps = 192791.786864 (including connections establishing)    
tps = 192805.650917 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.001  \set sid random(1,1000000)    
         0.000  \set v1 random(1,100000000)    
         0.164  insert into tbl (sid, v1, crt_time) values (:sid, :v1, now()) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);    
top - 20:57:35 up 16 days,  3:44,  2 users,  load average: 8.67, 2.08, 1.68    
Tasks: 497 total,  28 running, 469 sleeping,   0 stopped,   0 zombie    
%Cpu(s): 34.8 us, 13.7 sy,  0.0 ni, 51.3 id,  0.1 wa,  0.0 hi,  0.0 si,  0.0 st    
KiB Mem : 23094336+total, 79333744 free,  1588292 used, 15002134+buff/cache    
KiB Swap:        0 total,        0 free,        0 used. 22219502+avail Mem     





vi test.sql    
\set sid random(1,1000000)    
\set v1 random(1,100000000)    
insert into tbl (sid, v1, crt_time) select :sid+id, :v1+id, now() from generate_series(1,100) t(id) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);     
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 28 -j 28 -T 300    
scaling factor: 1  
query mode: prepared  
number of clients: 28  
number of threads: 28  
duration: 120 s  
number of transactions actually processed: 1411743  
latency average = 2.380 ms  
latency stddev = 0.815 ms  
tps = 11764.276597 (including connections establishing)  
tps = 11765.715797 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set sid random(1,1000000)    
         0.000  \set v1 random(1,100000000)    
         2.378  insert into tbl (sid, v1, crt_time) select :sid+id, :v1+id, now() from generate_series(1,100) t(id) on conflict (sid) do update set v1=excluded.v1, crt_time=excluded.crt_time, cnt=tbl.cnt+1, sum_v=case tbl.cnt when 1 then tbl.v1+excluded.v1 else tbl.sum_v+excluded.v1 end, min_v=least(tbl.min_v, excluded.v1), max_v=greatest(tbl.max_v, excluded.v1);  
34 processes: 22 running, 11 sleeping, 1 uninterruptable  
CPU states: 28.5% user,  0.0% nice,  6.3% system, 65.0% idle,  0.1% iowait  
Memory: 173G used, 47G free, 247M buffers, 168G cached  
DB activity: 9613 tps,  0 rollbs/s,   0 buffer r/s, 100 hit%, 961050 row r/s, 960824 row w/s  
DB I/O:     0 reads/s,     0 KB/s,     0 writes/s,     0 KB/s    
DB disk: 1455.4 GB total, 441.9 GB free (69% used)  


我们这个测试CASE,单个维度,单表,批量写入,根据CPU剩余(单ECS使用多实例或者使用UNLOGGED TABLE),估算得到的性能应该是:





create table tbl(c1 int, c2 int, c3 int, c4 int, c5 int);    
select c1, count(*) from tbl group by c1;    
select c2,c3, sum(c5) , count(*) from tbl group by c2,c3;    
..... 更多维度    


除了使用pipelinedb,实际上PostgreSQL使用insert on conflict和trigger或rule,也能实现一样的功能。




3、定义维度表的insert on conflict SQL

4、定义明细表trigger或rule,顺序调用insert on conflict 写入多个维度表



create table tbl(c1 int not null, c2 int not null, c3 int not null, c4 int not null, c5 int not null);    


create table cv1_tbl (c1 int primary key, cnt int8 default 1);    
create table cv2_tbl (c2 int, c3 int, c5 int, sum_v float8 default 0, cnt int8 default 1, primary key (c2,c3)) ;     
..... 其他维度    

3、定义维度表的insert on conflict SQL

insert into cv1_tbl (c1) values (NEW.c1) on conflict (c1) do update set cnt=cv1_tbl.cnt+1;    
insert into cv2_tbl (c2,c3,c5) values (NEW.c2, NEW.c3, NEW.c5) on conflict (c2,c3) do update set cnt=cv2_tbl.cnt+1, sum_v=case cv2_tbl.cnt when 1 then cv2_tbl.c5+excluded.c5 else cv2_tbl.sum_v+excluded.c5 end;    

4、定义明细表trigger或rule,顺序调用insert on conflict 写入多个维度表

create rule r1 as on insert to tbl do instead insert into cv1_tbl (c1) values (NEW.c1) on conflict (c1) do update set cnt=cv1_tbl.cnt+1;    
create rule r2 as on insert to tbl do instead insert into cv2_tbl (c2,c3,c5) values (NEW.c2, NEW.c3, NEW.c5) on conflict (c2,c3) do update set cnt=cv2_tbl.cnt+1, sum_v=case cv2_tbl.cnt when 1 then cv2_tbl.c5+excluded.c5 else cv2_tbl.sum_v+excluded.c5 end;    


vi test.sql    
\set c1 random(1,1000000)    
\set c2 random(1,1000000)    
\set c3 random(1,1000000)    
\set c4 random(1,1000000)    
\set c5 random(1,1000000)    
insert into tbl values (:c1, :c2, :c3, :c4, :c5);    
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120    


transaction type: ./test.sql    
scaling factor: 1    
query mode: prepared    
number of clients: 32    
number of threads: 32    
duration: 120 s    
number of transactions actually processed: 18618957    
latency average = 0.206 ms    
latency stddev = 0.212 ms    
tps = 155154.880841 (including connections establishing)    
tps = 155174.283641 (excluding connections establishing)    
script statistics:    
 - statement latencies in milliseconds:    
         0.001  \set c1 random(1,1000000)    
         0.000  \set c2 random(1,1000000)    
         0.000  \set c3 random(1,1000000)    
         0.000  \set c4 random(1,1000000)    
         0.001  \set c5 random(1,1000000)    
         0.203  insert into tbl values (:c1, :c2, :c3, :c4, :c5);    


postgres=# select * from cv2_tbl order by cnt desc limit 10;    
   c2   |   c3   |   c5   |  sum_v  | cnt     
 500568 | 119352 | 173877 |  436710 |   2    
 873168 |  20848 | 730385 | 1688835 |   2    
  90752 | 526912 | 622354 |  734505 |   2    
 273533 | 886999 | 766661 | 1085038 |   2    
 895573 | 466493 | 648095 | 1191965 |   2    
 338402 | 436092 | 940920 | 1372244 |   2    
 915723 | 866856 | 255638 |  947606 |   2    
 586692 | 543596 |  32905 |  996466 |   2    
 839232 | 928197 | 402745 | 1249665 |   2    
 401808 | 997216 | 493644 | 1423618 |   2    
(10 rows)    
postgres=# select * from cv1_tbl order by cnt desc limit 10;    
   c1   | cnt     
 952009 |  44    
 373778 |  43    
 483788 |  42    
  25749 |  42    
  93605 |  41    
 386201 |  41    
 596955 |  40    
 526220 |  40    
  91289 |  40    
 429061 |  40    
(10 rows)    

壳子表没有写入,把rule改成do also的话,就会写入本地表。是不是很爽呢?

postgres=# select * from tbl;    
 c1 | c2 | c3 | c4 | c5     
(0 rows)    






3、定义维度表的insert on conflict SQL

4、定义明细分区表trigger或rule,顺序调用insert on conflict 写入多个维度表







create extension hll;  
create table tbl (grpid int, userid int, dt date, cnt int8 default 1, hll_userid hll default hll_empty(), primary key (grpid, dt));  
insert into tbl (grpid, userid, dt) values () on conflict (grpid, dt) do update set   
  case tbl.cnt   
  when 1 then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid))   
  else hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid))  
  end ;  


vi test.sql  
\set grpid random(1,1000000)  
\set userid random(1,1000000000)  
insert into tbl (grpid, userid, dt) values (:grpid,:userid,'2017-11-24') on conflict (grpid, dt) do update set cnt=tbl.cnt+1, hll_userid=case tbl.cnt when 1 then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid))   else hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid))  end ;  
transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 28  
number of threads: 28  
duration: 120 s  
number of transactions actually processed: 21713334  
latency average = 0.155 ms  
latency stddev = 0.071 ms  
tps = 180938.313421 (including connections establishing)  
tps = 180959.906404 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set grpid random(1,1000000)  
         0.000  \set userid random(1,1000000000)  
         0.153  insert into tbl (grpid, userid, dt) values (:grpid,:userid,'2017-11-24') on conflict (grpid, dt) do update set cnt=tbl.cnt+1, hll_userid=  case tbl.cnt   when 1 then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid))   else hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid))  end ;  


postgres=# select * from tbl limit 10;  
 grpid  |  userid   |     dt     | cnt |                                                                                hll_userid                                                                                  
  71741 | 197976232 | 2017-11-24 |   5 | \x128b7fd534b8dfe5a72bbedd5b6c577ce9fb9fef7835561513628850f173084507f0bd7ed996166036a970  
 801374 | 373207765 | 2017-11-24 |   3 | \x128b7f1dd66eba7e70d9c550284e6d9870994f5f5b52f71f224d6e  
 565216 | 502576520 | 2017-11-24 |   7 | \x128b7f9c4eb2a37de228d8b959a3eb6875033eb9e5dae4c7a7a873037cc095c3f7b01506556992f5aeee9c2a29d4eeb4db71f92ce501619432a864  
  35036 | 868953081 | 2017-11-24 |  10 | \x128b7fa2249c2c7ca51016c477335c6c4e539dd369dd2ea9ab587ce6e3c3c88019dfc33361f5e97ab2db9e3475e0afefc5dc84547c9cc650d2c3ae61b7772ff8a3b36b63bfef7de0eff9f779d598d341edae11  
 950403 | 122708335 | 2017-11-24 |   9 | \x128b7fbb52bc26a18960fec0e5ef0b5d38015dc59f0bad2126d34ce0f19952682a1359257a39cb05a02cf0437f98ce664da1094e8173f33cc1df79547c86939e25bc096179d0a0cfe98b5c  
 173872 | 321068334 | 2017-11-24 |   7 | \x128b7fab5e34d66f513600c19356d876f80d37f13d28f4efc2d6ae0974487c0aa3f5e509affd49827908d35b7c4b009f57ff6376be2b1ea27b1204  
 786334 | 501502479 | 2017-11-24 |   5 | \x128b7f8b5e2d419433c147df779ac0ab34b25a060ecbdd5a896ee229a5ad32a00a060d516c141199609d3f  
 960665 | 855235921 | 2017-11-24 |   7 | \x128b7f95b32567416b5750ecb0c44a76480566f1d98aa6632a3ceeffe5dd8b8de96ffc2447dd5d74e20e993b38a6b242f2c78c678b60d542d68949  
  61741 | 945239318 | 2017-11-24 |   6 | \x128b7f885766f21f40b6b5b3783e764d90fd28c10af4a996cb5dcec8ea749905d0c5cb1de8b191b4f9e6775d597c247710ab71  
postgres=# select grpid,userid,cnt,hll_cardinality(hll_userid) from tbl limit 10;  
 grpid  |  userid   | cnt | hll_cardinality   
 775333 | 642518584 |  13 |              13  
  17670 | 542792727 |  11 |              11  
  30079 | 311255630 |  14 |              14  
  61741 | 945239318 |  10 |              10  
 808051 | 422418318 |  14 |              14  
 620850 | 461130760 |  12 |              12  
 256591 | 415325936 |  15 |              15  
 801374 | 373207765 |   9 |               9  
 314023 | 553568037 |  12 |              12  

HLL插件的知识参考如下: (兼容PostgreSQL 10头文件)

《Greenplum 最佳实践 - 估值插件hll的使用(以及hll分式聚合函数优化)》

《PostgreSQL hll (HyperLogLog) extension for "State of The Art Cardinality Estimation Algorithm" - 3》

《PostgreSQL hll (HyperLogLog) extension for "State of The Art Cardinality Estimation Algorithm" - 2》

《PostgreSQL hll (HyperLogLog) extension for "State of The Art Cardinality Estimation Algorithm" - 1》




create or replace function func1(int, int, date) returns void as $$  
  insert into tbl (grpid, userid, dt) values ($1,$2,$3) on conflict (grpid, dt)   
  do update set   
    case tbl.cnt     
    when 1   
      then hll_add(hll_add(tbl.hll_userid, hll_hash_integer(tbl.userid)), hll_hash_integer(excluded.userid))     
      hll_add(tbl.hll_userid, hll_hash_integer(excluded.userid))    
    end ;  
$$ language plpgsql strict;  




create or replace rule R1 AS on INSERT TO log_table do also XXXXXXXXXXXXXXX;


create or replace rule R1 AS on INSERT TO log_table WHERE (位点条件,如 id>10000000) do also XXXXXXXXXXXXXXX;

这种方法,数据既写log_table,同时又会执行流式统计XXXXX COMMAND。


《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《HTAP数据库 PostgreSQL 场景与性能测试之 22 - (OLTP) merge insert|upsert|insert on conflict|合并写入》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《打造云端流计算、在线业务、数据分析的业务数据闭环 - 阿里云RDS、HybridDB for PostgreSQL最佳实践》

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

digoal's wechat