Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Table engine MaterializedPostgreSQL fix dependency loading #57754

Merged
merged 5 commits into from Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/Databases/DDLLoadingDependencyVisitor.cpp
@@ -1,6 +1,7 @@
#include <Databases/DDLLoadingDependencyVisitor.h>
#include <Databases/DDLDependencyVisitor.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
#include <Interpreters/Context.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTCreateQuery.h>
Expand Down Expand Up @@ -131,6 +132,12 @@ void DDLLoadingDependencyVisitor::visit(const ASTStorage & storage, Data & data)
extractTableNameFromArgument(*storage.engine, data, 3);
else if (storage.engine->name == "Dictionary")
extractTableNameFromArgument(*storage.engine, data, 0);
else if (storage.engine->name == "MaterializedPostgreSQL")
{
const auto * create_query = data.create_query->as<ASTCreateQuery>();
auto nested_table = toString(create_query->uuid) + StorageMaterializedPostgreSQL::NESTED_TABLE_SUFFIX;
data.dependencies.emplace(QualifiedTableName{ .database = create_query->getDatabase(), .table = nested_table });
}
}


Expand Down
2 changes: 2 additions & 0 deletions src/Databases/DatabaseOrdinary.cpp
Expand Up @@ -139,6 +139,8 @@ void DatabaseOrdinary::loadTableFromMetadata(
assert(name.database == TSA_SUPPRESS_WARNING_FOR_READ(database_name));
const auto & query = ast->as<const ASTCreateQuery &>();

LOG_TRACE(log, "Loading table {}", name.getFullName());

try
{
auto [table_name, table] = createTableFromAST(
Expand Down
3 changes: 0 additions & 3 deletions src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.cpp
Expand Up @@ -45,9 +45,6 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}

static const auto NESTED_TABLE_SUFFIX = "_nested";
static const auto TMP_SUFFIX = "_tmp";


/// For the case of single storage.
StorageMaterializedPostgreSQL::StorageMaterializedPostgreSQL(
Expand Down
3 changes: 3 additions & 0 deletions src/Storages/PostgreSQL/StorageMaterializedPostgreSQL.h
Expand Up @@ -63,6 +63,9 @@ namespace DB
class StorageMaterializedPostgreSQL final : public IStorage, WithContext
{
public:
static constexpr auto NESTED_TABLE_SUFFIX = "_nested";
static constexpr auto TMP_SUFFIX = "_tmp";

StorageMaterializedPostgreSQL(const StorageID & table_id_, ContextPtr context_,
const String & postgres_database_name, const String & postgres_table_name);

Expand Down
Expand Up @@ -24,4 +24,10 @@
<database>postgres_database</database>
</postgres2>
</named_collections>
<text_log>
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<level>Test</level>
</text_log>
</clickhouse>
Expand Up @@ -944,6 +944,62 @@ def test_symbols_in_publication_name(started_cluster):
)


def test_dependent_loading(started_cluster):
table = "test_dependent_loading"

pg_manager.create_postgres_table(table)
instance.query(
f"INSERT INTO postgres_database.{table} SELECT number, number from numbers(0, 50)"
)

instance.query(
f"""
SET allow_experimental_materialized_postgresql_table=1;
CREATE TABLE {table} (key Int32, value Int32)
ENGINE=MaterializedPostgreSQL('{started_cluster.postgres_ip}:{started_cluster.postgres_port}', 'postgres_database', '{table}', 'postgres', 'mysecretpassword') ORDER BY key
"""
)

check_tables_are_synchronized(
instance,
table,
postgres_database=pg_manager.get_default_database(),
materialized_database="default",
)

assert 50 == int(instance.query(f"SELECT count() FROM {table}"))

instance.restart_clickhouse()

check_tables_are_synchronized(
instance,
table,
postgres_database=pg_manager.get_default_database(),
materialized_database="default",
)

assert 50 == int(instance.query(f"SELECT count() FROM {table}"))

uuid = instance.query(
f"SELECT uuid FROM system.tables WHERE name='{table}' and database='default' limit 1"
).strip()
nested_table = f"default.`{uuid}_nested`"
instance.contains_in_log(
f"Table default.{table} has 1 dependencies: {nested_table} (level 1)"
)

instance.query("SYSTEM FLUSH LOGS")
nested_time = instance.query(
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{uuid}_nested' and message not like '%like%'"
).strip()
time = instance.query(
f"SELECT event_time_microseconds FROM system.text_log WHERE message like 'Loading table default.{table}' and message not like '%like%'"
).strip()
instance.query(
f"SELECT toDateTime64('{nested_time}', 6) < toDateTime64('{time}', 6)"
)


if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
Expand Down