Skip to content

Commit

Permalink
better iceberg support for merge-on-read
Browse files Browse the repository at this point in the history
  • Loading branch information
menishmueli committed Mar 25, 2024
1 parent fd85cf1 commit 84b0444
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.dataflint.example

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

object IcebergExample extends App{
object IcebergExample extends App with Logging {
logInfo("start iceberg example")
logInfo("IcebergExample class loader: " + getClass.getClassLoader.toString)
val spark = SparkSession
.builder()
.appName("Iceberg Example")
Expand All @@ -11,20 +14,20 @@ object IcebergExample extends App{
.config("spark.ui.port", "10000")
.config("spark.sql.maxMetadataStringLength", "10000")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.spark_catalog.type", "hive")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "/tmp/iceberg-example/warehouse")
.config("spark.sql.defaultCatalog", "local")
.config("spark.sql.catalog.local.metrics-reporter-impl", "org.apache.spark.dataflint.iceberg.DataflintIcebergMetricsReporter")
.config("spark.dataflint.iceberg.autoCatalogDiscovery", true)
.config("spark.eventLog.enabled", "true")
.master("local[*]")
.getOrCreate()

spark.sparkContext.setJobDescription("Drop table if exists")
spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis PURGE")
spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis_unpartitoned PURGE")
spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis_unpartitioned PURGE")
spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis_small_files PURGE")
spark.sql("DROP TABLE IF EXISTS demo.nyc.taxis_read_on_merge PURGE")

spark.sparkContext.setJobDescription("Create taxis table")
spark.sql(
Expand Down Expand Up @@ -110,7 +113,7 @@ object IcebergExample extends App{
spark.sparkContext.setJobDescription("Create taxis unpartitioned table")
spark.sql(
"""
|CREATE TABLE demo.nyc.taxis_unpartitoned
|CREATE TABLE demo.nyc.taxis_unpartitioned
|(
| vendor_id bigint,
| trip_id bigint,
Expand All @@ -124,7 +127,7 @@ object IcebergExample extends App{
|""".stripMargin)

spark.sparkContext.setJobDescription("Set table sorting order")
spark.sql("ALTER TABLE demo.nyc.taxis_unpartitoned WRITE ORDERED BY vendor_id, trip_id")
spark.sql("ALTER TABLE demo.nyc.taxis_unpartitioned WRITE ORDERED BY vendor_id, trip_id")
spark.sparkContext.setJobDescription("Insert 100 records to taxis unpartitioned table")
spark.sql(
"""
Expand All @@ -138,20 +141,20 @@ object IcebergExample extends App{
| SELECT id FROM range(1, 101)
|) t
|""".stripMargin)
.writeTo("demo.nyc.taxis_unpartitoned")
.writeTo("demo.nyc.taxis_unpartitioned")
.append()

spark.sparkContext.setJobDescription("Delete record from unpartitioned table")
spark.sql(
"""
|DELETE FROM demo.nyc.taxis_unpartitoned
|DELETE FROM demo.nyc.taxis_unpartitioned
|WHERE trip_id = 1000371
|""".stripMargin)

spark.sparkContext.setJobDescription("Merge 20% of records to table")
spark.sql(
"""
MERGE INTO demo.nyc.taxis_unpartitoned t USING(
MERGE INTO demo.nyc.taxis_unpartitioned t USING(
SELECT
id as vendor_id,
1000370 + id as trip_id,
Expand All @@ -167,5 +170,90 @@ object IcebergExample extends App{
WHEN NOT MATCHED THEN INSERT *;
""")

spark.sparkContext.setJobDescription("Create taxis small files table")
spark.sql(
"""
|CREATE TABLE demo.nyc.taxis_small_files
|(
| vendor_id bigint,
| trip_id bigint,
| trip_distance float,
| fare_amount double,
| store_and_fwd_flag string
|)
|""".stripMargin)

spark.sparkContext.setJobDescription("Insert small files to table")
spark.sql(
"""
|SELECT
| id as vendor_id,
| 1000370 + id as trip_id,
| 1.5 + (id % 100) * 0.1 as trip_distance,
| 15.0 + (id % 100) * 0.7 as fare_amount,
| 'N' as store_and_fwd_flag
|FROM (
| SELECT id FROM range(1, 201)
|) t
|""".stripMargin)
.repartition(200)
.writeTo("demo.nyc.taxis_small_files")
.append()

spark.sparkContext.setJobDescription("Select from table with small files")
spark.sql("SELECT * FROM demo.nyc.taxis_small_files").show(200)

spark.sparkContext.setJobDescription("Create taxis read on merge")
spark.sql(
"""
|CREATE TABLE demo.nyc.taxis_read_on_merge
|(
| vendor_id bigint,
| trip_id bigint,
| trip_distance float,
| fare_amount double,
| store_and_fwd_flag string
|)
|TBLPROPERTIES (
|'write.delete.mode'='merge-on-read',
|'write.update.mode'='merge-on-read',
|'write.merge.mode'='merge-on-read',
|'write.distribution.mode'='range'
|)
|""".stripMargin)

spark.sql("ALTER TABLE demo.nyc.taxis_read_on_merge WRITE ORDERED BY vendor_id, trip_id")

spark.sparkContext.setJobDescription("Insert 2 records to taxis_read_on_merge")
spark.sql(
"""
|INSERT INTO demo.nyc.taxis_read_on_merge
|VALUES (1, 1000371, 1.8, 15.32, 'N'), (1, 1000372, 2.5, 22.15, 'N');
|""".stripMargin)

spark.sparkContext.setJobDescription("Delete record from taxis_read_on_merge")
spark.sql(
"""
|DELETE FROM demo.nyc.taxis_read_on_merge
|WHERE trip_id = 1000371
|""".stripMargin)

spark.sparkContext.setJobDescription("Update record in taxis_read_on_merge")
spark.sql(
"""
|UPDATE demo.nyc.taxis_read_on_merge
|SET fare_amount = 5.0
|WHERE trip_id = 1000372
|""".stripMargin)

spark.sparkContext.setJobDescription("Merge all records in taxis_read_on_merge")
spark.sql(
"""
MERGE INTO demo.nyc.taxis_read_on_merge t USING(SELECT * FROM demo.nyc.taxis_read_on_merge) u ON t.trip_id = u.trip_id
WHEN MATCHED THEN UPDATE SET
t.fare_amount = t.fare_amount + u.fare_amount
WHEN NOT MATCHED THEN INSERT *;
""")

scala.io.StdIn.readLine()
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package org.apache.spark.dataflint

import org.apache.spark.SparkContext
import org.apache.spark.dataflint.api.{DataFlintTab, DataflintApplicationInfoPage, DataflintIcebergPage, DataflintJettyUtils, DataflintSQLMetricsPage, DataflintSQLPlanPage, DataflintSQLStagesRddPage}
import org.apache.spark.dataflint.iceberg.ClassLoaderChecker
import org.apache.spark.dataflint.iceberg.ClassLoaderChecker.isMetricLoaderInRightClassLoader
import org.apache.spark.dataflint.listener.{DataflintListener, DataflintStore}
import org.apache.spark.dataflint.saas.DataflintRunExporterListener
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SQLAppStatusListener
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.ui.SparkUI
Expand All @@ -26,10 +29,33 @@ class DataflintSparkUIInstaller extends Logging {
}
}

// DataflintListener currently only relevant for iceberg support, so no need to add the listener if iceberg support is off
val icebergInstalled = context.conf.get("spark.sql.extensions", "").contains("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
val icebergEnabled = context.conf.getBoolean("spark.dataflint.iceberg.enabled", defaultValue = true)
val icebergAuthCatalogDiscovery = context.conf.getBoolean("spark.dataflint.iceberg.autoCatalogDiscovery", defaultValue = false)
if(icebergInstalled && icebergEnabled) {
if(icebergAuthCatalogDiscovery && isMetricLoaderInRightClassLoader()) {
context.conf.getAll.filter(_._1.startsWith("spark.sql.catalog")).filter(keyValue => keyValue._2 == "org.apache.iceberg.spark.SparkCatalog" || keyValue._2 == "org.apache.iceberg.spark.SparkSessionCatalog").foreach(keyValue => {
val configName = s"${keyValue._1}.metrics-reporter-impl"
context.conf.getOption(configName) match {
case Some(currentConfig) => {
if(currentConfig == "org.apache.spark.dataflint.iceberg.DataflintIcebergMetricsReporter") {
logInfo(s"Metric reporter already exist in config: ${configName}, no need to set it with dataflint iceberg auto discovery")
} else {
logWarning(s"Different metric reporter already exist in config: ${configName}, cannot set metric reporter to DataflintIcebergMetricsReporter")
}
}
case None => {
if(icebergAuthCatalogDiscovery) {
context.conf.set(configName, "org.apache.spark.dataflint.iceberg.DataflintIcebergMetricsReporter")
logInfo(s"set ${configName} reporter to DataflintIcebergMetricsReporter")
} else {
logWarning(s"DataflintIcebergMetricsReporter is missing for iceberg catalog ${configName}, for dataflint iceberg observability set spark.dataflint.iceberg.autoCatalogDiscovery to true or set the metric reporter manually to org.apache.spark.dataflint.iceberg.DataflintIcebergMetricsReporter")
}
}
}
})
}
// DataflintListener currently only relevant for iceberg support, so no need to add the listener if iceberg support is off
context.listenerBus.addToQueue(new DataflintListener(context.statusStore.store.asInstanceOf[ElementTrackingStore]), "dataflint")
}
loadUI(context.ui.get, sqlListener)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.spark.dataflint.iceberg

import org.apache.iceberg.CatalogUtil
import org.apache.spark.internal.Logging

object ClassLoaderChecker extends Logging {
def isMetricLoaderInRightClassLoader(): Boolean = {
val metricReporterClass = classOf[DataflintIcebergMetricsReporter]
val classLoaderMetricReporter = metricReporterClass.getClassLoader.toString
val classLoaderIcebergCatalog = classOf[CatalogUtil].getClassLoader.toString
try {
Class.forName(metricReporterClass.getCanonicalName, false, classOf[CatalogUtil].getClassLoader)
} catch {
case _: ClassNotFoundException =>
logWarning(s"Cannot load DataflintIcebergMetricsReporter from iceberg classloader, which prevents dataflint iceberg observability support. iceberg classloader: ${classLoaderIcebergCatalog} metric reporter classloader: ${classLoaderMetricReporter}")
return false
case error: Throwable =>
logError(s"Unexpected error while trying to load, can use DataflintIcebergMetricsReporter. iceberg classloader: ${classLoaderIcebergCatalog} metric reporter classloader: ${classLoaderMetricReporter}", error)
return false
}
true
}
}
23 changes: 21 additions & 2 deletions spark-ui/src/components/SqlFlow/StageNode.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ function handleAddedRemovedMetrics(name: string, added: number, removed: number,
const previousSnapshotTotal = total - added + removed;
const currentSnapshot = total;

if (added !== 0 && removed === 0) {
if (added === 0 && removed === 0) {
return []
}
else if (added !== 0 && removed === 0) {
const addedPercentage = calculatePercentage(added, currentSnapshot).toFixed(1);
return [{
name: `Added ${name}`,
Expand Down Expand Up @@ -154,7 +157,6 @@ function handleAddedRemovedMetrics(name: string, added: number, removed: number,
}
];
}
return []
}

export const StageNode: FC<{
Expand All @@ -180,6 +182,19 @@ export const StageNode: FC<{
"Table Name",
commit.tableName,
);
let modeName: string | undefined = undefined
if (data.node.nodeName !== "ReplaceData") {
modeName = "copy on write"
}
if (data.node.nodeName === "WriteDelta") {
modeName = "merge on read"
}
if (modeName !== undefined) {
dataTable.push({
name: "Mode",
value: modeName,
});
}

dataTable.push({
name: "Commit id",
Expand All @@ -193,7 +208,11 @@ export const StageNode: FC<{

dataTable.push(...handleAddedRemovedMetrics("Records", metrics.addedRecords, metrics.removedRecords, metrics.totalRecords, (x) => x.toString()));
dataTable.push(...handleAddedRemovedMetrics("Files", metrics.addedDataFiles, metrics.removedDataFiles, metrics.totalDataFiles, (x) => x.toString()));
dataTable.push(...handleAddedRemovedMetrics("Delete Files", metrics.addedPositionalDeletes, metrics.removedPositionalDeletes, metrics.totalPositionalDeletes, (x) => x.toString()));
dataTable.push(...handleAddedRemovedMetrics("Bytes", metrics.addedFilesSizeInBytes, metrics.removedFilesSizeInBytes, metrics.totalFilesSizeInBytes, humanFileSize));
dataTable.push(...handleAddedRemovedMetrics("Positional Deletes", metrics.addedPositionalDeletes, metrics.removedPositionalDeletes, metrics.totalPositionalDeletes, (x) => x.toString()));
dataTable.push(...handleAddedRemovedMetrics("Positional Deletes", metrics.addedEqualityDeletes, metrics.removedEqualityDeletes, metrics.totalEqualityDeletes, (x) => x.toString()));

}
if (data.node.parsedPlan !== undefined) {
const parsedPlan = data.node.parsedPlan;
Expand Down
9 changes: 4 additions & 5 deletions spark-ui/src/reducers/Alerts/IcebergReplacesReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ export function reduceIcebergReplaces(sql: SparkSQLStore, alerts: Alerts) {
location: `In: SQL query "${sql.description}" (id: ${sql.id}) and node "${node.nodeName}"`,
message: `${tableChangedPercentage.toFixed(1)}% of table ${node.icebergCommit.tableName} files were replaced, while only ${recordsChangedPercentage.toFixed(1)}% of records were changed`,
suggestion: `
1. Switch to merge-on-read mode to avoid the need to write the entire table data
2. Partition the table in such a way that use of update/merge/delete operation to update only the required partitions
1. Switch write mode merge-on-read mode, so instead of re-writing the entire file, only the changed records will be written
2. Partition the table in such a way that usage of update/merge/delete operation to update only the required partitions
`,
type: "warning",
source: {
Expand All @@ -44,10 +44,9 @@ export function reduceIcebergReplaces(sql: SparkSQLStore, alerts: Alerts) {
name: "replacedMostOfIcebergTable",
title: "Replaced Most Of Iceberg Table",
location: `In: SQL query "${sql.description}" (id: ${sql.id}) and node "${node.nodeName}"`,
message: `${tableChangedPercentage.toFixed(1)}% of table ${node.icebergCommit.tableName} files were replaced, which is a mis-use of iceberg update/merge/delete operations `,
message: `${tableChangedPercentage.toFixed(1)}% of table ${node.icebergCommit.tableName} files were replaced, which is potential mis-use of iceberg`,
suggestion: `
1. Partition the table in such a way that use of update/merge/delete operation to update only the required partitions
2. Switch to merge-on-read mode to avoid the need to write the entire table data
1. Partition the table in such a way that usage of update/merge/delete operation to update as little files as possible
`,
type: "warning",
source: {
Expand Down

0 comments on commit 84b0444

Please sign in to comment.