# Simplify ETL with Delta Live Table

DLT makes Data Engineering accessible for all. Just declare your transformations in SQL or Python, and DLT will handle the Data Engineering complexity for you.

<img style="float:right" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/dlt-golden-demo-loan-1.png" width="700"/>

**Accelerate ETL development** <br/>
Enable analysts and data engineers to innovate rapidly with simple pipeline development and maintenance 

**Remove operational complexity** <br/>
By automating complex administrative tasks and gaining broader visibility into pipeline operations

**Trust your data** <br/>
With built-in quality controls and quality monitoring to ensure accurate and useful BI, Data Science, and ML 

**Simplify batch and streaming** <br/>
With self-optimization and auto-scaling data pipelines for batch or streaming processing 

## Our Delta Live Table pipeline

We will be again using AXPO data but this time you can choose which dataset you want to investigate


The CSV data that we utilize here is already landed on a volume and is ready to be ingested from there. In this simple example you will just find one CSV file on that volume (_/Volumes/dbw-databricks-dna-hackathon-databricks-stream/alexander_genser/data_volume/data/_). 


Feel free to choose on of the 4 CSV files that are available for a try-out:
- dataset_1.csv -> Carbon emission trading specifically European Union Allowances (EUA)
- dataset_2.csv -> Coal pricing for API 2 
- dataset_3.csv -> French electricity market, specifically focusing on power futures contracts
- dataset_4.csv -> -	Foreign exchange (FX) rate information EUR-USD

The implementation structure below is ready for dataset_1, but feel free to change it. The SQL query in the next cell is to investigate the data you want to transform with your pipeline. You can just query any of the four datasets by changing the table name. 

In [0]:
%sql
SELECT * FROM dbw_databricks_dna_hackathon.team3_databricks.dataset_1

TimeSeries_FID,QuoteDateIndex_FID,QuoteTime,PublicationDateIndex_FID,PublicationDate,DeliveryGridPointName,DeliveryStartDate,DeliveryEndDate,DeliveryRelativeBucketName,DeliveryRelativeBucketNumber,settlement price,traded volume,open interest,Block Volume,Spread Volume,Point_ID,AdjustedDeliveryStartDate,AdjustedDeliveryEndDate,AdjustedDeliveryRelativeBucketName,AdjustedDeliveryGridPointName,AdjustedRelativeBucketNumber
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M03-2009[ICE],2009-03-31T17:00:00.000Z,2009-04-02T17:30:00.000Z,M,2,10.89,,,,,203826151089,2009-03-31T17:00:00.000Z,2009-04-02T17:30:00.000Z,M,M03-2009[ICE],2
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M06-2009[ICE],2009-06-30T17:00:00.000Z,2009-07-02T17:30:00.000Z,M,5,12.99,,,,,203826175112,2009-06-30T17:00:00.000Z,2009-07-02T17:30:00.000Z,M,M06-2009[ICE],5
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M09-2009[ICE],2009-09-29T17:00:00.000Z,2009-10-01T17:30:00.000Z,M,8,13.05,,,,,203826148864,2009-09-29T17:00:00.000Z,2009-10-01T17:30:00.000Z,M,M09-2009[ICE],8
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M12-2009[ICE],2009-12-15T18:00:00.000Z,2009-12-17T18:30:00.000Z,M,11,13.19,,,,,203826173197,2009-12-15T18:00:00.000Z,2009-12-17T18:30:00.000Z,M,M12-2009[ICE],11
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M03-2010[ICE],2010-03-23T18:00:00.000Z,2010-03-25T18:30:00.000Z,M,14,13.8,,,,,203826160700,2010-03-23T18:00:00.000Z,2010-03-25T18:30:00.000Z,M,M03-2010[ICE],14
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M06-2010[ICE],2010-06-29T17:00:00.000Z,2010-07-01T17:30:00.000Z,M,17,13.8,,,,,203826175228,2010-06-29T17:00:00.000Z,2010-07-01T17:30:00.000Z,M,M06-2010[ICE],17
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M09-2010[ICE],2010-09-28T17:00:00.000Z,2010-09-30T17:30:00.000Z,M,20,13.8,,,,,203826163109,2010-09-28T17:00:00.000Z,2010-09-30T17:30:00.000Z,M,M09-2010[ICE],20
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M12-2010[ICE],2010-12-21T18:00:00.000Z,2010-12-23T18:30:00.000Z,M,23,13.8,,,,,203826152587,2010-12-21T18:00:00.000Z,2010-12-23T18:30:00.000Z,M,M12-2010[ICE],23
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M03-2011[ICE],2011-03-29T17:00:00.000Z,2011-03-31T17:30:00.000Z,M,26,14.3,,,,,203826167215,2011-03-29T17:00:00.000Z,2011-03-31T17:30:00.000Z,M,M03-2011[ICE],26
88021503,20090101,2008-12-31T23:00:00.000Z,20151019,2015-10-19T15:12:39.320Z,M06-2011[ICE],2011-06-28T17:00:00.000Z,2011-06-30T17:30:00.000Z,M,29,14.3,,,,,203826146473,2011-06-28T17:00:00.000Z,2011-06-30T17:30:00.000Z,M,M06-2011[ICE],29


