# The Delta Lake

![image of DeltaLake](images/deltaLake.png)

# Data Object in Databricks

![image of DeltaLake](images/dataObjectInDataBricks.png)

## Set Up and Load Delta Tables

Quering Files

In [1]:
SELECT * FROM parquet.`${DA.paths.datasets}/ecommerce/raw/sales-historical` LIMIT 10;

SyntaxError: invalid syntax (731442162.py, line 1)

Create Table as Select (CTAS)

CREATE TABLE AS SELECT

In [None]:
CREATE OR REPLACE TEMP VIEW sales_unparsed AS   SELECT * FROM `csv.{da.paths.datasets}/ecommerce/raw/sales-csv`;

SELECT * FROM sales_unparsed LIMIT 10;

The read_files() Table-Valued 

options:
    1. format => "csv"
    2. => "|"
    3. header => true
    4. mode => "FAILFAST"

In [None]:
CREATE TABLE sales_bronze
    USING DELTA AS  
        SELECT * FROM read_files()"${da.paths.datasets}/ecommerce/raw/sales-csv",
            format => "csv",
            sep => "|",
            header => true,
            mode => "FAILFAST");

Catalogs, Schemas, and Tables on Databricks

DESCRIBE CATALOG `${DA.catalog_name}`;

DESCRIBE SCHEMA `${DA.schema_name}`;

LOAD 

In [None]:
COPY INTO users_bronze
    FROM '${DA.paths.datasets}/ecommerce/raw/users-30m'
    FILEFORMAT = parquet
    COPY_OPTIONS ('mergeSchema' = 'true');

Creating External Tables

CREATE TABLE sales_csv
(orer_id LONG, email STRING, transactions_timestamp LONG, total_item_quantity INTEGE,
purchase_revenue_in_usd DOUBLE, unique_items INTEGER, items STRING
)
USING CSV
OPTIONS (
   header = "true"
   delimiter = "|" 
)
LOCATION "${table location}"

Built-In Function

In [None]:
CREATE TABLE users_bronze;
    COPY INTO users_bronze FROM 
        (SELECT *, 
            cast(cast(user_first_touch_timestamp/1e6 AS TIMESTAMP) AS DATE) first_touch_date,
            current_timestamp() updated,
            input_file_name() source_file
        FROM '${da.paths.datasets}/ecommerce/raw/users-historical'
        )
        FILEFORMAT = PARQUET 
        COPY_OPTIONS ('mergeSchema' = 'true');

In [None]:
# Basic Transformation

## Cloning DeltaLakes Tables


In [None]:
CREATE OR REPLACE TABLE historical_sales_bronze
    USING DELTA AS
        SELECT * FROM parquet.`${DA.paths.datasets}/ecommerce/raw/sales-historical`;

DESCRIBE historical_sales_bronze;

In [None]:
CREATE OR REPLACE TABLE historical_sales_clone
DEEP CLONE historical_sales_bronze;

In [None]:
CREATE OR REPLACE TABLE historical_sales_shallow_clone
SHALLOW CLONE historical_sales_bronze;


Complete Overwrites

