Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow drop cache for Protobuf format #55064

Merged
merged 1 commit into from Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/en/interfaces/formats.md
Expand Up @@ -1920,6 +1920,14 @@ SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerat

In this case autogenerated Protobuf schema will be saved in file `path/to/schema/schema.capnp`.

### Drop Protobuf cache

To reload Protobuf schema loaded from [format_schema_path](../operations/server-configuration-parameters/settings.md/#server_configuration_parameters-format_schema_path) use [SYSTEM DROP ... FORMAT CACHE](../sql-reference/statements/system.md/#system-drop-schema-format) statement.

```sql
SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf
```

## ProtobufSingle {#protobufsingle}

Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters.
Expand Down
12 changes: 12 additions & 0 deletions docs/en/sql-reference/statements/system.md
Expand Up @@ -119,6 +119,18 @@ The compiled expression cache is enabled/disabled with the query/user/profile-le

Clears the [query cache](../../operations/query-cache.md).

## DROP FORMAT SCHEMA CACHE {#system-drop-schema-format}

Clears cache for schemas loaded from [format_schema_path](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-format_schema_path).

Supported formats:

- Protobuf

```sql
SYSTEM DROP FORMAT SCHEMA CACHE [FOR Protobuf]
```

## FLUSH LOGS

Flushes buffered log messages to system tables, e.g. system.query_log. Mainly useful for debugging since most system tables have a default flush interval of 7.5 seconds.
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Expand Up @@ -155,6 +155,7 @@ enum class AccessType
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_SYNC_FILESYSTEM_CACHE, "SYSTEM REPAIR FILESYSTEM CACHE, REPAIR FILESYSTEM CACHE, SYNC FILESYSTEM CACHE", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
Expand Down
6 changes: 6 additions & 0 deletions src/Formats/ProtobufSchemas.cpp
Expand Up @@ -21,6 +21,12 @@ ProtobufSchemas & ProtobufSchemas::instance()
return instance;
}

void ProtobufSchemas::clear()
{
std::lock_guard lock(mutex);
importers.clear();
}

class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/ProtobufSchemas.h
Expand Up @@ -54,6 +54,8 @@ class ProtobufSchemas : private boost::noncopyable
};

static ProtobufSchemas & instance();
// Clear cached protobuf schemas
void clear();

/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
Expand Down
19 changes: 19 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Expand Up @@ -64,6 +64,10 @@
#include <algorithm>
#include <unistd.h>

#if USE_PROTOBUF
#include <Formats/ProtobufSchemas.h>
#endif

#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
Expand Down Expand Up @@ -462,6 +466,20 @@ BlockIO InterpreterSystemQuery::execute()
#if USE_AZURE_BLOB_STORAGE
if (caches_to_drop.contains("AZURE"))
StorageAzureBlob::getSchemaCache(getContext()).clear();
#endif
break;
}
case Type::DROP_FORMAT_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_FORMAT_SCHEMA_CACHE);
std::unordered_set<String> caches_to_drop;
if (query.schema_cache_format.empty())
caches_to_drop = {"Protobuf"};
else
caches_to_drop = {query.schema_cache_format};
#if USE_PROTOBUF
if (caches_to_drop.contains("Protobuf"))
ProtobufSchemas::instance().clear();
#endif
break;
}
Expand Down Expand Up @@ -1082,6 +1100,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_FILESYSTEM_CACHE:
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
#if USE_AWS_S3
case Type::DROP_S3_CLIENT_CACHE:
#endif
Expand Down
5 changes: 5 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Expand Up @@ -211,6 +211,11 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
<< (settings.hilite ? hilite_keyword : "") << " SECOND"
<< (settings.hilite ? hilite_none : "");
}
else if (type == Type::DROP_FORMAT_SCHEMA_CACHE)
{
if (!schema_cache_format.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " FOR " << backQuoteIfNeed(schema_cache_format);
}
else if (type == Type::DROP_FILESYSTEM_CACHE)
{
if (!filesystem_cache_name.empty())
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTSystemQuery.h
Expand Up @@ -33,6 +33,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
#endif
DROP_FILESYSTEM_CACHE,
DROP_SCHEMA_CACHE,
DROP_FORMAT_SCHEMA_CACHE,
#if USE_AWS_S3
DROP_S3_CLIENT_CACHE,
#endif
Expand Down Expand Up @@ -120,6 +121,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster

String schema_cache_storage;

String schema_cache_format;

String fail_point_name;

SyncReplicaMode sync_replica_mode = SyncReplicaMode::DEFAULT;
Expand Down
12 changes: 11 additions & 1 deletion src/Parsers/ParserSystemQuery.cpp
Expand Up @@ -451,7 +451,17 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
break;
}

case Type::DROP_FORMAT_SCHEMA_CACHE:
{
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"Protobuf"}.ignore(pos, expected))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also support for CapnProto, should be easy to add

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would be nice to add, but if you do not mind I can add it in the next PR?

