Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Spark procedure to collect NDV #6582

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ private StandardBlobTypes() {}
* href="https://datasketches.apache.org/">Apache DataSketches</a> library
*/
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1";

public static final String NDV_BLOB = "ndv-blob";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark doesn't use Apache DataSketches to collect approximate NDV, so I am adding a new blob type. Hope this is OK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi What are you using for NDV stats here? I figure we should have a common blob type

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'blob' seems a bit redundant as they are all blobs? And also looking at the code, it's an approx ndv, which I didnt get from this name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use one blob type for NDV ideally, although Spark doesn't have the sketch data. I'm also curious how sketch data is useful for a table level metric. It is absolutely useful for file-level and partition-level since we can merge them later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho Right, we should have a better name for this. I am not sure if we can have a common blob type here. I will wait for @findepi 's input before changing this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it is impossible for us to make Theta Sketches using Spark? Things like that would be healthier for the long run if we implement that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you @RussellSpitzer . BTW engine interop is the primary reason why we have settled on Theta sketches. For Trino it would be easier to go with HLL, since that's what Trino engine & SPI are supporting for years now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer @findepi I agree it would be ideal if Spark can support Theta sketches. I will take a look to see the possibility to implement this.

@findepi Besides using NDV, Spark also uses other column stats such as NumOfNulls, Min, Max, etc. for CBO. I am wondering if Trino also use these stats and if these stats should also be stored in TableMetaData.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should create real Theta sketches. If Spark only needs the NDV integer, then that's great. We can either keep track of NDV sketch and incrementally update internal to Iceberg, or we can do it async. Either way, there should be no need for a different sketch type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear I don't think we need to get this into OSS Spark, I think it's fine if we generate these sketches in user land code.

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ transformArgument
expression
: constant
| stringMap
| stringArray
;

constant
Expand All @@ -133,6 +134,10 @@ stringMap
: MAP '(' constant (',' constant)* ')'
;

stringArray
: ARRAY '(' constant (',' constant)* ')'
;

booleanValue
: TRUE | FALSE
;
Expand Down Expand Up @@ -171,7 +176,7 @@ nonReserved
: ADD | ALTER | AS | ASC | BY | CALL | DESC | DROP | FIELD | FIRST | LAST | NULLS | ORDERED | PARTITION | TABLE | WRITE
| DISTRIBUTED | LOCALLY | UNORDERED | REPLACE | WITH | IDENTIFIER_KW | FIELDS | SET
| TRUE | FALSE
| MAP
| MAP | ARRAY
;

ADD: 'ADD';
Expand Down Expand Up @@ -203,6 +208,7 @@ TRUE: 'TRUE';
FALSE: 'FALSE';

MAP: 'MAP';
ARRAY: 'ARRAY';

