Skip to content

Commit

Permalink
Don't split dictionary source's table name into schema and table name…
Browse files Browse the repository at this point in the history
… itself

if ODBC driver doesn't support schema.
  • Loading branch information
Vitaly Baranov committed Jul 6, 2020
1 parent 99e9b15 commit 4733504
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 23 deletions.
1 change: 1 addition & 0 deletions programs/odbc-bridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set (CLICKHOUSE_ODBC_BRIDGE_SOURCES
ODBCBlockOutputStream.cpp
ODBCBridge.cpp
PingHandler.cpp
SchemaAllowedHandler.cpp
validateODBCConnectionString.cpp
)
set (CLICKHOUSE_ODBC_BRIDGE_LINK
Expand Down
6 changes: 6 additions & 0 deletions programs/odbc-bridge/HandlerFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco:
return new IdentifierQuoteHandler(keep_alive_timeout, context);
#else
return nullptr;
#endif
else if (uri.getPath() == "/schema_allowed")
#if USE_ODBC
return new SchemaAllowedHandler(keep_alive_timeout, context);
#else
return nullptr;
#endif
else if (uri.getPath() == "/write")
return new ODBCHandler(pool_map, keep_alive_timeout, context, "write");
Expand Down
3 changes: 2 additions & 1 deletion programs/odbc-bridge/HandlerFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "MainHandler.h"
#include "ColumnInfoHandler.h"
#include "IdentifierQuoteHandler.h"
#include "SchemaAllowedHandler.h"

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
Expand All @@ -15,7 +16,7 @@

namespace DB
{
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote' handlers.
/** Factory for '/ping', '/', '/columns_info', '/identifier_quote', '/schema_allowed' handlers.
* Also stores Session pools for ODBC connections
*/
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
Expand Down
76 changes: 76 additions & 0 deletions programs/odbc-bridge/SchemaAllowedHandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include "SchemaAllowedHandler.h"

#if USE_ODBC

# include <IO/WriteBufferFromHTTPServerResponse.h>
# include <IO/WriteHelpers.h>
# include <Poco/Data/ODBC/ODBCException.h>
# include <Poco/Data/ODBC/SessionImpl.h>
# include <Poco/Data/ODBC/Utility.h>
# include <Poco/Net/HTMLForm.h>
# include <Poco/Net/HTTPServerRequest.h>
# include <Poco/Net/HTTPServerResponse.h>
# include <common/logger_useful.h>
# include "validateODBCConnectionString.h"

# define POCO_SQL_ODBC_CLASS Poco::Data::ODBC

namespace DB
{
namespace
{
bool isSchemaAllowed(SQLHDBC hdbc)
{
std::string identifier;

SQLSMALLINT t;
SQLRETURN r = POCO_SQL_ODBC_CLASS::SQLGetInfo(hdbc, SQL_SCHEMA_USAGE, nullptr, 0, &t);

if (POCO_SQL_ODBC_CLASS::Utility::isError(r))
throw POCO_SQL_ODBC_CLASS::ConnectionException(hdbc);

return t != 0;
}
}


void SchemaAllowedHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: {}", request.getURI());

auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
LOG_WARNING(log, message);
};

if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");
return;
}

try
{
std::string connection_string = params.get("connection_string");
POCO_SQL_ODBC_CLASS::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
SQLHDBC hdbc = session.dbc().handle();

bool result = isSchemaAllowed(hdbc);

WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
writeBoolText(result, out);
}
catch (...)
{
process_error("Error getting schema usage from ODBC '" + getCurrentExceptionMessage(false) + "'");
tryLogCurrentException(log);
}
}

}

#endif
31 changes: 31 additions & 0 deletions programs/odbc-bridge/SchemaAllowedHandler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>

#if USE_ODBC

namespace DB
{
class Context;


/// This handler establishes connection to database, and retrieve whether schema is allowed.
class SchemaAllowedHandler : public Poco::Net::HTTPRequestHandler
{
public:
SchemaAllowedHandler(size_t keep_alive_timeout_, Context &)
: log(&Poco::Logger::get("SchemaAllowedHandler")), keep_alive_timeout(keep_alive_timeout_)
{
}

void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;

private:
Poco::Logger * log;
size_t keep_alive_timeout;
};

}