In [None]:
CREATE OR REPLACED TABLE events AS
    SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/events-historical`;

DESCRBE HISTORY events;

INSERT OVERWRITE

data in the target table will be replaced by data from the query.
Will fail if schema doesn't match.

In [None]:
INSERT OVERWRITE events
    SELECT * FROM parquet.`${da.paths.datasets}/ecommerce/raw/events-historical`;

DESCRBE HISTORY events;

MERGE UPDATES

upsert data from a source table, view, or DataFrame into Delta table

MERGE INTO target a
USING source b
ON {merge_condition}
WHEN MATCHED THEN {matched_action}
WHEN NOT MATCHED THEN {not_matched_action}

In [None]:
CREATE OR REPLACED TEMP VIEW users_update AS
SELECT *, current_timestamp() AS updated
FROM parquet.`${da.paths.datasets}/ecommerce/raw/users-30m`


MERGE INTO users AS
USING users_update b
ON a.user_id = b.user_id
WHEN MATCHED AND a.email IS NULL AND b.email IS NOT NULL THEN
    UPDATE SET email = b.email, updated = b.updates
WHEN NOT MATCHED THEN
    INSERT (user_id, email, updated)
    VALUES (b.user_id, b.email, b.updated)

INSERT-ONLY MERGE for Deduplication

In [None]:
MERGE INTO events AS
USING events_update b
ON a.user_id = b.user_id AND a.event_timestamp = b.event_timestamp
WHEN NOT MATCHED AND b.traffic_source = 'email' THEN
    INSERT *

Declare Schema with Generated Columns

In [None]:
CREATE OR REPLACE TABLE purchase_dates (
    id STRING,
    transaction_timestamp STRING,
    price STRING,
    date DATE GENERATED ALWAYS AS (
        cast(cast(transaction_timestamp/le6 AS TIMESTAMP) AS DATE)
    )
    COMMENT "generated based on `transaction_timestamp` column"
);

SET

In [None]:
SET spark.databricks.delta.schema.automerge.enabled=true;

MERGE INTO purchase_dates a
USING purchases b
ON a.id = b.id 
WHEN NOT MATCHED THEN
    INSERT *;
SELECT * FROM purchase_dates;

Add aTable Constraint

NOT_NULL
CHECK

In [None]:
ALTER TABLE purchase_dates ADD CONSTRAINTS valid+dae CHECK (date > '2020-01-01')

In [None]:
DESCRIBE EXTENDED purchase_dates

# Cleaning Data

In [None]:
CREATE TABLE IF NOT EXISTS users_silver 
(
    user_id STRING,
    user_first_touch_timestamp BIGINT,
    email STRING,
    updates TIMESTAMP,
    first_touch TIMESTAMP,
    first_touch_date DATE,
    first_touch_time STRING,
    email_domain STRING
);

CREATE OR REPLACE TABLE users_silver_working AS 
    SELECT * FROM users_bronze;



Data Profile

click + and select Visualization

Missing Data

SELECT * FROM users_silver_working WHERE user_id IS NULL;

In [None]:
CREATE OR REPLACE TABLE user_silver_working AS 
    SELECT * users_silver_working WHERE user_id IS NOT NULL;

In [None]:
INSERT OVERWRITE users_silver_working
    SELECT DISTING(*) FROM users_silver_working 

Deduplicate Rows on Specific Column

In [None]:
INSERT OVERWRITE users_silver_working
    SELECT user_is, user_first_touch_timestamp, max(email) AS email, max(updated) AS updated
    FROM users_silver_working
    WHERE user_id IS NOT NULL
GROUP BY user_id, user_first_touch_timestamp;

Validate Dataset

In [None]:
SELECT max(row_count) <= 1 no_duplicate_ids FROM (
    SELECT user_id, count(*) AS row_count
    FROM users_silver_working
    GROUP BY user_id
)

In [None]:
SELECT max(user_id_count) <=1 at_most_one_id FROM (
    SELECT email, count(user_id) AS user_id_count
    FROM users_silver_working
    WHERE email IS NOT NULL
    GROUP BY email
)

Date Formate and Regex

In [None]:
INSERT INTO users_silver
(
    SELECT *, to_date(date_format(first_touch, "yyy-M-d")) AS first_touch_date,
    date_formate(first_touch, "HH:mm:ss") AS first_touch_time,
    regexp_extract(email, "@(.*)", 0) AS email_domain

    FROM (
        SELECT *, CAST(user_first_touch_timestamp / 1e6 AS timestamp) AS first_touch
        FROM users_silver_working
    )
)

# Complex Transformations

In [None]:
CREATE OR REPLACE TEMP VIEW parsed_events AS SELECT json.* FROM (
    SELECT from_json(value, 'STRUCT<......>') AS json FROM event_strings
);

Manipulate Arrays

Spark SQL has a number of functions for manipulating array data

. explode()
. size()

In [None]:
CREATE OR REPLACE TEMP VIEW exploded_events AS 
SELECT *. explode(items) AS item
FROM parsed_events;

SELECT * FROM exploded_events WERE size(items) > 2

Nesting Functions

collect_set()

SELECT user_id, collect_set(items.items_id) AS cart_history
FROM exploded_events
GROUP BY user_id
ORDERY BY user_id

flatten()

In [None]:
SELECT user_id, flatten(collect_set(items.item_id)) AS cart_history
FROM exploded_events
GROUP BY user_id
ORDER BY user_id

array_distinct()

In [None]:
SELECT use_id, array_distinct(flatten(collect_set(items.item_id))) AS cart_history
FROM exploded_events
GROUP BY user_id
ORDER BY user_id

JOIN TABLES

In [None]:
CREATE OR REPLACE TEMP VIEW item_purchases AS 
    SELECT * FROM (
        SELECT *, explode(items) AS item FROM sales
    ) a
    INNER JOIN item_lookup b
    ON a.item.item_id = b.item_id;

Pivot Tables

In [None]:
SELECT * FROM item_purchases PIVOT (
    sum(item.quantity) FOR item_id IN (
        'P_FOAM_K',
        'M_STAN_Q',
        ''
    )
)

# SQL UDFs

SQL UDFs adn Control Flow

User Defined Functions

In [None]:
CREATE OR REPLACE FUNCTION sale_announcement (item_name STRING, item_price INT)
RETURNS STRING 
RETURN concat("The ", item_name, " is on sale for $", round(item_price * 0.8, 0));

SELECT *, sale_announcement(name, price) AS message FROM item_lookup

Scoping and Permissions of SQL UDFs

. persist between executin environments (notebooks, DBSQL queries, and jobs)
. Exist as bjects in the metastore and are goverened by the same Table ACLs as databases, tables. or views
. To create a SQL UDF
    - USE CATALOG on catalog
    - USE SHEMA
    - USE FUNCTION

. To use
    - USE CATALOG on the catalog
        - USE SCHEMA on the schema
        - and Execute


DESCRIBE FUNCION EXTENDED

SIMPLE Contro, Flow Fuction

In [None]:
CREATE OR REPLACE FUNCTION item_preference(name STRING, price INT)
RETURNS STRING 
RETURN CASE 
    WHEN name = "Standar Queen Mattress" THEN "This is my default mattress"
    WHEN name = "Premium Queen Mattress" THEN "This is my favorite mattress"
    WHEN price > 100 THEN concat("I'd wait until the ", name, " is on sale for &", round(price * 0.8, 0))
    ELSE concat("I don't need a ", name)
END;

# Advanced Data Lake Features

Liquid Clustering 

In [None]:
CREATE OR REPLACE TABLE events_liquid CLUSTER By (user_id) AS 
    SELECT * FROM events 

In [None]:
ALTER TABLE events CLUSTER BY (user_id)

Choosing Clustering Keys

Triggering Liquid Clustering

OPTIMIZE events;

Creating a Delta Table with History

In [None]:
CREATE TABLE studets(i INT, name STRING, value DOUBLE);

INSERT INTO students VALUES (1, "Yue", 1.0);
INSERT INTO students VALUES (1, "Omar", 2.0);
INSERT INTO students VALUES (1, "YBue", 2.5);


INSERT INTO students VALUES (4, "Ted", 4.7), (5, "Tifeny", 5.5), (6, "Vini", 6.3);

UPDATE students SET value = value + 1 WHERE name LIKE "T%";

DELETE FROM students WHERE value > 6;

CREATE OR RELACE TEMP VIEW updates(id, name, value, type) AS VALUES (2, "Omr", 15.2, "update"), (.....), (.....);


DESCRIBE HISTORY students 



Deletion Vectors

Delta Lake Time Travel

In [None]:
SELECT * FROM students VERSION AS OF 3

In [None]:
RESTORE TABLE students TO VERSION AS OF 9