Lighthouse is a library developed by DataBeans to optimize Lakehouse performance and reduce its total cost ownership. It is designed to monitor the health of the Lakehouse tables from a data layout perspective and provide valuable insights about how well data is clustered. This information helps users identify when data maintenance operations (vacuum, compaction, clustering …) should be performed, which engenders improvements in query performance and reduction in storage costs.
The Lighthouse library can assist in addressing the following questions:
- How well is my data clustered?
- Does my data layout favor skipping based on statistics?
- Is it advisable to Z-order before running a query on a certain column?
- Is my data suffering from the many small files problem?
- How frequently should I re-cluster my data to maintain its optimal clustering state?
Lighthouse is compiled using SBT.
To compile, run
sbt compile
To generate artifacts, run
sbt package
To execute tests, run
sbt test
- Apache Spark 3.3.2
- Delta 2.3.0
- Open the terminal and run the following command:
spark-shell
--packages io.delta:delta-core_2.12:2.3.0,io.github.Databeans:lighthouse_2.12:0.1.0
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
- Import the DeltaClusteringMetrics class :
import fr.databeans.lighthouse.metrics.delta.DeltaClusteringMetrics
- Compute clustering metrics for a given column of the Delta table:
val clusteringMetrics = DeltaClusteringMetrics.forPath("path/to/your/deltaTable", spark).computeForColumn("col_name")
- Display the computed clustering metrics using the show() method:
clusteringMetrics.show()
Submit the application to a Spark cluster:
spark-submit \
--class com.example.MyApp \
--master <master-url> \
--packages io.delta:delta-core_2.12:2.3.0,io.github.Databeans:lighthouse_2.12:0.1.0 \
</path/to/your/spark/application.jar>
This command specifies the following options:
- --class: Name of the main class of your application.
- --master: URL of the Spark cluster to use.
- --packages: Maven coordinates of the Delta Lake library to use, Maven coordinates of the lighthouse library to use.
- The path to your application's JAR file.
Example:
spark-submit
--class Quickstart
--master local[*]
--packages io.delta:delta-core_2.12:2.3.0,io.github.Databeans:lighthouse_2.12:0.1.0
target/scala-2.12/clustering-metrics-example_2.12-0.1.jar
- Install our Maven library to your cluster:
Go to compute
> cluster
> Libraries
> Install New
> Set Source
= Maven | coordinates
= io.github.Databeans:lighthouse_2.12:0.1.0
(Or Add the Lighthouse_2.12-0.1.0.jar to your cluster)
-
Download this notebook and import it to your workspace.
-
Create a new cell in your notebook and insert
%run <path/to/DeltaClusteringMetrics>
.PS: Replace <path/to/your/DeltaClusteringMetrics> with the actual path to the DeltaClusteringMetrics notebook.
-
Run the cell.
With these steps completed, you'll be able to use the DeltaClusteringMetrics library.
-
forName(deltaTable: String, spark: SparkSession): DeltaClusteringMetrics
- deltaTable: Name of the Delta table
- spark: SparkSession instance
-
forPath(deltaPath: String, spark: SparkSession): DeltaClusteringMetrics
- deltaPath: Path of the Delta table
- spark: SparkSession instance
-
computeForColumn(column: String): DataFrame
- column: column name to compute metrics for
-
computeForColumns(columns: String*): DataFrame
- columns: columns list to compute metrics for
-
computeForAllColumns(): DataFrame
Assuming that you have a delta table
import DeltaClusteringMetrics
import fr.databeans.lighthouse.metrics.delta.DeltaClusteringMetrics
compute clustering information for a given column.
val clusteringMetric = DeltaClusteringMetrics
.forPath("path/to/deltaTable", spark)
.computeForColumn("id")
compute clustering information for multiple columns.
val clusteringMetrics = DeltaClusteringMetrics
.forName("DeltaTable",spark)
.computeForColumns("id","value")
compute clustering information for all columns of the table.
val clusteringMetrics = DeltaClusteringMetrics
.forName("DeltaTable",spark)
.computeForAllColumns()
The library will then compute the clustering metrics and generate a dataframe containing the next columns:
column | total_file_count | total_uniform_file_count | average_overlap | average_overlap_depth | file_depth_histogram |
---|---|---|---|---|---|
col_name | 5 | 5 | 3.0 | 4 .0 | {5.0 -> 0, 10.0 -... |
total_file_count
Total number of files composing the Delta table.
total_uniform_file_count
Files in which min and max values of a given ordering column are equal
average_overlap
Average number of overlapping files for each file in the delta table.
The higher the average_overlap, the worse the clustering.
average_overlap_depth
The average number of files that will be read when an overlap occurs.
The higher the average_overlap_depth, the worse the clustering.
File_depth_histogram
A histogram detailing the distribution of the overlap_depth on the table by grouping the tables’ files by their proportional overlap depth.
- 0 to 16 with increments of 1.
- For buckets larger than 16, increments of twice the width of the previous bucket (e.g. 32, 64, 128, …)
To gain a comprehensive understanding of the library in action, including:
- how to utilize the lighthouse library for metric extraction
- how to interpret the extracted metrics for performing maintenance operations on your data layout
We, Databeans, recommend reading the following blog post:
- Lighthouse cannot compute metrics for a column without statistics: Before computing clustering metrics, Lighthouse requires the statistics of the columns to be computed, so if statistics are not available, it will not be able to compute metrics for that column.
- clustering metrics cannot be computed for partitioning columns
- When handling a column with all null values,
the average_overlap
andaverage_overlap_depth
metrics will be assigned a value of -1, while thefile_depth_histogram
metric will be assigned a null value.
- Lighthouse currently supports the following data types: Int, Long, Decimal, and String.
- Lighthouse supports only Delta tables and may not work with other table formats.
Lighthouse supports:
- Scala 2.12.13
- Spark 3.3.2
- Delta 2.3.0
Lighthouse is an open-source project, and we welcome contributors from the community. If you have a new feature or improvement, feel free to submit a pull request.