Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for storage S3Queue #54422

Merged
merged 42 commits into from Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7338b56
S3Queue fixes
kssenii Sep 7, 2023
220a67e
Better
kssenii Sep 8, 2023
afcb0b2
Merge remote-tracking branch 'upstream/master' into s3-queue-fixes
kssenii Sep 14, 2023
342755d
Better
kssenii Sep 14, 2023
6846fe3
Fxi
kssenii Sep 15, 2023
47050c2
Merge remote-tracking branch 'upstream/master' into s3-queue-fixes
kssenii Sep 25, 2023
e4256eb
Better
kssenii Sep 25, 2023
3b54b6b
Add system tables
kssenii Sep 25, 2023
251b016
Better
kssenii Sep 25, 2023
4d78dba
Fix style check, fix build without s3
kssenii Sep 25, 2023
14b09d3
Add caching
kssenii Sep 26, 2023
6c778d4
Fix build without s3
kssenii Sep 27, 2023
d77452c
Fix style check
kssenii Sep 27, 2023
6b191a1
Better
kssenii Sep 27, 2023
f753b91
Better maintenance of processing node
kssenii Sep 27, 2023
e0ff76a
Fix
kssenii Sep 27, 2023
4278389
Add comments
kssenii Sep 27, 2023
c579f5b
Fix style check
kssenii Sep 27, 2023
57cfb88
Fix typos check, fix build wihtout s3
kssenii Sep 27, 2023
1749874
Fxi
kssenii Sep 28, 2023
7d91ba1
Update doc
kssenii Sep 28, 2023
4a79225
Minor changes
kssenii Sep 28, 2023
d644992
Fxi
kssenii Sep 28, 2023
1ef21ba
Fix data race
kssenii Sep 29, 2023
d64b990
Merge remote-tracking branch 'origin/master' into s3-queue-fixes
kssenii Oct 13, 2023
bfe174f
Fix test
kssenii Oct 13, 2023
a9c0c20
Update documentation
kssenii Oct 13, 2023
b6b124f
Usability improvement
kssenii Oct 13, 2023
44ea61e
Improve shutdown
kssenii Oct 13, 2023
6fde98a
Minor improvement
kssenii Oct 13, 2023
0ddee6a
Fix
kssenii Oct 15, 2023
0a6a4b3
Review fixes
kssenii Oct 16, 2023
96c518b
Merge branch 'master' into s3-queue-fixes
kssenii Oct 16, 2023
8744cd9
Fix
kssenii Oct 17, 2023
4464c86
Merge remote-tracking branch 'origin/master' into s3-queue-fixes
kssenii Oct 17, 2023
42ed249
Fix build
kssenii Oct 17, 2023
4d2cf52
Update documentation
kssenii Oct 17, 2023
c549083
Fix
kssenii Oct 17, 2023
fd37e25
Fix doc
kssenii Oct 17, 2023
eb4519b
Fix
kssenii Oct 17, 2023
c4bad25
Fix
kssenii Oct 18, 2023
f90e31e
Enable log by default
kssenii Oct 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
152 changes: 139 additions & 13 deletions docs/en/engines/table-engines/integrations/s3queue.md
Expand Up @@ -24,12 +24,15 @@ CREATE TABLE s3_queue_engine_table (name String, value UInt32)
[after_processing = 'keep',]
[keeper_path = '',]
[s3queue_loading_retries = 0,]
[s3queue_processing_threads_num = 1,]
[s3queue_enable_logging_to_s3queue_log = 0,]
[s3queue_polling_min_timeout_ms = 1000,]
[s3queue_polling_max_timeout_ms = 10000,]
[s3queue_polling_backoff_ms = 0,]
[s3queue_tracked_files_limit = 1000,]
[s3queue_tracked_file_ttl_sec = 0,]
[s3queue_polling_size = 50,]
[s3queue_tracked_files_limit = 1000,]
[s3queue_cleanup_interval_min_ms = 10000,]
[s3queue_cleanup_interval_max_ms = 30000,]
```

**Engine parameters**
Expand All @@ -46,7 +49,7 @@ CREATE TABLE s3_queue_engine_table (name String, value UInt32)
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'ordered';
mode = 'unordered';
```

Using named collections:
Expand Down Expand Up @@ -109,6 +112,18 @@ Possible values:

Default value: `0`.

