# Lakeflow Pipeline

This Declarative Lakeflow Pipeline uses Auto Loader to incrementally ingest Databricks cluster logs into tables for log analysis. This notebook is executed using a pipeline defined in resources/watchtower.pipeline.yml.

In [0]:
import dlt
from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()
raw_log_location = spark.conf.get("raw_log_location").rstrip("/")

In [0]:
@dlt.table(
    name="bronze.logs",
    table_properties={
        "Quality": "bronze",
    },
)
@dlt.expect("valid_file_size", "file_size > 0")
@dlt.expect("non_empty_message", "msg IS NOT NULL")
def bronze_logs():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "text")
            .load(f"{raw_log_location}/*/driver/")
            .selectExpr(
                "value as msg",
                "_metadata.file_path",
                "_metadata.file_name",
                "_metadata.file_size",
                "_metadata.file_modification_time"
            )
    )

In [0]:
@dlt.table(
    name="silver.logs",
    table_properties={
        "Quality": "silver",
    },
    cluster_by=["cluster_id", "log_ts"]
)
@dlt.expect("non_empty_cluster_id", "cluster_id IS NOT NULL AND cluster_id != ''")
@dlt.expect("non_empty_log_source", "log_source IS NOT NULL")
@dlt.expect("parsed_log4j_log_level", "log_source != 'log4j' OR log_level != ''")
@dlt.expect("non_empty_log4j_ts", "log_source != 'log4j' OR log_ts IS NOT NULL")
def silver_logs():
    return (
        dlt.read_stream("bronze.logs")
            .withColumn("parsed_msg", F.try_parse_json("msg"))
            .selectExpr(
            # Parsing cluster_id and log_source which are part of the structured file path.
            "regexp_extract(file_path, '/([0-9]{4}-[0-9]{6}-[0-9a-z]+)/', 1) as cluster_id",
            "regexp_extract(file_name, '^(stdout|stderr|log4j).*$', 1) as log_source",

            # When msg was parsed as structured JSON, it becomes easy and efficient to parse using VARIANT,
            # otherwise, regex can be used, which is inefficient and more prone to mistakes.
            """CASE
                WHEN parsed_msg IS NOT NULL THEN try_variant_get(parsed_msg, '$.level', 'string')
                ELSE regexp_extract(msg, '\\\\b(TRACE|DEBUG|INFO|WARN|ERROR|FATAL|CRITICAL)\\\\b', 1)
            END as log_level""",

            """CASE
                WHEN parsed_msg IS NOT NULL THEN ifnull(
                    try_variant_get(parsed_msg, '$.timestamp', 'timestamp'),
                    try_variant_get(parsed_msg, '$.@timestamp', 'timestamp')
                )
                ELSE try_to_timestamp(
                replace(
                    regexp_extract(msg, '(\\\\d{2}[\\\\/\-]\\\\d{2}[\\\\/\\\\-]\\\\d{2}\\\\s\\\\d{2}:\\\\d{2}:\\\\d{2})', 1),
                    '/',
                    '-'
                ),
                'yy-MM-dd HH:mm:ss'
                ) END as log_ts""",

            """CASE
                WHEN parsed_msg IS NOT NULL THEN try_variant_get(parsed_msg, '$.message', 'string')
                ELSE msg
            END as log_msg""",

            "file_path",
            "file_name",
            "file_size",
            "file_modification_time",
        )
    )

In [0]:
@dlt.table(
    name="silver.job_run_compute",
    table_properties={
        "Quality": "silver",
    },
    cluster_by=["compute_id", "job_id"],
    comment="Reference table of compute IDs used in each job run. Compute ID may be a cluster ID or warehouse ID."
)
@dlt.expect("non_empty_compute_id", "compute_id IS NOT NULL AND compute_id != ''")
@dlt.expect("non_empty_job_id", "job_id IS NOT NULL")
@dlt.expect("non_empty_workspace_id", "workspace_id IS NOT NULL")
@dlt.expect("non_empty_job_run_id", "job_run_id IS NOT NULL")
def job_run_compute():
    return (spark.sql(
      """
      SELECT DISTINCT workspace_id, job_id, job_run_id, explode(compute_ids) as compute_id
      FROM system.lakeflow.job_task_run_timeline
      WHERE
        period_start_time >= NOW() - INTERVAL 7 DAYS
    """))