Skip to content

Commit

Permalink
Allow drop cache for protobuf format
Browse files Browse the repository at this point in the history
Before it was impossible to update Protobuf schema without server
restart. With this commit, it is enough to send query `SYSTEM DROP
SCHEMA FORMAT CACHE [FOR Protobuf]`.
  • Loading branch information
AVMusorin committed Oct 9, 2023
1 parent 29d3b34 commit 8d0c961
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 1 deletion.
8 changes: 8 additions & 0 deletions docs/en/interfaces/formats.md
Expand Up @@ -1920,6 +1920,14 @@ SELECT * FROM test.hits format Protobuf SETTINGS format_protobuf_use_autogenerat

In this case autogenerated Protobuf schema will be saved in file `path/to/schema/schema.capnp`.

### Drop Protobuf cache

To reload Protobuf schema loaded from [format_schema_path](../operations/server-configuration-parameters/settings.md/#server_configuration_parameters-format_schema_path) use [SYSTEM DROP ... FORMAT CACHE](../sql-reference/statements/system.md/#system-drop-schema-format) statement.

```sql
SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf
```

## ProtobufSingle {#protobufsingle}

Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters.
Expand Down
12 changes: 12 additions & 0 deletions docs/en/sql-reference/statements/system.md
Expand Up @@ -119,6 +119,18 @@ The compiled expression cache is enabled/disabled with the query/user/profile-le

Clears the [query cache](../../operations/query-cache.md).

## DROP FORMAT SCHEMA CACHE {#system-drop-schema-format}

Clears cache for schemas loaded from [format_schema_path](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-format_schema_path).

Supported formats:

- Protobuf

```sql
SYSTEM DROP FORMAT SCHEMA CACHE [FOR Protobuf]
```

## FLUSH LOGS

Flushes buffered log messages to system tables, e.g. system.query_log. Mainly useful for debugging since most system tables have a default flush interval of 7.5 seconds.
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Expand Up @@ -155,6 +155,7 @@ enum class AccessType
M(SYSTEM_DROP_FILESYSTEM_CACHE, "SYSTEM DROP FILESYSTEM CACHE, DROP FILESYSTEM CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_SYNC_FILESYSTEM_CACHE, "SYSTEM REPAIR FILESYSTEM CACHE, REPAIR FILESYSTEM CACHE, SYNC FILESYSTEM CACHE", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
Expand Down
6 changes: 6 additions & 0 deletions src/Formats/ProtobufSchemas.cpp
Expand Up @@ -21,6 +21,12 @@ ProtobufSchemas & ProtobufSchemas::instance()
return instance;
}

void ProtobufSchemas::clear()
{
std::lock_guard lock(mutex);
importers.clear();
}

class ProtobufSchemas::ImporterWithSourceTree : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
Expand Down
2 changes: 2 additions & 0 deletions src/Formats/ProtobufSchemas.h
Expand Up @@ -54,6 +54,8 @@ class ProtobufSchemas : private boost::noncopyable
};

static ProtobufSchemas & instance();
// Clear cached protobuf schemas
void clear();

/// Parses the format schema, then parses the corresponding proto file, and returns the descriptor of the message type.
/// The function never returns nullptr, it throws an exception if it cannot load or parse the file.
Expand Down
19 changes: 19 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Expand Up @@ -64,6 +64,10 @@
#include <algorithm>
#include <unistd.h>

#if USE_PROTOBUF
#include <Formats/ProtobufSchemas.h>
#endif

