Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Data Import From S3 #49086

Merged
merged 50 commits into from Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
b412314
wip
s-kat Apr 19, 2023
2b7a403
wip: select and streaming to views
s-kat Apr 27, 2023
281694f
pull upstream
s-kat Apr 28, 2023
60d7e7f
update settings
s-kat Apr 29, 2023
dd5e95e
add ordered mode
s-kat May 1, 2023
9b936fd
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 1, 2023
624d846
add template for tests
s-kat May 1, 2023
f21c838
add some tests
s-kat May 2, 2023
841987a
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 2, 2023
559e8fa
fix codestyle
s-kat May 2, 2023
7073d37
fix typo
s-kat May 2, 2023
7498ed2
check meta, reformat code, add tests
s-kat May 4, 2023
db6e7f1
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 4, 2023
c24ec8f
fix
s-kat May 4, 2023
5af6765
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 8, 2023
a8d56b2
fix ordered mode + more tests
s-kat May 8, 2023
751337f
reformat code
s-kat May 8, 2023
a443c7f
fix
s-kat May 9, 2023
d376224
fix tests
s-kat May 9, 2023
446cf3c
handle failed files
s-kat May 10, 2023
eac1abf
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 10, 2023
a330648
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 11, 2023
abf1d45
fix build
s-kat May 15, 2023
c443090
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 15, 2023
1880ded
fix build
s-kat May 15, 2023
ba55c11
add documentation
s-kat May 15, 2023
d7b4d9b
add new table engine to index
s-kat May 15, 2023
0e55c7c
Merge branch 'master' of github.com:ClickHouse/ClickHouse into fix-build
s-kat May 18, 2023
bc7c67e
code review fixes
s-kat May 25, 2023
d2b8dd3
Merge branch 'master' of github.com:ClickHouse/ClickHouse into fix-build
s-kat May 25, 2023
528fa9a
fix
s-kat May 25, 2023
3629e0c
fix
s-kat May 26, 2023
3e39ce7
fix build
s-kat May 26, 2023
f09bdc0
Merge branch 'master' of github.com:ClickHouse/ClickHouse into s3queue
s-kat May 27, 2023
5de8697
Merge remote-tracking branch 'upstream/master' into s3queue
kssenii Jun 30, 2023
a347408
Fix style check
kssenii Jul 17, 2023
7359dd5
Merge remote-tracking branch 'upstream/master' into s3queue
kssenii Jul 17, 2023
fe53cd3
Minor changes
kssenii Jul 17, 2023
d2195cf
Small fixes
kssenii Jul 20, 2023
f82364d
Fix flaky test
kssenii Jul 21, 2023
4fc3d0b
Merge remote-tracking branch 'upstream/master' into s3queue
kssenii Jul 22, 2023
f207e5b
Add TODO
kssenii Jul 23, 2023
6e99e5c
Fix another flaky test
kssenii Jul 30, 2023
c13fdca
Merge remote-tracking branch 'upstream/master' into s3queue
kssenii Jul 31, 2023
870a506
Some fixes
kssenii Jul 31, 2023
75f6a50
Add todo
kssenii Jul 31, 2023
be458fd
Fix style check
kssenii Aug 1, 2023
08f5ebf
Fix test
kssenii Aug 1, 2023
a14a6b5
Better
kssenii Aug 1, 2023
8893670
Merge branch 'master' into s3queue
kssenii Aug 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/engines/table-engines/index.md
Expand Up @@ -59,6 +59,7 @@ Engines in the family:
- [EmbeddedRocksDB](../../engines/table-engines/integrations/embedded-rocksdb.md)
- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md)
- [PostgreSQL](../../engines/table-engines/integrations/postgresql.md)
- [S3Queue](../../engines/table-engines/integrations/s3queue.md)

### Special Engines {#special-engines}

Expand Down
194 changes: 194 additions & 0 deletions docs/en/engines/table-engines/integrations/s3queue.md
@@ -0,0 +1,194 @@
---
slug: /en/engines/table-engines/integrations/s3queue
sidebar_position: 7
sidebar_label: S3Queue
---