#endif
24 changes: 24 additions & 0 deletions src/Common/XDBCBridgeHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class IXDBCBridgeHelper
virtual Poco::URI getMainURI() const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
virtual IdentifierQuotingStyle getIdentifierQuotingStyle() = 0;
virtual bool isSchemaAllowed() = 0;
virtual String getName() const = 0;

virtual ~IXDBCBridgeHelper() = default;
Expand All @@ -61,6 +62,7 @@ class XDBCBridgeHelper : public IXDBCBridgeHelper
Poco::Logger * log = &Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper");

std::optional<IdentifierQuotingStyle> quote_style;
std::optional<bool> is_schema_allowed;

protected:
auto getConnectionString() const
Expand All @@ -80,6 +82,7 @@ class XDBCBridgeHelper : public IXDBCBridgeHelper
static constexpr inline auto MAIN_HANDLER = "/";
static constexpr inline auto COL_INFO_HANDLER = "/columns_info";
static constexpr inline auto IDENTIFIER_QUOTE_HANDLER = "/identifier_quote";
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
static constexpr inline auto PING_OK_ANSWER = "Ok.";

XDBCBridgeHelper(const Context & global_context_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
Expand Down Expand Up @@ -128,6 +131,27 @@ class XDBCBridgeHelper : public IXDBCBridgeHelper
return *quote_style;
}

bool isSchemaAllowed() override
{
if (!is_schema_allowed.has_value())
{
startBridgeSync();

auto uri = createBaseURI();
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());

ReadWriteBufferFromHTTP buf(
uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));

bool res;
readBoolText(res, buf);
is_schema_allowed = res;
}

return *is_schema_allowed;
}

