Skip to content

Commit

Permalink
Rebalance strategies (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonels-msft committed Mar 2, 2020
1 parent 6590e05 commit 8c3685d
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 2 deletions.
7 changes: 6 additions & 1 deletion admin_guide/cluster_management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,14 @@ Rebalance Shards without Downtime

If you want to move existing shards to a newly added worker, Citus Enterprise provides a :ref:`rebalance_table_shards` function to make it easier. This function will move the shards of a given table to distribute them evenly among the workers.

The function is configurable to rebalance shards according to a number of
strategies, to best match your database workload. See the function reference to
learn which strategy to choose. Here's an example of rebalancing shards using
the default strategy:

.. code-block:: postgresql
SELECT rebalance_table_shards('github_events');
SELECT rebalance_table_shards();
Many products, like multi-tenant SaaS applications, cannot tolerate downtime, and Citus rebalancing is able to honor this requirement on PostgreSQL 10 or above. This means reads and writes from the application can continue with minimal interruption while data is being moved.

Expand Down
125 changes: 125 additions & 0 deletions develop/api_metadata.rst
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,131 @@ The pg_dist_colocation table contains information about which tables' shards sho
2 | 32 | 2 | 20
(1 row)

.. _pg_dist_rebalance_strategy:

Rebalancer strategy table
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. note::

The pg_dist_rebalance_strategy table is a part of Citus Enterprise. Please `contact us <https://www.citusdata.com/about/contact_us>`_ to obtain this functionality.

This table defines strategies that :ref:`rebalance_table_shards` can use to determine where to move shards.

+--------------------------------+----------------------+---------------------------------------------------------------------------+
| Name | Type | Description |
+================================+======================+===========================================================================+
| name | name | | Unique name for the strategy |
+--------------------------------+----------------------+---------------------------------------------------------------------------+
| default_strategy | boolean | | Whether :ref:`rebalance_table_shards` should choose this strategy by |
| | | | default. Use :ref:`citus_set_default_rebalance_strategy` to update |
| | | | this column |
+--------------------------------+----------------------+---------------------------------------------------------------------------+
| shard_cost_function | regproc | | Identifier for a cost function, which must take a shardid as bigint, |
| | | | and return its notion of a cost, as type real |
+--------------------------------+----------------------+---------------------------------------------------------------------------+
| node_capacity_function | regproc | | Identifier for a capacity function, which must take a nodeid as int, |
| | | | and return its notion of node capacity as type real |
+--------------------------------+----------------------+---------------------------------------------------------------------------+
| shard_allowed_on_node_function | regproc | | Identifier for a function that given shardid bigint, and nodeidarg int, |
| | | | returns boolean for whether the shard is allowed to be stored on the |
| | | | node |
+--------------------------------+----------------------+---------------------------------------------------------------------------+
| default_threshold | float4 | | Threshold for deeming a node too full or too empty, which determines |
| | | | when the rebalance_table_shards should try to move shards |
+--------------------------------+----------------------+---------------------------------------------------------------------------+
| minimum_threshold | float4 | | A safeguard to prevent the threshold argument of |
| | | | rebalance_table_shards() from being set too low |
+--------------------------------+----------------------+---------------------------------------------------------------------------+

A Citus installation ships with these strategies in the table:

.. code-block:: postgres
SELECT * FROM pg_dist_rebalance_strategy;
::

-[ RECORD 1 ]-------------------+-----------------------------------
Name | by_shard_count
default_strategy | true
shard_cost_function | citus_shard_cost_1
node_capacity_function | citus_node_capacity_1
shard_allowed_on_node_function | citus_shard_allowed_on_node_true
default_threshold | 0
minimum_threshold | 0
-[ RECORD 2 ]-------------------+-----------------------------------
Name | by_disk_size
default_strategy | false
shard_cost_function | citus_shard_cost_by_disk_size
node_capacity_function | citus_node_capacity_1
shard_allowed_on_node_function | citus_shard_allowed_on_node_true
default_threshold | 0.1
minimum_threshold | 0.01

The default strategy, ``by_shard_count``, assigns every shard the same cost. Its effect is to equalize the shard count across nodes. The other predefined strategy, ``by_disk_size``, assigns a cost to each shard matching its disk size in bytes plus that of the shards that are colocated with it. The disk size is calculated using ``pg_total_relation_size``, so it includes indices. This strategy attempts to achieve the same disk space on every node. Note the threshold of 0.1 -- it prevents unnecessary shard movement caused by insigificant differences in disk space.

.. _custom_rebalancer_strategies:

Creating custom rebalancer strategies
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$

Here are examples of functions that can be used within new shard rebalancer strategies, and registered in the :ref:`pg_dist_rebalance_strategy` with the :ref:`citus_add_rebalance_strategy` function.

* Setting a node capacity exception by hostname pattern:

.. code-block:: postgres
-- example of node_capacity_function
CREATE FUNCTION v2_node_double_capacity(nodeidarg int)
RETURNS boolean AS $$
SELECT
(CASE WHEN nodename LIKE '%.v2.worker.citusdata.com' THEN 2 ELSE 1 END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
* Rebalancing by number of queries that go to a shard, as measured by the :ref:`citus_stat_statements`:

.. code-block:: postgres
-- example of shard_cost_function
CREATE FUNCTION cost_of_shard_by_number_of_queries(shardid bigint)
RETURNS real AS $$
SELECT coalesce(sum(calls)::real, 0.001) as shard_total_queries
FROM citus_stat_statements
WHERE partition_key is not null
AND get_shard_id_for_distribution_column('tab', partition_key) = shardid;
$$ LANGUAGE sql;
* Isolating a specific shard (10000) on a node (address '10.0.0.1'):

.. code-block:: postgres
-- example of shard_allowed_on_node_function
CREATE FUNCTION isolate_shard_10000_on_10_0_0_1(shardid bigint, nodeidarg int)
RETURNS boolean AS $$
SELECT
(CASE WHEN nodename = '10.0.0.1' THEN shardid = 10000 ELSE shardid != 10000 END)
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
-- The next two definitions are recommended in combination with the above function.
-- This way the average utilization of nodes is not impacted by the isolated shard.
CREATE FUNCTION no_capacity_for_10_0_0_1(nodeidarg int)
RETURNS real AS $$
SELECT
(CASE WHEN nodename = '10.0.0.1' THEN 0 ELSE 1 END)::real
FROM pg_dist_node where nodeid = nodeidarg
$$ LANGUAGE sql;
CREATE FUNCTION no_cost_for_10000(shardid bigint)
RETURNS real AS $$
SELECT
(CASE WHEN shardid = 10000 THEN 0 ELSE 1 END)::real
$$ LANGUAGE sql;
.. _citus_stat_statements:

Query statistics table
Expand Down
123 changes: 122 additions & 1 deletion develop/api_udf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,32 @@ $$$$$$$$$$$$$$$$$$$$$$$$$$$
.. note::
The rebalance_table_shards function is a part of Citus Enterprise. Please `contact us <https://www.citusdata.com/about/contact_us>`_ to obtain this functionality.

The rebalance_table_shards() function moves shards of the given table to make them evenly distributed among the workers. The function first calculates the list of moves it needs to make in order to ensure that the cluster is balanced within the given threshold. Then, it moves shard placements one by one from the source node to the destination node and updates the corresponding shard metadata to reflect the move.
The rebalance_table_shards() function moves shards of the given table to make
them evenly distributed among the workers. The function first calculates the
list of moves it needs to make in order to ensure that the cluster is balanced
within the given threshold. Then, it moves shard placements one by one from the
source node to the destination node and updates the corresponding shard
metadata to reflect the move.

Every shard is assigned a cost when determining whether shards are "evenly
distributed." By default each shard has the same cost (a value of 1), so
distributing to equalize the cost across workers is the same as equalizing the
number of shards on each. The constant cost strategy is called "by_shard_count"
and is the default rebalancing strategy.

The default strategy is appropriate under these circumstances:

1. The shards are roughly the same size
2. The shards get roughly the same amount of traffic
3. Worker nodes are all the same size/type
4. Shards haven't been pinned to particular workers

If any of these assumptions don't hold, then the default rebalancing can result
in a bad plan. In this case you may customize the strategy, using the
``rebalance_strategy`` parameter.

It's advisable to call :ref:`get_rebalance_table_shards_plan` before running
rebalance_table_shards, to see and verify the actions to be performed.

Arguments
**************************
Expand All @@ -1052,6 +1077,8 @@ Arguments

**drain_only:** (Optional) When true, move shards off worker nodes who have ``shouldhaveshards`` set to false in :ref:`pg_dist_node`; move no other shards.

**rebalance_strategy:** (Optional) the name of a strategy in :ref:`pg_dist_rebalance_strategy`. If this argument is omitted, the function chooses the default strategy, as indicated in the table.

Return Value
*********************************

Expand All @@ -1072,6 +1099,41 @@ This example usage will attempt to rebalance the github_events table without mov
SELECT rebalance_table_shards('github_events', excluded_shard_list:='{1,2}');
.. _get_rebalance_table_shards_plan:

get_rebalance_table_shards_plan
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$

.. note::
The get_rebalance_table_shards_plan function is a part of Citus Enterprise. Please `contact us <https://www.citusdata.com/about/contact_us>`_ to obtain this functionality.

Output the planned shard movements of :ref:`rebalance_table_shards` without
performing them. While it's unlikely, get_rebalance_table_shards_plan can
output a slightly different plan than what a rebalance_table_shards call with
the same arguments will do. This could happen because they are not executed at
the same time, so facts about the cluster -- e.g. disk space -- might differ
between the calls.

Arguments
**************************

The same arguments as rebalance_table_shards: relation, threshold,
max_shard_moves, excluded_shard_list, and drain_only. See documentation of that
function for the arguments' meaning.

Return Value
*********************************

Tuples containing these columns:

* **table_name**: The table whose shards would move
* **shardid**: The shard in question
* **shard_size**: Size in bytes
* **sourcename**: Hostname of the source node
* **sourceport**: Port of the source node
* **targetname**: Hostname of the destination node
* **targetport**: Port of the destination node

.. _get_rebalance_progress:

get_rebalance_progress
Expand Down Expand Up @@ -1121,6 +1183,63 @@ Example
│ 7083 │ foo │ 102019 │ 8192 │ n3.foobar.com │ 5432 │ n4.foobar.com │ 5432 │ 2 │
└───────────┴────────────┴─────────┴────────────┴───────────────┴────────────┴───────────────┴────────────┴──────────┘

.. _citus_add_rebalance_strategy:

citus_add_rebalance_strategy
$$$$$$$$$$$$$$$$$$$$$$$$$$$$

Append a row to the ``pg_dist_rebalance_strategy``.

Arguments
**************************

For more about these arguments, see the corresponding column values in :ref:`pg_dist_rebalance_strategy`.

**name:** identifier for the new strategy

**shard_cost_function:** identifies the function used to determine the "cost" of each shard

**node_capacity_function:** identifies the function to measure node capacity

**shard_allowed_on_node_function:** identifies the function which determines which shards can be placed on which nodes

**default_threshold:** a floating point threshold that tunes how precisely the cumulative shard cost should be balanced between nodes

**minimum_threshold:** (Optional) a safeguard column that holds the minimum value allowed for the threshold argument of rebalance_table_shards(). Its default value is 0

Return Value
*********************************

N/A

.. _citus_set_default_rebalance_strategy:

citus_set_default_rebalance_strategy
$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$

.. note::
The citus_set_default_rebalance_strategy function is a part of Citus Enterprise. Please `contact us <https://www.citusdata.com/about/contact_us>`_ to obtain this functionality.

Update the :ref:`pg_dist_rebalance_strategy` table, changing the strategy named
by its argument to be the default chosen when rebalancing shards.

Arguments
**************************

**name:** the name of the strategy in pg_dist_rebalance_strategy

Return Value
*********************************

N/A

Example
**************************

.. code-block:: postgresql
SELECT citus_set_default_rebalance_strategy('by_disk_size');
.. _master_drain_node:

master_drain_node
Expand All @@ -1144,6 +1263,8 @@ Arguments
* ``force_logical``: Use logical replication even if the table doesn't have a replica identity. Any concurrent update/delete statements to the table will fail during replication.
* ``block_writes``: Use COPY (blocking writes) for tables lacking primary key or replica identity.

**rebalance_strategy:** (Optional) the name of a strategy in :ref:`pg_dist_rebalance_strategy`. If this argument is omitted, the function chooses the default strategy, as indicated in the table.

Return Value
*********************************

Expand Down

0 comments on commit 8c3685d

Please sign in to comment.