# S3Queue Table Engine
This engine provides integration with [Amazon S3](https://aws.amazon.com/s3/) ecosystem and allows streaming import. This engine is similar to the [Kafka](../../../engines/table-engines/integrations/kafka.md), [RabbitMQ](../../../engines/table-engines/integrations/rabbitmq.md) engines, but provides S3-specific features.

## Create Table {#creating-a-table}

``` sql
CREATE TABLE s3_queue_engine_table (name String, value UInt32)
ENGINE = S3Queue(path [, NOSIGN | aws_access_key_id, aws_secret_access_key,] format, [compression])
[SETTINGS]
[mode = 'unordered',]
[after_processing = 'keep',]
[keeper_path = '',]
[s3queue_loading_retries = 0,]
[s3queue_polling_min_timeout_ms = 1000,]
[s3queue_polling_max_timeout_ms = 10000,]
[s3queue_polling_backoff_ms = 0,]
[s3queue_max_set_size = 1000,]
[s3queue_max_set_age_s = 0,]
[s3queue_polling_size = 50,]
```

**Engine parameters**

- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [below](#wildcards-in-path).
- `NOSIGN` - If this keyword is provided in place of credentials, all the requests will not be signed.
kssenii marked this conversation as resolved.
Show resolved Hide resolved
- `format` — The [format](../../../interfaces/formats.md#formats) of the file.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file. For more information see [Using S3 for Data Storage](../mergetree-family/mergetree.md#table_engine-mergetree-s3).
- `compression` — Compression type. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. Parameter is optional. By default, it will autodetect compression by file extension.


## Settings {#s3queue-settings}

### mode {#mode}

Allows to automatically close the connection after query execution, i.e. disable connection reuse.
kssenii marked this conversation as resolved.
Show resolved Hide resolved

Possible values:

- unordered — With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer.
- ordered — With ordered mode, only the max name of the successfully consumed file, and the names of files that will be retried after unsuccessful loading attempt are being stored in ZooKeeper.

Default value: `unordered`.

### after_processing {#after_processing}

Delete or keep file after successful processing.
Possible values:

- keep.
- delete.

Default value: `keep`.

### keeper_path {#keeper_path}

The path in ZooKeeper can be specified as a table engine setting or default path can be formed from the global configuration-provided path and table UUID.
Possible values:

- String.

Default value: ``.
kssenii marked this conversation as resolved.
Show resolved Hide resolved

### s3queue_loading_retries {#s3queue_loading_retries}

Retry file loading up to specified number of times. By default, there are no retries.
Possible values:

- Positive integer.

Default value: `0`.

### s3queue_polling_min_timeout_ms {#s3queue_polling_min_timeout_ms}

Minimal timeout before next polling (in milliseconds).

Possible values:

- Positive integer.

Default value: `1000`.

### s3queue_polling_max_timeout_ms {#s3queue_polling_max_timeout_ms}

Maximum timeout before next polling (in milliseconds).

Possible values:

- Positive integer.

Default value: `10000`.

### s3queue_polling_backoff_ms {#s3queue_polling_backoff_ms}

Polling backoff (in milliseconds).

Possible values:

- Positive integer.

Default value: `0`.

### s3queue_max_set_size {#s3queue_max_set_size}
kssenii marked this conversation as resolved.
Show resolved Hide resolved

Max set size for tracking processed files in unordered mode in ZooKeeper.

Possible values:

- Positive integer.

Default value: `1000`.

### s3queue_max_set_age_s {#s3queue_max_set_age_s}
kssenii marked this conversation as resolved.
Show resolved Hide resolved

Maximum number of seconds to store processed files in ZooKeeper node (store forever by default).

Possible values:

- Positive integer.

Default value: `0`.

### s3queue_polling_size {#s3queue_polling_size}

Maximum files to fetch from S3 with SELECT or in background task.
kssenii marked this conversation as resolved.
Show resolved Hide resolved

Possible values:

- Positive integer.

Default value: `50`.


## S3-related Settings {#s3-settings}

Engine supports all s3 related settings. For more information about S3 settings see [here](../../../engines/table-engines/integrations/s3.md).


## Description {#description}

`SELECT` is not particularly useful for streaming import (except for debugging), because each file can be imported only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this:

1. Use the engine to create a table for consuming from specified path in S3 and consider it a data stream.
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.

When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background.

Example:

``` sql
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'unordred',
keeper_path = '/clickhouse/s3queue/';

CREATE TABLE stats (name String, value UInt32)
ENGINE = MergeTree() ORDER BY name;

CREATE MATERIALIZED VIEW consumer TO stats
AS SELECT name, value FROM s3queue_engine_table;

SELECT * FROM stats ORDER BY name;
```

## Virtual columns {#virtual-columns}

- `_path` — Path to the file.
- `_file` — Name of the file.

For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).


## Wildcards In Path {#wildcards-in-path}

`path` argument can specify multiple files using bash-like wildcards. For being processed file should exist and match to the whole path pattern. Listing of files is determined during `SELECT` (not at `CREATE` moment).

- `*` — Substitutes any number of any characters except `/` including empty string.
- `?` — Substitutes any single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}` — Substitutes any number in range from N to M including both borders. N and M can have leading zeroes e.g. `000..078`.

Constructions with `{}` are similar to the [remote](../../../sql-reference/table-functions/remote.md) table function.

:::note
If the listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
:::

1 change: 1 addition & 0 deletions src/CMakeLists.txt
Expand Up @@ -248,6 +248,7 @@ add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/S3Queue)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)
add_object_library(clickhouse_server Server)
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -94,6 +94,7 @@ class IColumn;
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "", "Default zookeeper path prefix for S3Queue engine", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \
Expand Down
7 changes: 7 additions & 0 deletions src/Core/SettingsEnums.cpp
Expand Up @@ -202,4 +202,11 @@ IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
{"zlib", FormatSettings::ORCCompression::ZLIB},
{"lz4", FormatSettings::ORCCompression::LZ4}})

IMPLEMENT_SETTING_ENUM(S3QueueMode, ErrorCodes::BAD_ARGUMENTS,
{{"ordered", S3QueueMode::ORDERED},
{"unordered", S3QueueMode::UNORDERED}})

IMPLEMENT_SETTING_ENUM(S3QueueAction, ErrorCodes::BAD_ARGUMENTS,
{{"keep", S3QueueAction::KEEP},
{"delete", S3QueueAction::DELETE}})
}
17 changes: 17 additions & 0 deletions src/Core/SettingsEnums.h
Expand Up @@ -218,4 +218,21 @@ enum class ParallelReplicasCustomKeyFilterType : uint8_t
DECLARE_SETTING_ENUM(ParallelReplicasCustomKeyFilterType)

DECLARE_SETTING_ENUM(LocalFSReadMethod)

enum class S3QueueMode
{
ORDERED,
UNORDERED,
};

DECLARE_SETTING_ENUM(S3QueueMode)

enum class S3QueueAction
{
KEEP,
DELETE,
};

DECLARE_SETTING_ENUM(S3QueueAction)

}