Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ public DeltaFixture()
string deltaVersion = sparkVersion.Major switch
{
2 => "delta-core_2.11:0.6.1",
3 => "delta-core_2.12:0.7.0",
3 => "delta-core_2.12:0.8.0",
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
};

(string, string)[] conf = new[]
{
("spark.databricks.delta.snapshotPartitions", "2"),
("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5")
("spark.sql.sources.parallelPartitionDiscovery.parallelism", "5"),
// Set the writer protocol version for testing UpgradeTableProtocol().
("spark.databricks.delta.minWriterVersion", "2")
};

(string, string)[] extraConf = sparkVersion.Major switch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Microsoft.Spark.E2ETest;
using Microsoft.Spark.E2ETest.Utils;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
Expand Down Expand Up @@ -337,7 +336,9 @@ public void TestSignaturesV3_0_X()
_spark.Range(15).Write().Format("delta").SaveAsTable(tableName);

Assert.IsType<DeltaTable>(DeltaTable.ForName(tableName));
Assert.IsType<DeltaTable>(DeltaTable.ForName(_spark, tableName));
DeltaTable table = DeltaTable.ForName(_spark, tableName);

table.UpgradeTableProtocol(1, 3);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ internal static class DeltaLakeVersions
internal const string V0_6_0 = "0.6.0";
internal const string V0_6_1 = "0.6.1";
internal const string V0_7_0 = "0.7.0";
internal const string V0_8_0 = "0.8.0";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,5 +501,21 @@ public DeltaMergeBuilder Merge(DataFrame source, Column condition) =>
"merge",
source,
condition));

/// <summary>
/// Updates the protocol version of the table to leverage new features. Upgrading the reader version
/// will prevent all clients that have an older version of Delta Lake from accessing this table.
/// Upgrading the writer version will prevent older versions of Delta Lake to write to this table.
/// The reader or writer version cannot be downgraded.
///
/// See online documentation and Delta's protocol specification at
/// <see href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md">PROTOCOL.md</see> for more
/// details.
/// </summary>
/// <param name="readerVersion">Version of the Delta read protocol.</param>
/// <param name="writerVersion">Version of the Delta write protocol.</param>
[DeltaLakeSince(DeltaLakeVersions.V0_8_0)]
public void UpgradeTableProtocol(int readerVersion, int writerVersion) =>
_jvmObject.Invoke("upgradeTableProtocol", readerVersion, writerVersion);
}
}