In [None]:
%pip install install git+https://github.com/henrikbulldog/xdbutils.git

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting git+https://github.com/henrikbulldog/xdbutils.git
  Cloning https://github.com/henrikbulldog/xdbutils.git to /tmp/pip-req-build-5s_16fm8
  Running command git clone --filter=blob:none --quiet https://github.com/henrikbulldog/xdbutils.git /tmp/pip-req-build-5s_16fm8
  Resolved https://github.com/henrikbulldog/xdbutils.git to commit 5e4d826adaf34ee857ba9250372af39724dad78d
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting install
  Downloading install-1.3.5-py3-none-any.whl (3.2 kB)
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 310.8/310.8 MB 1.4 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200

In [None]:
from xdbutils import XDBUtils

xdbutils = XDBUtils(spark, dbutils)

pipeline = xdbutils.create_dlt_batch_pipeline(
  source_system="eds",
  entity="co2emis",
  catalog="testing_dlt",
  tags={
    "data_owner": "Henrik Thomsen",
    "cost_center": "123456",
    "documentation": "https://github.com/henrikbulldog/xdbutils"
  },
  databricks_token=dbutils.secrets().get(scope="<scope>", key="<secret>")
)

In [None]:
pipeline.raw_to_bronze(
  raw_base_path="dbfs:/FileStore/datalakehouse/raw",
  raw_format="json",
  expectations={
    "Valid dataset": "dataset = 'CO2Emis'",
    "Valid data": "records IS NOT NULL AND array_size(records) > 1",
    }
)

Name,Type
dataset,string
records,"array<struct<CO2Emission:double,Minutes5DK:string,Minutes5UTC:string,PriceArea:string>>"
total,bigint
_rescued_data,string
_ingest_time,timestamp
_quarantined,boolean


In [None]:
from pyspark.sql.functions import explode, col, lit

pipeline.bronze_to_silver(
  parse=lambda df: (
    df
    .withColumn("record", explode("records"))
    .select(
      col("record.CO2Emission").alias("value"),
      col("record.Minutes5UTC").cast("timestamp").alias("timestamp"),
      col("record.PriceArea").alias("price_area"),
      )
    ),
  expectations={
    "valid_timestamp": "timestamp IS NOT NULL",
    "valid_value": "value IS NOT NULL"
    }
  )

Name,Type
value,double
timestamp,timestamp
price_area,string


In [None]:
pipeline.silver_to_gold(
  name="top_10",
  parse=lambda df: (
    df
      .where(col("price_area") == lit("DK1"))
      .orderBy(col("value").desc())
      .limit(10)
      .select("value", "timestamp")
  )
)

Name,Type
value,double
timestamp,timestamp
