Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SIP Don't merge] support Spark 3.1 #3296

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ caffeine = "2.9.3"
rocksdbjni = "7.10.2"
iceberg = '1.3.1' # 1.4.0 causes test to fail
trino = '426'
spark = "3.4.1" # 3.5.0 causes tests to fail
spark = "3.1.2" # 3.5.0 causes tests to fail
scala-collection-compat = "2.7.0"
scala-java-compat = "1.0.2"
sqlite-jdbc = "3.42.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.spark.sql.connector.expressions.HoursTransform;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.MonthsTransform;
import org.apache.spark.sql.connector.expressions.SortedBucketTransform;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.expressions.YearsTransform;
import org.apache.spark.sql.types.DataType;
Expand Down Expand Up @@ -114,9 +113,8 @@ static SparkTableInfo create(SparkBaseTable baseTable) {
Arrays.stream(baseTable.partitioning())
.forEach(
transform -> {
if (transform instanceof BucketTransform
|| transform instanceof SortedBucketTransform) {
if (isBucketPartition(supportsBucketPartition, transform)) {
if (transform instanceof BucketTransform) {
if (isBucketPartition(supportsBucketPartition)) {
sparkTableInfo.addPartition(transform);
} else {
sparkTableInfo.setBucket(transform);
Expand Down Expand Up @@ -149,8 +147,8 @@ static SparkTableInfo create(SparkBaseTable baseTable) {
return sparkTableInfo;
}

private static boolean isBucketPartition(boolean supportsBucketPartition, Transform transform) {
return supportsBucketPartition && !(transform instanceof SortedBucketTransform);
private static boolean isBucketPartition(boolean supportsBucketPartition) {
return supportsBucketPartition;
}

public List<SparkColumnInfo> getUnPartitionedColumns() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.distributions.Distributions;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrders;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.expressions.transforms.Transforms;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -21,7 +20,6 @@
import javax.ws.rs.NotSupportedException;
import lombok.Getter;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.spark.sql.connector.expressions.ApplyTransform;
import org.apache.spark.sql.connector.expressions.BucketTransform;
import org.apache.spark.sql.connector.expressions.DaysTransform;
Expand All @@ -31,7 +29,6 @@
import org.apache.spark.sql.connector.expressions.Literal;
import org.apache.spark.sql.connector.expressions.LogicalExpressions;
import org.apache.spark.sql.connector.expressions.MonthsTransform;
import org.apache.spark.sql.connector.expressions.SortedBucketTransform;
import org.apache.spark.sql.connector.expressions.YearsTransform;
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
Expand All @@ -49,9 +46,8 @@
public class SparkTransformConverter {

/**
* If supportsBucketPartition is ture, BucketTransform is transfromed to partition, and
* SortedBucketTransform is not supported. If false, BucketTransform and SortedBucketTransform is
* transformed to Distribution and SortOrder.
* If supportsBucketPartition is ture, BucketTransform is transfromed to partition, and is not
* supported. If false, BucketTransform is transformed to Distribution and SortOrder.
*/
private final boolean supportsBucketPartition;

Expand Down Expand Up @@ -142,19 +138,13 @@ public DistributionAndSortOrdersInfo toGravitinoDistributionAndSortOrders(
.filter(transform -> !isPartitionTransform(transform))
.forEach(
transform -> {
if (transform instanceof SortedBucketTransform) {
Pair<Distribution, SortOrder[]> pair =
toGravitinoDistributionAndSortOrders((SortedBucketTransform) transform);
distributionAndSortOrdersInfo.setDistribution(pair.getLeft());
distributionAndSortOrdersInfo.setSortOrders(pair.getRight());
} else if (transform instanceof BucketTransform) {
if (transform instanceof BucketTransform) {
BucketTransform bucketTransform = (BucketTransform) transform;
Distribution distribution = toGravitinoDistribution(bucketTransform);
distributionAndSortOrdersInfo.setDistribution(distribution);
} else {
throw new NotSupportedException(
"Only support BucketTransform and SortedBucketTransform, but get: "
+ transform.name());
"Only support BucketTransform , but get: " + transform.name());
}
});

Expand Down Expand Up @@ -255,26 +245,6 @@ private static Distribution toGravitinoDistribution(BucketTransform bucketTransf
return Distributions.hash(bucketNum, expressions);
}

// Spark datasourceV2 doesn't support specify sort order direction, use ASCENDING as default.
private static Pair<Distribution, SortOrder[]> toGravitinoDistributionAndSortOrders(
SortedBucketTransform sortedBucketTransform) {
int bucketNum = (Integer) sortedBucketTransform.numBuckets().value();
Expression[] bucketColumns =
toGravitinoNamedReference(JavaConverters.seqAsJavaList(sortedBucketTransform.columns()));

Expression[] sortColumns =
toGravitinoNamedReference(
JavaConverters.seqAsJavaList(sortedBucketTransform.sortedColumns()));
SortOrder[] sortOrders =
Arrays.stream(sortColumns)
.map(
sortColumn ->
SortOrders.of(sortColumn, ConnectorConstants.SPARK_DEFAULT_SORT_DIRECTION))
.toArray(SortOrder[]::new);

return Pair.of(Distributions.hash(bucketNum, bucketColumns), sortOrders);
}

private static org.apache.spark.sql.connector.expressions.Transform toSparkBucketTransform(
Distribution distribution, SortOrder[] sortOrders) {
if (distribution == null) {
Expand Down Expand Up @@ -325,8 +295,7 @@ private static Expression toGravitinoNamedReference(

public static org.apache.spark.sql.connector.expressions.Transform createSortBucketTransform(
int bucketNum, String[] bucketFields, String[] sortFields) {
return LogicalExpressions.bucket(
bucketNum, createSparkNamedReference(bucketFields), createSparkNamedReference(sortFields));
return null;
}

// columnName could be "a" or "a.b" for nested column
Expand Down Expand Up @@ -382,12 +351,9 @@ private static String getFieldNameFromGravitinoNamedReference(
private boolean isPartitionTransform(
org.apache.spark.sql.connector.expressions.Transform transform) {
if (supportsBucketPartition) {
Preconditions.checkArgument(
!(transform instanceof SortedBucketTransform),
"Spark doesn't support SortedBucketTransform as partition transform");
return true;
}
return !(transform instanceof BucketTransform || transform instanceof SortedBucketTransform);
return !(transform instanceof BucketTransform);
}

// Referred from org.apache.iceberg.spark.Spark3Util
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.spark.sql.types.StringType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.TimestampNTZType;
import org.apache.spark.sql.types.TimestampType;
import org.apache.spark.sql.types.VarcharType;

Expand Down Expand Up @@ -68,8 +67,6 @@ public static Type toGravitinoType(DataType sparkType) {
return Types.DateType.get();
} else if (sparkType instanceof TimestampType) {
return Types.TimestampType.withTimeZone();
} else if (sparkType instanceof TimestampNTZType) {
return Types.TimestampType.withoutTimeZone();
} else if (sparkType instanceof ArrayType) {
ArrayType arrayType = (ArrayType) sparkType;
return Types.ListType.of(toGravitinoType(arrayType.elementType()), arrayType.containsNull());
Expand Down Expand Up @@ -131,9 +128,6 @@ public static DataType toSparkType(Type gravitinoType) {
} else if (gravitinoType instanceof Types.TimestampType
&& ((Types.TimestampType) gravitinoType).hasTimeZone()) {
return DataTypes.TimestampType;
} else if (gravitinoType instanceof Types.TimestampType
&& !((Types.TimestampType) gravitinoType).hasTimeZone()) {
return DataTypes.TimestampNTZType;
} else if (gravitinoType instanceof Types.ListType) {
Types.ListType listType = (Types.ListType) gravitinoType;
return DataTypes.createArrayType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.rel.Schema;
import com.datastrato.gravitino.rel.SchemaChange;
import com.datastrato.gravitino.rel.expressions.literals.Literals;
import com.datastrato.gravitino.spark.connector.ConnectorConstants;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
Expand All @@ -26,14 +25,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.ws.rs.NotSupportedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.catalog.Column;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.NamespaceChange.SetProperty;
Expand All @@ -42,6 +38,7 @@
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

Expand Down Expand Up @@ -162,15 +159,32 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti
}
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
try {
String database = getDatabase(ident);
com.datastrato.gravitino.rel.Table table =
gravitinoCatalogClient
.asTableCatalog()
.loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name()));
// Will create a catalog specific table
return createSparkTable(
ident, table, sparkCatalog, propertiesConverter, sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}

@SuppressWarnings("deprecation")
@Override
public Table createTable(
Identifier ident, Column[] columns, Transform[] transforms, Map<String, String> properties)
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
NameIdentifier gravitinoIdentifier =
NameIdentifier.of(metalakeName, catalogName, getDatabase(ident), ident.name());
com.datastrato.gravitino.rel.Column[] gravitinoColumns =
Arrays.stream(columns)
.map(column -> createGravitinoColumn(column))
Arrays.stream(schema.fields())
.map(structField -> createGravitinoColumn(structField))
.toArray(com.datastrato.gravitino.rel.Column[]::new);

Map<String, String> gravitinoProperties =
Expand All @@ -179,9 +193,9 @@ public Table createTable(
String comment = gravitinoProperties.remove(ConnectorConstants.COMMENT);

DistributionAndSortOrdersInfo distributionAndSortOrdersInfo =
sparkTransformConverter.toGravitinoDistributionAndSortOrders(transforms);
sparkTransformConverter.toGravitinoDistributionAndSortOrders(partitions);
com.datastrato.gravitino.rel.expressions.transforms.Transform[] partitionings =
sparkTransformConverter.toGravitinoPartitionings(transforms);
sparkTransformConverter.toGravitinoPartitionings(partitions);

try {
com.datastrato.gravitino.rel.Table table =
Expand All @@ -204,30 +218,6 @@ public Table createTable(
}
}

@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
try {
String database = getDatabase(ident);
com.datastrato.gravitino.rel.Table table =
gravitinoCatalogClient
.asTableCatalog()
.loadTable(NameIdentifier.of(metalakeName, catalogName, database, ident.name()));
// Will create a catalog specific table
return createSparkTable(
ident, table, sparkCatalog, propertiesConverter, sparkTransformConverter);
} catch (com.datastrato.gravitino.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}

@SuppressWarnings("deprecation")
@Override
public Table createTable(
Identifier ident, StructType schema, Transform[] partitions, Map<String, String> properties)
throws TableAlreadyExistsException, NoSuchNamespaceException {
throw new NotSupportedException("Deprecated create table method");
}

@Override
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
com.datastrato.gravitino.rel.TableChange[] gravitinoTableChanges =
Expand Down Expand Up @@ -366,15 +356,14 @@ public void alterNamespace(String[] namespace, NamespaceChange... changes)
}

@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException, NonEmptyNamespaceException {
public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
validateNamespace(namespace);
try {
return gravitinoCatalogClient
.asSchemas()
.dropSchema(NameIdentifier.of(metalakeName, catalogName, namespace[0]), cascade);
.dropSchema(NameIdentifier.of(metalakeName, catalogName, namespace[0]), false);
} catch (NonEmptySchemaException e) {
throw new NonEmptyNamespaceException(namespace);
return false;
}
}

Expand All @@ -392,12 +381,12 @@ private String getCatalogDefaultNamespace() {
return catalogDefaultNamespace[0];
}

private com.datastrato.gravitino.rel.Column createGravitinoColumn(Column sparkColumn) {
private com.datastrato.gravitino.rel.Column createGravitinoColumn(StructField structField) {
return com.datastrato.gravitino.rel.Column.of(
sparkColumn.name(),
SparkTypeConverter.toGravitinoType(sparkColumn.dataType()),
sparkColumn.comment(),
sparkColumn.nullable(),
structField.name(),
SparkTypeConverter.toGravitinoType(structField.dataType()),
structField.getComment().isEmpty() ? null : structField.getComment().get(),
structField.nullable(),
// Spark doesn't support autoIncrement
false,
// todo: support default value
Expand Down Expand Up @@ -437,8 +426,7 @@ static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange
addColumn.isNullable());
} else if (change instanceof TableChange.DeleteColumn) {
TableChange.DeleteColumn deleteColumn = (TableChange.DeleteColumn) change;
return com.datastrato.gravitino.rel.TableChange.deleteColumn(
deleteColumn.fieldNames(), deleteColumn.ifExists());
return com.datastrato.gravitino.rel.TableChange.deleteColumn(deleteColumn.fieldNames(), true);
} else if (change instanceof TableChange.UpdateColumnType) {
TableChange.UpdateColumnType updateColumnType = (TableChange.UpdateColumnType) change;
return com.datastrato.gravitino.rel.TableChange.updateColumnType(
Expand Down Expand Up @@ -471,12 +459,6 @@ static com.datastrato.gravitino.rel.TableChange transformTableChange(TableChange
(TableChange.UpdateColumnNullability) change;
return com.datastrato.gravitino.rel.TableChange.updateColumnNullability(
updateColumnNullability.fieldNames(), updateColumnNullability.nullable());
} else if (change instanceof TableChange.UpdateColumnDefaultValue) {
TableChange.UpdateColumnDefaultValue updateColumnDefaultValue =
(TableChange.UpdateColumnDefaultValue) change;
return com.datastrato.gravitino.rel.TableChange.updateColumnDefaultValue(
updateColumnDefaultValue.fieldNames(),
Literals.stringLiteral(updateColumnDefaultValue.newDefaultValue()));
} else {
throw new UnsupportedOperationException(
String.format("Unsupported table change %s", change.getClass().getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
Expand All @@ -31,7 +27,7 @@
* StagingTableCatalog and FunctionCatalog, allowing for advanced operations like table staging and
* function management tailored to the needs of Iceberg tables.
*/
public class GravitinoIcebergCatalog extends BaseCatalog implements FunctionCatalog {
public class GravitinoIcebergCatalog extends BaseCatalog {

@Override
protected TableCatalog createAndInitSparkCatalog(
Expand Down Expand Up @@ -86,16 +82,6 @@ protected SparkTransformConverter getSparkTransformConverter() {
return new SparkTransformConverter(true);
}

@Override
public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
return ((SparkCatalog) sparkCatalog).listFunctions(namespace);
}

@Override
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
return ((SparkCatalog) sparkCatalog).loadFunction(ident);
}

private void initHiveProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
Expand Down
Loading
Loading