Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/workflows/pull_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
# "python3 ./ci/jobs/scripts/workflow_hooks/pr_description.py", # NOTE (strtgbb): relies on labels we don't use
"python3 ./ci/jobs/scripts/workflow_hooks/version_log.py",
# "python3 ./ci/jobs/scripts/workflow_hooks/quick_sync.py", # NOTE (strtgbb): we don't do this
"python3 ./ci/jobs/scripts/workflow_hooks/new_tests_check.py",
# "python3 ./ci/jobs/scripts/workflow_hooks/new_tests_check.py", # NOTE (strtgbb): we don't use this
],
workflow_filter_hooks=[should_skip_job],
post_hooks=[
Expand Down
24 changes: 14 additions & 10 deletions src/Databases/DataLake/GlueCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,12 @@ bool GlueCatalog::empty() const
bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMetadata & table_metadata) const
{
String metadata_path;
String metadata_uri;
if (auto table_specific_properties = table_metadata.getDataLakeSpecificProperties();
table_specific_properties.has_value())
{
metadata_path = table_specific_properties->iceberg_metadata_file_location;
metadata_uri = metadata_path;
if (metadata_path.starts_with("s3:/"))
metadata_path = metadata_path.substr(5);

Expand All @@ -420,22 +422,24 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
else
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Metadata specific properties should be defined");

if (!metadata_objects.get(metadata_path))
if (!metadata_objects.get(metadata_uri))
{
DB::ASTStorage * storage = table_engine_definition->as<DB::ASTStorage>();
DB::ASTs args = storage->engine->arguments->children;

auto table_endpoint = settings[DB::DatabaseDataLakeSetting::storage_endpoint].value;
String storage_endpoint = !settings[DB::DatabaseDataLakeSetting::storage_endpoint].value.empty() ? settings[DB::DatabaseDataLakeSetting::storage_endpoint].value : metadata_uri;

if (args.empty())
args.emplace_back(std::make_shared<DB::ASTLiteral>(table_endpoint));
args.emplace_back(std::make_shared<DB::ASTLiteral>(storage_endpoint));
else
args[0] = std::make_shared<DB::ASTLiteral>(table_endpoint);
args[0] = std::make_shared<DB::ASTLiteral>(storage_endpoint);

if (args.size() == 1 && table_metadata.hasStorageCredentials())
if (args.size() == 1)
{
auto storage_credentials = table_metadata.getStorageCredentials();
if (storage_credentials)
storage_credentials->addCredentialsToEngineArgs(args);
if (table_metadata.hasStorageCredentials())
table_metadata.getStorageCredentials()->addCredentialsToEngineArgs(args);
else if (!credentials.IsExpiredOrEmpty())
DataLake::S3Credentials(credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), credentials.GetSessionToken()).addCredentialsToEngineArgs(args);
}

auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
Expand All @@ -454,9 +458,9 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
Poco::JSON::Parser parser;
Poco::Dynamic::Var result = parser.parse(metadata_file);
auto metadata_object = result.extract<Poco::JSON::Object::Ptr>();
metadata_objects.set(metadata_path, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
metadata_objects.set(metadata_uri, std::make_shared<Poco::JSON::Object::Ptr>(metadata_object));
}
auto metadata_object = *metadata_objects.get(metadata_path);
auto metadata_object = *metadata_objects.get(metadata_uri);
auto current_schema_id = metadata_object->getValue<Int64>("current-schema-id");
auto schemas = metadata_object->getArray(Iceberg::f_schemas);
for (size_t i = 0; i < schemas->size(); ++i)
Expand Down
Loading