Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate DiskObjectStorage with IO Scheduler #47009

Merged
merged 22 commits into from Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/en/engines/table-engines/mergetree-family/mergetree.md
Expand Up @@ -1139,6 +1139,8 @@ Optional parameters:
- `s3_max_put_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_put_rps`.
- `s3_max_get_rps` — Maximum GET requests per second rate before throttling. Default value is `0` (unlimited).
- `s3_max_get_burst` — Max number of requests that can be issued simultaneously before hitting request per second limit. By default (`0` value) equals to `s3_max_get_rps`.
- `read_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of read requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).
- `write_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of write requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).

### Configuring the cache

Expand Down Expand Up @@ -1251,6 +1253,8 @@ Other parameters:
* `cache_enabled` - Allows to cache mark and index files on local FS. Default value is `true`.
* `cache_path` - Path on local FS where to store cached mark and index files. Default value is `/var/lib/clickhouse/disks/<disk_name>/cache/`.
* `skip_access_check` - If true, disk access checks will not be performed on disk start-up. Default value is `false`.
* `read_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of read requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).
* `write_resource` — Resource name to be used for [scheduling](/docs/en/operations/workload-scheduling.md) of write requests to this disk. Default value is empty string (IO scheduling is not enabled for this disk).

Examples of working configurations can be found in integration tests directory (see e.g. [test_merge_tree_azure_blob_storage](https://github.com/ClickHouse/ClickHouse/blob/master/tests/integration/test_merge_tree_azure_blob_storage/configs/config.d/storage_conf.xml) or [test_azure_blob_storage_zero_copy_replication](https://github.com/ClickHouse/ClickHouse/blob/master/tests/integration/test_azure_blob_storage_zero_copy_replication/configs/config.d/storage_conf.xml)).

Expand Down
64 changes: 64 additions & 0 deletions docs/en/operations/system-tables/scheduler.md
@@ -0,0 +1,64 @@
---
slug: /en/operations/system-tables/scheduler
---
# scheduler

Contains information and status for [scheduling nodes](/docs/en/operations/workload-scheduling.md/#hierarchy) residing on the local server.
This table can be used for monitoring. The table contains a row for every scheduling node.

Example:

``` sql
SELECT *
FROM system.scheduler
WHERE resource = 'network_read' AND path = '/prio/fair/prod'
FORMAT Vertical
```

``` text
Row 1:
──────
resource: network_read
path: /prio/fair/prod
type: fifo
weight: 5
priority: 0
is_active: 0
active_children: 0
dequeued_requests: 67
dequeued_cost: 4692272
busy_periods: 63
vruntime: 938454.1999999989
system_vruntime: ᴺᵁᴸᴸ
queue_length: 0
queue_cost: 0
budget: -60524
is_satisfied: ᴺᵁᴸᴸ
inflight_requests: ᴺᵁᴸᴸ
inflight_cost: ᴺᵁᴸᴸ
max_requests: ᴺᵁᴸᴸ
max_cost: ᴺᵁᴸᴸ
```

Columns:

- `resource` (`String`) - Resource name
- `path` (`String`) - Path to a scheduling node within this resource scheduling hierarchy
- `type` (`String`) - Type of a scheduling node.
- `weight` (`Float64`) - Weight of a node, used by a parent node of `fair`` type.
- `priority` (`Int64`) - Priority of a node, used by a parent node of 'priority' type (Lower value means higher priority).
- `is_active` (`UInt8`) - Whether this node is currently active - has resource requests to be dequeued and constraints satisfied.
- `active_children` (`UInt64`) - The number of children in active state.
- `dequeued_requests` (`UInt64`) - The total number of resource requests dequeued from this node.
- `dequeued_cost` (`UInt64`) - The sum of costs (e.g. size in bytes) of all requests dequeued from this node.
- `busy_periods` (`UInt64`) - The total number of deactivations of this node.
- `vruntime` (`Nullable(Float64)`) - For children of `fair` nodes only. Virtual runtime of a node used by SFQ algorithm to select the next child to process in a max-min fair manner.
- `system_vruntime` (`Nullable(Float64)`) - For `fair` nodes only. Virtual runtime showing `vruntime` of the last processed resource request. Used during child activation as the new value of `vruntime`.
- `queue_length` (`Nullable(UInt64)`) - For `fifo` nodes only. Current number of resource requests residing in the queue.
- `queue_cost` (`Nullable(UInt64)`) - For `fifo` nodes only. Sum of costs (e.g. size in bytes) of all requests residing in the queue.
- `budget` (`Nullable(Int64)`) - For `fifo` nodes only. The number of available "cost units" for new resource requests. Can appear in case of discrepancy of estimated and real costs of resource requests (e.g. after read/write failure)
- `is_satisfied` (`Nullable(UInt8)`) - For constraint nodes only (e.g. `inflight_limit`). Equals `1` if all the constraint of this node are satisfied.
- `inflight_requests` (`Nullable(Int64)`) - For `inflight_limit` nodes only. The number of resource requests dequeued from this node, that are currently in consumption state.
- `inflight_cost` (`Nullable(Int64)`) - For `inflight_limit` nodes only. The sum of costs (e.g. bytes) of all resource requests dequeued from this node, that are currently in consumption state.
- `max_requests` (`Nullable(Int64)`) - For `inflight_limit` nodes only. Upper limit for `inflight_requests` leading to constraint violation.
- `max_cost` (`Nullable(Int64)`) - For `inflight_limit` nodes only. Upper limit for `inflight_cost` leading to constraint violation.
153 changes: 153 additions & 0 deletions docs/en/operations/workload-scheduling.md
@@ -0,0 +1,153 @@
---
slug: /en/operations/workload-scheduling
sidebar_position: 69
sidebar_label: "Workload scheduling"
title: "Workload scheduling"
---