/**
* @todo leaky abstraction - used by external API's
*/
Expand Down
2 changes: 1 addition & 1 deletion src/Dictionaries/CassandraDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ CassandraDictionarySource::CassandraDictionarySource(
, dict_struct(dict_struct_)
, settings(settings_)
, sample_block(sample_block_)
, query_builder(dict_struct, settings.db, settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
, query_builder(dict_struct, settings.db, "", settings.table, settings.where, IdentifierQuotingStyle::DoubleQuotes)
{
cassandraCheck(cass_cluster_set_contact_points(cluster, settings.host.c_str()));
if (settings.port)
Expand Down
4 changes: 2 additions & 2 deletions src/Dictionaries/ClickHouseDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(
, where{config.getString(config_prefix + ".where", "")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, sample_block{sample_block_}
, context(context_)
, is_local{isLocalAddress({host, port}, secure ? context.getTCPPortSecure().value_or(0) : context.getTCPPort())}
Expand Down Expand Up @@ -97,7 +97,7 @@ ClickHouseDictionarySource::ClickHouseDictionarySource(const ClickHouseDictionar
, update_field{other.update_field}
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, sample_block{other.sample_block}
, context(other.context)
, is_local{other.is_local}
Expand Down
16 changes: 3 additions & 13 deletions src/Dictionaries/ExternalQueryBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,12 @@ namespace ErrorCodes
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_)
: dict_struct(dict_struct_), db(db_), where(where_), quoting_style(quoting_style_)
{
if (auto pos = table_.find('.'); pos != std::string::npos)
{
schema = table_.substr(0, pos);
table = table_.substr(pos + 1);
}
else
{
schema = "";
table = table_;
}
}
: dict_struct(dict_struct_), db(db_), schema(schema_), table(table_), where(where_), quoting_style(quoting_style_)
{}


void ExternalQueryBuilder::writeQuoted(const std::string & s, WriteBuffer & out) const
Expand Down
3 changes: 2 additions & 1 deletion src/Dictionaries/ExternalQueryBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ struct ExternalQueryBuilder
{
const DictionaryStructure & dict_struct;
std::string db;
std::string table;
std::string schema;
std::string table;
const std::string & where;

IdentifierQuotingStyle quoting_style;
Expand All @@ -28,6 +28,7 @@ struct ExternalQueryBuilder
ExternalQueryBuilder(
const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & where_,
IdentifierQuotingStyle quoting_style_);
Expand Down
4 changes: 2 additions & 2 deletions src/Dictionaries/MySQLDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ MySQLDictionarySource::MySQLDictionarySource(
, dont_check_update_time{config.getBool(config_prefix + ".dont_check_update_time", false)}
, sample_block{sample_block_}
, pool{mysqlxx::PoolFactory::instance().get(config, config_prefix)}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config.getString(config_prefix + ".invalidate_query", "")}
, close_connection{config.getBool(config_prefix + ".close_connection", false) || config.getBool(config_prefix + ".share_connection", false)}
Expand All @@ -87,7 +87,7 @@ MySQLDictionarySource::MySQLDictionarySource(const MySQLDictionarySource & other
, dont_check_update_time{other.dont_check_update_time}
, sample_block{other.sample_block}
, pool{other.pool}
, query_builder{dict_struct, db, table, where, IdentifierQuotingStyle::Backticks}
, query_builder{dict_struct, db, "", table, where, IdentifierQuotingStyle::Backticks}
, load_all_query{other.load_all_query}
, last_modification{other.last_modification}
, invalidate_query{other.invalidate_query}
Expand Down
39 changes: 37 additions & 2 deletions src/Dictionaries/XDBCDictionarySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace DB
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}

namespace
Expand Down Expand Up @@ -60,6 +61,39 @@ namespace
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
BlockInputStreamPtr reader;
};


ExternalQueryBuilder makeExternalQueryBuilder(const DictionaryStructure & dict_struct_,
const std::string & db_,
const std::string & schema_,
const std::string & table_,
const std::string & where_,
IXDBCBridgeHelper & bridge_)
{
std::string schema = schema_;
std::string table = table_;

if (bridge_.isSchemaAllowed())
{
if (schema.empty())
{
if (auto pos = table.find('.'); pos != std::string::npos)
{
schema = table.substr(0, pos);
table = table.substr(pos + 1);
}
}
}
else
{
if (!schema.empty())
throw Exception{"Dictionary source of type " + bridge_.getName() + " specifies a schema but schema is not supported by "
+ bridge_.getName() + "-driver",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}

return {dict_struct_, db_, schema, table, where_, bridge_.getIdentifierQuotingStyle()};
}
}

static const UInt64 max_block_size = 8192;
Expand All @@ -76,11 +110,12 @@ XDBCDictionarySource::XDBCDictionarySource(
, update_time{std::chrono::system_clock::from_time_t(0)}
, dict_struct{dict_struct_}
, db{config_.getString(config_prefix_ + ".db", "")}
, schema{config_.getString(config_prefix_ + ".schema", "")}
, table{config_.getString(config_prefix_ + ".table")}
, where{config_.getString(config_prefix_ + ".where", "")}
, update_field{config_.getString(config_prefix_ + ".update_field", "")}
, sample_block{sample_block_}
, query_builder{dict_struct, db, table, where, bridge_->getIdentifierQuotingStyle()}
, query_builder{makeExternalQueryBuilder(dict_struct, db, schema, table, where, *bridge_)}
, load_all_query{query_builder.composeLoadAllQuery()}
, invalidate_query{config_.getString(config_prefix_ + ".invalidate_query", "")}
, bridge_helper{bridge_}
Expand All @@ -104,7 +139,7 @@ XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
, where{other.where}
, update_field{other.update_field}
, sample_block{other.sample_block}
, query_builder{dict_struct, db, table, where, other.bridge_helper->getIdentifierQuotingStyle()}
, query_builder{other.query_builder}
, load_all_query{other.load_all_query}
, invalidate_query{other.invalidate_query}
, invalidate_query_response{other.invalidate_query_response}
Expand Down
1 change: 1 addition & 0 deletions src/Dictionaries/XDBCDictionarySource.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class XDBCDictionarySource final : public IDictionarySource
std::chrono::time_point<std::chrono::system_clock> update_time;
const DictionaryStructure dict_struct;
const std::string db;
const std::string schema;
const std::string table;
const std::string where;
const std::string update_field;
Expand Down
1 change: 0 additions & 1 deletion tests/integration/test_dictionaries_dependency/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def check():
check()


@pytest.mark.skip(reason="TODO: should be fixed")
@pytest.mark.parametrize("node", nodes)
def test_dependency_via_dictionary_database(node):
node.query("CREATE DATABASE dict_db ENGINE=Dictionary")
Expand Down

0 comments on commit 4733504

Please sign in to comment.