Copy link
Member

@Avogar Avogar Sep 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, you can add it in the next PR

res->schema_cache_format = "Protobuf";
else
return false;
}
break;
}
case Type::UNFREEZE:
{
ASTPtr ast;
Expand Down
68 changes: 68 additions & 0 deletions tests/integration/test_format_schema_on_server/test.py
@@ -1,5 +1,7 @@
import pytest
import os
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException

cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance("instance", clickhouse_path_dir="clickhouse_path")
Expand Down Expand Up @@ -44,3 +46,69 @@ def test_protobuf_format_output(started_cluster):
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)


def test_drop_cache_protobuf_format(started_cluster):
create_simple_table()
instance.query("INSERT INTO test.simple VALUES (1, 'abc'), (2, 'def')")

schema = """
syntax = "proto3";

message MessageTmp {
uint64 key = 1;
string value = 2;
}
"""

protobuf_schema_path_name = "message_tmp.proto"

database_path = os.path.abspath(os.path.join(instance.path, "database"))
with open(
os.path.join(database_path, "format_schemas", protobuf_schema_path_name), "w"
) as file:
file.write(schema)
assert (
instance.http_query(
"SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)
# Replace simple.proto with a new Protobuf schema
new_schema = """
syntax = "proto3";

message MessageTmp {
uint64 key2 = 1;
string value2 = 2;
}
"""
with open(
os.path.join(database_path, "format_schemas", protobuf_schema_path_name), "w"
) as file:
file.write(new_schema)

instance.query("DROP TABLE IF EXISTS test.new_simple")
instance.query(
"""
CREATE TABLE test.new_simple (key2 UInt64, value2 String)
ENGINE = MergeTree ORDER BY tuple();
"""
)
instance.query("INSERT INTO test.new_simple VALUES (1, 'abc'), (2, 'def')")

instance.query("SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf")

# Tets works with new scheme
assert (
instance.http_query(
"SELECT * FROM test.new_simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)
# Tests that stop working with old scheme
with pytest.raises(Exception) as exc:
instance.http_query(
"SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
assert "NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS)" in str(exc.value)
1 change: 1 addition & 0 deletions tests/queries/0_stateless/01271_show_privileges.reference
Expand Up @@ -106,6 +106,7 @@ SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP C
SYSTEM DROP FILESYSTEM CACHE ['SYSTEM DROP FILESYSTEM CACHE','DROP FILESYSTEM CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM SYNC FILESYSTEM CACHE ['SYSTEM REPAIR FILESYSTEM CACHE','REPAIR FILESYSTEM CACHE','SYNC FILESYSTEM CACHE'] GLOBAL SYSTEM
SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP FORMAT SCHEMA CACHE ['SYSTEM DROP FORMAT SCHEMA CACHE','DROP FORMAT SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP S3 CLIENT CACHE ['SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM
SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD
Expand Down
@@ -0,0 +1,2 @@
SYSTEM DROP FORMAT SCHEMA CACHE
SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf
2 changes: 2 additions & 0 deletions tests/queries/0_stateless/02889_system_drop_format_schema.sql
@@ -0,0 +1,2 @@
EXPLAIN SYNTAX SYSTEM DROP FORMAT SCHEMA CACHE;
EXPLAIN SYNTAX SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf;