Skip to content

Latest commit

 

History

History
472 lines (351 loc) · 20.8 KB

20210307_03.md

File metadata and controls

472 lines (351 loc) · 20.8 KB

PostgreSQL citus 发布 10版本 - 支持columner 列存储, 开放rebalance源码, 支持reference,分布式,本地表JOIN 等

作者

digoal

日期

2021-03-07

标签

PostgreSQL , citus , 列存储


背景

citus 发布 10版本 - 支持columner 列存储, 开放rebalance源码, 支持reference,分布式,本地表JOIN 等

详见

https://www.citusdata.com/blog/2021/03/05/citus-10-release-open-source-rebalancer-and-columnar-for-postgres/

https://github.com/citusdata/citus

https://github.com/citusdata/citus/blob/master/src/backend/columnar/README.md

Development on Citus first started around a decade ago and once a year we release a major new Citus open source version. We wanted to make number 10 something special, but I could not have imagined how truly spectacular this release would become. Citus 10 extends Postgres (12 and 13) with many new superpowers:

  • Columnar storage for Postgres: Compress your PostgreSQL and Citus tables to reduce storage cost and speed up your analytical queries.
  • Sharding on a single Citus node: Make your single-node Postgres server ready to scale out by sharding tables locally using Citus.
  • Shard rebalancer in Citus open source: We have open sourced the shard rebalancer so you can easily add Citus nodes and rebalance your cluster.
  • Joins and foreign keys between local PostgreSQL tables and Citus tables: Mix and match PostgreSQL and Citus tables with foreign keys and joins.
  • Functions to change the way your tables are distributed: Redistribute your tables in a single step using new alter table functions.
  • Much more: Better naming, improved SQL & DDL support, simplified operations.

Columnar storage for PostgreSQL

The data sizes of some new Citus users are truly gigantic, which meant we needed a way to lower storage cost and get more out of the hardware. That is why we implemented columnar storage for PostgreSQL as part of the Citus extension. Citus Columnar will give you compression ratios of 3-10x, and even greater I/O reductions. Best part? We have made columnar available as part of the open source Citus 10 release and you can use Citus columnar with or without the Citus scale-out features!

Our team has a long history with columnar storage in PostgreSQL, as we originally developed the cstore_fdw extension which offered columnar storage via the foreign data wrapper (fdw) API. PostgreSQL 12 introduced “table access methods”, which allows extensions to define custom storage formats in a much more native way.

Citus makes columnar storage available in PostgreSQL via the table access method APIs, which means that you can now create columnar tables by simply adding USING columnar when creating a table:

CREATE TABLE order_history (...) USING columnar;  

If you have an existing row-based (“heap”) table that you’d like to convert to columnar, you can do that too, using the alter_table_set_access_method function:

-- compress a table using columnar storage (indexes are dropped)  
SELECT alter_table_set_access_method('orders_2019', 'columnar');  

When you use columnar storage, you will typically see a 60-90% reduction in data size. In addition, Citus will only read the columns used in the SQL query. This can give dramatic speed ups for I/O bound queries, and a big reduction in storage cost.

Compared to cstore_fdw, Citus columnar has a better compression ratio thanks to zstd compression. Citus Columnar also supports rollback, streaming replication, archival, and pg_upgrade.

There are still a few limitations to be aware of: Indexes and update/delete are not supported, and it is best to avoid single-row inserts, since compression only works well in batches. We plan to address these limitations in upcoming Citus releases, but you can also avoid them by using partitioning.

If you partition time series data by time, you can use row-based storage for recent partitions to enable single-row, update/delete/upsert and indexes—while using columnar storage to archive data that is no longer changing. To make this easy, we also added a function to compress all your old partitions in one go:

-- compress all partitions older than 7 days  
CALL alter_old_partitions_set_access_method('orders_partitioned', now() - interval '7 days', 'columnar');  

This procedure commits after every partition to release locks as quickly as possible. You can use pg_cron to run this new function as a nightly compression job.

Stay tuned for a Citus 10 Columnar blog post by Jeff Davis describing the new columnar storage access method in detail.

Open sourcing the Citus shard rebalancer

The managed Hyperscale (Citus) service on Azure and its predecessor on AWS used to have a separate extension for rebalancing shards after adding a node, which was not part of the Citus open source repo.

With Citus 10, we have introduced the shard rebalancing functionality into the open source Citus repo. That means that scaling out is now as simple as running 2 SQL queries after you’ve set up a new PostgreSQL server with the Citus extension:

-- add a new worker node  
SELECT citus_add_node('10.0.0.6 ', 5432);  
  
-- move shards to the new worker node(s) to even out the number of shards per node  
SELECT rebalance_table_shards();  
  
-- or, move shards based on disk size in case they are skewed  
SELECT rebalance_table_shards(rebalance_strategy := 'by_disk_size');  

While a shard is being moved, writes to that shard are blocked, but all reads and writes to other shards can continue. If you are using the Hyperscale (Citus) option in Azure Database for PostgreSQL, we have some extra tricks to make writes to the shards being moved non-blocking as well.

By default, the Citus shard rebalancer comes with 2 rebalance strategies: by_shard_count to even out the number of shards (the default), by_disk_size to even out the number of bytes, and you can also create your own rebalance strategies.

Joins and foreign keys between PostgreSQL and Citus tables

With the new single-node Citus capabilities and the shard rebalancer, you can be ready to scale out by distributing your tables. However, distributing tables does involve certain trade-offs, such as extra network round trips when querying shards on worker nodes, and a few unsupported SQL features.

If you have a very large Postgres table and a data-intensive workload (e.g. the frequently-queried part of the table exceeds memory), then the performance gains from distributing the table over multiple nodes with Citus will vastly outweigh any downsides. However, if most of your other Postgres tables are small, then you might end up having to make additional changes without much additional benefit.

A simple solution would be to not distribute the smaller tables at all. In most Citus deployments, the application connects to a single coordinator node (which is usually sufficient), and the coordinator is a fully functional PostgreSQL server. That means you could organize your database as follows:

  • convert large tables into Citus distributed tables,
  • convert smaller tables that frequently JOIN with distributed tables into reference tables,
  • convert smaller tables that have foreign keys from distributed tables into reference tables,
  • keep all other tables as regular PostgreSQL tables local to the coordinator.

pic

Diagram 2: Example of a data model where the really large table (clicks) is distributed. Because the Clicks table has a foreign key to Ads, we turn Ads into a reference table. Ads also has foreign keys to other tables, but we can keep those other tables (Campaigns, Publishers, Advertisers) as local tables on the coordinator.
That way, you can scale out CPU, memory, and I/O where you need it, and minimize application changes and other trade-offs where you don’t. To make this model work seamlessly, Citus 10 adds support for 2 important features:

  • foreign keys between local tables and reference tables
  • direct joins between local tables and distributed tables

With these new features, you can mix and match PostgreSQL tables and Citus tables to get the best of both worlds without having to separate them in your data model.

列存储的使用

Introduction

Citus Columnar offers a per-table option for columnar storage to
reduce IO requirements though compression and projection pushdown.

Design Trade-Offs

Existing PostgreSQL row tables work well for OLTP:

  • Support UPDATE/DELETE efficiently
  • Efficient single-tuple lookups

The Citus Columnar tables work best for analytic or DW workloads:

  • Compression
  • Doesn't read unnecessary columns
  • Efficient VACUUM

Next generation of cstore_fdw

Citus Columnar is the next generation of
cstore_fdw.

Benefits of Citus Columnar over cstore_fdw:

  • Citus Columnar is based on the Table Access Method
    API
    , which
    allows it to behave exactly like an ordinary heap (row) table for
    most operations.
  • Supports Write-Ahead Log (WAL).
  • Supports ROLLBACK.
  • Supports physical replication.
  • Supports recovery, including Point-In-Time Restore (PITR).
  • Supports pg_dump and pg_upgrade without the need for special
    options or extra steps.
  • Better user experience; simple USINGclause.
  • Supports more features that work on ordinary heap (row) tables.

Limitations

  • Append-only (no UPDATE/DELETE support)
  • No space reclamation (e.g. rolled-back transactions may still
    consume disk space)
  • No index support, index scans, or bitmap index scans
  • No tidscans
  • No sample scans
  • No TOAST support (large values supported inline)
  • No support for ON CONFLICT
    statements (except DO NOTHING actions with no target specified).
  • No support for tuple locks (SELECT ... FOR SHARE, SELECT ... FOR UPDATE)
  • No support for serializable isolation level
  • Support for PostgreSQL server versions 12+ only
  • No support for foreign keys, unique constraints, or exclusion
    constraints
  • No support for logical decoding
  • No support for intra-node parallel scans
  • No support for AFTER ... FOR EACH ROW triggers
  • No UNLOGGED columnar tables
  • No TEMPORARY columnar tables

Future iterations will incrementally lift the limitations listed above.

User Experience

Create a Columnar table by specifying USING columnar when creating
the table.

CREATE TABLE my_columnar_table  
(  
    id INT,  
    i1 INT,  
    i2 INT8,  
    n NUMERIC,  
    t TEXT  
) USING columnar;  