### s3queue_processing_threads_num {#processing_threads_num}

Number of threads to perform processing. Applies only for `Unordered` mode.

Default value: `1`.

### s3queue_enable_logging_to_s3queue_log {#enable_logging_to_s3queue_log}

Enable logging to `system.s3queue_log`.

Default value: `0`.

### s3queue_polling_min_timeout_ms {#polling_min_timeout_ms}

Minimal timeout before next polling (in milliseconds).
Expand Down Expand Up @@ -161,18 +176,17 @@ Possible values:

Default value: `0`.

### s3queue_polling_size {#polling_size}
### s3queue_cleanup_interval_min_ms {#cleanup_interval_min_ms}

Maximum files to fetch from S3 with SELECT or in background task.
Engine takes files for processing from S3 in batches.
We limit the batch size to increase concurrency if multiple table engines with the same `keeper_path` consume files from the same path.
For 'Ordered' mode. Defines a minimum boundary for reschedule interval for a background task, which is responsible for maintaining tracked file TTL and maximum tracked files set.

Possible values:
Default value: `10000`.

- Positive integer.
### s3queue_cleanup_interval_max_ms {#cleanup_interval_max_ms}

Default value: `50`.
For 'Ordered' mode. Defines a maximum boundary for reschedule interval for a background task, which is responsible for maintaining tracked file TTL and maximum tracked files set.

Default value: `30000`.

## S3-related Settings {#s3-settings}

Expand Down Expand Up @@ -227,6 +241,118 @@ For more information about virtual columns see [here](../../../engines/table-eng

Constructions with `{}` are similar to the [remote](../../../sql-reference/table-functions/remote.md) table function.

:::note
If the listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
:::
## Limitations {#limitations}

1. Duplicated rows can be as a result of:

- an exception happens during parsing in the middle of file processing and retries are enabled via `s3queue_loading_retries`;

- `S3Queue` is configured on multiple servers pointing to the same path in zookeeper and keeper session expires before one server managed to commit processed file, which could lead to another server taking processing of the file, which could be partially or fully processed by the first server;

- abnormal server termination.

2. `S3Queue` is configured on multiple servers pointing to the same path in zookeeper and `Ordered` mode is used, then `s3queue_loading_retries` will not work. This will be fixed soon.


## Introspection {#introspection}

For introspection use `system.s3queue` stateless table and `system.s3queue_log` persistent table.

1. `system.s3queue`. This table is not persistent and shows in-memory state of `S3Queue`: which files are currently being processed, which files are processed or failed.

``` sql
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue
(
`database` String,
`table` String,
`file_name` String,
`rows_processed` UInt64,
`status` String,
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64)
`exception` String
)
ENGINE = SystemS3Queue
COMMENT 'SYSTEM TABLE is built on the fly.' │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

Example:

``` sql

SELECT *
FROM system.s3queue

Row 1:
──────
zookeeper_path: /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name: wikistat/original/pageviews-20150501-030000.gz
rows_processed: 5068534
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:31
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
```

2. `system.s3queue_log`. Persistent table. Has the same information as `system.s3queue`, but for `processed` and `failed` files.

The table has the following structure:

``` sql
SHOW CREATE TABLE system.s3queue_log

Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314

┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue_log
(
`event_date` Date,
`event_time` DateTime,
`table_uuid` String,
`file_name` String,
`rows_processed` UInt64,
`status` Enum8('Processed' = 0, 'Failed' = 1),
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64),
`exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
SETTINGS index_granularity = 8192 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```

In order to use `system.s3queue_log` define its configuration in server config file:

``` xml
<s3queue_log>
<database>system</database>
<table>s3queue_log</table>
</s3queue_log>
```

Example:

``` sql
SELECT *
FROM system.s3queue_log

Row 1:
──────
event_date: 2023-10-13
event_time: 2023-10-13 13:10:12
table_uuid:
file_name: wikistat/original/pageviews-20150501-020000.gz
rows_processed: 5112621
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:12
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
```
2 changes: 1 addition & 1 deletion programs/copier/ClusterCopier.cpp
Expand Up @@ -391,7 +391,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
auto code = zookeeper->tryMulti(ops, responses);

if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
return zkutil::EphemeralNodeHolder::existing(current_worker_path, *zookeeper);

if (code == Coordination::Error::ZBADVERSION)
{
Expand Down
14 changes: 13 additions & 1 deletion src/Common/ProfileEvents.cpp
Expand Up @@ -530,6 +530,13 @@ The server successfully detected this situation and will download merged part fr
M(OverflowThrow, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'throw' and exception was thrown.") \
M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \
\
M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing")\
M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed")\
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\
M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(S3QueuePullMicroseconds, "Time spent to read file data")\
M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\
\
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \
M(IOUringSQEsResubmits, "Total number of io_uring SQE resubmits performed") \
Expand Down Expand Up @@ -589,9 +596,14 @@ Timer::Timer(Counters & counters_, Event timer_event_, Event counter_event, Reso
counters.increment(counter_event);
}

UInt64 Timer::get()
{
return watch.elapsedNanoseconds() / static_cast<UInt64>(resolution);
}

void Timer::end()
{
counters.increment(timer_event, watch.elapsedNanoseconds() / static_cast<UInt64>(resolution));
counters.increment(timer_event, get());
watch.reset();
}

Expand Down
1 change: 1 addition & 0 deletions src/Common/ProfileEvents.h
Expand Up @@ -41,6 +41,7 @@ namespace ProfileEvents
~Timer() { end(); }
void cancel() { watch.reset(); }
void end();
UInt64 get();

private:
Counters & counters;
Expand Down
1 change: 1 addition & 0 deletions src/Common/SystemLogBase.cpp
Expand Up @@ -10,6 +10,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>
Expand Down
1 change: 1 addition & 0 deletions src/Common/SystemLogBase.h
Expand Up @@ -27,6 +27,7 @@
M(ZooKeeperLogElement) \
M(ProcessorProfileLogElement) \
M(TextLogElement) \
M(S3QueueLogElement) \
M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) \
Expand Down
29 changes: 25 additions & 4 deletions src/Common/ZooKeeper/ZooKeeper.h
Expand Up @@ -644,31 +644,51 @@ class EphemeralNodeHolder
public:
using Ptr = std::shared_ptr<EphemeralNodeHolder>;

EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data)
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool try_create, bool sequential, const std::string & data)
: path(path_), zookeeper(zookeeper_)
{
if (create)
{
path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
need_remove = created = true;
}
else if (try_create)
{
need_remove = created = Coordination::Error::ZOK == zookeeper.tryCreate(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
}
}

std::string getPath() const
{
return path;
}

bool isCreated() const
{
return created;
}

static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, data);
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, false, data);
}

static Ptr tryCreate(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
auto node = std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, true, false, data);
if (node->isCreated())
return node;
return nullptr;
}

static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, true, data);
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, true, data);
}

static Ptr existing(const std::string & path, ZooKeeper & zookeeper)
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, "");
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, false, "");
}

void setAlreadyRemoved()
Expand Down Expand Up @@ -702,6 +722,7 @@ class EphemeralNodeHolder
ZooKeeper & zookeeper;
CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode};
bool need_remove = true;
bool created = false;
};

using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Expand Up @@ -108,6 +108,7 @@ class IColumn;
M(UInt64, s3_http_connection_pool_size, 1000, "How many reusable open connections to keep per S3 endpoint. Only applies to the S3 table engine and table function, not to S3 disks (for disks, use disk config instead). Global setting, can only be set in config, overriding it per session or per query has no effect.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(Bool, s3queue_enable_logging_to_s3queue_log, false, "Enable writing to system.s3queue_log. The value can be overwritten per table with table settings", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \
Expand Down
9 changes: 9 additions & 0 deletions src/Interpreters/Context.cpp
Expand Up @@ -3588,6 +3588,15 @@ std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
return shared->system_logs->filesystem_cache_log;
}

std::shared_ptr<S3QueueLog> Context::getS3QueueLog() const
{
auto lock = getGlobalSharedLock();
if (!shared->system_logs)
return {};

return shared->system_logs->s3_queue_log;
}

std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetchesLog() const
{
auto lock = getGlobalSharedLock();
Expand Down
2 changes: 2 additions & 0 deletions src/Interpreters/Context.h
Expand Up @@ -105,6 +105,7 @@ class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class S3QueueLog;
class AsynchronousInsertLog;
class BackupLog;
class IAsynchronousReader;
Expand Down Expand Up @@ -1041,6 +1042,7 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<S3QueueLog> getS3QueueLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;
Expand Down