Spark: add procedure to generate symlink manifests#4401
Spark: add procedure to generate symlink manifests#4401jackye1995 wants to merge 1 commit intoapache:masterfrom jackye1995:symlink
Conversation
| if (partitionType.fields().isEmpty()) { | ||
| entries.select("data_file.file_path") | ||
| .write() | ||
| .format("parquet") |
There was a problem hiding this comment.
[question] should the format here be "text" instead ? As per this : https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/SymlinkTextInputFormat.java#L47-L52
|
|
||
| Types.StructType partitionType = Partitioning.partitionType(icebergTable); | ||
| Dataset<Row> entries = SparkTableUtil.loadCatalogMetadataTable(spark(), icebergTable, MetadataTableType.ENTRIES) | ||
| .filter("status < 2 AND data_file.content = 0"); |
There was a problem hiding this comment.
we should cache the DF as well, otherwise we will end up scanning the metaDataTable twice. in :
- count()
- write()
| private void checkDirectoryExists(String... paths) { | ||
| String path = String.join("/", paths); | ||
| Assert.assertTrue("Directory should exist: " + path, Files.isDirectory(Paths.get(URI.create(path)))); | ||
| } |
There was a problem hiding this comment.
should we also check the symlink folder when being used as the external table is return's same result as iceberg table ? WDYT
kbendick
left a comment
There was a problem hiding this comment.
Thanks for this addition @jackye1995!
Regarding merge-on-read, generated symlink table does not consider delete files. This is basically a "snapshot view" of the table. A compaction is needed to generate the most up-to-date view of the table.
Should we error out if there are delete files present?
We can add an option to still allow for ignoring them to get this "snapshot view", but I think by default a symlink manifest shouldn't be generated that can represent a table at a state it never existed in.
For example, if we're using MOR, and we have one file that gets written as a plain.data file (as it only contains appends), and then some number of delta files, the current proposed symlink manifest based table will contain that new file but won't contain the deltas.
Correct me if I'm wrong, but I think that would be a state the table was never in. I can see how it would avoid unnecessary compute to skip trying to process any delta files, e.g. if the user did compact it ahead of time, but I don't think that should be the default for the procedure.
The default output should return a symlink that represents the table at least as it is at some point in time.
**in the above default location "<table_root>/_symlink_format_manifest/<snapshot_id>", in either case, does this mean whenever I regenerate the symlink_format_manifest file, i have to alter the external table location to reflect the latest manifest file ? how to execute the procedure to override / exclude the snapshot_id prefix and generate manifest file like below |
| Preconditions.checkArgument(tableIdent != null && !tableIdent.isEmpty(), | ||
| "Cannot handle an empty identifier for argument table"); | ||
|
|
||
| CatalogPlugin defaultCatalog = spark().sessionState().catalogManager().currentCatalog(); |
There was a problem hiding this comment.
Nit: these lines can be replaced by: Spark3Util.loadIcebergTable()
| int len = tableLocation.length(); | ||
| StringBuilder sb = new StringBuilder(); | ||
| sb.append(tableLocation); | ||
| if (sb.charAt(len - 1) != '/') { |
There was a problem hiding this comment.
Optional: can save one line above to get "len" by just checking tableLocation.endsWith(). (Not sure if there's any other reason to do it this way)
| public InternalRow[] call(InternalRow args) { | ||
| String tableIdent = args.getString(0); | ||
| Preconditions.checkArgument(tableIdent != null && !tableIdent.isEmpty(), | ||
| "Cannot handle an empty identifier for argument table"); |
There was a problem hiding this comment.
Nit: maybe a slightly more user-friendly error message, "table cannot be null or empty"?
| "Cannot generate symlink manifests for an empty table"); | ||
|
|
||
| long snapshotId = icebergTable.currentSnapshot().snapshotId(); | ||
| String symlinkRootLocation = args.isNullAt(1) ? |
There was a problem hiding this comment.
Curious, do we need to add any validation that path ends with /? (Noticed we put a / at the end of the default path, not sure if that was a hard requirement)
| sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); | ||
| List<Object[]> result = sql("CALL %s.system.generate_symlink_format_manifest('%s')", catalogName, tableIdent); | ||
| Table table = validationCatalog.loadTable(tableIdent); | ||
| List<Object[]> expected = Lists.newArrayList(); |
There was a problem hiding this comment.
Nit: can we use Lists.newArrayList(row(table...))?
(same for other tests)
| String path = String.join("/", paths); | ||
| Assert.assertTrue("Directory should exist: " + path, Files.isDirectory(Paths.get(URI.create(path)))); | ||
| } | ||
| } |
There was a problem hiding this comment.
If not too hard, can we add one more test about evolving partition spec?
| List<Object[]> expected = Lists.newArrayList(); | ||
| expected.add(row(table.currentSnapshot().snapshotId(), 2L)); | ||
| assertEquals("Should find 2 files", expected, result); | ||
| checkDirectoryExists(customLocation); |
There was a problem hiding this comment.
Can we add check (maybe in this test or another), of directory exists under customLocation for each partition? To make sure that custom location works with partitioned table.
| } | ||
|
|
||
| @Override | ||
| public InternalRow[] call(InternalRow args) { |
There was a problem hiding this comment.
Do you think it's beneficial to add an Action and have Procedure call it, like the other ones? I am not sure. cc @RussellSpitzer @aokolnychyi if any thoughts.
|
@jackye1995 Is there a way to do the same thing using Java API? How can we read the location for all of data files given tableId and snapshotId? |
@prashantgohel1 You may have already gotten it, but If you're looking for getting the data files for a table at a given snapshot, going through the Table APIs table.snapshot to get the snapshot, and then using snapshot's data manifest API to get all the data file manifests should work.https://github.com/apache/iceberg/blob/master/api/src/main/java/org/apache/iceberg/Snapshot.java#L101 Then it's a matter of using the Java library to read the manifest file. https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/ManifestFiles.java#L71 Another way is to query the data files metadata table. |
|
Discussed offline with @jackye1995 I'll be carrying this PR forward. So my thinking is the following: 1.) I do think we need to have an explicit flag for ignoring deletes, it makes sense to me that the operation fails if there are deletes. This is because as a user, by default, I expect the actual state of the table to be represented in the symlink file. If the operation fails, it does force a user to do a compaction prior to running this procedure but I think that makes sense. And then if they really don't want this behavior they can pass in the flag to ignore deletes @kbendick @jackye1995 let me know your thinking or we can discuss on the PRs I plan on raising. 2.) Looks like there's interest in having this be an actual Spark Action, I will add that. @jackye1995 Feel free to close this, I will add you as a co-author on the PR for the procedure implementation. Thanks! |
Add a Spark procedure to generate symlink manifests, so that systems without Iceberg support can read Iceberg table data using an external table:
I did not add an action for this because this is to just give a gateway for users with any existing query engine that does not natively support Iceberg (in my case it's Redshift Spectrum) to start reading Iceberg, because most engines support Hive with symlink input format to some extent. If we think it deserves an action in core API, I can also add that.
The procedure looks like:
The
symlink_root_locationis optional. The default is<table_root>/_symlink_format_manifest/<snapshot_id>. A snapshot ID suffix is added because if this procedure is executed twice against the same table, we don't want to mix the results if the table is updated. If users want to use a consistent root path for the symlink table, it could be input as an override.I thought about adding another option for
snapshot_idin the input, so we can generate a symlink table for any historical snapshots, but decided to not do that to avoid making the procedure too complicated. We can add it as a follow up if needed.The procedure currently returns the
snapshot_idthat the procedure is executed against, anddata_file_countfor the number of data files in the symlink manifests.Regarding partitioning, the generated symlink table exposes all the hidden partitions, and use the union of all historical table partition specs. For example, if the table is partitioned by spec1
category, spec 2bucket(16, id), users are expected to create a symlink table withPARTITIONED BY (id_bucket int, category string).Regarding merge-on-read, generated symlink table does not consider delete files. This is basically a "snapshot view" of the table. A compaction is needed to generate the most up-to-date view of the table.