Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Azure blob storage sas token #51141

Merged
merged 12 commits into from Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterCreateQuery.cpp
Expand Up @@ -764,7 +764,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti
/// Table function without columns list.
auto table_function_ast = create.as_table_function->ptr();
auto table_function = TableFunctionFactory::instance().get(table_function_ast, getContext());
properties.columns = table_function->getActualTableStructure(getContext());
properties.columns = table_function->getActualTableStructure(getContext(), /*is_insert_query*/ true);
}
else if (create.is_dictionary)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterDescribeQuery.cpp
Expand Up @@ -96,7 +96,7 @@ BlockIO InterpreterDescribeQuery::execute()
else if (table_expression.table_function)
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_expression.table_function, getContext());
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext());
auto table_function_column_descriptions = table_function_ptr->getActualTableStructure(getContext(), /*is_insert_query*/ true);
for (const auto & table_function_column_description : table_function_column_descriptions)
columns.emplace_back(table_function_column_description);
}
Expand Down
123 changes: 89 additions & 34 deletions src/Storages/StorageAzureBlob.cpp
Expand Up @@ -87,7 +87,7 @@ const std::unordered_set<std::string_view> optional_configuration_keys = {

bool isConnectionString(const std::string & candidate)
{
return candidate.starts_with("DefaultEndpointsProtocol");
return !candidate.starts_with("http");
}

}
Expand Down Expand Up @@ -258,7 +258,7 @@ void registerStorageAzureBlob(StorageFactory & factory)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");

auto configuration = StorageAzureBlob::getConfiguration(engine_args, args.getLocalContext());
auto client = StorageAzureBlob::createClient(configuration);
auto client = StorageAzureBlob::createClient(configuration, /* is_read_only */ false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
Expand Down Expand Up @@ -310,58 +310,113 @@ void registerStorageAzureBlob(StorageFactory & factory)
});
}

AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration)
static bool containerExists(std::unique_ptr<BlobServiceClient> &blob_service_client, std::string container_name)
{
Azure::Storage::Blobs::ListBlobContainersOptions options;
options.Prefix = container_name;
options.PageSizeHint = 1;

auto containers_list_response = blob_service_client->ListBlobContainers(options);
auto containers_list = containers_list_response.BlobContainers;

for (const auto & container : containers_list)
{
if (container_name == container.Name)
return true;
}
return false;
}

AzureClientPtr StorageAzureBlob::createClient(StorageAzureBlob::Configuration configuration, bool is_read_only)
{
AzureClientPtr result;

if (configuration.is_connection_string)
{
std::unique_ptr<BlobServiceClient> blob_service_client = std::make_unique<BlobServiceClient>(BlobServiceClient::CreateFromConnectionString(configuration.connection_url));
result = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(configuration.connection_url, configuration.container));
result->CreateIfNotExists();
}
else
{
if (configuration.account_name.has_value() && configuration.account_key.has_value())
bool container_exists = containerExists(blob_service_client,configuration.container);

if (!container_exists)
{
auto storage_shared_key_credential = std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
if (is_read_only)
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage container does not exist '{}'",
configuration.container);

try
{
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
}
catch (const Azure::Storage::StorageException & e)
result->CreateIfNotExists();
} catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
{
auto final_url = configuration.connection_url
+ (configuration.connection_url.back() == '/' ? "" : "/")
+ configuration.container;

result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
}
else
if (!(e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists."))
{
throw;
}
}
}
}
else
{
std::shared_ptr<Azure::Storage::StorageSharedKeyCredential> storage_shared_key_credential;
if (configuration.account_name.has_value() && configuration.account_key.has_value())
{
storage_shared_key_credential
= std::make_shared<Azure::Storage::StorageSharedKeyCredential>(*configuration.account_name, *configuration.account_key);
}

std::unique_ptr<BlobServiceClient> blob_service_client;
if (storage_shared_key_credential)
{
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, storage_shared_key_credential);
}
else
{
blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url);
}

bool container_exists = containerExists(blob_service_client,configuration.container);

std::string final_url;
size_t pos = configuration.connection_url.find('?');
if (pos != std::string::npos)
{
auto url_without_sas = configuration.connection_url.substr(0, pos);
final_url = url_without_sas + (url_without_sas.back() == '/' ? "" : "/") + configuration.container
+ configuration.connection_url.substr(pos);
}
else
final_url
= configuration.connection_url + (configuration.connection_url.back() == '/' ? "" : "/") + configuration.container;

if (container_exists)
{
auto managed_identity_credential = std::make_shared<Azure::Identity::ManagedIdentityCredential>();
auto blob_service_client = std::make_unique<BlobServiceClient>(configuration.connection_url, managed_identity_credential);
if (storage_shared_key_credential)
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
else
result = std::make_unique<BlobContainerClient>(final_url);
}
else
{
if (is_read_only)
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"AzureBlobStorage container does not exist '{}'",
configuration.container);
try
{
result = std::make_unique<BlobContainerClient>(blob_service_client->CreateBlobContainer(configuration.container).Value);
}
catch (const Azure::Storage::StorageException & e)
} catch (const Azure::Storage::StorageException & e)
{
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict)
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::Conflict
&& e.ReasonPhrase == "The specified container already exists.")
{
auto final_url = configuration.connection_url
+ (configuration.connection_url.back() == '/' ? "" : "/")
+ configuration.container;

result = std::make_unique<BlobContainerClient>(final_url, managed_identity_credential);
if (storage_shared_key_credential)
result = std::make_unique<BlobContainerClient>(final_url, storage_shared_key_credential);
else
result = std::make_unique<BlobContainerClient>(final_url);
}
else
{
Expand Down Expand Up @@ -439,7 +494,7 @@ void StorageAzureBlob::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
{
throw Exception(
ErrorCodes::DATABASE_ACCESS_DENIED,
"S3 key '{}' contains globs, so the table is in readonly mode",
"AzureBlobStorage key '{}' contains globs, so the table is in readonly mode",
configuration.blob_path);
}

Expand Down Expand Up @@ -1228,7 +1283,7 @@ ColumnsDescription StorageAzureBlob::getTableStructureFromData(
return nullptr;
}

/// S3 file iterator could get new keys after new iteration, check them in schema cache.
///AzureBlobStorage file iterator could get new keys after new iteration, check them in schema cache.
if (ctx->getSettingsRef().schema_inference_use_cache_for_azure && read_keys.size() > prev_read_keys_size)
{
columns_from_cache = tryGetColumnsFromCache(read_keys.begin() + prev_read_keys_size, read_keys.end(), configuration, format_settings, ctx);
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/StorageAzureBlob.h
Expand Up @@ -65,7 +65,7 @@ class StorageAzureBlob : public IStorage
ASTPtr partition_by_);

static StorageAzureBlob::Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context);
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration);
static AzureClientPtr createClient(StorageAzureBlob::Configuration configuration, bool is_read_only);

static AzureObjectStorage::SettingsPtr createSettings(ContextPtr local_context);

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/getStructureOfRemoteTable.cpp
Expand Up @@ -39,7 +39,7 @@ ColumnsDescription getStructureOfRemoteTableInShard(
if (shard_info.isLocal())
{
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_func_ptr, context);
return table_function_ptr->getActualTableStructure(context);
return table_function_ptr->getActualTableStructure(context, /*is_insert_query*/ true);
}

auto table_func_name = queryToString(table_func_ptr);
Expand Down
5 changes: 3 additions & 2 deletions src/TableFunctions/Hive/TableFunctionHive.cpp
Expand Up @@ -49,13 +49,14 @@ namespace DB
actual_columns = parseColumnsListFromString(table_structure, context_);
}

ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/) const { return actual_columns; }
ColumnsDescription TableFunctionHive::getActualTableStructure(ContextPtr /*context_*/, bool /*is_insert_query*/) const { return actual_columns; }

StoragePtr TableFunctionHive::executeImpl(
const ASTPtr & /*ast_function_*/,
ContextPtr context_,
const std::string & table_name_,
ColumnsDescription /*cached_columns_*/) const
ColumnsDescription /*cached_columns_*/,
bool /*is_insert_query*/) const
{
const Settings & settings = context_->getSettings();
ParserExpression partition_by_parser;
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/Hive/TableFunctionHive.h
Expand Up @@ -17,10 +17,10 @@ class TableFunctionHive : public ITableFunction
bool hasStaticStructure() const override { return true; }

StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;

const char * getStorageTypeName() const override { return storage_type_name; }
ColumnsDescription getActualTableStructure(ContextPtr) const override;
ColumnsDescription getActualTableStructure(ContextPtr, bool is_insert_query) const override;
void parseArguments(const ASTPtr & ast_function_, ContextPtr context_) override;

private:
Expand Down
8 changes: 4 additions & 4 deletions src/TableFunctions/ITableFunction.cpp
Expand Up @@ -34,15 +34,15 @@ StoragePtr ITableFunction::execute(const ASTPtr & ast_function, ContextPtr conte
auto context_to_use = use_global_context ? context->getGlobalContext() : context;

if (cached_columns.empty())
return executeImpl(ast_function, context, table_name, std::move(cached_columns));
return executeImpl(ast_function, context, table_name, std::move(cached_columns), is_insert_query);

if (hasStaticStructure() && cached_columns == getActualTableStructure(context))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns));
if (hasStaticStructure() && cached_columns == getActualTableStructure(context,is_insert_query))
return executeImpl(ast_function, context_to_use, table_name, std::move(cached_columns), is_insert_query);