#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
Expand Down Expand Up @@ -462,6 +466,20 @@ BlockIO InterpreterSystemQuery::execute()
#if USE_AZURE_BLOB_STORAGE
if (caches_to_drop.contains("AZURE"))
StorageAzureBlob::getSchemaCache(getContext()).clear();
#endif
break;
}
case Type::DROP_FORMAT_SCHEMA_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_FORMAT_SCHEMA_CACHE);
std::unordered_set<String> caches_to_drop;
if (query.schema_cache_format.empty())
caches_to_drop = {"Protobuf"};
else
caches_to_drop = {query.schema_cache_format};
#if USE_PROTOBUF
if (caches_to_drop.contains("Protobuf"))
ProtobufSchemas::instance().clear();
#endif
break;
}
Expand Down Expand Up @@ -1082,6 +1100,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::DROP_FILESYSTEM_CACHE:
case Type::SYNC_FILESYSTEM_CACHE:
case Type::DROP_SCHEMA_CACHE:
case Type::DROP_FORMAT_SCHEMA_CACHE:
#if USE_AWS_S3
case Type::DROP_S3_CLIENT_CACHE:
#endif
Expand Down
5 changes: 5 additions & 0 deletions src/Parsers/ASTSystemQuery.cpp
Expand Up @@ -211,6 +211,11 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
<< (settings.hilite ? hilite_keyword : "") << " SECOND"
<< (settings.hilite ? hilite_none : "");
}
else if (type == Type::DROP_FORMAT_SCHEMA_CACHE)
{
if (!schema_cache_format.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " FOR " << backQuoteIfNeed(schema_cache_format);
}
else if (type == Type::DROP_FILESYSTEM_CACHE)
{
if (!filesystem_cache_name.empty())
Expand Down
3 changes: 3 additions & 0 deletions src/Parsers/ASTSystemQuery.h
Expand Up @@ -33,6 +33,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
#endif
DROP_FILESYSTEM_CACHE,
DROP_SCHEMA_CACHE,
DROP_FORMAT_SCHEMA_CACHE,
#if USE_AWS_S3
DROP_S3_CLIENT_CACHE,
#endif
Expand Down Expand Up @@ -120,6 +121,8 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster

String schema_cache_storage;

String schema_cache_format;

String fail_point_name;

SyncReplicaMode sync_replica_mode = SyncReplicaMode::DEFAULT;
Expand Down
12 changes: 11 additions & 1 deletion src/Parsers/ParserSystemQuery.cpp
Expand Up @@ -451,7 +451,17 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
break;
}

case Type::DROP_FORMAT_SCHEMA_CACHE:
{
if (ParserKeyword{"FOR"}.ignore(pos, expected))
{
if (ParserKeyword{"Protobuf"}.ignore(pos, expected))
res->schema_cache_format = "Protobuf";
else
return false;
}
break;
}
case Type::UNFREEZE:
{
ASTPtr ast;
Expand Down
68 changes: 68 additions & 0 deletions tests/integration/test_format_schema_on_server/test.py
@@ -1,5 +1,7 @@
import pytest
import os
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException

cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance("instance", clickhouse_path_dir="clickhouse_path")
Expand Down Expand Up @@ -44,3 +46,69 @@ def test_protobuf_format_output(started_cluster):
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)


def test_drop_cache_protobuf_format(started_cluster):
create_simple_table()
instance.query("INSERT INTO test.simple VALUES (1, 'abc'), (2, 'def')")

schema = """
syntax = "proto3";
message MessageTmp {
uint64 key = 1;
string value = 2;
}
"""

protobuf_schema_path_name = "message_tmp.proto"

database_path = os.path.abspath(os.path.join(instance.path, "database"))
with open(
os.path.join(database_path, "format_schemas", protobuf_schema_path_name), "w"
) as file:
file.write(schema)
assert (
instance.http_query(
"SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)
# Replace simple.proto with a new Protobuf schema
new_schema = """
syntax = "proto3";
message MessageTmp {
uint64 key2 = 1;
string value2 = 2;
}
"""
with open(
os.path.join(database_path, "format_schemas", protobuf_schema_path_name), "w"
) as file:
file.write(new_schema)

instance.query("DROP TABLE IF EXISTS test.new_simple")
instance.query(
"""
CREATE TABLE test.new_simple (key2 UInt64, value2 String)
ENGINE = MergeTree ORDER BY tuple();
"""
)
instance.query("INSERT INTO test.new_simple VALUES (1, 'abc'), (2, 'def')")

instance.query("SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf")

# Tets works with new scheme
assert (
instance.http_query(
"SELECT * FROM test.new_simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
== "\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"
)
# Tests that stop working with old scheme
with pytest.raises(Exception) as exc:
instance.http_query(
"SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='message_tmp:MessageTmp'"
)
assert "NO_COLUMNS_SERIALIZED_TO_PROTOBUF_FIELDS)" in str(exc.value)
1 change: 1 addition & 0 deletions tests/queries/0_stateless/01271_show_privileges.reference
Expand Up @@ -106,6 +106,7 @@ SYSTEM DROP COMPILED EXPRESSION CACHE ['SYSTEM DROP COMPILED EXPRESSION','DROP C
SYSTEM DROP FILESYSTEM CACHE ['SYSTEM DROP FILESYSTEM CACHE','DROP FILESYSTEM CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM SYNC FILESYSTEM CACHE ['SYSTEM REPAIR FILESYSTEM CACHE','REPAIR FILESYSTEM CACHE','SYNC FILESYSTEM CACHE'] GLOBAL SYSTEM
SYSTEM DROP SCHEMA CACHE ['SYSTEM DROP SCHEMA CACHE','DROP SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP FORMAT SCHEMA CACHE ['SYSTEM DROP FORMAT SCHEMA CACHE','DROP FORMAT SCHEMA CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP S3 CLIENT CACHE ['SYSTEM DROP S3 CLIENT','DROP S3 CLIENT CACHE'] GLOBAL SYSTEM DROP CACHE
SYSTEM DROP CACHE ['DROP CACHE'] \N SYSTEM
SYSTEM RELOAD CONFIG ['RELOAD CONFIG'] GLOBAL SYSTEM RELOAD
Expand Down
@@ -0,0 +1,2 @@
SYSTEM DROP FORMAT SCHEMA CACHE
SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf
2 changes: 2 additions & 0 deletions tests/queries/0_stateless/02889_system_drop_format_schema.sql
@@ -0,0 +1,2 @@
EXPLAIN SYNTAX SYSTEM DROP FORMAT SCHEMA CACHE;
EXPLAIN SYNTAX SYSTEM DROP FORMAT SCHEMA CACHE FOR Protobuf;

0 comments on commit 8d0c961

Please sign in to comment.