New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hudi uniform support #2333
Hudi uniform support #2333
Conversation
This is my first time contributing to Delta Lake and I need some assistance on a few points:
|
These are the instructions that work for me. Can you tell bit more about the Kerberos issue?
@lzlfred ^
@lzlfred ^ |
I run into this error which seems like it may be a version issue?
|
Not sure how this was tested, perhaps hadoop and hadoop-common both need to be pinned to the same version, something like 3.3.6 |
@the-other-tim-brown Thanks for putting this together. Delta appreciate this effort and lets work together to add the support.
|
|
Yes sure. at this point we do not need to read the hudi table by Spark yet.
myself is not a sbt expert either... i will circle this back to the committers and see. |
without being able to review this in detail yet, here are a few high level feedbacks:
I will review this in more detail in the next few weeks. Thanks for the effort ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks great ! plz do
- clean up and make separate Hudi package as previously mentioned.
- do a grep and fix other Iceberg occurrences mostly in comments.
import scala.collection.JavaConverters._ | ||
|
||
/** | ||
* Used to prepare (convert) and then commit a set of Delta actions into the Iceberg table located |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Used to prepare (convert) and then commit a set of Delta actions into the Iceberg table located | |
* Used to prepare (convert) and then commit a set of Delta actions into the Hudi table metadata located |
* at the same path as [[postCommitSnapshot]] | ||
* | ||
* | ||
* @param conf Configuration for Iceberg Hadoop interactions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param conf Configuration for Iceberg Hadoop interactions. | |
* @param conf Configuration for Hudi Hadoop interactions. |
* | ||
* | ||
* @param conf Configuration for Iceberg Hadoop interactions. | ||
* @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit. | |
* @param postCommitSnapshot Latest Delta snapshot associated with this Hudi commit. |
* | ||
* @param conf Configuration for Iceberg Hadoop interactions. | ||
* @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit. | ||
* @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE. | |
* @param tableOp How to instantiate the underlying Hudi table. Defaults to WRITE_TABLE. |
import scala.util.control.NonFatal | ||
|
||
object HudiConverter { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
if (!UniversalFormat.hudiEnabled(snapshotToConvert.metadata)) { | ||
return None | ||
} | ||
txn.catalogTable match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg almost always require a catalog so we had this requirement. If thats not true for Hudi, I would suggest to release this contraint, otherwise it wont work if we access/write to the Delta table by path.
However on the other hand I get the fact that we need to pass in the tableName to Hudi table creator.
Can we add a TODO here saying requiring a catalogTable is a constraint that we can release in the future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't require a catalog for Hudi. I just figured if there is a name present we can use it when creating the table.
!UniversalFormat.hudiEnabled(postCommitSnapshot.metadata)) { | ||
return | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
} | ||
|
||
private def runArchiver(table: HoodieJavaTable[_ <: HoodieAvroPayload], | ||
config: HoodieWriteConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ident... plz ident 4 space from private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
private def markInstantsAsCleaned(table: HoodieJavaTable[_], | ||
writeConfig: HoodieWriteConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ident
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
private def getWriteConfig(schema: Schema, | ||
numCommitsToKeep: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@lzlfred I've moved the code into a separate package but I do not have any experience with setting up a new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what a epic feature ! thanks for all efforts.
* removed from the wait queue. Only one snapshot is queued at any point of time. | ||
* | ||
*/ | ||
override def enqueueSnapshotForConversion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we want to consolidate all these async logic with iceberg conversion... I will take that part and I am not asking for a TODO from this PR.. .
isNullable) | ||
// TODO: Add List and Map support: https://github.com/delta-io/delta/issues/2738 | ||
case ArrayType(elementType, containsNull) => | ||
throw new UnsupportedOperationException("UniForm doesn't support Array columns") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: say Uniform Hudi
throw new UnsupportedOperationException("UniForm doesn't support Array columns") | ||
|
||
case MapType(keyType, valueType, valueContainsNull) => | ||
throw new UnsupportedOperationException("UniForm doesn't support Map columns") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above.
case atomicType: AtomicType => convertAtomic(atomicType, isNullable) | ||
|
||
case other => | ||
throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Iceberg") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Iceberg") | |
throw new UnsupportedOperationException(s"Cannot convert Delta type $other to Hudi") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SBT related changes LGTM
|
||
package org.apache.spark.sql.delta.hooks | ||
|
||
import org.apache.spark.sql.SparkSession |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idk why scala style does not report error, while plz follow
/spark/src/main/scala/org/apache/spark/sql/delta/hooks/IcebergConverterHook.scala
and make import to be
import org.apache.spark.sql.delta.{OptimisticTransactionImpl, Snapshot, UniversalFormat}
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTable | ||
import org.apache.spark.sql.types.{ArrayType, NullType, MapType} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz swap NullType and MapType to be alphabetical.
Which Delta project/connector is this regarding?
Description
How was this patch tested?
Some basic tests are added
Does this PR introduce any user-facing changes?
Yes, this allows users to expose their Delta tables as Hudi