In [0]:
# importing libraries
import dlt
import pyspark.sql.functions as F
from pyspark.sql.functions import col, weekofyear, year, expr, round, avg, to_date, concat, lit, sum, window, date_format
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

# path to volume where data is landed (you can find the volume in Unity catalog)
source = '/Volumes/dbw_databricks_dna_hackathon_databricks_stream/alexander_genser/data_volume/data/'


# What is Databricks Auto Loader?

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/product_demos/autoloader/autoloader-edited-anim.gif" style="float:right; margin-left: 10px" />

[Databricks Auto Loader](https://docs.databricks.com/ingestion/auto-loader/index.html) lets you scan a cloud storage folder (S3, ADLS, GS) and only ingest the new data that arrived since the previous run.

This is called **incremental ingestion**.

Auto Loader can be used in a near real-time stream or in a batch fashion, e.g., running every night to ingest daily data.

Auto Loader provides a strong gaurantee when used with a Delta sink (the data will only be ingested once).

## How Auto Loader simplifies data ingestion

Ingesting data at scale from cloud storage can be really hard at scale. Auto Loader makes it easy, offering these benefits:


* **Incremental** & **cost-efficient** ingestion (removes unnecessary listing or state handling)
* **Simple** and **resilient** operation: no tuning or manual code required
* Scalable to **billions of files**
  * Using incremental listing (recommended, relies on filename order)
  * Leveraging notification + message queue (when incremental listing can't be used)
* **Schema inference** and **schema evolution** are handled out of the box for most formats (csv, json, avro, images...)

<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-engineering&org_id=1444828305810485&notebook=%2F01-Auto-loader-schema-evolution-Ingestion&demo_name=auto-loader&event=VIEW&path=%2F_dbdemos%2Fdata-engineering%2Fauto-loader%2F01-Auto-loader-schema-evolution-Ingestion&version=1">

<img style="float: center; padding-left: 10px" src="https://github.com/genseral/axpo_dna_summit_2024/blob/main/figures/Power_data_simple_DLT.drawio.png?raw=true" width="600"/>

In [0]:
@dlt.table(
    name = f"bronze_simple_carbon",
    comment = "",
)
@dlt.expect_or_drop()
def bronze_simple_carbon():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(f"{source}dataset_1")
        .select("*")
    )


## Silver layer: joining tables while ensuring data quality

<img style="float: right; padding-left: 10px" src="https://github.com/genseral/axpo_dna_summit_2024/blob/main/figures/Power_data_simple_DLT_silver.drawio.png?raw=true" width="600"/>

Once the bronze layer is defined, we'll create the sliver layers by Joining data. Note that bronze tables are referenced using the `LIVE` spacename. 

To consume only increment from the Bronze layer like `raw_txs`, we'll be using the `stream` keyworkd: `stream(LIVE.raw_txs)`

Note that we don't have to worry about compactions, DLT handles that for us.

#### Expectations
By defining expectations (`CONSTRAINT <name> EXPECT <condition>`), you can enforce and track your data quality. See the [documentation](https://docs.databricks.com/data-engineering/delta-live-tables/delta-live-tables-expectations.html) for more details

In [0]:
@dlt.table(
    name = f"silver_simple_carbon",
    comment=""
)
@dlt.expect_or_drop()
def silver_simple_carbon():
    raw_power_fra =  dlt.read("bronze_simple_power")

    return (
        raw_power_fra.select(
                '*'
            )
    )


## Gold layer

<img style="float: right; padding-left: 10px" src="https://github.com/genseral/axpo_dna_summit_2024/blob/main/figures/Power_data_simple_DLT_gold.drawio.png?raw=true" width="600"/>

Our last step is to materialize the Gold Layer.

Because these tables will be requested at scale using a SQL Endpoint, we'll add Zorder at the table level to ensure faster queries using `pipelines.autoOptimize.zOrderCols`, and DLT will handle the rest.

In [0]:
@dlt.table(
    name = f"gold_simple_carbon",
    comment="Aggregated Power data from france"
)
def gold_simple_carbon():
    return (
        dlt.read("silver_simple_carbon")
            .select(
                '*'
            )
    )