PLUS: '+';
MINUS: '-';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.BlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.puffin.StandardBlobTypes;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestDistinctCountProcedure extends SparkExtensionsTestBase {

public TestDistinctCountProcedure(
String catalogName, String implementation, Map<String, String> config) {
super(catalogName, implementation, config);
}

@Before
public void setupTable() {
sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
}

@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@Test
public void testAnalyze() {
sql("INSERT INTO %s VALUES (1, 'a')", tableName);
sql("INSERT INTO %s VALUES (2, 'b')", tableName);
sql("INSERT INTO %s VALUES (3, 'a')", tableName);
sql("INSERT INTO %s VALUES (4, 'b')", tableName);
sql("INSERT INTO %s VALUES (5, 'a')", tableName);
sql("INSERT INTO %s VALUES (6, 'b')", tableName);

List<Object[]> returns =
sql(
"CALL %s.system.distinct_count("
+ "table => '%s',"
+ "distinct_count_view => '%s',"
+ "columns => array('%s','%s'))",
catalogName, tableName, "result", "id", "data");

Table table = validationCatalog.loadTable(tableIdent);
List files = table.statisticsFiles();
List<BlobMetadata> metadataList = ((GenericStatisticsFile) files.get(0)).blobMetadata();

BlobMetadata firstBlob = metadataList.get(0);
Assertions.assertThat(firstBlob.type()).as("type").isEqualTo(StandardBlobTypes.NDV_BLOB);
Assertions.assertThat(firstBlob.fields()).as("columns").isEqualTo(ImmutableList.of(0));
Assertions.assertThat(Long.parseLong(firstBlob.properties().get("ndv"))).isEqualTo(6);

BlobMetadata secondBlob = metadataList.get(1);
Assertions.assertThat(secondBlob.type()).as("type").isEqualTo(StandardBlobTypes.NDV_BLOB);
Assertions.assertThat(secondBlob.fields()).as("columns").isEqualTo(ImmutableList.of(1));
Assertions.assertThat(Long.parseLong(secondBlob.properties().get("ndv"))).isEqualTo(2);

String viewName = (String) returns.get(0)[0];
assertEquals(
"Rows should match", ImmutableList.of(row(6L, 2L)), sql("select * from %s", viewName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ abstract class BaseProcedure implements Procedure {
protected static final DataType STRING_MAP =
DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);

protected static final DataType STRING_ARRAY =
DataTypes.createArrayType(DataTypes.StringType, false);

private final SparkSession spark;
private final TableCatalog tableCatalog;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.procedures;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.UUID;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.puffin.Blob;
import org.apache.iceberg.puffin.Puffin;
import org.apache.iceberg.puffin.PuffinWriter;
import org.apache.iceberg.puffin.StandardBlobTypes;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A procedure that gets approximate NDV (number of distinct value) for the requested columns and
* sets this to the table's StatisticsFile.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am debating myself if I should collect ndv only or also collect everything else such as max, min, num_nulls etc. in ANALYZE TABLE. I will just collect ndv for now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we have all those in the Iceberg file level metadata already, wonder if its necessary as we could combine those to have an aggregate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have file level metadata for max, min, num_nulls etc. That's why I was hesitate to include those here. We don't have file level ndv, though.

public class DistinctCountProcedure extends BaseProcedure {
private static final Logger LOG = LoggerFactory.getLogger(DistinctCountProcedure.class);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("distinct_count_view", DataTypes.StringType),
ProcedureParameter.optional("columns", STRING_ARRAY),
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("view_name", DataTypes.StringType, false, Metadata.empty())
});

public static SparkProcedures.ProcedureBuilder builder() {
return new Builder<DistinctCountProcedure>() {
@Override
protected DistinctCountProcedure doBuild() {
return new DistinctCountProcedure(tableCatalog());
}
};
}

private DistinctCountProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
String tableName = args.getString(0);
Identifier tableIdent = toIdentifier(tableName, PARAMETERS[0].name());
SparkTable sparkTable = loadSparkTable(tableIdent);
StructType schema = sparkTable.schema();
Table table = sparkTable.table();
ArrayData columns = args.getArray(2);
int columnSizes = columns.numElements();

long[] ndvs = new long[columnSizes];
int[] fieldId = new int[columnSizes];
String query = "SELECT ";
for (int i = 0; i < columnSizes; i++) {
String colName = columns.getUTF8String(i).toString();
query += "APPROX_COUNT_DISTINCT(" + colName + "), ";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are technically not using distinct here, maybe we should be calling the procedure "analyze"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change the procedure name from DistinctCountProcedure to AnalyzeTableProcedure

fieldId[i] = schema.fieldIndex(colName);
}

query = query.substring(0, query.length() - 2) + " FROM " + tableName;
Dataset<Row> df = spark().sql(query);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer, @flyrain, @huaxingao: Is it good to have a spark action first and call that action from this procedure? This way the users who use only APIs can also leverage this feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable. RewriteManifestsProcedure did the same thing.


for (int i = 0; i < columnSizes; i++) {
ndvs[i] = df.head().getLong(i);
}

TableOperations operations = ((HasTableOperations) table).operations();
FileIO fileIO = ((HasTableOperations) table).operations().io();
String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it exists, we throw FileNotFoundException? Should we just check and throw better exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean AlreadyExistsException, right? Yes, we should check. I guess we can probably keep the AlreadyExistsException but make the error message better.

OutputFile outputFile = fileIO.newOutputFile(path);

try (PuffinWriter writer =
Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we move "Spark DistinctCountProcedure" to a separate constant?

for (int i = 0; i < columnSizes; i++) {
writer.add(
new Blob(
StandardBlobTypes.NDV_BLOB,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with defining a new a new blob type here is we probably need to describe in the spec, otherwise folks won't be able to deserialize it

ImmutableList.of(fieldId[i]),
table.currentSnapshot().snapshotId(),
table.currentSnapshot().sequenceNumber(),
ByteBuffer.allocate(0),
amogh-jahagirdar marked this conversation as resolved.
Show resolved Hide resolved
null,
ImmutableMap.of("ndv", Long.toString(ndvs[i]))));
}
writer.finish();

GenericStatisticsFile statisticsFile =
new GenericStatisticsFile(
table.currentSnapshot().snapshotId(),
path,
writer.fileSize(),
writer.footerSize(),
writer.writtenBlobsMetadata().stream()
.map(GenericBlobMetadata::from)
.collect(ImmutableList.toImmutableList()));

table
.updateStatistics()
.setStatistics(table.currentSnapshot().snapshotId(), statisticsFile)
.commit();
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}

String viewName = viewName(args, tableName);
// Create a view for users to query
df.createOrReplaceTempView(viewName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I missed something, is there a point to keeping it as view, if its already returned by the procedure?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept this as a view so users will have an easy way to query the statistics information after calling the stored procedure.

The main reason I am adding this store procedure is because I can't get an agreement to implement ANALYZE TABLE for data source V2 in Spark. This stored procedure is doing something similar to ANALYZE TABLE. Normally after users analyze table, they will DESCRIBE to get the statistics information. I create a view so users can query the statistics.

return toOutputRows(viewName);
}

@NotNull
private static String viewName(InternalRow args, String tableName) {
String viewName = args.isNullAt(1) ? null : args.getString(1);
if (viewName == null) {
String shortTableName =
tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
viewName = shortTableName + "_changes";
}
return viewName;
}

private InternalRow[] toOutputRows(String viewName) {
InternalRow row = newInternalRow(UTF8String.fromString(viewName));
return new InternalRow[] {row};
}

@Override
public String description() {
return "DistinctCountProcedure";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
mapBuilder.put("register_table", RegisterTableProcedure::builder);
mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
mapBuilder.put("distinct_count", DistinctCountProcedure::builder);
return mapBuilder.build();
}

Expand Down