Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/en/engines/table-engines/special/hybrid.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ ALTER TABLE tiered MODIFY SETTING
hybrid_watermark_cold = '2025-08-01';
```

### Inspecting current watermarks

Current effective watermark values are exposed through [`system.hybrid_watermarks`](../../../operations/system-tables/hybrid_watermarks.md). One row per declared `hybridParam()` name; a diagnostic row with `last_exception` is emitted if the metadata read fails.

### Restrictions

- Only `hybrid_watermark_*` settings are accepted on Hybrid tables. Regular `DistributedSettings` (e.g. `bytes_to_delay_insert`) are rejected.
Expand Down
78 changes: 78 additions & 0 deletions docs/en/operations/system-tables/hybrid_watermarks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
---
description: 'System table exposing current effective watermark values for Hybrid-engine tables.'
keywords: ['system table', 'hybrid_watermarks', 'hybrid']
slug: /operations/system-tables/hybrid_watermarks
title: 'system.hybrid_watermarks'
doc_type: 'reference'
---

Exposes the current effective watermark values for every attached [Hybrid](../../engines/table-engines/special/hybrid.md) table. Use this table for monitoring, alerting, and runbooks instead of parsing `SHOW CREATE TABLE` or `system.tables.create_table_query`.

Columns:

<!--AUTOGENERATED_START-->
<!--AUTOGENERATED_END-->

## Row contract

For every in-scope Hybrid table, `system.hybrid_watermarks` emits exactly one of:

1. **N rows** — one per declared `hybridParam()` name. `value` is taken from the runtime snapshot, `type` from the declaration, `last_exception` is empty.
2. **Zero rows** — if the table's predicates contain no `hybridParam()` calls.
3. **One diagnostic row** — if reading the table's hybrid metadata raised an exception or a keyspace consistency check failed. `name`, `value`, and `type` are empty; `last_exception` is populated.

The three cases are mutually exclusive for a given `(database, table)`: you never see, e.g., some healthy rows plus a diagnostic row for the same table.

`SELECT * FROM system.hybrid_watermarks WHERE last_exception != ''` is a correct alert for "live Hybrid table I cannot introspect".

## Scope

Covers Hybrid tables visible to `getTablesIterator()` plus session-local temporary Hybrid tables — the same set that appears in `system.tables`. Temporary tables are emitted with `database = ''`, matching the `system.tables` convention.

Out of scope:

- **On-disk metadata that fails to load.** A `.sql` file that fails the factory-time validation of `hybridParam()` arity, literal types, or declared-type conflicts is rejected during startup and the table never attaches. Such failures surface in the server log, not in this table.
- **Detached tables.** Re-`ATTACH` re-runs factory validation.
- **Non-Hybrid `Distributed` tables.** Filtered out by `getName() == "Hybrid"`.

## Access control

`SHOW_TABLES` on `(database, table)` for persistent tables, identical to `system.distribution_queue`. Session-local temporary tables are only visible to their owning session and are not gated by `SHOW_TABLES`, matching `system.tables`.

## Example

```sql
SELECT * FROM system.hybrid_watermarks FORMAT Vertical;
```

```text
Row 1:
──────
database: default
table: hybrid_hot_cold
name: hybrid_watermark_hot
value: 2025-10-01
type: DateTime
last_exception:
```

Find all tables with at least one watermark at a given value:

```sql
SELECT database, table
FROM system.hybrid_watermarks
WHERE name = 'hybrid_watermark_hot' AND value = '2025-10-01';
```

Alert on tables that currently cannot be introspected:

```sql
SELECT database, table, last_exception
FROM system.hybrid_watermarks
WHERE last_exception != '';
```

**See Also**

- [Hybrid table engine](../../engines/table-engines/special/hybrid.md)
- [Distributed table engine](../../engines/table-engines/special/distributed.md)
1 change: 1 addition & 0 deletions src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static struct InitFiu
REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \
REGULAR(stripe_log_sink_write_fallpoint) \
REGULAR(hybrid_watermarks_read_fail) \
ONCE(smt_commit_merge_mutate_zk_fail_after_op) \
ONCE(smt_commit_merge_mutate_zk_fail_before_op) \
ONCE(smt_commit_write_zk_fail_after_op) \
Expand Down
7 changes: 7 additions & 0 deletions src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,13 @@ static std::unordered_map<String, String> collectHybridParamTypes(
return result;
}

std::unordered_map<String, String> StorageDistributed::getDeclaredHybridParamTypes() const
{
if (getName() != "Hybrid")
return {};
return collectHybridParamTypes(base_segment_predicate, segments);
}

void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const
{
std::optional<NameDependencies> name_deps{};
Expand Down
8 changes: 8 additions & 0 deletions src/Storages/StorageDistributed.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ class StorageDistributed final : public IStorage, WithContext
MultiVersion<WatermarkParams>::Version getHybridWatermarkParams() const
{ return hybrid_watermark_params.get(); }

/// Returns (name -> declared type) for every `hybridParam()` call in the
/// stored Hybrid predicates. Empty for non-Hybrid tables.
///
/// For any attached Hybrid table this returns a consistent map, because
/// registerStorageHybrid() rejects conflicting types for the same name at
/// factory time. The result is used by system.hybrid_watermarks.
std::unordered_map<String, String> getDeclaredHybridParamTypes() const;

void loadHybridWatermarkParams(SettingsChanges & changes);

/// Getter methods for ClusterProxy::executeQuery
Expand Down
236 changes: 236 additions & 0 deletions src/Storages/System/StorageSystemHybridWatermarks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <Storages/System/StorageSystemHybridWatermarks.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDistributed.h>
#include <Storages/VirtualColumnUtils.h>
#include <Access/ContextAccess.h>
#include <Common/FailPoint.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Databases/IDatabase.h>

#include <algorithm>

namespace DB
{

namespace ErrorCodes
{
extern const int FAULT_INJECTED;
}

namespace FailPoints
{
extern const char hybrid_watermarks_read_fail[];
}


ColumnsDescription StorageSystemHybridWatermarks::getColumnsDescription()
{
return ColumnsDescription
{
{"database", std::make_shared<DataTypeString>(), "Name of the database."},
{"table", std::make_shared<DataTypeString>(), "Name of the Hybrid table."},
{"name", std::make_shared<DataTypeString>(),
"Watermark parameter name from hybridParam() (always starts with hybrid_watermark_). Empty on a diagnostic row."},
{"value", std::make_shared<DataTypeString>(),
"Current effective watermark value from the runtime snapshot. Empty on a diagnostic row."},
{"type", std::make_shared<DataTypeString>(),
"Declared type from hybridParam('name', 'type'). Empty on a diagnostic row."},
{"last_exception", std::make_shared<DataTypeString>(),
"Empty on success. Populated when reading this table's hybrid metadata raised an exception or produced an inconsistent view."},
};
}

Block StorageSystemHybridWatermarks::getFilterSampleBlock() const
{
return {
{ {}, std::make_shared<DataTypeString>(), "database" },
{ {}, std::make_shared<DataTypeString>(), "table" },
};
}

void StorageSystemHybridWatermarks::fillData(
MutableColumns & res_columns, ContextPtr context, const ActionsDAG::Node * predicate, std::vector<UInt8>) const
{
const auto access = context->getAccess();
const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);

/// Enumerate only Hybrid tables (StorageDistributed with getName() == "Hybrid").
std::map<String, std::map<String, StoragePtr>> tables;
for (const auto & db : DatabaseCatalog::instance().getDatabases(GetDatabasesOptions{.with_datalake_catalogs = false}))
{
if (db.second->isExternal())
continue;

/// Temp tables are surfaced via the session-local branch below; mirrors system.tables.
if (db.first == DatabaseCatalog::TEMPORARY_DATABASE)
continue;

const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, db.first);

for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{
StoragePtr table = iterator->table();
if (!table)
continue;

const auto * distributed = dynamic_cast<const StorageDistributed *>(table.get());
if (!distributed || distributed->getName() != "Hybrid")
continue;

if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, db.first, iterator->name()))
continue;

tables[db.first][iterator->name()] = table;
}
}

/// Session-local temporary tables, mirroring `system.tables` (see
/// StorageSystemTables::read(), the temporary-table branch around line 371).
/// They are emitted with database = "" to match `system.tables`' convention.
/// No SHOW_TABLES gate: externals are session-scoped and only visible to the
/// owning session, the same as in `system.tables`.
if (context->hasSessionContext())
{
for (auto & [name, storage] : context->getSessionContext()->getExternalTables())
{
if (!storage)
continue;

const auto * distributed = dynamic_cast<const StorageDistributed *>(storage.get());
if (!distributed || distributed->getName() != "Hybrid")
continue;

tables[""][name] = storage;
}
}

MutableColumnPtr col_database_mut = ColumnString::create();
MutableColumnPtr col_table_mut = ColumnString::create();

for (auto & db : tables)
{
for (auto & table : db.second)
{
col_database_mut->insert(db.first);
col_table_mut->insert(table.first);
}
}

ColumnPtr col_database_to_filter = std::move(col_database_mut);
ColumnPtr col_table_to_filter = std::move(col_table_mut);

/// Apply pushed-down predicate on (database, table).
{
Block filtered_block
{
{ col_database_to_filter, std::make_shared<DataTypeString>(), "database" },
{ col_table_to_filter, std::make_shared<DataTypeString>(), "table" },
};

VirtualColumnUtils::filterBlockWithPredicate(predicate, filtered_block, context);

if (!filtered_block.rows())
return;

col_database_to_filter = filtered_block.getByName("database").column;
col_table_to_filter = filtered_block.getByName("table").column;
}

auto emit_diagnostic = [&](const String & database, const String & table, const String & message)
{
size_t c = 0;
res_columns[c++]->insert(database);
res_columns[c++]->insert(table);
res_columns[c++]->insertDefault(); /// name
res_columns[c++]->insertDefault(); /// value
res_columns[c++]->insertDefault(); /// type
res_columns[c++]->insert(message);
};

for (size_t i = 0, tables_size = col_database_to_filter->size(); i < tables_size; ++i)
{
String database = (*col_database_to_filter)[i].safeGet<String>();
String table = (*col_table_to_filter)[i].safeGet<String>();

auto & distributed_table = dynamic_cast<StorageDistributed &>(*tables[database][table]);

std::unordered_map<String, String> types;
MultiVersion<StorageDistributed::WatermarkParams>::Version snapshot;

/// Per-table fault isolation: mirrors StorageSystemTables (see its lines 450-462).
/// In normal operation this try never throws, but we want one broken attached
/// Hybrid table never to take down the whole scan.
try
{
/// Test-only hook for exercising the diagnostic row path from SQL.
fiu_do_on(FailPoints::hybrid_watermarks_read_fail,
{
throw Exception(ErrorCodes::FAULT_INJECTED,
"Injected fault for system.hybrid_watermarks");
});

types = distributed_table.getDeclaredHybridParamTypes();
snapshot = distributed_table.getHybridWatermarkParams();
}
catch (...)
{
tryLogCurrentException(getLogger("StorageSystemHybridWatermarks"),
fmt::format("Failed to read hybrid watermarks for {}.{}", database, table),
LogsLevel::information);
emit_diagnostic(database, table, getCurrentExceptionMessage(/*with_stacktrace=*/false));
continue;
}

/// Row contract case 2: zero declared watermarks → emit zero rows.
if (types.empty())
continue;

/// Row contract case 3 (defense in depth): keyspace mismatch. CREATE
/// enforces declared-keys == snapshot-keys at [StorageDistributed.cpp]
/// lines 3043-3051, so this only triggers on unexpected runtime drift.
bool consistent = snapshot && snapshot->size() == types.size();
if (consistent)
{
for (const auto & [name, _] : types)
{
if (!snapshot->contains(name))
{
consistent = false;
break;
}
}
}

if (!consistent)
{
emit_diagnostic(database, table, fmt::format(
"Hybrid watermark keyspace mismatch: {} declared, {} in snapshot",
types.size(), snapshot ? snapshot->size() : 0));
continue;
}

/// Row contract case 1: N healthy rows, sorted by name for reference-file stability.
std::vector<String> sorted_names;
sorted_names.reserve(types.size());
for (const auto & [name, _] : types)
sorted_names.push_back(name);
std::sort(sorted_names.begin(), sorted_names.end());

for (const auto & name : sorted_names)
{
size_t c = 0;
res_columns[c++]->insert(database);
res_columns[c++]->insert(table);
res_columns[c++]->insert(name);
res_columns[c++]->insert(snapshot->at(name));
res_columns[c++]->insert(types.at(name));
res_columns[c++]->insertDefault(); /// last_exception
}
}
}

}
Loading