Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][KERNEL][VARIANT] Add read in delta kernel #3020

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 35 additions & 21 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ val sparkVersion = settingKey[String]("Spark version")
spark / sparkVersion := getSparkVersion()
connectCommon / sparkVersion := getSparkVersion()
connectServer / sparkVersion := getSparkVersion()
kernelDefaults / sparkVersion := getSparkVersion()
goldenTables / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
Expand Down Expand Up @@ -144,6 +146,25 @@ lazy val commonSettings = Seq(
unidocSourceFilePatterns := Nil,
)

/**
* Java-/Scala-/Uni-Doc settings aren't working yet against Spark Master.
1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
generating delta-spark unidoc compiles delta-iceberg
2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
issue above.
*/
def crossSparkUniDocSettings(): Seq[Setting[_]] = getSparkVersion() match {
case LATEST_RELEASED_SPARK_VERSION => Seq(
// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq()
}

/**
* Note: we cannot access sparkVersion.value here, since that can only be used within a task or
* setting macro.
Expand All @@ -158,12 +179,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
Compile / unmanagedSourceDirectories += (Compile / baseDirectory).value / "src" / "main" / "scala-spark-3.5",
Test / unmanagedSourceDirectories += (Test / baseDirectory).value / "src" / "test" / "scala-spark-3.5",
Antlr4 / antlr4Version := "4.9.3",

// Java-/Scala-/Uni-Doc Settings
scalacOptions ++= Seq(
"-P:genjavadoc:strictVisibility=true" // hide package private types and methods in javadoc
),
unidocSourceFilePatterns := Seq(SourceFilePattern("io/delta/tables/", "io/delta/exceptions/"))
)

case SPARK_MASTER_VERSION => Seq(
Expand All @@ -188,13 +203,6 @@ def crossSparkSettings(): Seq[Setting[_]] = getSparkVersion() match {
"--add-opens=java.base/sun.security.action=ALL-UNNAMED",
"--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
)

// Java-/Scala-/Uni-Doc Settings
// This isn't working yet against Spark Master.
// 1) delta-spark on Spark Master uses JDK 17. delta-iceberg uses JDK 8 or 11. For some reason,
// generating delta-spark unidoc compiles delta-iceberg
// 2) delta-spark unidoc fails to compile. spark 3.5 is on its classpath. likely due to iceberg
// issue above.
)
}

Expand All @@ -221,6 +229,7 @@ lazy val connectCommon = (project in file("spark-connect/common"))
name := "delta-connect-common",
commonSettings,
crossSparkSettings(),
crossSparkUniDocSettings(),
releaseSettings,
Compile / compile := runTaskOnlyOnSparkMaster(
task = Compile / compile,
Expand Down Expand Up @@ -280,6 +289,7 @@ lazy val connectServer = (project in file("spark-connect/server"))
emptyValue = ()
).value,
crossSparkSettings(),
crossSparkUniDocSettings(),
libraryDependencies ++= Seq(
"com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf",

Expand Down Expand Up @@ -307,6 +317,7 @@ lazy val spark = (project in file("spark"))
sparkMimaSettings,
releaseSettings,
crossSparkSettings(),
crossSparkUniDocSettings(),
libraryDependencies ++= Seq(
// Adding test classifier seems to break transitive resolution of the core dependencies
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "provided",
Expand Down Expand Up @@ -450,6 +461,7 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
scalaStyleSettings,
javaOnlyReleaseSettings,
Test / javaOptions ++= Seq("-ea"),
crossSparkSettings(),
libraryDependencies ++= Seq(
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "1.7.36",
Expand Down Expand Up @@ -504,6 +516,7 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
scalaStyleSettings,
javaOnlyReleaseSettings,
Test / javaOptions ++= Seq("-ea"),
crossSparkSettings(),
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client-runtime" % hadoopVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5",
Expand All @@ -520,10 +533,10 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.openjdk.jmh" % "jmh-core" % "1.37" % "test",
"org.openjdk.jmh" % "jmh-generator-annprocess" % "1.37" % "test",

"org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
),
javaCheckstyleSettings("dev/kernel-checkstyle.xml"),
// Unidoc settings
Expand Down Expand Up @@ -1218,14 +1231,15 @@ lazy val goldenTables = (project in file("connectors/golden-tables"))
name := "golden-tables",
commonSettings,
skipReleaseSettings,
crossSparkSettings(),
libraryDependencies ++= Seq(
// Test Dependencies
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests"
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests"
)
)

Expand Down
10 changes: 10 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.PartitionUtils;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.internal.util.VariantUtils;

/**
* Represents a scan of a Delta table.
Expand Down Expand Up @@ -196,6 +197,15 @@ public FilteredColumnarBatch next() {
nextDataBatch = nextDataBatch.withDeletedColumnAt(rowIndexOrdinal);
}

// Transform physical variant columns (struct of binaries) into logical variant
// columns.
if (ScanStateRow.getVariantFeatureEnabled(scanState)) {
nextDataBatch = VariantUtils.withVariantColumns(
engine.getExpressionHandler(),
nextDataBatch
);
}

// Add partition columns
nextDataBatch =
PartitionUtils.withPartitionColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ default ArrayValue getArray(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Return the variant value located at {@code rowId}. Returns null if the slot for {@code rowId}
* is null
*/
default VariantValue getVariant(int rowId) {
throw new UnsupportedOperationException("Invalid value request for data type");
}

/**
* Get the child vector associated with the given ordinal. This method is applicable only to the
* {@code struct} type columns.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ default ColumnarBatch slice(int start, int end) {
throw new UnsupportedOperationException("Not yet implemented!");
}

/**
* Return a copy of this {@link ColumnarBatch} with the column at given {@code ordinal}
* replaced with {@code newVector} and the schema field at given {@code ordinal} replaced
* with {@code newColumnSchema}.
*
* @param ordinal Ordinal of the column vector to replace.
* @param newColumnSchema The schema field of the new column.
* @param newVector New column vector that will replace the column vector at the given
* {@code ordinal}.
* @return {@link ColumnarBatch} with a new column vector at the given ordinal.
*/
default ColumnarBatch withReplacedColumnVector(int ordinal, StructField newColumnSchema,
ColumnVector newVector) {
throw new UnsupportedOperationException("Not yet implemented!");
}

/**
* @return iterator of {@link Row}s in this batch
*/
Expand Down
6 changes: 6 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/data/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,10 @@ public interface Row {
* Throws error if the column at given ordinal is not of map type,
*/
MapValue getMap(int ordinal);

/**
* Return variant value of the column located at the given ordinal.
* Throws error if the column at given ordinal is not of variant type.
*/
VariantValue getVariant(int ordinal);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.data;

/**
* Abstraction to represent a single Variant value in a {@link ColumnVector}.
*/
public interface VariantValue {
byte[] getValue();

byte[] getMetadata();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static void validateReadSupportedTable(
break;
case "deletionVectors": // fall through
case "timestampNtz": // fall through
case "variantType-preview": // fall through
case "vacuumProtocolCheck": // fall through
case "v2Checkpoint":
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.StructType;

/**
Expand Down Expand Up @@ -111,5 +112,10 @@ public MapValue getMap(int ordinal) {
return getChild(ordinal).getMap(rowId);
}

@Override
public VariantValue getVariant(int ordinal) {
return getChild(ordinal).getVariant(rowId);
}

protected abstract ColumnVector getChild(int ordinal);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.data.ArrayValue;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.data.VariantValue;
import io.delta.kernel.types.*;

/**
Expand Down Expand Up @@ -134,6 +135,12 @@ public MapValue getMap(int ordinal) {
return (MapValue) getValue(ordinal);
}

@Override
public VariantValue getVariant(int ordinal) {
throwIfUnsafeAccess(ordinal, VariantType.class, "variant");
return (VariantValue) getValue(ordinal);
}

private Object getValue(int ordinal) {
return ordinalToValue.get(ordinal);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ScanStateRow extends GenericRow {
.add("partitionColumns", new ArrayType(StringType.STRING, false))
.add("minReaderVersion", IntegerType.INTEGER)
.add("minWriterVersion", IntegerType.INTEGER)
.add("variantFeatureEnabled", BooleanType.BOOLEAN)
.add("tablePath", StringType.STRING);

private static final Map<String, Integer> COL_NAME_TO_ORDINAL =
Expand All @@ -64,6 +65,10 @@ public static ScanStateRow of(
valueMap.put(COL_NAME_TO_ORDINAL.get("partitionColumns"), metadata.getPartitionColumns());
valueMap.put(COL_NAME_TO_ORDINAL.get("minReaderVersion"), protocol.getMinReaderVersion());
valueMap.put(COL_NAME_TO_ORDINAL.get("minWriterVersion"), protocol.getMinWriterVersion());
valueMap.put(
COL_NAME_TO_ORDINAL.get("variantFeatureEnabled"),
protocol.getReaderFeatures().contains("variantType-preview")
);
valueMap.put(COL_NAME_TO_ORDINAL.get("tablePath"), tablePath);
return new ScanStateRow(valueMap);
}
Expand Down Expand Up @@ -147,4 +152,15 @@ public static String getColumnMappingMode(Row scanState) {
public static String getTableRoot(Row scanState) {
return scanState.getString(COL_NAME_TO_ORDINAL.get("tablePath"));
}

/**
* Get whether the "variantType" table feature is enabled from scan state {@link Row} returned
* by {@link Scan#getScanState(Engine)}
*
* @param scanState Scan state {@link Row}
* @return Boolean indicating whether "variantType" is enabled.
*/
public static Boolean getVariantFeatureEnabled(Row scanState) {
return scanState.getBoolean(COL_NAME_TO_ORDINAL.get("variantFeatureEnabled"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.internal.util;

import java.util.Arrays;

import io.delta.kernel.data.ColumnVector;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.engine.ExpressionHandler;
import io.delta.kernel.expressions.*;
import io.delta.kernel.types.*;

public class VariantUtils {
public static ColumnarBatch withVariantColumns(
ExpressionHandler expressionHandler,
ColumnarBatch dataBatch) {
for (int i = 0; i < dataBatch.getSchema().length(); i++) {
StructField field = dataBatch.getSchema().at(i);
if (!(field.getDataType() instanceof StructType) &&
!(field.getDataType() instanceof ArrayType) &&
!(field.getDataType() instanceof MapType) &&
(field.getDataType() != VariantType.VARIANT ||
dataBatch.getColumnVector(i).getDataType() == VariantType.VARIANT)) {
continue;
}

ExpressionEvaluator evaluator = expressionHandler.getEvaluator(
// Field here is variant type if its actually a variant.
// TODO: probably better to pass in the schema as an expression argument
// so the schema is enforced at the expression level. Need to pass in a literal
// schema
new StructType().add(field),
new ScalarExpression(
"variant_coalesce",
Arrays.asList(new Column(field.getName()))
),
VariantType.VARIANT
);

ColumnVector variantCol = evaluator.eval(dataBatch);
dataBatch = dataBatch.withReplacedColumnVector(i, field, variantCol);
}
return dataBatch;
}

private static ColumnVector[] getColumnBatchVectors(ColumnarBatch batch) {
ColumnVector[] res = new ColumnVector[batch.getSchema().length()];
for (int i = 0; i < batch.getSchema().length(); i++) {
res[i] = batch.getColumnVector(i);
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ private static Object getValueAsObject(
return toJavaList(columnVector.getArray(rowId));
} else if (dataType instanceof MapType) {
return toJavaMap(columnVector.getMap(rowId));
} else if (dataType instanceof VariantType) {
return columnVector.getVariant(rowId);
} else {
throw new UnsupportedOperationException("unsupported data type");
}
Expand Down