Skip to content

Commit

Permalink
Delta Universal Format (UniForm) allows you to read Delta tables with…
Browse files Browse the repository at this point in the history
… Iceberg clients.

## Description

UniForm takes advantage of the fact that both Delta and Iceberg consist of Parquet data files and a metadata layer. UniForm automatically generates Iceberg metadata asynchronously, allowing Iceberg clients to read Delta tables as if they were Iceberg tables. You can expect negligible Delta write overhead when UniForm is enabled, as the Iceberg conversion and transaction occurs asynchronously after the Delta commit.

A single copy of the data files provides access to both format clients.

This PR adds the implementation for Universal Format (Iceberg) as well as the IcebergCompatV1 protocol validation.

To create a table with UniForm:

```sql
CREATE TABLE T(c1 INT) USING DELTA SET TBLPROPERTIES(
  'delta.universalFormat.enabledFormats' = 'iceberg');
```

To enable UniForm on an existing table

```sql
ALTER TABLE T SET TBLPROPERTIES(
  'delta.columnMapping.mode' = 'name',
  'delta.universalFormat.enabledFormats' = 'iceberg');
```

See the IcebergCompatV1 protocol specification PR here: #1869.

New UT `iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertToIcebergSuite.scala` as well as manual local publishing and integration testing with two spark shells, one loaded with Delta, the other with Iceberg.

## Does this PR introduce _any_ user-facing changes?

Optional delta table property `delta.universalFormat.enabledFormats`.

Closes #1870

GitOrigin-RevId: 8a4723680b12bb112190ee1f94a5eae9c4904a83
  • Loading branch information
scottsand-db committed Jun 28, 2023
1 parent 27111ee commit 9b50cd2
Show file tree
Hide file tree
Showing 24 changed files with 2,622 additions and 8 deletions.
57 changes: 50 additions & 7 deletions build.sbt
Expand Up @@ -272,24 +272,67 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
)
)

val icebergSparkRuntimeArtifactName = {
val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion)
s"iceberg-spark-runtime-$expMaj.$expMin"
}

// Build using: build/sbt clean icebergShaded/compile iceberg/compile
// It will fail the first time, just re-run it.
lazy val iceberg = (project in file("iceberg"))
.dependsOn(spark % "compile->compile;test->test;provided->provided")
.settings (
name := "delta-iceberg",
commonSettings,
scalaStyleSettings,
releaseSettings,
libraryDependencies ++= Seq( {
val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion)
("org.apache.iceberg" % s"iceberg-spark-runtime-$expMaj.$expMin" % "1.3.0" % "provided")
.cross(CrossVersion.binary)
},
libraryDependencies ++= Seq(
// Fix Iceberg's legacy java.lang.NoClassDefFoundError: scala/jdk/CollectionConverters$ error
// due to legacy scala.
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1"
)
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.1",
"org.apache.iceberg" %% icebergSparkRuntimeArtifactName % "1.3.0" % "provided",
"com.github.ben-manes.caffeine" % "caffeine" % "2.9.3"
),
Compile / unmanagedJars += (icebergShaded / assembly).value,
// Generate the assembly JAR as the package JAR
Compile / packageBin := assembly.value,
assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
assembly / logLevel := Level.Info,
assembly / test := {},
assemblyPackageScala / assembleArtifact := false
)

lazy val generateIcebergJarsTask = TaskKey[Unit]("generateIcebergJars", "Generate Iceberg JARs")

lazy val icebergShaded = (project in file("icebergShaded"))
.dependsOn(spark % "provided")
.settings (
name := "iceberg-shaded",
commonSettings,
skipReleaseSettings,

// Compile, patch and generated Iceberg JARs
generateIcebergJarsTask := {
import sys.process._
val scriptPath = baseDirectory.value / "generate_iceberg_jars.py"
// Download iceberg code in `iceberg_src` dir and generate the JARs in `lib` dir
Seq("python3", scriptPath.getPath)!
},
Compile / unmanagedJars := (Compile / unmanagedJars).dependsOn(generateIcebergJarsTask).value,
cleanFiles += baseDirectory.value / "iceberg_src",
cleanFiles += baseDirectory.value / "lib",

// Generated shaded Iceberg JARs
Compile / packageBin := assembly.value,
assembly / assemblyJarName := s"${name.value}_${scalaBinaryVersion.value}-${version.value}.jar",
assembly / logLevel := Level.Info,
assembly / test := {},
assembly / assemblyShadeRules := Seq(
ShadeRule.rename("org.apache.iceberg.**" -> "shadedForDelta.@0").inAll,
),
assemblyPackageScala / assembleArtifact := false,
// Make the 'compile' invoke the 'assembly' task to generate the uber jar.
)

lazy val hive = (project in file("connectors/hive"))
.dependsOn(standaloneCosmetic)
Expand Down

0 comments on commit 9b50cd2

Please sign in to comment.