# Using Databricks Pipelines for Data Workflows

## 1. Introduction to Databricks Pipelines

**Databricks Pipelines** provide an easy-to-use, scalable way to create ETL workflows using Delta Live Tables. These pipelines automate data ingestion, transformation, and storage with a structured approach to manage workflows efficiently.

### Key Benefits:
- **Automated Workflow Management**: Reduces manual ETL tasks by automating data transformations and quality checks.
- **Data Quality Assurance**: Built-in tools enforce data quality with expectations and automatic schema evolution.
- **Reliability with ACID Transactions**: Delta Lake offers ACID transactions for reliable data processing.
- **Real-Time Data Processing**: DLT supports both batch and real-time data workflows.

---

## 2. Setting Up a Delta Live Tables Pipeline

To use Databricks Pipelines, set up a Delta Live Tables pipeline within your Databricks environment.

### Step-by-Step Setup

1. **Navigate to Delta Live Tables**:
   - In your Databricks workspace, go to the **Workflows** tab, then select **Delta Live Tables**.

2. **Create a New Pipeline**:
   - Click on **Create Pipeline**.
   - Name the pipeline (e.g., `RetailDataWorkflow`).
   - **Mode**: Choose **Triggered** for batch processing or **Continuous** for streaming (real-time) processing.

3. **Define Source and Target Locations**:
   - Specify the path to your source data (e.g., an S3 or ADLS path).
   - Define a storage location for the DLT pipeline, such as `dbfs:/pipelines/retail_data_workflow/`.

4. **Configure Advanced Settings** (Optional):
   - Enable **Auto Schema Evolution** to adapt to changes in the source schema.
   - Configure **Data Quality Expectations** to enforce rules on specific columns.

### Example Pipeline Configuration

```plaintext
Pipeline Name: RetailDataWorkflow
Pipeline Mode: Continuous
Notebook Library: /Workspace/DeltaLiveTables/RetailDataPipeline
Storage Location: dbfs:/pipelines/retail_data_workflow/
```

---

## 3. Building a Data Workflow with Delta Live Tables

The core of a Databricks Pipeline is the Delta Live Tables notebook, where tables and views are defined with data transformations and quality checks.

### Step 1: Data Ingestion

Define the ingestion layer with Delta Live Tables to load data from the source into a raw table.

In [0]:
import dlt
from pyspark.sql.functions import col

# Define a schema for raw retail data
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("invoice_no", StringType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("unit_price", IntegerType(), True),
    StructField("invoice_date", TimestampType(), True),
    StructField("country", StringType(), True)
])

@dlt.table(
    name="raw_retail_data",
    comment="Ingest raw retail data from S3"
)
def raw_retail_data():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(schema)
        .load("/mnt/s3dataread/retail-data/by-day/test-auto-loaded/")
    )

### Step 2: Data Transformation

Create transformation layers to clean and standardize the raw data.

In [0]:
@dlt.table(
    name="cleaned_retail_data",
    comment="Cleaned and standardized retail data"
)
def cleaned_retail_data():
    return (
        dlt.read("raw_retail_data")
        .filter(col("quantity").isNotNull() & col("unit_price").isNotNull())
        .withColumn("total_value", col("quantity") * col("unit_price"))
    )

### Step 3: Data Aggregation and Enrichment

Aggregate data to create a summary table for analytical queries, such as total sales per customer.

In [0]:
@dlt.table(
    name="customer_sales_summary",
    comment="Aggregated sales data by customer"
)
def customer_sales_summary():
    return (
        dlt.read("cleaned_retail_data")
        .groupBy("customer_id")
        .agg({"total_value": "sum"})
        .withColumnRenamed("sum(total_value)", "total_sales")
    )

### Step 4: Data Quality Checks

Use **expectations** to enforce data quality rules. If data fails these expectations, it can be logged or discarded.

In [0]:
@dlt.table(
    name="validated_retail_data",
    comment="Data with quality checks on quantity and unit price"
)
@dlt.expect("positive_quantity", "quantity > 0")
@dlt.expect("valid_price", "unit_price > 0")
def validated_retail_data():
    return dlt.read("cleaned_retail_data")

In this example:
- `@dlt.expect`: Specifies data quality expectations. If rows violate these rules, they are either logged or dropped based on configuration.

---

## 4. Scheduling and Automation

To automate this pipeline, configure it to run on a defined schedule.

### Step-by-Step Scheduling Setup

1. **Go to the Pipeline Settings**: In the Delta Live Tables UI, open the pipeline you created.
2. **Select Schedule**:
   - Configure the pipeline to run at intervals, such as daily or hourly, depending on your data update frequency.
3. **Choose Mode**:
   - **Triggered Mode**: Executes the pipeline based on a schedule or manually.
   - **Continuous Mode**: Continuously processes incoming data in real-time.

With a configured schedule, Databricks will automatically trigger the pipeline based on the set interval.

---


---

## 7. Summary

This guide covered **Using Databricks Pipelines for Data Workflows** to create, manage, and optimize ETL processes:
1. **Setting Up a Delta Live Tables Pipeline**: Configured a new pipeline and defined ingestion settings.
2. **Building the Data Workflow**: Ingested data from source, transformed it, performed aggregation, and applied quality checks.
3. **Scheduling and Automation**: Configured pipeline