Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds AddFiles Procedure #2210

Merged
merged 18 commits into from
Mar 23, 2021
Merged

Conversation

RussellSpitzer
Copy link
Member

@RussellSpitzer RussellSpitzer commented Feb 4, 2021

No description provided.

This procedure mimics our old support for "IMPORT DATA" but does not allow for dynamic
overwriting of files in partitions. This will now require a seperate DELETE command to
remove a partition. Other than that, the capabilites are identical to the previous functionality
except now the functionality is a procedure rather than an SQL command.
@github-actions github-actions bot added the spark label Feb 4, 2021
@RussellSpitzer
Copy link
Member Author

Implementation of #2068

@aokolnychyi
Copy link
Contributor

Thanks for working on this, @RussellSpitzer! Let me take a look today.

}
validatePartitionSpec(table, dataPath, fs, partition);

if (table.properties().get(TableProperties.DEFAULT_NAME_MAPPING) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am +1 on this too. I think it will be safer.

@pvary
Copy link
Contributor

pvary commented Feb 18, 2021

I am looking for a way to migrate a native Hive table to a Hive table backed by an Iceberg table, and during my search I found this PR (along with the #2068 - Procedure for adding files to a Table), but I have also found the same thing for Flink: #2217 (Flink : migrate hive table to iceberg table).

Maybe we should factor-out the common parts to a place which is accessible for both Spark and Flink (and for Hive as well). Like a common java API or something like it. Would this be possible?

@RussellSpitzer
Copy link
Member Author

@pvary we also have the migrate and snapshot actions

fs = dataPath.getFileSystem(conf);
isFile = fs.getFileStatus(dataPath).isFile();
} catch (IOException e) {
throw new RuntimeException("Unable to access add_file path", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe UncheckedIOException makes more sense. Or maybe just replace with Util.getFs ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to go back through the PR and switch these to unchecked IO's, this is a good suggestion but in the midst of refactoring I forgot to do it.

table.name(), partition.size(), partitionFields.size()));
}
partitionFields.forEach(field -> {
if (!partition.containsKey(field.name())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the above discussion, I gather that Hive partitions can also be imported? Do we see issues here around Hive's lowercasing of columns? If yes, then something to consider for future enhancements...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this could be a problem if the entries that Spark returns are different than those we expect based on column names, i'll need to add some hive tests for that

Previously we always just used the Spark InMemory file tool to list the partitions in a table
regardless of whether it was a pure file based table or a Hive based table. Now instead we
have two distinct paths, either we have a file based table which is listed like `parquet.path`
or a hive table which has the standard `database.table`. When a table identifier is give for
hive we attempt to read the Catalog's partition listing which lets us discover alternate
partition locations and match the underlying hive catalog.
Change function calls and some error messages
There seems to be a classpath issue with ORC and Spark, not Iceberg related. I'll
Check this out tomorrow
@RussellSpitzer
Copy link
Member Author

I'm having issues with writing to ORC tables from spark in our tests, probably a version conflict?

Caused by: java.lang.NoSuchMethodError: org.apache.orc.TypeDescription.createRowBatch()Lorg/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch;

@RussellSpitzer
Copy link
Member Author

We have some strange classpath issues with ORC in our test classpath, to make things worse, when I attempt to debug the issue the debug console does not have the same issue and can call the missing method without issue.

@pvary
Copy link
Contributor

pvary commented Mar 10, 2021

@pvary + @aokolnychyi Should be ready for another look if ya'll have time, Thanks in advance!

LGTM on Hive related stuff. Only minors and mostly questions.
When we start working on Hive migration definitely will move some of the code from the SparkTableUtil, Spark3Util to some more accessible place, but that's another story.
Too much Spark code to be comfortable giving a +1.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few nits and I'd love to see a couple of Avro tests. Other than that, looks ready to go.

Thanks for the great work, @RussellSpitzer!

}


@Ignore // Classpath issues prevent us from actually writing to a Spark ORC table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a couple of tests for Avro?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@github-actions github-actions bot added the INFRA label Mar 16, 2021
Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few nits/questions but should be ready to go otherwise.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I think we need to revert no longer needed change in the style file and it should be good to go. There were a few super minor nits but that's optional.

Thanks for the hard work, @RussellSpitzer! I am sure this will be really useful.

@aokolnychyi aokolnychyi merged commit f0a6b71 into apache:master Mar 23, 2021
@aokolnychyi
Copy link
Contributor

Thank you, @RussellSpitzer! Thanks everyone who participated in the review!

@aokolnychyi
Copy link
Contributor

@RussellSpitzer, could you please create issues for ORC and Avro tests? We already have an issue to not depend on Spark in-memory file index.

@RussellSpitzer
Copy link
Member Author

Thanks everybody! I think the review period really helped make this code better, I hope it's of use to folks in the future.

@RussellSpitzer RussellSpitzer deleted the AddFilesProcedure branch March 23, 2021 15:03
coolderli pushed a commit to coolderli/iceberg that referenced this pull request Apr 26, 2021
stevenzwu pushed a commit to stevenzwu/iceberg that referenced this pull request Jul 28, 2021
@zhaoyuxiaoxiao
Copy link

@RussellSpitzer hello,How to associate an existing ICEBERG table with a hive table。What is the syntax of sparkSQL? Can you give an example?thanks

@RussellSpitzer
Copy link
Member Author

I'm not sure what you are asking, the syntax for this command is

CALL spark_catalog.system.add_files(
  table => 'db.tbl',
  source_table => '`parquet`.`path/to/table`'
)

I'll be adding docs shortly but you should probably ask your question with more detail about what you are trying to accomplish on the mailing list or the Slack since this may not do what you expect it to do.

@talgos1
Copy link

talgos1 commented Nov 22, 2021

Hi, I'm wondering if it's possible to add a specific file (or even an unpartitioned parquet/orc table) to a partitioned iceberg table (specific partition)?

@rdblue
Copy link
Contributor

rdblue commented Nov 22, 2021

@talgos1, have you tried using a full file path as the source table?

@talgos1
Copy link

talgos1 commented Nov 23, 2021

@talgos1, have you tried using a full file path as the source table?

@rdblue
Using the CALL command, it expects the source to have same partition spec as destination (got an error the source has no partitions)

Since my last comment, I succeeded doing that using the spark and java APIs and explicitly defining a synthetic SparkPartition for the file/path

// Define the source file
val uri = "/some/path/to/orc/file.orc"
val format = "orc"
val partitionSpec: util.Map[String, String] = Map("some_partition_key" -> "some_partition_value").asJava
// Define a synthetic spark partition
val sparkPartition = new SparkPartition(partitionSpec, uri,  format)

// Do the add call for importing partitioned source table
SparkTableUtil.importSparkPartitions(spark, Seq(sparkPartition).asJava, table, spec, stagingDir)

WDYT?

@rdblue
Copy link
Contributor

rdblue commented Nov 23, 2021

I'm glad you were able to find a way to get it working! We may want to update this so that we can detect when there's only a single file and handle that case. But in the meantime, it looks like this is a good way to get it working using the API directly.

@chenwyi2
Copy link

there is a way to import a hive table into a specific parition in iceberg? the partition schema between hive table and iceberg is different, for example, a hive table has two partitions but a iceberg tablehas three paritions whcih contains a bucket(id) partition, we want import this hive table into a specific parition in iceberg. when we add files into iceberg error with "because that table is partitioned and contains non-identitypartition transforms which will not be compatible. Found non-identity fields [1000: bucketid: bucket2] "

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants