Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix splitting table name of dictionary source #12165

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions programs/odbc-bridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
ODBCBlockOutputStream.cpp
ODBCBridge.cpp
PingHandler.cpp
SchemaAllowedHandler.cpp
validateODBCConnectionString.cpp
)
set (CLICKHOUSE_ODBC_BRIDGE_LINK
Expand Down
6 changes: 6 additions & 0 deletions programs/odbc-bridge/HandlerFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco:
return new IdentifierQuoteHandler(keep_alive_timeout, context);
#else
return nullptr;
#endif
else if (uri.getPath() == "/schema_allowed")
#if USE_ODBC
return new SchemaAllowedHandler(keep_alive_timeout, context);
#else
return nullptr;
#endif
else if (uri.getPath() == "/write")
return new ODBCHandler(pool_map, keep_alive_timeout, context, "write");
Expand Down
3 changes: 2 additions & 1 deletion programs/odbc-bridge/HandlerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "MainHandler.h"
#include "ColumnInfoHandler.h"
#include "IdentifierQuoteHandler.h"
#include "SchemaAllowedHandler.h"

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
Expand All @@ -15,7 +16,7 @@

namespace DB
{
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote' handlers.
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
* Also stores Session pools for ODBC connections
*/
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
Expand Down
76 changes: 76 additions & 0 deletions programs/odbc-bridge/SchemaAllowedHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "SchemaAllowedHandler.h"

#if USE_ODBC

# include <IO/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteHelpers.h>
# include <Poco/Data/ODBC/ODBCException.h>
# include <Poco/Data/ODBC/SessionImpl.h>
# include <Poco/Data/ODBC/Utility.h>
# include <Poco/Net/HTMLForm.h>
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
# include <common/logger_useful.h>
# include "validateODBCConnectionString.h"

# define POCO_SQL_ODBC_CLASS Poco::Data::ODBC

namespace DB
{
namespace
{
bool isSchemaAllowed(SQLHDBC hdbc)
{
std::string identifier;

SQLSMALLINT t;
SQLRETURN r = POCO_SQL_ODBC_CLASS::SQLGetInfo(hdbc, SQL_SCHEMA_USAGE, nullptr, 0, &t);

if (POCO_SQL_ODBC_CLASS::Utility::isError(r))
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);

return t != 0;
}
}


void SchemaAllowedHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: {}", request.getURI());

auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
LOG_WARNING(log, message);
};

if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");
return;
}

try
{
std::string connection_string = params.get("connection_string");
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
SQLHDBC hdbc = session.dbc().handle();

bool result = isSchemaAllowed(hdbc);

WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
writeBoolText(result, out);
}
catch (...)
{
process_error("Error getting schema usage from ODBC '" + getCurrentExceptionMessage(false) + "'");
tryLogCurrentException(log);
}
}

}

#endif
31 changes: 31 additions & 0 deletions programs/odbc-bridge/SchemaAllowedHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>

#if USE_ODBC

namespace DB
{
class Context;


/// This handler establishes connection to database, and retrieve whether schema is allowed.
class SchemaAllowedHandler : public Poco::Net::HTTPRequestHandler
{
public:
SchemaAllowedHandler(size_t keep_alive_timeout_, Context &)
: log(&Poco::Logger::get("SchemaAllowedHandler")), keep_alive_timeout(keep_alive_timeout_)
{
}

void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;

private:
Poco::Logger * log;
size_t keep_alive_timeout;
};

}

#endif
24 changes: 24 additions & 0 deletions src/Common/XDBCBridgeHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class IXDBCBridgeHelper
virtual Poco::URI getMainURI() const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
virtual bool isSchemaAllowed() = 0;
virtual String getName() const = 0;

virtual ~IXDBCBridgeHelper() = default;
Expand All @@ -61,6 +62,7 @@ class XDBCBridgeHelper : public IXDBCBridgeHelper
Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper");

std::optional<IdentifierQuotingStyle> quote_style;
std::optional<bool> is_schema_allowed;

protected:
auto getConnectionString() const
Expand All @@ -80,6 +82,7 @@ class XDBCBridgeHelper : public IXDBCBridgeHelper
static constexpr inline auto MAIN_HANDLER = "/";
static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote";
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
static constexpr inline auto PING_OK_ANSWER = "Ok.";

XDBCBridgeHelper(const Context & global_context_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
Expand Down Expand Up @@ -128,6 +131,27 @@ class XDBCBridgeHelper : public IXDBCBridgeHelper
return *quote_style;
}

bool isSchemaAllowed() override
{
if (!is_schema_allowed.has_value())
{
startBridgeSync();

auto uri = createBaseURI();
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());

ReadWriteBufferFromHTTP buf(
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));

bool res;
readBoolText(res, buf);
is_schema_allowed = res;
}

return *is_schema_allowed;
}

