Skip to content

Commit

Permalink
Merge pull request #51141 from ClickHouse/azure_blob_storage_sas_token
Browse files Browse the repository at this point in the history
Azure blob storage sas token
  • Loading branch information
alesapin committed Aug 1, 2023
2 parents 40ce9cf + 74e7ff9 commit 0ee9797
Show file tree
Hide file tree
Showing 68 changed files with 239 additions and 171 deletions.
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterCreateQuery.cpp
Expand Up @@ -764,7 +764,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
/// Table function without columns list.
auto table_function_ast = create.as_table_function->ptr();
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
properties.columns = table_function->getActualTableStructure(getContext());
properties.columns = table_function->getActualTableStructure(getContext(), /*is_insert_query*/ true);
}
else if (create.is_dictionary)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterDescribeQuery.cpp
Expand Up @@ -96,7 +96,7 @@ BlockIO InterpreterDescribeQuery::execute()
else if (table_expression.table_function)
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, getContext());
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext());
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext(), /*is_insert_query*/ true);
for (const auto & table_function_column_description : table_function_column_descriptions)
columns.emplace_back(table_function_column_description);
}
Expand Down
123 changes: 89 additions & 34 deletions src/Storages/StorageAzureBlob.cpp
Expand Up @@ -86,7 +86,7 @@ const std::unordered_set<std::string_view> optional_configuration_keys = {

bool isConnectionString(const std::string & candidate)
{
return candidate.starts_with("DefaultEndpointsProtocol");
return !candidate.starts_with("http");
}

}
Expand Down Expand Up @@ -257,7 +257,7 @@ void registerStorageAzureBlob(StorageFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");

auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext());
auto client = StorageAzureBlob::createClient(configuration);
auto client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
Expand Down Expand Up @@ -309,58 +309,113 @@ void registerStorageAzureBlob(StorageFactory & factory)
});
}

AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration)
static bool containerExists(std::unique_ptr<BlobServiceClient> &blob_service_client, std::string container_name)
{
Azure::Storage::Blobs::ListBlobContainersOptions options;
options.Prefix = container_name;
options.PageSizeHint = 1;

auto containers_list_response = blob_service_client->ListBlobContainers(options);
auto containers_list = containers_list_response.BlobContainers;

for (const auto & container : containers_list)
{
if (container_name == container.Name)
return true;
}
return false;
}

AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration, bool is_read_only)
{
AzureClientPtr result;

if (configuration.is_connection_string)
{
std::unique_ptr<BlobServiceClient> blob_service_client = std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(configuration.connection_url));
result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container));
result->CreateIfNotExists();
}
else
{
if (configuration.account_name.has_value() && configuration.account_key.has_value())
bool container_exists = containerExists(blob_service_client,configuration.container);

if (!container_exists)
{
auto storage_shared_key_credential = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
if (is_read_only)
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage container does not exist '{}'",
configuration.container);

try
{
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
}
catch (const Azure::Storage::StorageException & e)
result->CreateIfNotExists();
} catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
{
auto final_url = configuration.connection_url
+ (configuration.connection_url.back() == '/' ? "" : "/")
+ configuration.container;

result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
}
else
if (!(e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists."))
{
throw;
}
}
}
}
else
{
std::shared_ptr<Azure::Storage::StorageSharedKeyCredential> storage_shared_key_credential;
if (configuration.account_name.has_value() && configuration.account_key.has_value())
{
storage_shared_key_credential
= std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
}

std::unique_ptr<BlobServiceClient> blob_service_client;
if (storage_shared_key_credential)
{
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
}
else
{
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url);
}

bool container_exists = containerExists(blob_service_client,configuration.container);

std::string final_url;
size_t pos = configuration.connection_url.find('?');
if (pos != std::string::npos)
{
auto url_without_sas = configuration.connection_url.substr(0, pos);
final_url = url_without_sas + (url_without_sas.back() == '/' ? "" : "/") + configuration.container
+ configuration.connection_url.substr(pos);
}
else
final_url
= configuration.connection_url + (configuration.connection_url.back() == '/' ? "" : "/") + configuration.container;

if (container_exists)
{
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, managed_identity_credential);
if (storage_shared_key_credential)
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
else
result = std::make_unique<BlobContainerClient>(final_url);
}
else
{
if (is_read_only)
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage container does not exist '{}'",
configuration.container);
try
{
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
}
catch (const Azure::Storage::StorageException & e)
} catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists.")
{
auto final_url = configuration.connection_url
+ (configuration.connection_url.back() == '/' ? "" : "/")
+ configuration.container;

result = std::make_unique<BlobContainerClient>(final_url, managed_identity_credential);
if (storage_shared_key_credential)
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
else
result = std::make_unique<BlobContainerClient>(final_url);
}
else
{
Expand Down Expand Up @@ -438,7 +493,7 @@ void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
{
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode",
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode",
configuration.blob_path);
}

Expand Down Expand Up @@ -1203,7 +1258,7 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
return nullptr;
}

/// S3 file iterator could get new keys after new iteration, check them in schema cache.
///AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageAzureBlob.h
Expand Up @@ -65,7 +65,7 @@ class StorageAzureBlob : public IStorage
ASTPtr partition_by_);