When ClickHouse execute multiple queries simultaneously, they may be using shared resources (e.g. disks). Scheduling constraints and policies can be applied to regulate how resources are utilized and shared between different workloads. For every resource a scheduling hierarchy can be configured. Hierarchy root represents a resource, while leafs are queues, holding requests that exceed resource capacity.

:::note
Currently only remote disk IO can be scheduled using described method. For CPU scheduling see settings about thread pools and [`concurrent_threads_soft_limit_num`](server-configuration-parameters/settings.md#concurrent_threads_soft_limit_num). For flexible memory limits see [Memory overcommit](settings/memory-overcommit.md)
:::

## Disk configuration {#disk-config}

To enable IO scheduling for a specific disk, you have to specify `read_resource` and/or `write_resource` in storage configuration. It says ClickHouse what resource should be used for every read and write requests with given disk. Read and write resource can refer to the same resource name, which is useful for local SSDs or HDDs. Multiple different disks also can refer to the same resource, which is useful for remote disks: if you want to be able to allow fair division of network bandwidth between e.g. "production" and "development" workloads.

Example:
```xml
<clickhouse>
<storage_configuration>
...
<disks>
<s3>
<type>s3</type>
<endpoint>https://clickhouse-public-datasets.s3.amazonaws.com/my-bucket/root-path/</endpoint>
<access_key_id>your_access_key_id</access_key_id>
<secret_access_key>your_secret_access_key</secret_access_key>
<read_resource>network_read</read_resource>
<write_resource>network_write</write_resource>
</s3>
</disks>
<policies>
<s3_main>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3_main>
</policies>
</storage_configuration>
</clickhouse>
```

## Workload markup {#workload_markup}

Queries can be marked with setting `workload` to distinguish different workloads. If `workload` is not set, than value "default" is used. Note that you are able to specify the other value using settings profiles. Setting constraints can be used to make `workload` constant if you want all queries from the user to be marked with fixed value of `workload` setting.

Let's consider an example of a system with two different workloads: "production" and "development".

```sql
SELECT count() FROM my_table WHERE value = 42 SETTINGS workload = 'production'
SELECT count() FROM my_table WHERE value = 13 SETTINGS workload = 'development'
```

## Resource scheduling hierarchy {#hierarchy}

From the standpoint of scheduling subsystem a resource represents a hierarchy of scheduling nodes.

```mermaid
graph TD
subgraph network_read
nr_root(("/"))
-->|100 concurrent requests| nr_fair("fair")
-->|75% bandwidth| nr_prod["prod"]
nr_fair
-->|25% bandwidth| nr_dev["dev"]
end

subgraph network_write
nw_root(("/"))
-->|100 concurrent requests| nw_fair("fair")
-->|75% bandwidth| nw_prod["prod"]
nw_fair
-->|25% bandwidth| nw_dev["dev"]
end
```

**Possible node types:**
* `inflight_limit` (constraint) - blocks if either number of concurrent in-flight requests exceeds `max_requests`, or their total cost exceeds `max_cost`; must have a single child.
* `fair` (policy) - selects the next request to serve from one of its children nodes according to max-min fairness; children nodes can specify `weight` (default is 1).
* `priority` (policy) - selects the next request to serve from one of its children nodes according to static priorities (lower value means higher priority); children nodes can specify `priority` (default is 0).
* `fifo` (queue) - leaf of the hierarchy capable of holding requests that exceed resource capacity.

The following example shows how to define IO scheduling hierarchies shown in the picture:

```xml
<clickhouse>
<resources>
<network_read>
<node path="/">
<type>inflight_limit</type>
<max_requests>100</max_requests>
</node>
<node path="/fair">
<type>fair</type>
</node>
<node path="/fair/prod">
<type>fifo</type>
<weight>3</weight>
</node>
<node path="/fair/dev">
<type>fifo</type>
</node>
</network_read>
<network_write>
<node path="/">
<type>inflight_limit</type>
<max_requests>100</max_requests>
</node>
<node path="/fair">
<type>fair</type>
</node>
<node path="/fair/prod">
<type>fifo</type>
<weight>3</weight>
</node>
<node path="/fair/dev">
<type>fifo</type>
</node>
</network_write>
</resources>
</clickhouse>
```

## Workload classifiers {#workload_classifiers}

Workload classifiers are used to define mapping from `workload` specified by a query into leaf-queues that should be used for specific resources. At the moment, workload classification is simple: only static mapping is available.

Example:
```xml
<clickhouse>
<workload_classifiers>
<production>
<network_read>/fair/prod</network_read>
<network_write>/fair/prod</network_write>
</production>
<development>
<network_read>/fair/dev</network_read>
<network_write>/fair/dev</network_write>
</development>
<default>
<network_read>/fair/dev</network_read>
<network_write>/fair/dev</network_write>
</default>
</workload_classifiers>
</clickhouse>
```


## See also
- [system.scheduler](/docs/en/operations/system-tables/scheduler.md)
8 changes: 8 additions & 0 deletions src/Common/CurrentThread.cpp
Expand Up @@ -97,6 +97,14 @@ ThreadGroupPtr CurrentThread::getGroup()
return current_thread->getThreadGroup();
}

