Skip to content

Commit

Permalink
Spark 3.3: Support storage-partitioned joins
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Dec 7, 2022
1 parent b176202 commit d62a5a1
Show file tree
Hide file tree
Showing 19 changed files with 1,075 additions and 189 deletions.
24 changes: 18 additions & 6 deletions core/src/main/java/org/apache/iceberg/Partitioning.java
Expand Up @@ -200,6 +200,16 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) {
/**
* Builds a grouping key type considering all provided specs.
*
* @param specs one or many specs
* @return the constructed grouping key type
*/
public static StructType groupingKeyType(Collection<PartitionSpec> specs) {
return groupingKeyType(null, specs);
}

/**
* Builds a grouping key type considering the provided schema and specs.
*
* <p>A grouping key defines how data is split between files and consists of partition fields with
* non-void transforms that are present in each provided spec. Iceberg guarantees that records
* with different values for the grouping key are disjoint and are stored in separate files.
Expand All @@ -215,11 +225,12 @@ public Void alwaysNull(int fieldId, String sourceName, int sourceId) {
* that have the same field ID but use a void transform under the hood. Such fields cannot be part
* of the grouping key as void transforms always return null.
*
* @param schema a schema
* @param specs one or many specs
* @return the constructed grouping key type
*/
public static StructType groupingKeyType(Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("grouping key", specs, commonActiveFieldIds(specs));
public static StructType groupingKeyType(Schema schema, Collection<PartitionSpec> specs) {
return buildPartitionProjectionType("grouping key", specs, commonActiveFieldIds(schema, specs));
}

/**
Expand Down Expand Up @@ -341,15 +352,15 @@ private static Set<Integer> allFieldIds(Collection<PartitionSpec> specs) {
}

// collects IDs of partition fields with non-void transforms that are present in each spec
private static Set<Integer> commonActiveFieldIds(Collection<PartitionSpec> specs) {
private static Set<Integer> commonActiveFieldIds(Schema schema, Collection<PartitionSpec> specs) {
Set<Integer> commonActiveFieldIds = Sets.newHashSet();

int specIndex = 0;
for (PartitionSpec spec : specs) {
if (specIndex == 0) {
commonActiveFieldIds.addAll(activeFieldIds(spec));
commonActiveFieldIds.addAll(activeFieldIds(schema, spec));
} else {
commonActiveFieldIds.retainAll(activeFieldIds(spec));
commonActiveFieldIds.retainAll(activeFieldIds(schema, spec));
}

specIndex++;
Expand All @@ -358,8 +369,9 @@ private static Set<Integer> commonActiveFieldIds(Collection<PartitionSpec> specs
return commonActiveFieldIds;
}

private static List<Integer> activeFieldIds(PartitionSpec spec) {
private static List<Integer> activeFieldIds(Schema schema, PartitionSpec spec) {
return spec.fields().stream()
.filter(field -> schema == null || schema.findField(field.sourceId()) != null)
.filter(field -> !isVoidTransform(field))
.map(PartitionField::fieldId)
.collect(Collectors.toList());
Expand Down
13 changes: 13 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestPartitioning.java
Expand Up @@ -349,6 +349,19 @@ public void testGroupingKeyTypeWithEvolvedUnpartitionedSpec() {
Assert.assertEquals("Types must match", expectedType, actualType);
}

@Test
public void testGroupingKeyTypeWithProjectedSchema() {
TestTables.TestTable table =
TestTables.create(tableDir, "test", SCHEMA, BY_CATEGORY_DATA_SPEC, V1_FORMAT_VERSION);

Schema projectedSchema = table.schema().select("id", "data");

StructType expectedType =
StructType.of(NestedField.optional(1001, "data", Types.StringType.get()));
StructType actualType = Partitioning.groupingKeyType(projectedSchema, table.specs().values());
Assert.assertEquals("Types must match", expectedType, actualType);
}

@Test
public void testGroupingKeyTypeWithIncompatibleSpecEvolution() {
TestTables.TestTable table =
Expand Down
Expand Up @@ -404,15 +404,15 @@ public void testSparkTableAddDropPartitions() throws Exception {
assertPartitioningEquals(sparkTable(), 1, "bucket(16, id)");

sql("ALTER TABLE %s ADD PARTITION FIELD truncate(data, 4)", tableName);
assertPartitioningEquals(sparkTable(), 2, "truncate(data, 4)");
assertPartitioningEquals(sparkTable(), 2, "truncate(4, data)");

sql("ALTER TABLE %s ADD PARTITION FIELD years(ts)", tableName);
assertPartitioningEquals(sparkTable(), 3, "years(ts)");

sql("ALTER TABLE %s DROP PARTITION FIELD years(ts)", tableName);
assertPartitioningEquals(sparkTable(), 2, "truncate(data, 4)");
assertPartitioningEquals(sparkTable(), 2, "truncate(4, data)");

sql("ALTER TABLE %s DROP PARTITION FIELD truncate(data, 4)", tableName);
sql("ALTER TABLE %s DROP PARTITION FIELD truncate(4, data)", tableName);
assertPartitioningEquals(sparkTable(), 1, "bucket(16, id)");

sql("ALTER TABLE %s DROP PARTITION FIELD shard", tableName);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -255,74 +256,90 @@ public static org.apache.iceberg.Table toIcebergTable(Table table) {
return sparkTable.table();
}

public static Transform[] toTransforms(Schema schema, List<PartitionField> fields) {
SpecTransformToSparkTransform visitor = new SpecTransformToSparkTransform(schema);

List<Transform> transforms = Lists.newArrayList();

for (PartitionField field : fields) {
Transform transform = PartitionSpecVisitor.visit(schema, field, visitor);
if (transform != null) {
transforms.add(transform);
}
}

return transforms.toArray(new Transform[0]);
}

/**
* Converts a PartitionSpec to Spark transforms.
*
* @param spec a PartitionSpec
* @return an array of Transforms
*/
public static Transform[] toTransforms(PartitionSpec spec) {
Map<Integer, String> quotedNameById = SparkSchemaUtil.indexQuotedNameById(spec.schema());
List<Transform> transforms =
PartitionSpecVisitor.visit(
spec,
new PartitionSpecVisitor<Transform>() {
@Override
public Transform identity(String sourceName, int sourceId) {
return Expressions.identity(quotedName(sourceId));
}
SpecTransformToSparkTransform visitor = new SpecTransformToSparkTransform(spec.schema());
List<Transform> transforms = PartitionSpecVisitor.visit(spec, visitor);
return transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new);
}

@Override
public Transform bucket(String sourceName, int sourceId, int numBuckets) {
return Expressions.bucket(numBuckets, quotedName(sourceId));
}
private static class SpecTransformToSparkTransform implements PartitionSpecVisitor<Transform> {
private final Map<Integer, String> quotedNameById;

@Override
public Transform truncate(String sourceName, int sourceId, int width) {
return Expressions.apply(
"truncate",
Expressions.column(quotedName(sourceId)),
Expressions.literal(width));
}
SpecTransformToSparkTransform(Schema schema) {
this.quotedNameById = SparkSchemaUtil.indexQuotedNameById(schema);
}

@Override
public Transform year(String sourceName, int sourceId) {
return Expressions.years(quotedName(sourceId));
}
@Override
public Transform identity(String sourceName, int sourceId) {
return Expressions.identity(quotedName(sourceId));
}

@Override
public Transform month(String sourceName, int sourceId) {
return Expressions.months(quotedName(sourceId));
}
@Override
public Transform bucket(String sourceName, int sourceId, int numBuckets) {
return Expressions.bucket(numBuckets, quotedName(sourceId));
}

@Override
public Transform day(String sourceName, int sourceId) {
return Expressions.days(quotedName(sourceId));
}
@Override
public Transform truncate(String sourceName, int sourceId, int width) {
NamedReference column = Expressions.column(quotedName(sourceId));
return Expressions.apply("truncate", Expressions.literal(width), column);
}

@Override
public Transform hour(String sourceName, int sourceId) {
return Expressions.hours(quotedName(sourceId));
}
@Override
public Transform year(String sourceName, int sourceId) {
return Expressions.years(quotedName(sourceId));
}

@Override
public Transform alwaysNull(int fieldId, String sourceName, int sourceId) {
// do nothing for alwaysNull, it doesn't need to be converted to a transform
return null;
}
@Override
public Transform month(String sourceName, int sourceId) {
return Expressions.months(quotedName(sourceId));
}

@Override
public Transform unknown(
int fieldId, String sourceName, int sourceId, String transform) {
return Expressions.apply(transform, Expressions.column(quotedName(sourceId)));
}
@Override
public Transform day(String sourceName, int sourceId) {
return Expressions.days(quotedName(sourceId));
}

private String quotedName(int id) {
return quotedNameById.get(id);
}
});
@Override
public Transform hour(String sourceName, int sourceId) {
return Expressions.hours(quotedName(sourceId));
}

return transforms.stream().filter(Objects::nonNull).toArray(Transform[]::new);
@Override
public Transform alwaysNull(int fieldId, String sourceName, int sourceId) {
// do nothing for alwaysNull, it doesn't need to be converted to a transform
return null;
}

@Override
public Transform unknown(int fieldId, String sourceName, int sourceId, String transform) {
return Expressions.apply(transform, Expressions.column(quotedName(sourceId)));
}

private String quotedName(int id) {
return quotedNameById.get(id);
}
}

public static NamedReference toNamedReference(String name) {
Expand Down
Expand Up @@ -228,4 +228,12 @@ public Long streamFromTimestamp() {
.defaultValue(Long.MIN_VALUE)
.parse();
}

public boolean preserveDataGrouping() {
return confParser
.booleanConf()
.sessionConf(SparkSQLProperties.PRESERVE_DATA_GROUPING)
.defaultValue(SparkSQLProperties.PRESERVE_DATA_GROUPING_DEFAULT)
.parse();
}
}
Expand Up @@ -42,4 +42,9 @@ private SparkSQLProperties() {}
// Controls whether to check the order of fields during writes
public static final String CHECK_ORDERING = "spark.sql.iceberg.check-ordering";
public static final boolean CHECK_ORDERING_DEFAULT = true;

// Controls whether to preserve the existing grouping of data while planning splits
public static final String PRESERVE_DATA_GROUPING =
"spark.sql.iceberg.split.preserve-data-grouping";
public static final boolean PRESERVE_DATA_GROUPING_DEFAULT = false;
}
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -41,6 +42,7 @@ class SparkBatch implements Batch {
private final JavaSparkContext sparkContext;
private final Table table;
private final SparkReadConf readConf;
private final Types.StructType groupingKeyType;
private final List<? extends ScanTaskGroup<?>> taskGroups;
private final Schema expectedSchema;
private final boolean caseSensitive;
Expand All @@ -51,12 +53,14 @@ class SparkBatch implements Batch {
JavaSparkContext sparkContext,
Table table,
SparkReadConf readConf,
Types.StructType groupingKeyType,
List<? extends ScanTaskGroup<?>> taskGroups,
Schema expectedSchema,
int scanHashCode) {
this.sparkContext = sparkContext;
this.table = table;
this.readConf = readConf;
this.groupingKeyType = groupingKeyType;
this.taskGroups = taskGroups;
this.expectedSchema = expectedSchema;
this.caseSensitive = readConf.caseSensitive();
Expand All @@ -80,6 +84,7 @@ public InputPartition[] planInputPartitions() {
index ->
partitions[index] =
new SparkInputPartition(
groupingKeyType,
taskGroups.get(index),
tableBroadcast,
expectedSchemaString,
Expand Down

0 comments on commit d62a5a1

Please sign in to comment.