static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration);
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration, bool is_read_only);

static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context);

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/getStructureOfRemoteTable.cpp
Expand Up @@ -38,7 +38,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
if (shard_info.isLocal())
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
return table_function_ptr->getActualTableStructure(context);
return table_function_ptr->getActualTableStructure(context, /*is_insert_query*/ true);
}

auto table_func_name = queryToString(table_func_ptr);
Expand Down
5 changes: 3 additions & 2 deletions src/TableFunctions/Hive/TableFunctionHive.cpp
Expand Up @@ -49,13 +49,14 @@ namespace DB
actual_columns = parseColumnsListFromString(table_structure, context_);
}

ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/) const { return actual_columns; }
ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/, bool /*is_insert_query*/) const { return actual_columns; }

StoragePtr TableFunctionHive::executeImpl(
const ASTPtr & /*ast_function_*/,
ContextPtr context_,
const std::string & table_name_,
ColumnsDescription /*cached_columns_*/) const
ColumnsDescription /*cached_columns_*/,
bool /*is_insert_query*/) const
{
const Settings & settings = context_->getSettings();
ParserExpression partition_by_parser;
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/Hive/TableFunctionHive.h
Expand Up @@ -17,10 +17,10 @@ class TableFunctionHive : public ITableFunction
bool hasStaticStructure() const override { return true; }

StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;

const char * getStorageTypeName() const override { return storage_type_name; }
ColumnsDescription getActualTableStructure(ContextPtr) const override;
ColumnsDescription getActualTableStructure(ContextPtr, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function_, ContextPtr context_) override;

private:
Expand Down
8 changes: 4 additions & 4 deletions src/TableFunctions/ITableFunction.cpp
Expand Up @@ -34,15 +34,15 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte
auto context_to_use = use_global_context ? context->getGlobalContext() : context;

if (cached_columns.empty())
return executeImpl(ast_function, context, table_name, std::move(cached_columns));
return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query);

if (hasStaticStructure() && cached_columns == getActualTableStructure(context))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns));
if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query);

auto this_table_function = shared_from_this();
auto get_storage = [=]() -> StoragePtr
{
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns);
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, is_insert_query);
};

/// It will request actual table structure and create underlying storage lazily
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/ITableFunction.h
Expand Up @@ -58,7 +58,7 @@ class ITableFunction : public std::enable_shared_from_this<ITableFunction>
virtual void parseArguments(const ASTPtr & /*ast_function*/, ContextPtr /*context*/) {}

/// Returns actual table structure probably requested from remote server, may fail
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/) const = 0;
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/, bool is_insert_query) const = 0;

/// Check if table function needs a structure hint from SELECT query in case of
/// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...)
Expand Down Expand Up @@ -89,7 +89,7 @@ class ITableFunction : public std::enable_shared_from_this<ITableFunction>

private:
virtual StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const = 0;

virtual const char * getStorageTypeName() const = 0;
};
Expand Down
5 changes: 3 additions & 2 deletions src/TableFunctions/ITableFunctionDataLake.h
Expand Up @@ -26,7 +26,8 @@ class ITableFunctionDataLake : public TableFunction
const ASTPtr & /*ast_function*/,
ContextPtr context,
const std::string & table_name,
ColumnsDescription /*cached_columns*/) const override
ColumnsDescription /*cached_columns*/,
bool /*is_insert_query*/) const override
{
ColumnsDescription columns;
if (TableFunction::configuration.structure != "auto")
Expand All @@ -42,7 +43,7 @@ class ITableFunctionDataLake : public TableFunction

const char * getStorageTypeName() const override { return Storage::name; }

ColumnsDescription getActualTableStructure(ContextPtr context) const override
ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override
{
if (TableFunction::configuration.structure == "auto")
{
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunctionFileLike.cpp
Expand Up @@ -110,7 +110,7 @@ void ITableFunctionFileLike::addColumnsStructureToArguments(ASTs & args, const S
}
}

StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{
ColumnsDescription columns;
if (structure != "auto")
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunctionFileLike.h
Expand Up @@ -48,7 +48,7 @@ class ITableFunctionFileLike : public ITableFunction
ColumnsDescription structure_hint;

private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;

virtual StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, ContextPtr global_context,
Expand Down
6 changes: 3 additions & 3 deletions src/TableFunctions/ITableFunctionXDBC.cpp
Expand Up @@ -61,7 +61,7 @@ void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const
}
}

ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context) const
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
startBridgeIfNot(context);

Expand Down Expand Up @@ -92,10 +92,10 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
return ColumnsDescription{columns};
}

StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
startBridgeIfNot(context);
auto columns = getActualTableStructure(context);
auto columns = getActualTableStructure(context, is_insert_query);
auto result = std::make_shared<StorageXDBC>(
StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, ConstraintsDescription{}, String{}, context, helper);
result->startup();
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/ITableFunctionXDBC.h
Expand Up @@ -16,15 +16,15 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;

/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(ContextPtr context,
Poco::Timespan http_timeout_,
const std::string & connection_string_,
bool use_connection_pooling_) const = 0;

ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;

void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;

Expand Down

0 comments on commit 0ee9797

Please sign in to comment.