ContextPtr CurrentThread::getQueryContext()
{
if (unlikely(!current_thread))
return {};

return current_thread->getQueryContext();
}

std::string_view CurrentThread::getQueryId()
{
if (unlikely(!current_thread))
Expand Down
4 changes: 4 additions & 0 deletions src/Common/CurrentThread.h
Expand Up @@ -86,6 +86,10 @@ class CurrentThread
static void finalizePerformanceCounters();

/// Returns a non-empty string if the thread is attached to a query

/// Returns attached query context
static ContextPtr getQueryContext();

static std::string_view getQueryId();

/// Initializes query with current thread as master thread in constructor, and detaches it in destructor
Expand Down
42 changes: 40 additions & 2 deletions src/Disks/ObjectStorages/DiskObjectStorage.cpp
Expand Up @@ -7,6 +7,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Common/CurrentThread.h>
#include <Common/createHardLink.h>
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
Expand Down Expand Up @@ -65,6 +66,8 @@ DiskObjectStorage::DiskObjectStorage(
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(config.getBool(config_prefix + ".send_metadata", false))
, read_resource_name(config.getString(config_prefix + ".read_resource", ""))
, write_resource_name(config.getString(config_prefix + ".write_resource", ""))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
{}

Expand Down Expand Up @@ -480,6 +483,32 @@ DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
config_prefix);
}

template <class Settings>
static inline Settings updateResourceLink(const Settings & settings, const String & resource_name)
{
if (resource_name.empty())
return settings;
if (auto query_context = CurrentThread::getQueryContext())
{
Settings result(settings);
result.resource_link = query_context->getClassifier()->get(resource_name);
return result;
}
return settings;
}

String DiskObjectStorage::getReadResourceName() const
{
std::unique_lock lock(resource_mutex);
return read_resource_name;
}

String DiskObjectStorage::getWriteResourceName() const
{
std::unique_lock lock(resource_mutex);
return write_resource_name;
}

std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
const String & path,
const ReadSettings & settings,
Expand All @@ -495,7 +524,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(

return object_storage->readObjects(
storage_objects,
object_storage->getAdjustedSettingsFromMetadataFile(settings, path),
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getReadResourceName()), path),
read_hint,
file_size);
}
Expand All @@ -513,7 +542,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
path,
buf_size,
mode,
object_storage->getAdjustedSettingsFromMetadataFile(settings, path));
object_storage->getAdjustedSettingsFromMetadataFile(updateResourceLink(settings, getWriteResourceName()), path));
}

Strings DiskObjectStorage::getBlobPath(const String & path) const
Expand Down Expand Up @@ -543,6 +572,15 @@ void DiskObjectStorage::applyNewSettings(
/// FIXME we cannot use config_prefix that was passed through arguments because the disk may be wrapped with cache and we need another name
const auto config_prefix = "storage_configuration.disks." + name;
object_storage->applyNewSettings(config, config_prefix, context_);

{
std::unique_lock lock(resource_mutex);
if (String new_read_resource_name = config.getString(config_prefix + ".read_resource", ""); new_read_resource_name != read_resource_name)
read_resource_name = new_read_resource_name;
if (String new_write_resource_name = config.getString(config_prefix + ".write_resource", ""); new_write_resource_name != write_resource_name)
write_resource_name = new_write_resource_name;
}

IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}

Expand Down
7 changes: 7 additions & 0 deletions src/Disks/ObjectStorages/DiskObjectStorage.h
Expand Up @@ -212,6 +212,9 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;
/// execution.
DiskTransactionPtr createObjectStorageTransaction();

String getReadResourceName() const;
String getWriteResourceName() const;

const String object_storage_root_path;
Poco::Logger * log;

Expand All @@ -226,6 +229,10 @@ friend class DiskObjectStorageRemoteMetadataRestoreHelper;

const bool send_metadata;

mutable std::mutex resource_mutex;
String read_resource_name;
String write_resource_name;

std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
};

Expand Down
1 change: 1 addition & 0 deletions src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp
Expand Up @@ -54,6 +54,7 @@ void registerDiskHDFS(DiskFactory & factory, bool global_skip_access_check)
std::move(hdfs_storage),
config,
config_prefix);

disk->startup(context, skip_access_check);

return disk;
Expand Down
1 change: 1 addition & 0 deletions src/Disks/ObjectStorages/Web/registerDiskWebServer.cpp
Expand Up @@ -54,6 +54,7 @@ void registerDiskWebServer(DiskFactory & factory, bool global_skip_access_check)
object_storage,
config,
config_prefix);

disk->startup(context, skip_access_check);
return disk;
};
Expand Down