auto this_table_function = shared_from_this();
auto get_storage = [=]() -> StoragePtr
{
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns);
return this_table_function->executeImpl(ast_function, context_to_use, table_name, cached_columns, is_insert_query);
};

/// It will request actual table structure and create underlying storage lazily
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/ITableFunction.h
Expand Up @@ -58,7 +58,7 @@ class ITableFunction : public std::enable_shared_from_this<ITableFunction>
virtual void parseArguments(const ASTPtr & /*ast_function*/, ContextPtr /*context*/) {}

/// Returns actual table structure probably requested from remote server, may fail
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/) const = 0;
virtual ColumnsDescription getActualTableStructure(ContextPtr /*context*/, bool is_insert_query) const = 0;

/// Check if table function needs a structure hint from SELECT query in case of
/// INSERT INTO FUNCTION ... SELECT ... and INSERT INTO ... SELECT ... FROM table_function(...)
Expand Down Expand Up @@ -89,7 +89,7 @@ class ITableFunction : public std::enable_shared_from_this<ITableFunction>

private:
virtual StoragePtr executeImpl(
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const = 0;
const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const = 0;

virtual const char * getStorageTypeName() const = 0;
};
Expand Down
5 changes: 3 additions & 2 deletions src/TableFunctions/ITableFunctionDataLake.h
Expand Up @@ -26,7 +26,8 @@ class ITableFunctionDataLake : public TableFunction
const ASTPtr & /*ast_function*/,
ContextPtr context,
const std::string & table_name,
ColumnsDescription /*cached_columns*/) const override
ColumnsDescription /*cached_columns*/,
bool /*is_insert_query*/) const override
{
ColumnsDescription columns;
if (TableFunction::configuration.structure != "auto")
Expand All @@ -42,7 +43,7 @@ class ITableFunctionDataLake : public TableFunction

const char * getStorageTypeName() const override { return Storage::name; }

ColumnsDescription getActualTableStructure(ContextPtr context) const override
ColumnsDescription getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const override
{
if (TableFunction::configuration.structure == "auto")
{
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunctionFileLike.cpp
Expand Up @@ -110,7 +110,7 @@ void ITableFunctionFileLike::addColumnsStructureToArguments(ASTs & args, const S
}
}

StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool /*is_insert_query*/) const
{
ColumnsDescription columns;
if (structure != "auto")
Expand Down
2 changes: 1 addition & 1 deletion src/TableFunctions/ITableFunctionFileLike.h
Expand Up @@ -48,7 +48,7 @@ class ITableFunctionFileLike : public ITableFunction
ColumnsDescription structure_hint;

private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;

virtual StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, ContextPtr global_context,
Expand Down
6 changes: 3 additions & 3 deletions src/TableFunctions/ITableFunctionXDBC.cpp
Expand Up @@ -61,7 +61,7 @@ void ITableFunctionXDBC::startBridgeIfNot(ContextPtr context) const
}
}

ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context) const
ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr context, bool /*is_insert_query*/) const
{
startBridgeIfNot(context);

Expand Down Expand Up @@ -92,10 +92,10 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
return ColumnsDescription{columns};
}

StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
StoragePtr ITableFunctionXDBC::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/, bool is_insert_query) const
{
startBridgeIfNot(context);
auto columns = getActualTableStructure(context);
auto columns = getActualTableStructure(context, is_insert_query);
auto result = std::make_shared<StorageXDBC>(
StorageID(getDatabaseName(), table_name), schema_name, remote_table_name, columns, ConstraintsDescription{}, String{}, context, helper);
result->startup();
Expand Down
4 changes: 2 additions & 2 deletions src/TableFunctions/ITableFunctionXDBC.h
Expand Up @@ -16,15 +16,15 @@ namespace DB
class ITableFunctionXDBC : public ITableFunction
{
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns) const override;
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const std::string & table_name, ColumnsDescription cached_columns, bool is_insert_query) const override;

/* A factory method to create bridge helper, that will assist in remote interaction */
virtual BridgeHelperPtr createBridgeHelper(ContextPtr context,
Poco::Timespan http_timeout_,
const std::string & connection_string_,
bool use_connection_pooling_) const = 0;

ColumnsDescription getActualTableStructure(ContextPtr context) const override;
ColumnsDescription getActualTableStructure(ContextPtr context, bool is_insert_query) const override;

void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;

Expand Down