diff --git a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h index 498d4f3b5803..4e8babe15fb7 100644 --- a/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h @@ -166,6 +166,9 @@ class IDataLakeMetadata : boost::noncopyable virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); } virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); } + virtual std::optional partitionKey(ContextPtr) const { return {}; } + virtual std::optional sortingKey(ContextPtr) const { return {}; } + protected: virtual ObjectIterator createKeysIterator( Strings && data_files_, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp index 0cadc66c6d76..12fcb29b24ac 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp @@ -258,6 +258,192 @@ bool IcebergMetadata::update(const ContextPtr & local_context) return previous_snapshot_schema_id != relevant_snapshot_schema_id; } +namespace +{ + +using IdToName = std::unordered_map; + +IdToName buildIdToNameMap(const Poco::JSON::Object::Ptr & metadata_obj) +{ + IdToName map; + if (!metadata_obj || !metadata_obj->has("current-schema-id") || !metadata_obj->has("schemas")) + return map; + + const auto current_schema_id = metadata_obj->getValue("current-schema-id"); + auto schemas = metadata_obj->getArray("schemas"); + if (!schemas) + return map; + + for (size_t i = 0; i < schemas->size(); ++i) + { + auto schema = schemas->getObject(i); + + if (!schema || !schema->has("schema-id") || (schema->getValue("schema-id") != current_schema_id)) + continue; + + if (auto fields = schema->getArray("fields")) + { + for (size_t j = 0; j < fields->size(); ++j) + { + auto f = fields->getObject(j); + if (!f || !f->has("id") || !f->has("name")) + continue; + map.emplace(f->getValue("id"), f->getValue("name")); + } + } + break; + } + return map; +} + +String formatTransform( + const String & transform, + const Poco::JSON::Object::Ptr & field_obj, + const IdToName & id_to_name) +{ + Int32 source_id = (field_obj && field_obj->has("source-id")) + ? field_obj->getValue("source-id") + : -1; + + const auto it = id_to_name.find(source_id); + const String col = (it != id_to_name.end()) ? it->second : ("col_" + toString(source_id)); + + String base = transform; + String param; + if (const auto lpos = transform.find('['); lpos != String::npos && transform.back() == ']') + { + base = transform.substr(0, lpos); + param = transform.substr(lpos + 1, transform.size() - lpos - 2); // strip [ and ] + } + + String result; + if (base == "identity") + result = col; + else if (base == "year" || base == "month" || base == "day" || base == "hour") + result = base + "(" + col + ")"; + else if (base != "void") + { + if (!param.empty()) + result = base + "(" + param + ", " + col + ")"; + else + result = base + "(" + col + ")"; + } + return result; +} + +Poco::JSON::Array::Ptr findActivePartitionFields(const Poco::JSON::Object::Ptr & metadata_obj) +{ + if (!metadata_obj) + return nullptr; + + if (metadata_obj->has("partition-spec")) + return metadata_obj->getArray("partition-spec"); + + // If for some reason there is no partition-spec, try partition-specs + default- + if (metadata_obj->has("partition-specs") && metadata_obj->has("default-spec-id")) + { + const auto default_spec_id = metadata_obj->getValue("default-spec-id"); + if (auto specs = metadata_obj->getArray("partition-specs")) + { + for (size_t i = 0; i < specs->size(); ++i) + { + auto spec = specs->getObject(i); + if (!spec || !spec->has("spec-id")) + continue; + if (spec->getValue("spec-id") == default_spec_id) + return spec->has("fields") ? spec->getArray("fields") : nullptr; + } + } + } + + return nullptr; +} + +Poco::JSON::Array::Ptr findActiveSortFields(const Poco::JSON::Object::Ptr & metadata_obj) +{ + if (!metadata_obj || !metadata_obj->has("default-sort-order-id") || !metadata_obj->has("sort-orders")) + return nullptr; + + const auto default_sort_order_id = metadata_obj->getValue("default-sort-order-id"); + auto orders = metadata_obj->getArray("sort-orders"); + if (!orders) + return nullptr; + + for (size_t i = 0; i < orders->size(); ++i) + { + auto order = orders->getObject(i); + if (!order || !order->has("order-id")) + continue; + if (order->getValue("order-id") == default_sort_order_id) + return order->has("fields") ? order->getArray("fields") : nullptr; + } + return nullptr; +} + +String composeList( + const Poco::JSON::Array::Ptr & fields, + const IdToName & id_to_name, + bool lookup_sort_modifiers) +{ + if (!fields || fields->size() == 0) + return {}; + + Strings parts; + parts.reserve(fields->size()); + + for (size_t i = 0; i < fields->size(); ++i) + { + auto field = fields->getObject(i); + if (!field) + continue; + + const String transform = field->has("transform") ? field->getValue("transform") : "identity"; + String expr = formatTransform(transform, field, id_to_name); + if (expr.empty()) + continue; + + if (lookup_sort_modifiers) + { + if (field->has("direction")) + { + auto d = field->getValue("direction"); + expr += (Poco::icompare(d, "desc") == 0) ? " DESC" : " ASC"; + } + } + + parts.push_back(std::move(expr)); + } + + if (parts.empty()) + return {}; + + String res; + for (size_t i = 0; i < parts.size(); ++i) + { + if (i) res += ", "; + res += parts[i]; + } + return res; +} + +std::pair, std::optional> extractIcebergKeys(const Poco::JSON::Object::Ptr & metadata_obj) +{ + std::optional partition_key; + std::optional sort_key; + + if (metadata_obj) + { + auto id_to_name = buildIdToNameMap(metadata_obj); + + partition_key = composeList(findActivePartitionFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ false); + sort_key = composeList(findActiveSortFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ true); + } + + return {partition_key, sort_key}; +} + +} + void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) { auto configuration_ptr = configuration.lock(); @@ -309,6 +495,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec } } + auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object); relevant_snapshot = std::make_shared( getManifestList( object_storage, @@ -324,7 +511,9 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec relevant_snapshot_id, total_rows, total_bytes, - total_position_deletes); + total_position_deletes, + partition_key, + sorting_key); if (!snapshot->has(f_schema_id)) throw Exception( @@ -777,6 +966,19 @@ std::optional IcebergMetadata::totalBytes(ContextPtr local_context) cons return result; } +std::optional IcebergMetadata::partitionKey(ContextPtr) const +{ + SharedLockGuard lock(mutex); + return relevant_snapshot->partition_key; +} + +std::optional IcebergMetadata::sortingKey(ContextPtr) const +{ + SharedLockGuard lock(mutex); + return relevant_snapshot->sorting_key; +} + + ObjectIterator IcebergMetadata::iterate( const ActionsDAG * filter_dag, FileProgressCallback callback, diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h index e755c72946b8..8af8e68dca13 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h @@ -121,6 +121,9 @@ class IcebergMetadata : public IDataLakeMetadata void checkAlterIsPossible(const AlterCommands & commands) override; void alter(const AlterCommands & params, ContextPtr context) override; + std::optional partitionKey(ContextPtr) const override; + std::optional sortingKey(ContextPtr) const override; + protected: ObjectIterator iterate(const ActionsDAG * filter_dag, FileProgressCallback callback, size_t list_batch_size, ContextPtr local_context) const override; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h index ec1553b65212..53eb4ec9d390 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h @@ -17,6 +17,8 @@ struct IcebergDataSnapshot std::optional total_rows; std::optional total_bytes; std::optional total_position_delete_rows; + std::optional partition_key; + std::optional sorting_key; std::optional getTotalRows() const { diff --git a/src/Storages/System/StorageSystemTables.cpp b/src/Storages/System/StorageSystemTables.cpp index ff1d570a8024..7c28b0558bb5 100644 --- a/src/Storages/System/StorageSystemTables.cpp +++ b/src/Storages/System/StorageSystemTables.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include #include @@ -604,18 +606,54 @@ class TablesBlockSource : public ISource ASTPtr expression_ptr; if (columns_mask[src_index++]) { - if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST())) - res_columns[res_index++]->insert(format({context, *expression_ptr})); - else - res_columns[res_index++]->insertDefault(); + bool inserted = false; + // Extract from specific DataLake metadata if suitable + if (auto * obj = dynamic_cast(table.get())) + { + if (auto * dl_meta = obj->getExternalMetadata(context)) + { + if (auto p = dl_meta->partitionKey(context); p.has_value()) + { + res_columns[res_index++]->insert(*p); + inserted = true; + } + } + + } + + if (!inserted) + { + if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST())) + res_columns[res_index++]->insert(format({context, *expression_ptr})); + else + res_columns[res_index++]->insertDefault(); + } } if (columns_mask[src_index++]) { - if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast)) - res_columns[res_index++]->insert(format({context, *expression_ptr})); - else - res_columns[res_index++]->insertDefault(); + bool inserted = false; + + // Extract from specific DataLake metadata if suitable + if (auto * obj = dynamic_cast(table.get())) + { + if (auto * dl_meta = obj->getExternalMetadata(context)) + { + if (auto p = dl_meta->sortingKey(context); p.has_value()) + { + res_columns[res_index++]->insert(*p); + inserted = true; + } + } + } + + if (!inserted) + { + if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast)) + res_columns[res_index++]->insert(format({context, *expression_ptr})); + else + res_columns[res_index++]->insertDefault(); + } } if (columns_mask[src_index++]) diff --git a/tests/integration/test_storage_iceberg/test.py b/tests/integration/test_storage_iceberg/test.py index bcce89ac357a..791234dece09 100644 --- a/tests/integration/test_storage_iceberg/test.py +++ b/tests/integration/test_storage_iceberg/test.py @@ -499,7 +499,7 @@ def make_query_from_function( # Cluster Query with node1 as coordinator and storage type as arg select_cluster_with_type_arg, query_id_cluster_with_type_arg = make_query_from_function( run_on_cluster=True, - storage_type_as_arg=True, + storage_type_as_arg=True, ) # Cluster Query with node1 as coordinator and storage type in named collection @@ -1042,7 +1042,7 @@ def test_metadata_file_selection_from_version_hint(started_cluster, format_versi spark.sql( f"INSERT INTO {TABLE_NAME} select id, char(id + ascii('a')) from range(10)" ) - + # test the case where version_hint.text file contains just the version number with open(f"/iceberg_data/default/{TABLE_NAME}/metadata/version-hint.text", "w") as f: f.write('5') @@ -2645,7 +2645,7 @@ def test_writes_create_table(started_cluster, format_version, storage_type): with pytest.raises(Exception): create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, "(x String)", format_version) - create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, "(x String)", format_version, "", True) + create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, "(x String)", format_version, "", True) assert '`x` String' in instance.query(f"SHOW CREATE TABLE {TABLE_NAME}") @@ -2710,7 +2710,7 @@ def test_relevant_iceberg_schema_chosen(started_cluster, storage_type): instance = started_cluster.instances["node1"] spark = started_cluster.spark_session TABLE_NAME = "test_relevant_iceberg_schema_chosen_" + storage_type + "_" + get_uuid_str() - + spark.sql( f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( @@ -3211,7 +3211,7 @@ def get_iceberg_metadata_to_dict(query_id: str): result[name] = list(filter(lambda x: len(x) > 0, result[name])) result['row_in_file'] = list(map(lambda x : int(x) if x.isdigit() else None, result['row_in_file'])) return result - + def verify_result_dictionary(diction : dict, allowed_content_types : set): # Expected content_type and only it is present if set(diction['content_type']) != allowed_content_types: @@ -3254,7 +3254,7 @@ def verify_result_dictionary(diction : dict, allowed_content_types : set): raise ValueError("Row should be specified for an entry {}, file_path: {}".format(diction['content_type'][i], file_path)) number_of_missing_row_values += 1 - + # We have exactly one metadata file if number_of_missing_row_values != 1: raise ValueError("Not a one row value (corresponding to metadata file) is missing for file path: {}".format(file_path)) @@ -3492,3 +3492,49 @@ def compare_selects(query): compare_selects(f"SELECT _path,* FROM {creation_expression} ORDER BY ALL") compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE name_old='vasily' ORDER BY ALL") compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL") + + +@pytest.mark.parametrize("storage_type", ["s3"]) +def test_system_tables_partition_sorting_keys(started_cluster, storage_type): + instance = started_cluster.instances["node1"] + spark = started_cluster.spark_session + + table_name = f"test_sys_tables_keys_{storage_type}_{uuid.uuid4().hex[:8]}" + fq_table = f"spark_catalog.default.{table_name}" + + spark.sql(f"DROP TABLE IF EXISTS {fq_table}") + spark.sql(f""" + CREATE TABLE {fq_table} ( + id INT, + ts TIMESTAMP, + payload STRING + ) + USING iceberg + PARTITIONED BY (bucket(16, id), day(ts)) + TBLPROPERTIES ('format-version' = '2') + """) + spark.sql(f"ALTER TABLE {fq_table} WRITE ORDERED BY (id DESC NULLS LAST, hour(ts))") + spark.sql(f""" + INSERT INTO {fq_table} VALUES + (1, timestamp'2024-01-01 10:00:00', 'a'), + (2, timestamp'2024-01-02 11:00:00', 'b'), + (NULL, timestamp'2024-01-03 12:00:00', 'c') + """) + + time.sleep(2) + default_upload_directory( + started_cluster, + storage_type, + f"/iceberg_data/default/{table_name}/", + f"/iceberg_data/default/{table_name}/", + ) + + create_iceberg_table(storage_type, instance, table_name, started_cluster) + + res = instance.query(f""" + SELECT partition_key, sorting_key + FROM system.tables + WHERE name = '{table_name}' FORMAT csv + """).strip().lower() + + assert res == '"bucket(16, id), day(ts)","id desc, hour(ts) asc"'