/**
* @todo leaky abstraction - used by external API's
*/
Expand Down
2 changes: 1 addition & 1 deletion src/Dictionaries/CassandraDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ CassandraDictionarySource::CassandraDictionarySource(
, dict_struct(dict_struct_)
, settings(settings_)
, sample_block(sample_block_)
, query_builder(dict_struct, settings.db, settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
, query_builder(dict_struct, settings.db, "", settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
{
cassandraCheck(cass_cluster_set_contact_points(cluster, settings.host.c_str()));
if (settings.port)
Expand Down
4 changes: 2 additions & 2 deletions src/Dictionaries/ClickHouseDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
, where{config.getString(config_prefix + ".where", "")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block_}
, context(context_)
, is_local{isLocalAddress({host, port}, secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort())}
Expand Down Expand Up @@ -97,7 +97,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, update_field{other.update_field}
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, sample_block{other.sample_block}
, context(other.context)
, is_local{other.is_local}
Expand Down
16 changes: 3 additions & 13 deletions src/Dictionaries/ExternalQueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,12 @@ namespace ErrorCodes
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_)
: dict_struct(dict_struct_), db(db_), where(where_), quoting_style(quoting_style_)
{
if (auto pos = table_.find('.'); pos != std::string::npos)
{
schema = table_.substr(0, pos);
table = table_.substr(pos + 1);
}
else
{
schema = "";
table = table_;
}
}
: dict_struct(dict_struct_), db(db_), schema(schema_), table(table_), where(where_), quoting_style(quoting_style_)
{}


void ExternalQueryBuilder::writeQuoted(const std::string & s, WriteBuffer & out) const
Expand Down
3 changes: 2 additions & 1 deletion src/Dictionaries/ExternalQueryBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ struct ExternalQueryBuilder
{
const DictionaryStructure & dict_struct;
std::string db;
std::string table;
std::string schema;
std::string table;
const std::string & where;

IdentifierQuotingStyle quoting_style;
Expand All @@ -28,6 +28,7 @@ struct ExternalQueryBuilder
ExternalQueryBuilder(
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_);
Expand Down
4 changes: 2 additions & 2 deletions src/Dictionaries/MySQLDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ MySQLDictionarySource::MySQLDictionarySource(
, dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
, sample_block{sample_block_}
, pool{mysqlxx::PoolFactory::instance().get(config, config_prefix)}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)}
Expand All @@ -87,7 +87,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
, dont_check_update_time{other.dont_check_update_time}
, sample_block{other.sample_block}
, pool{other.pool}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{other.load_all_query}
, last_modification{other.last_modification}
, invalidate_query{other.invalidate_query}
Expand Down
39 changes: 37 additions & 2 deletions src/Dictionaries/XDBCDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace DB
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}

namespace
Expand Down Expand Up @@ -60,6 +61,39 @@ namespace
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
BlockInputStreamPtr reader;
};


ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & where_,
IXDBCBridgeHelper & bridge_)
{
std::string schema = schema_;
std::string table = table_;

if (bridge_.isSchemaAllowed())
{
if (schema.empty())
{
if (auto pos = table.find('.'); pos != std::string::npos)
{
schema = table.substr(0, pos);
table = table.substr(pos + 1);
}
}
}
else
{
if (!schema.empty())
throw Exception{"Dictionary source of type " + bridge_.getName() + " specifies a schema but schema is not supported by "
+ bridge_.getName() + "-driver",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}

return {dict_struct_, db_, schema, table, where_, bridge_.getIdentifierQuotingStyle()};
}
}

static const UInt64 max_block_size = 8192;
Expand All @@ -76,11 +110,12 @@ XDBCDictionarySource::XDBCDictionarySource(
, update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, db{config_.getString(config_prefix_ + ".db", "")}
, schema{config_.getString(config_prefix_ + ".schema", "")}
, table{config_.getString(config_prefix_ + ".table")}
, where{config_.getString(config_prefix_ + ".where", "")}
, update_field{config_.getString(config_prefix_ + ".update_field", "")}
, sample_block{sample_block_}
, query_builder{dict_struct, db, table, where, bridge_->getIdentifierQuotingStyle()}
, query_builder{makeExternalQueryBuilder(dict_struct, db, schema, table, where, *bridge_)}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")}
, bridge_helper{bridge_}
Expand All @@ -104,7 +139,7 @@ XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
, where{other.where}
, update_field{other.update_field}
, sample_block{other.sample_block}
, query_builder{dict_struct, db, table, where, other.bridge_helper->getIdentifierQuotingStyle()}
, query_builder{other.query_builder}
, load_all_query{other.load_all_query}
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
Expand Down
1 change: 1 addition & 0 deletions src/Dictionaries/XDBCDictionarySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class XDBCDictionarySource final : public IDictionarySource
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string db;
const std::string schema;
const std::string table;
const std::string where;
const std::string update_field;
Expand Down
1 change: 0 additions & 1 deletion tests/integration/test_dictionaries_dependency/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def check():
check()


@pytest.mark.skip(reason="TODO: should be fixed")
@pytest.mark.parametrize("node", nodes)
def test_dependency_via_dictionary_database(node):
node.query("CREATE DATABASE dict_db ENGINE=Dictionary")
Expand Down