Insert data into the table and read from it like normal (subject to
the limitations listed above).

To see internal statistics about the table, use VACUUM VERBOSE. Note that VACUUM (without FULL) is much faster on a
columnar table, because it scans only the metadata, and not the actual
data.

Options

Set options using:

alter_columnar_table_set(  
    relid REGCLASS,  
    chunk_group_row_limit INT4 DEFAULT NULL,  
    stripe_row_limit INT4 DEFAULT NULL,  
    compression NAME DEFAULT NULL,  
    compression_level INT4)  

For example:

SELECT alter_columnar_table_set(  
    'my_columnar_table',  
    compression => 'none',  
    stripe_row_limit => 10000);  

The following options are available:

  • compression: [none|pglz|zstd|lz4|lz4hc] - set the compression type
    for newly-inserted data. Existing data will not be
    recompressed/decompressed. The default value is zstd (if support
    has been compiled in).
  • compression_level: <integer> - Sets compression level. Valid
    settings are from 1 through 19. If the compression method does not
    support the level chosen, the closest level will be selected
    instead.
  • stripe_row_limit: <integer> - the maximum number of rows per
    stripe for newly-inserted data. Existing stripes of data will not
    be changed and may have more rows than this maximum value. The
    default value is 150000.
  • chunk_group_row_limit: <integer> - the maximum number of rows per
    chunk for newly-inserted data. Existing chunks of data will not be
    changed and may have more rows than this maximum value. The default
    value is 10000.

View options for all tables with:

SELECT * FROM columnar.options;  

You can also adjust options with a SET command of one of the
following GUCs:

  • columnar.compression
  • columnar.compression_level
  • columnar.stripe_row_limit
  • columnar.chunk_group_row_limit

GUCs only affect newly-created tables, not any newly-created
stripes on an existing table.

Partitioning

Columnar tables can be used as partitions; and a partitioned table may
be made up of any combination of row and columnar partitions.

CREATE TABLE parent(ts timestamptz, i int, n numeric, s text)  
  PARTITION BY RANGE (ts);  
  
-- columnar partition  
CREATE TABLE p0 PARTITION OF parent  
  FOR VALUES FROM ('2020-01-01') TO ('2020-02-01')  
  USING COLUMNAR;  
-- columnar partition  
CREATE TABLE p1 PARTITION OF parent  
  FOR VALUES FROM ('2020-02-01') TO ('2020-03-01')  
  USING COLUMNAR;  
-- row partition  
CREATE TABLE p2 PARTITION OF parent  
  FOR VALUES FROM ('2020-03-01') TO ('2020-04-01');  
  
INSERT INTO parent VALUES ('2020-01-15', 10, 100, 'one thousand'); -- columnar  
INSERT INTO parent VALUES ('2020-02-15', 20, 200, 'two thousand'); -- columnar  
INSERT INTO parent VALUES ('2020-03-15', 30, 300, 'three thousand'); -- row  

When performing operations on a partitioned table with a mix of row
and columnar partitions, take note of the following behaviors for
operations that are supported on row tables but not columnar
(e.g. UPDATE, DELETE, tuple locks, etc.):

  • If the operation is targeted at a specific row partition
    (e.g. UPDATE p2 SET i = i + 1), it will succeed; if targeted at
    a specified columnar partition (e.g. UPDATE p1 SET i = i + 1),
    it will fail.
  • If the operation is targeted at the partitioned table and has a
    WHERE clause that excludes all columnar partitions
    (e.g. UPDATE parent SET i = i + 1 WHERE ts = '2020-03-15'), it
    will succeed.
  • If the operation is targeted at the partitioned table, but does not
    exclude all columnar partitions, it will fail; even if the actual
    data to be updated only affects row tables (e.g. UPDATE parent SET i = i + 1 WHERE n = 300).

Because columnar tables do not support indexes, it's impossible to
create indexes on the partitioned table if some partitions are
columnar. Instead, you must create indexes on the individual row
partitions. Similarly for constraints that require indexes, e.g.:

CREATE INDEX p2_ts_idx ON p2 (ts);  
CREATE UNIQUE INDEX p2_i_unique ON p2 (i);  
ALTER TABLE p2 ADD UNIQUE (n);  

Converting Between Row and Columnar

Note: ensure that you understand any advanced features that may be
used with the table before converting it (e.g. row-level security,
storage options, constraints, inheritance, etc.), and ensure that they
are reproduced in the new table or partition appropriately. LIKE,
used below, is a shorthand that works only in simple cases.

CREATE TABLE my_table(i INT8 DEFAULT '7');  
INSERT INTO my_table VALUES(1);  
-- convert to columnar  
SELECT alter_table_set_access_method('my_table', 'columnar');  
-- back to row  
SELECT alter_table_set_access_method('my_table', 'heap');  

Performance Microbenchmark

Important: This microbenchmark is not intended to represent any real
workload. Compression ratios, and therefore performance, will depend
heavily on the specific workload. This is only for the purpose of
illustrating a "columnar friendly" contrived workload that showcases
the benefits of columnar.

Schema

CREATE TABLE perf_row(  
    id INT8,  
    ts TIMESTAMPTZ,  
    customer_id INT8,  
    vendor_id INT8,  
    name TEXT,  
    description TEXT,  
    value NUMERIC,  
    quantity INT4  
);  
  
CREATE TABLE perf_columnar(LIKE perf_row) USING COLUMNAR;  

Data

CREATE OR REPLACE FUNCTION random_words(n INT4) RETURNS TEXT LANGUAGE plpython2u AS $$  
import random  
t = ''  
words = ['zero','one','two','three','four','five','six','seven','eight','nine','ten']  
for i in xrange(0,n):  
  if (i != 0):  
    t += ' '  
  r = random.randint(0,len(words)-1)  
  t += words[r]  
return t  
$$;  
INSERT INTO perf_row  
   SELECT  
    g, -- id  
    '2020-01-01'::timestamptz + ('1 minute'::interval * g), -- ts  
    (random() * 1000000)::INT4, -- customer_id  
    (random() * 100)::INT4, -- vendor_id  
    random_words(7), -- name  
    random_words(100), -- description  
    (random() * 100000)::INT4/100.0, -- value  
    (random() * 100)::INT4 -- quantity  
   FROM generate_series(1,75000000) g;  
  
INSERT INTO perf_columnar SELECT * FROM perf_row;  

Compression Ratio

=> SELECT pg_total_relation_size('perf_row')::numeric/pg_total_relation_size('perf_columnar') AS compression_ratio;  
 compression_ratio  
--------------------  
 5.3958044063457513  
(1 row)  

The overall compression ratio of columnar table, versus the same data
stored with row storage, is 5.4X.

=> VACUUM VERBOSE perf_columnar;  
INFO:  statistics for "perf_columnar":  
storage id: 10000000000  
total file size: 8761368576, total data size: 8734266196  
compression rate: 5.01x  
total row count: 75000000, stripe count: 500, average rows per stripe: 150000  
chunk count: 60000, containing data for dropped columns: 0, zstd compressed: 60000  

VACUUM VERBOSE reports a smaller compression ratio, because it
only averages the compression ratio of the individual chunks, and does
not account for the metadata savings of the columnar format.

System

  • Azure VM: Standard D2s v3 (2 vcpus, 8 GiB memory)
  • Linux (ubuntu 18.04)
  • Data Drive: Standard HDD (512GB, 500 IOPS Max, 60 MB/s Max)
  • PostgreSQL 13 (--with-llvm, --with-python)
  • shared_buffers = 128MB
  • max_parallel_workers_per_gather = 0
  • jit = on

Note: because this was run on a system with enough physical memory to
hold a substantial fraction of the table, the IO benefits of columnar
won't be entirely realized by the query runtime unless the data size
is substantially increased.

Query

-- OFFSET 1000 so that no rows are returned, and we collect only timings  
  
SELECT vendor_id, SUM(quantity) FROM perf_row GROUP BY vendor_id OFFSET 1000;  
SELECT vendor_id, SUM(quantity) FROM perf_row GROUP BY vendor_id OFFSET 1000;  
SELECT vendor_id, SUM(quantity) FROM perf_row GROUP BY vendor_id OFFSET 1000;  
SELECT vendor_id, SUM(quantity) FROM perf_columnar GROUP BY vendor_id OFFSET 1000;  
SELECT vendor_id, SUM(quantity) FROM perf_columnar GROUP BY vendor_id OFFSET 1000;  
SELECT vendor_id, SUM(quantity) FROM perf_columnar GROUP BY vendor_id OFFSET 1000;  

Timing (median of three runs):

  • row: 436s
  • columnar: 16s
  • speedup: 27X

您的愿望将传达给PG kernel hacker、数据库厂商等, 帮助提高数据库产品质量和功能, 说不定下一个PG版本就有您提出的功能点. 针对非常好的提议,奖励限量版PG文化衫、纪念品、贴纸、PG热门书籍等,奖品丰富,快来许愿。开不开森.

digoal's wechat