Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for dynamically getting delta table features #428

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Compiled class file
*.class

# vscode configs
.vscode/*

# Log file
*.log

Expand Down Expand Up @@ -41,3 +44,4 @@ target/
*.crc
demo/jars/*
demo/notebook/.ipynb_checkpoints/*
/.history
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the .gitignore with .vscode/* as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely!

"java.compile.nullAnalysis.mode": "automatic"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import lombok.ToString;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Literal;
import org.apache.spark.sql.types.StructField;
Expand Down Expand Up @@ -65,12 +67,16 @@
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.sync.ConversionTarget;

// to support getting the min read and writer dynamically
import io.delta.tables.*;

public class DeltaConversionTarget implements ConversionTarget {
private static final String MIN_READER_VERSION = String.valueOf(1);
// private static final String MIN_READER_VERSION = String.valueOf(1);
// gets access to generated columns.
private static final String MIN_WRITER_VERSION = String.valueOf(4);
// private static final String MIN_WRITER_VERSION = String.valueOf(4);

private DeltaLog deltaLog;
private DeltaTable deltaTable;
private DeltaSchemaExtractor schemaExtractor;
private DeltaPartitionExtractor partitionExtractor;
private DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor;
Expand All @@ -79,6 +85,9 @@ public class DeltaConversionTarget implements ConversionTarget {
private int logRetentionInHours;
private TransactionState transactionState;

private String minReaderVersion;
private String minWriterVersion;

public DeltaConversionTarget() {}

public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession sparkSession) {
Expand Down Expand Up @@ -112,6 +121,7 @@ public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession sparkSe
dataFileUpdatesExtractor);
}


private void _init(
String tableDataPath,
String tableName,
Expand All @@ -121,6 +131,9 @@ private void _init(
DeltaPartitionExtractor partitionExtractor,
DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor) {
DeltaLog deltaLog = DeltaLog.forTable(sparkSession, tableDataPath);
DeltaTable deltaTable = DeltaTable.forPath(sparkSession, tableDataPath);
minReaderVersion = String.valueOf(1);
minWriterVersion = String.valueOf(4);
boolean deltaTableExists = deltaLog.tableExists();
if (!deltaTableExists) {
deltaLog.ensureLogDirectoryExist();
Expand All @@ -129,6 +142,7 @@ private void _init(
this.partitionExtractor = partitionExtractor;
this.dataFileUpdatesExtractor = dataFileUpdatesExtractor;
this.deltaLog = deltaLog;
this.deltaTable = deltaTable;
this.tableName = tableName;
this.logRetentionInHours = logRetentionInHours;
}
Expand Down Expand Up @@ -267,10 +281,25 @@ private void commitTransaction() {
new DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))));
}

@VisibleForTesting
private Map<String, String> getConfigurationsForDeltaSync() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can test this here

I would expect we can add the timestamp_ntz and see that the delta table has the correct versions set for min reader/writer?

// The output of detail() method has only one row with the following schema.
// deltaTable.detail() is added to the constructor for this class, and sets
// a private variable to of deltaTable

// limit the results to the attributes needed
Dataset<Row> record = deltaTable.detail().select("minWriterVersion", "minReaderVersion");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you get this information from deltaLog.snapshot().metadata()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think so. I looked through the api for the deltaLog.snapshot().metadata(), and did not find a referance to it. The only place i found it at was here https://docs.delta.io/latest/delta-utility.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to see if we can just let the Delta Lake library automatically set the versions for us. Can you try that as well? If I remember correctly, it looked like it would auto update them for us.


// Collect the first row and extract data (in this instance the function only yields a single row)
Row row = record.first();

minWriterVersion = row.getAs("minWriterVersion").toString();
minReaderVersion = row.getAs("minReaderVersion").toString();

Map<String, String> configMap = new HashMap<>();
configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), MIN_READER_VERSION);
configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), MIN_WRITER_VERSION);
configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), minReaderVersion);
configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), minWriterVersion);
configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
// Sets retention for the Delta Log, does not impact underlying files in the table
configMap.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;

import org.apache.xtable.schema.SchemaUtils;

/**
Expand All @@ -60,6 +61,10 @@ public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID = "delta.columnMapping.id";
private static final DeltaSchemaExtractor INSTANCE = new DeltaSchemaExtractor();

// Timestamps in Delta are microsecond precision by default
private static final Map<InternalSchema.MetadataKey, Object> DEFAULT_TIMESTAMP_PRECISION_METADATA = Collections.singletonMap(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION, InternalSchema.MetadataValue.MICROS);

public static DeltaSchemaExtractor getInstance() {
return INSTANCE;
}
Expand All @@ -86,7 +91,6 @@ private DataType convertFieldType(InternalField field) {
case INT:
return DataTypes.IntegerType;
case LONG:
case TIMESTAMP_NTZ:
return DataTypes.LongType;
case BYTES:
case FIXED:
Expand All @@ -99,6 +103,8 @@ private DataType convertFieldType(InternalField field) {
return DataTypes.DateType;
case TIMESTAMP:
return DataTypes.TimestampType;
case TIMESTAMP_NTZ:
return DataTypes.TimestampNTZType;
case DOUBLE:
return DataTypes.DoubleType;
case DECIMAL:
Expand Down Expand Up @@ -183,10 +189,11 @@ private InternalSchema toInternalSchema(
case "timestamp":
type = InternalType.TIMESTAMP;
// Timestamps in Delta are microsecond precision by default
metadata =
Collections.singletonMap(
InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
InternalSchema.MetadataValue.MICROS);
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
break;
case "timestamp_ntz":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type = InternalType.TIMESTAMP_NTZ;
metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
break;
case "struct":
StructType structType = (StructType) dataType;
Expand Down
17 changes: 17 additions & 0 deletions core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@

import io.delta.tables.DeltaTable;

import org.apache.spark.sql.types.DataTypes;
import org.apache.xtable.delta.TestDeltaHelper;

import static org.apache.spark.sql.types.DataTypes.TimestampNTZType;

@Getter
public class TestSparkDeltaTable implements GenericTable<Row, Object>, Closeable {
// typical inserts or upserts do not use this partition value.
Expand Down Expand Up @@ -110,6 +113,20 @@ public List<Row> insertRowsForPartition(int numRows, Integer year, String level)
return rows;
}

public List<Row> insertRowsForTimestampPartition() {
List<Row> rows = testDeltaHelper.generateRows(10);
Dataset<Row> df = sparkSession.createDataFrame(rows, testDeltaHelper.getTableStructSchema());
// Add a TimestampNTZ column manually if not already included
df.schema().add(DataTypes.createStructField("timestamp_ntz", TimestampNTZType, false));
// Writing the DataFrame to Delta format, partitioned by the TimestampNTZ column
df.write()
.format("delta")
.mode("append")
.partitionBy("timestamp_ntz") // Replace "timestamp_ntz" with the actual column name
.save(basePath);

return rows;
}
@Override
public List<Row> insertRecordsForSpecialPartition(int numRows) {
return insertRowsForPartition(numRows, SPECIAL_DATE_PARTITION_VALUE, SPECIAL_PARTITION_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.xtable.delta;

import static org.junit.jupiter.api.Assertions.*;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
Expand All @@ -35,10 +34,13 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import io.delta.tables.DeltaTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.apache.xtable.model.storage.*;
import org.apache.xtable.model.storage.InternalDataFile;


public class ITDeltaConversionTargetSource {

private static final InternalField COL1_INT_FIELD =
Expand Down Expand Up @@ -731,4 +734,25 @@ private boolean checkIfFileIsRemoved(String activePath, TableChange tableChange)
.collect(Collectors.toSet());
return filePathsRemoved.contains(activePath);
}

@Test
public void getConfigurationsForDeltaSync() {

String tableName = GenericTable.getTableName();
TestSparkDeltaTable testSparkDeltaTable =
new TestSparkDeltaTable(
tableName, tempDir, sparkSession, "timestamp_field", false);
testSparkDeltaTable.insertRowsForTimestampPartition();
DeltaTable dt = testSparkDeltaTable.getDeltaTable();
Dataset<Row> record = dt.detail().select("minWriterVersion", "minReaderVersion");

// Collect the first row and extract data (in this instance the function only yields a single row)
Row row = record.first();

String minWriterVersion = row.getAs("minWriterVersion").toString();
String minReaderVersion = row.getAs("minReaderVersion").toString();
// Assert the results
assertEquals("7", minWriterVersion);
assertEquals("3", minReaderVersion);
}
}
23 changes: 23 additions & 0 deletions core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,29 @@ public void createTable(SparkSession sparkSession, String tableName, String base
.partitionedBy("yearOfBirth");
} else if ("level".equals(partitionField)) {
tableBuilder.partitionedBy(partitionField);
} else if ("timestamp_field".equals(partitionField)){
tableBuilder
.addColumn(
DeltaTable.columnBuilder("YEAR")
.dataType(IntegerType)
.generatedAlwaysAs("YEAR(timestamp_field)")
.build())
.addColumn(
DeltaTable.columnBuilder("MONTH")
.dataType(IntegerType)
.generatedAlwaysAs("MONTH(timestamp_field)")
.build())
.addColumn(
DeltaTable.columnBuilder("DAY")
.dataType(IntegerType)
.generatedAlwaysAs("DAY(timestamp_field)")
.build())
.addColumn(
DeltaTable.columnBuilder("HOUR")
.dataType(IntegerType)
.generatedAlwaysAs("HOUR(timestamp_field)")
.build())
.partitionedBy("timestamp_field");
} else if (partitionField != null) {
throw new IllegalArgumentException("Unexpected partition field: " + partitionField);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ public void testTimestamps() {

StructType structRepresentationTimestampNtz =
new StructType()
.add("requiredTimestampNtz", DataTypes.LongType, false)
.add("optionalTimestampNtz", DataTypes.LongType, true);
.add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
.add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);

Assertions.assertEquals(
structRepresentationTimestamp,
Expand Down Expand Up @@ -892,4 +892,5 @@ void generateColumnsAreNotTranslatedToInternalSchema() {
Assertions.assertEquals(
internalSchema, DeltaSchemaExtractor.getInstance().toInternalSchema(structRepresentation));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.spark.sql.delta.DeltaConfigs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ForeverAngry Can you remove these changes to this file if they are no longer required?

import org.apache.spark.sql.delta.GeneratedColumn;

import scala.collection.JavaConverters;
Expand Down Expand Up @@ -406,6 +406,8 @@ private void validateDeltaTable(
internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation");
}



private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) {
return InternalSnapshot.builder()
.table(table)
Expand Down Expand Up @@ -508,4 +510,5 @@ private static SparkSession buildSparkSession() {
.set("spark.master", "local[2]");
return SparkSession.builder().config(sparkConf).getOrCreate();
}

}