# Data Ingestion with Lakeflow Connect

This module introduces **Lakeflow Connect** as a scalable, production-ready framework for ingesting data into Databricks from diverse source systems.

You will start by understanding the two connector types:
- **Standard Connectors** (object storage, files, event-based ingestion)
- **Managed Connectors** (enterprise SaaS and database sources)

Next, you will learn common ingestion patterns:
- Batch ingestion  
- Incremental batch ingestion  
- Streaming ingestion  

The module reinforces why **Delta tables** and the **Medallion Architecture (Bronze → Silver → Gold)** are foundational for reliable ingestion pipelines.

You will then gain hands-on exposure to ingesting data from cloud object storage using **Lakeflow Connect Standard Connectors**, covering:
- `CREATE TABLE AS SELECT (CTAS)`
- `COPY INTO`
- **Auto Loader**

Each approach is discussed with its benefits, trade-offs, and when to use it in real production scenarios.

You will also learn how to:
- Append ingestion metadata columns at the **Bronze** layer
- Use and manage the **rescued data column** to safely handle schema drift and malformed records

The module further covers:
- Ingesting and flattening **semi-structured JSON data**
- Enterprise-grade ingestion using **Lakeflow Connect Managed Connectors**

Finally, you will explore alternative ingestion strategies such as:
- `MERGE INTO`–based ingestion patterns
- Leveraging datasets from the **Databricks Marketplace**

By the end of this module, you will have a solid foundation to design and implement robust, scalable ingestion pipelines in modern data engineering architectures.

---

**Environment Setup:**
- **Catalog:** `lakeflow_demo`
- **Schema:** `lakeflow_schema`
- **Volume:** `raw` (located at `/Volumes/lakeflow_demo/lakeflow_schema/raw/`)

**Note:** Make sure to run the `00_Setup_Environment.ipynb` notebook first to create the catalog, schema, volume, and sample data files.


## Unity Catalog Overview

Unity Catalog = unified governance, not a single namespace.

- "Unity" does NOT mean "one catalog."
- It means one control plane.

It unifies:

- Security (GRANT/REVOKE once, enforced everywhere)
- Identity (users, groups, service principals)
- Lineage
- Auditing
- Metastore

All of that lives in one metastore per region, governed centrally by Databricks.

That's the "Unity."

### Unity Catalog Hierarchy

- **Cloud Storage** (S3 / ADLS / GCS)
  - ↓
  - **Unity Catalog Metastore** (one per region)
    - ↓
    - **Catalog** (e.g., `lakeflow_demo`)
      - ↓
      - **Schema** (e.g., `lakeflow_schema`)
        - ↓
        - **Table / View / Volumes / Function**


## A. Setup

Run the following cell to configure your working environment for this notebook.


In [0]:
%sql
-- Set default catalog and schema
USE CATALOG lakeflow_demo;
USE SCHEMA lakeflow_schema;

-- View current catalog and schema
SELECT 
  current_catalog(), 
  current_schema();


# 1. Data Ingestion with CREATE TABLE AS and COPY INTO

In this demonstration, we'll explore ingesting data from cloud storage into Delta tables with the `CREATE TABLE AS (CTAS)` AND `COPY INTO` statements.

### Learning Objectives

By the end of this lesson, you should be able to:

- Use the CTAS statement with `read_files()` to ingest Parquet files into a Delta table.
- Use `COPY INTO` to incrementally load Parquet files from cloud object storage into a Delta table.


## B. Explore the Data Source Files

1. We'll create a table containing historical user data from Parquet files stored in the volume  
   `'/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/'` within Unity Catalog.

   Use the `LIST` statement to view the files in this volume. Run the cell and review the results.

   Notice the files in the **name** column begin with **part-**. This shows that this volume contains multiple **Parquet** files.


In [0]:
%sql
LIST '/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/';


2. Query the Parquet files by path in the `/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/` directory to view the raw data in tabular format to quickly preview the files.


In [0]:
%sql
SELECT * 
FROM parquet.`/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/`
LIMIT 10;


## C. Batch Data Ingestion with CTAS and read_files()

The `CREATE TABLE AS` (CTAS) statement is used to create and populate tables using the results of a query. This allows you to create a table and load it with data in a single step, streamlining data ingestion workflows.

#### Automatic Schema Inference for Parquet Files

Apache Parquet is a columnar storage format optimized for analytical queries. It includes embedded schema metadata (e.g., column names and data types), which enables automatic schema inference when creating tables from Parquet files. This eliminates the need for manual schema definitions and simplifies the process of converting Parquet files into Delta format by leveraging the built-in schema metadata.


### C1. CTAS with the `read_files()` Function

The code in the next cell creates a table using CTAS with the `read_files()` function.

The `read_files()` table-valued function (TVF) enables reading a variety of file formats and provides additional options for data ingestion.

1. Use the `read_files()` function to query the same Parquet files located in `/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/`. The `LIMIT` clause limits the amount of rows during exploration and development.

   - The first parameter in `read_files` is the path to the data.

   - The `format => "parquet"` option specifies the file format.

   The `read_files` function automatically detects the file format and infers a unified schema across all files. It also supports explicit schema definitions and `schemaHints`. For more details on schema inference capabilities, refer to the [Schema inference](https://docs.databricks.com/aws/en/sql/language-manual/functions/read_files#schema-inference) documentation.

**NOTE:** A **_rescued_data** column is automatically included by default to capture any data that doesn't match the inferred schema.


In [0]:
%sql
SELECT * 
FROM read_files(
  '/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/',
  format => 'parquet'
)
LIMIT 10;


2. Next, let's use `read_files()` with a CTAS statement to create the table **historical_users_bronze_ctas_rf**, then display the table.

   Notice that the Parquet files were ingested to create a table (Delta by default).


In [0]:
%sql
-- Drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS historical_users_bronze_ctas_rf;

-- Create the Delta table
CREATE TABLE historical_users_bronze_ctas_rf 
AS
SELECT * 
FROM read_files(
        '/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/',
        format => 'parquet'
      );

-- Preview the Delta table
SELECT * 
FROM historical_users_bronze_ctas_rf 
LIMIT 10;


3. Run the `DESCRIBE TABLE EXTENDED` statement to view column names, data types, and additional table metadata.  

   Review the results and notice the following:
   
   - The table was created in your schema within the catalog **lakeflow_demo**.

   - The *Type* row indicates that the table is *MANAGED*.
   
   - The *Provider* row specifies that the table is a Delta table.


In [0]:
%sql
DESCRIBE TABLE EXTENDED historical_users_bronze_ctas_rf;


## D. Incremental Data Ingestion with `COPY INTO`

`COPY INTO` is a Databricks SQL command that allows you to load data from a file location into a Delta table. This operation is re-triable and idempotent, i.e. files in the source location that have already been loaded are skipped. This command is useful for when you need to load data into an existing Delta table. 

[COPY INTO](https://docs.databricks.com/aws/en/sql/language-manual/delta-copy-into)

### D1. Ingesting Parquet Files with COPY INTO

Using the same set of Parquet files as before, let's use `COPY INTO` to create our Bronze table again.

We will look at two examples:

1. Example 1: Common Schema Mismatch Error

2. Example 2: Preemptively Handling Schema Evolution


#### Example 1: Common Schema Mismatch Error

1. The cell below creates an empty table named **historical_users_bronze_ci** with a defined schema for only the **user_id** and **user_first_touch_timestamp** columns.

   However, the Parquet files in `'/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/'` contain three columns: 
    - **user_id**
    - **user_first_touch_timestamp** 
    - **email**

   Run the cell below and review the error. You should see the `[COPY_INTO_SCHEMA_MISMATCH_WITH_TARGET_TABLE]` error. This error occurs because there is a schema mismatch: the Parquet files contain 3 columns, but the target table **historical_users_bronze_ci** only has 2 columns.

   How can you handle this error?


In [0]:
%sql
--------------------------------------------
-- This cell returns an error
--------------------------------------------

-- Drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS historical_users_bronze_ci;

-- Create an empty table with the specified table schema (only 2 out of the 3 columns)
CREATE TABLE historical_users_bronze_ci (
  user_id STRING,
  user_first_touch_timestamp BIGINT
);

-- Use COPY INTO to populate Delta table
COPY INTO historical_users_bronze_ci
  FROM '/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/'
  FILEFORMAT = parquet
  COPY_OPTIONS ('mergeSchema' = 'true');


 SELECT * FROM historical_users_bronze_ci


2. We can fix this error by adding `COPY_OPTIONS` with the `mergeSchema = 'true'` option. When set to `true`, this option allows the schema to evolve based on the incoming data.

   Run the next cell with the `COPY_OPTIONS` option added. You should notice that the Parquet files were successfully ingested into the table.


In [0]:
%sql
COPY INTO historical_users_bronze_ci
  FROM '/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/'
  FILEFORMAT = parquet
  COPY_OPTIONS ('mergeSchema' = 'true');     -- Merge the schema of each file


3. Preview the data in the **historical_users_bronze_ci** table.


In [0]:
%sql
SELECT *
FROM historical_users_bronze_ci
LIMIT 10;


### D2. Idempotency (Incremental Ingestion)

`COPY INTO` tracks the files it has previously ingested. If the command is run again, no additional data is ingested because the files in the source directory haven't changed.

1. Let's run the `COPY INTO` command again and check if any data is re-ingested into the table.

   Run the cell and view the results. Notice that the values for **num_affected_rows**, **num_inserted_rows**, and **num_skipped_corrupt_files** are all 0 because the data has already been ingested into the Delta table.

**NOTE**: If new files are added to the cloud storage location, `COPY INTO` will only ingest those files. Using `COPY INTO` is a great option if you want to run a job for incremental batch ingestion from cloud storage location without re-reading files that have already been loaded.


In [0]:
%sql
COPY INTO historical_users_bronze_ci
  FROM '/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/'
  FILEFORMAT = parquet
  COPY_OPTIONS ('mergeSchema' = 'true');


# 2. Adding Metadata Columns During Ingestion

In this demonstration, we'll explore how to add metadata columns during data ingestion. 

This process will include adding metadata, converting Unix timestamps to standard `DATE` format, and row ingestion times.

### Learning Objectives

By the end of this lesson, you should be able to:

- Modify columns during data ingestion from cloud storage to your bronze table.
- Add the current ingestion timestamp to the bronze.
- Use the `_metadata` column to extract file-level metadata (e.g., file name, modification time) during ingestion.


## B. Explore the Data Source Files

1. We'll create a table containing historical user data from Parquet files stored in the volume  
   `'/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/'` within Unity Catalog.

   Use the `LIST` statement to view the files in this volume. Run the cell and review the results.

   View the values in the **name** column that begin with **part-**. This shows that this volume contains multiple **Parquet** files.


In [0]:
%sql
LIST '/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/';


## C. Adding Metadata Columns to the Bronze Table During Ingestion

When ingesting data into the Bronze layer, you can apply transformations during ingestion and also retrieve metadata about the input files using the **_metadata** column.

The **_metadata** column is a hidden column available for all supported file formats. To include it in the returned data, you must explicitly select it in the read query that specifies the source.

### Ingestion Requirements

During data ingestion, we'll perform the following actions:

1. Convert the parquet Unix timestamp to a `DATE` column.

2. Include the **input file name** to indicate the data raw source.

3. Include the **last modification** timestamp of the input file.

4. Add the **file ingestion time** to the Bronze table.

**Note:** The `_metadata` column is available across all supported input file formats.


1. Run the cell below to display the parquet data in the `"/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/"` volume and view the results.

    Notice that the **user_first_touch_timestamp** column has a Unix timestamp.


In [0]:
%sql
SELECT *
FROM read_files(
  "/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/",
  format => 'parquet')
LIMIT 10;


### C1. Convert the Unix Time on Ingestion to Bronze

The Unix timestamp column **user_first_touch_timestamp** values represent the time in microseconds since the Unix epoch (January 1, 1970).

To create a readable date column, use the [`from_unixtime()`](https://docs.databricks.com/en/sql/language-manual/functions/from_unixtime.html) function, converting the **user_first_touch_timestamp** from microseconds to seconds by dividing by 1,000,000.

1. Run the query and review the results. The query generates a new column, **first_touch_date**, by converting the Unix timestamp into a human-readable date column.

   Run the cell and view the **first_touch_date** column. Notice the **first_touch_date** column is cast to a data type of **DATE**.


In [0]:
%sql
SELECT
  *,
  cast(from_unixtime(user_first_touch_timestamp/1000000) AS DATE) AS first_touch_date
FROM read_files(
  "/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/",
  format => 'parquet')
LIMIT 10;


### C2. Adding Column Metadata on Ingestion

The following metadata can be added to the bronze table:

- `_metadata.file_modification_time`: Adds the last modification time of the input file.

- `_metadata.file_name`: Adds the input file name.

- [`current_timestamp()`](https://docs.databricks.com/aws/en/sql/language-manual/functions/current_timestamp): Returns the current timestamp (`TIMESTAMP` data type) when the query starts, useful for tracking ingestion time.

You can read more about the `_metadata` column in the [Databricks documentation](https://docs.databricks.com/en/ingestion/file-metadata-column.html).

1. Run the query below to add the following columns:

   - **file_modification_time** and **file_name**, using the **_metadata** column to capture input file details.  
   
   - **ingestion_time**, which records the exact time the data was ingested.

   Review the results. You should see the new columns **file_modification_time**, **source_file**, and **ingestion_time** added to the output.


In [0]:
%sql
SELECT
  *,
  cast(from_unixtime(user_first_touch_timestamp / 1000000) AS DATE) AS first_touch_date,
  _metadata.file_modification_time AS file_modification_time,      -- Last data source file modification time
  _metadata.file_name AS source_file,                              -- Ingest data source file name
  current_timestamp() as ingestion_time                            -- Ingestion timestamp
FROM read_files(
  "/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/",
  format => 'parquet')
LIMIT 10;


### C3. Creating the Final Bronze Table

1. Put it all together with the `CTAS` statement to create the Delta table.

    Run the cell to create and view the new table **historical_users_bronze**.
    
    Confirm that the new columns **first_touch_date**, **file_modification_time**, **source_file** and **ingestion_time** were created successfully in the bronze table.


In [0]:
%sql
-- Drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS historical_users_bronze;

-- Create an empty table
CREATE TABLE historical_users_bronze AS
SELECT
  *,
  cast(from_unixtime(user_first_touch_timestamp / 1000000) AS DATE) AS first_touch_date,
  _metadata.file_modification_time AS file_modification_time,      -- Last data source file modification time
  _metadata.file_name AS source_file,                              -- Ingest data source file name
  current_timestamp() as ingestion_time                            -- Ingestion timestamp
FROM read_files(
  "/Volumes/lakeflow_demo/lakeflow_schema/raw/users-historical/",
  format => 'parquet');


-- View the final bronze table
SELECT * 
FROM historical_users_bronze
LIMIT 10;


# 3. Handling CSV Ingestion with the Rescued Data Column

In this demonstration, we will focus on ingesting CSV files into Delta Lake using the `CTAS` (`CREATE TABLE AS SELECT`) pattern with the `read_files()` method and exploring the rescued data column. 

### Learning Objectives

By the end of this lesson, you will be able to:

- Ingest CSV files as Delta tables using the `CREATE TABLE AS SELECT` (CTAS) statement with the `read_files()` function.
- Define and apply an explicit schema with `read_files()` to ensure consistent and reliable data ingestion.
- Handle and inspect rescued data that does not conform to the defined schema.


## B. Inspect the Dataset

1. Let's take a look at our CSV file with malformed data. 

    The query should use `text.<path>` to return the headers and rows from the CSV file. 

    Run the cell and view row 4. Notice that the value for the price contains a `$`.


In [0]:
%sql
SELECT *
FROM text.`/Volumes/lakeflow_demo/lakeflow_schema/raw/products-csv/lab_malformed_data.csv`;


### B2. Ingesting and Rescuing Malformed Data

Begin developing your query to ingest the CSV file in the specified path and view malformed records using the **_rescued_data** column.

#### Requirements
Your final SQL query should ingest the CSV file using CTAS and `read_files`. **In the cell below, do not create a table yet. Simply start developing your query to ingest and create the table**:

1. Select all columns from the raw CSV file.

2. Use the `read_files()` function with appropriate options to read the CSV file. 
   - **HINT:** Note that the delimiter is a comma (`,`) not a pipe (`|`).

3. Explicitly define the schema for ingestion. The schema is defined as follows:  
   - `item_id` (STRING)  
   - `name` (STRING)  
   - `price` (DOUBLE)

4. Use the correct option to include the rescued data column and name it **_rescued_data** to capture malformed rows.

   - **HINT**: If you define a schema you must [use the rescuedDataColumn option](https://docs.databricks.com/aws/en/sql/language-manual/functions/read_files#csv-options) to add the **_rescued_data** column.


In [0]:
%sql
SELECT * 
FROM read_files(
        '/Volumes/lakeflow_demo/lakeflow_schema/raw/products-csv/lab_malformed_data.csv',
        format => "csv",
        sep => ",",
        header => true,
        schema => '''
              item_id STRING, 
              name STRING, 
              price DOUBLE
        ''',
        rescueddatacolumn => "_rescued_data"
      );


### B3. Add Additional Metadata Columns During Ingestion

Next, you can create the final bronze table named **products_bronze** that contains the additional metadata columns. Use the query you created above as the starting point.

### Final Table Requirements

Incorporate the SQL query you created in the previous section and complete the following:

1. Use a CTAS statement to create the final bronze Delta table named **products_bronze**. 

2. Ingest the same CSV file `/Volumes/lakeflow_demo/lakeflow_schema/raw/products-csv/lab_malformed_data.csv`

3. Use the same defined schema:  
   - `item_id` (STRING)  
   - `name` (STRING)  
   - `price` (DOUBLE)

4. Use the `_metadata` column to create two new columns named **file_modification_time** and **source_file**  within your SELECT statement.
   - **HINT:** [_metadata](https://docs.databricks.com/en/ingestion/file-metadata-column.html)

5. Add a column named **ingestion_time** that provides a timestamp for ingestion. 
   - **HINT:** Use the [current_timestamp()](https://docs.databricks.com/aws/en/sql/language-manual/functions/current_timestamp) to record the current timestamp at the start of the query evaluation.


In [0]:
%sql
-- Drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS products_bronze;

-- Create the Delta table
CREATE TABLE products_bronze 
AS
SELECT
  *,
  _metadata.file_modification_time AS file_modification_time,
  _metadata.file_name AS source_file, 
  current_timestamp() as ingestion_time
FROM read_files(
        '/Volumes/lakeflow_demo/lakeflow_schema/raw/products-csv/lab_malformed_data.csv',
        format => "csv",
        sep => ",",
        header => true,
        schema => 'item_id STRING, name STRING, price DOUBLE', 
        rescueddatacolumn => "_rescued_data"
      );

-- View the final table
SELECT * 
FROM products_bronze;


# 4. Ingesting JSON Files with Databricks

In this demonstration, we'll explore how to ingest JSON files and perform foundational JSON-specific transformations during ingestion, including decoding encoded fields and flattening nested JSON strings. We'll be working with simulated Kafka event data.

### Learning Objectives
By the end of this lesson, you should be able to:
- Ingest raw JSON data into Unity Catalog using CTAS and `read_files()`.
- Apply multiple techniques to flatten JSON string columns with and without converting to a STRUCT type.
- Understand the difference between `explode()` and `explode_outer()`.


## B. Overview of CTAS with `read_files()` for Ingestion of JSON files

### B1. Inspect JSON files

1. Run the next cell to verify that there are JSON files located at `/Volumes/lakeflow_demo/lakeflow_schema/raw/events-kafka/`.


In [0]:
%sql
LIST '/Volumes/lakeflow_demo/lakeflow_schema/raw/events-kafka/';


2. Run the cell below to view the raw JSON data in the output. Note the following:

   - Each row contains JSON with 6 key/value pairs.

   - The **key** and **value** fields are encoded in base64. Base64 is an encoding scheme that converts binary data into a readable ASCII string.


In [0]:
%sql
SELECT * 
FROM text.`/Volumes/lakeflow_demo/lakeflow_schema/raw/events-kafka/`
LIMIT 5;


3. Run the cell below to see how to use `read_files()` to read the JSON data. Notice the following:

   - The JSON file is cleanly read into a tabular format with 6 columns.

   - The **key** and **value** columns are base64-encoded and returned as STRING data type.
   
   - There are no rows in the **_rescued_data** column.


In [0]:
%sql
SELECT *
FROM read_files(
  "/Volumes/lakeflow_demo/lakeflow_schema/raw/events-kafka/",
  format => "json"
)
LIMIT 10;


### B2. Using CTAS and `read_files()` with JSON

Ingesting JSON files using `read_files()` is as straightforward as reading CSV files.

1. Run the cell below to store this raw data in the **kafka_events_bronze_raw** table and view the table. When inspecting the results, you'll notice that:

   - The **key** and **value** columns are of type STRING and contain data that is **base64-encoded**.

   - This means the actual content has been encoded into base64 format and stored as a string. 
   
   - They have not yet been transformed into a readable string in the first bronze table we create.

**NOTE:** Base64 encoding is commonly used when ingesting data from sources like message queues or streaming platforms, where preserving formatting and avoiding data corruption is important.


In [0]:
%sql
-- Drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS kafka_events_bronze_raw;

-- Create the Delta table
CREATE TABLE kafka_events_bronze_raw AS
SELECT *
FROM read_files(
  "/Volumes/lakeflow_demo/lakeflow_schema/raw/events-kafka/",
  format => "json"
);

-- Display the table
SELECT *
FROM kafka_events_bronze_raw
LIMIT 10;


### B3. Decoding base64 Strings for the Bronze Table

1. Let's take a look at decoding the **key** and **value** columns by inspecting their data types after applying the `unbase64()` function. The `unbase64` function returns a decoded base64 string as binary.

    - **encoded_key**: The original encoded **key** column as a base64 string.

    - **decoded_key**: A new column created by decoding **key** from a base64 string to BINARY.

    - **encoded_value**: The original encoded **value** column as a base64 string.

    - **decoded_value**: A new column created by decoding **value** from a base64 string to BINARY.

    Run the cell and view the results. Notice that the **decoded_key** and **decoded_value** columns are now BINARY.


In [0]:
%sql
SELECT
  key AS encoded_key,
  unbase64(key) AS decoded_key,
  value AS encoded_value,
  unbase64(value) AS decoded_value
FROM kafka_events_bronze_raw
LIMIT 5;


2. Run the next cell to convert the BINARY columns to STRING columns using the `CAST` function. Notice the following in the results:

    - The **decoded_key** and **decoded_value** columns are now of type STRING and readable.

    - The **decoded_value** column is a JSON-formatted string.


In [0]:
%sql
SELECT
  key AS encoded_key,
  cast(unbase64(key) AS STRING) AS decoded_key,
  value AS encoded_value,
  cast(unbase64(value) AS STRING) AS decoded_value
FROM kafka_events_bronze_raw
LIMIT 5;


3. Now, let's put it all together to create another bronze-level table named **kafka_events_bronze_decoded**. This table will store the STRING values for the **key** and **value** columns from the original **kafka_events_bronze_raw** table.


In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_decoded AS
SELECT
  cast(unbase64(key) AS STRING) AS decoded_key,
  offset,
  partition,
  timestamp,
  topic,
  cast(unbase64(value) AS STRING) AS decoded_value
FROM kafka_events_bronze_raw;

-- View the new table
SELECT *
FROM kafka_events_bronze_decoded
LIMIT 5;


## C. Working with JSON Formatted Strings in a Table

### C1. Flattening JSON String Columns

Next, we will explore how extract a column from a column containing a JSON formatted string. 

**BENEFITS**
- **Simple** - Easy to implement and store JSON as plain text.
- **Flexible** - Can hold any JSON structure without schema constraints.

**CONSIDERATIONS**
- **Performance** - STRING columns are slower when querying and processing complex data.
- **No Schema** - The lack of a defined schema for STRING columns can lead to data integrity issues.
- **Complex to Query** - Requires additional code to parse and retrieve data, which can be complex.

#### C1.1 Query JSON strings

You can extract a column from fields containing JSON strings using the syntax: `<column-name>:<extraction-path>`, where `<column-name>` is the string column name and `<extraction-path>` is the path to the field to extract. The returned results are strings. You can also do this with nested fields by using either `.` or `[]`.

This utilizes Spark SQL's built-in functionality to interact directly with nested data stored as JSON strings.

[Query JSON strings](https://docs.databricks.com/aws/en/semi-structured/json)

1. For example, let's extract the following values from the JSON-formatted string:
    - `decoded_value:device`
    - `decoded_value:traffic_source`
    - `decoded_value:geo`
    - `decoded_value:items`

    Run the cell and view the results. Notice that we have successfully extracted the values from the JSON formatted string.

    - **device** is a STRING

    - **traffic_source** is a STRING

    - **geo** is a STRING containing another JSON formatted string
    
    - **item** is a STRING contain an array of JSON formatted strings


In [0]:
%sql
SELECT 
  decoded_value,
  decoded_value:device,
  decoded_value:traffic_source,
  decoded_value:geo,       -- Contains another JSON formatted string
  decoded_value:items      -- Contains a nested-array of JSON formatted strings
FROM kafka_events_bronze_decoded
LIMIT 5;


2. We can then begin to parse out the necessary JSON formatted string values to create another bronze table to flatten the JSON formatted string column for downstream processing.


In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_string_flattened AS
SELECT
  decoded_key,
  offset,
  partition,
  timestamp,
  topic,
  decoded_value:device,
  decoded_value:traffic_source,
  decoded_value:geo,       -- Contains another JSON formatted string
  decoded_value:items      -- Contains a nested-array of JSON formatted strings
FROM kafka_events_bronze_decoded;

-- Display the table
SELECT *
FROM kafka_events_bronze_string_flattened
LIMIT 10;


### C2. Flattening JSON Formatting Strings via STRUCT Conversion

Similar to the previous section, we will discuss how to flatten our JSON STRING column **decoded_value** using a STRUCT column.

#### Benefits and Considerations of STRUCT Columns

**Benefits**
- **Schema Enforcement** – STRUCT columns define and enforce a schema, helping maintain data integrity.
- **Improved Performance** – STRUCTs are generally more efficient for querying and processing than plain strings.

**Considerations**
- **Schema Enforcement** – Because the schema is enforced, issues can arise if the JSON structure changes over time.
- **Reduced Flexibility** – The data must consistently match the defined schema, leaving less room for structural variation.

#### C2.1 Converting a JSON STRING to a STRUCT Column

To convert a JSON-formatted STRING column to a STRUCT column, you will need to derive the schema of the JSON-formatted string and then parse each row into a STRUCT type.

We can do this in two steps:
1. Get the STRUCT type of the JSON formatted string.
2. Apply the STRUCT to the JSON formatted string column.

1. Determine the derived schema using the [`schema_of_json()`](https://docs.databricks.com/en/sql/language-manual/functions/schema_of_json.html) function, which returns the schema inferred from a JSON-formatted string.

   Run the cell and view the results. Notice that the output displays the structure of the JSON string.


In [0]:
%sql
-- First, get a sample JSON string from the decoded table to determine schema
SELECT decoded_value 
FROM kafka_events_bronze_decoded 
LIMIT 1;


2. Use `schema_of_json()` with a sample JSON string to get the schema. Then use `from_json()` to convert the JSON string column to a STRUCT.

   **Note:** For this exercise, we'll use a simplified schema based on the web events structure. Copy the output from `schema_of_json` into the `from_json()` function.


In [0]:
%sql
-- Get schema from a sample JSON string
SELECT schema_of_json('{"browser":"Chrome","page":"home","action":"view","event_timestamp":1234567890,"location":{"city":"San Francisco","country":"US"},"session_id":"SESS00000001","customer_id":"CUST00000001"}')
AS schema;


3. Use `from_json()` to parse the JSON string column into a STRUCT type and create a new table named **kafka_events_bronze_struct**.

   Run the cell and view the results. Notice that the **value** column has been transformed into a nested STRUCT that includes scalar fields, nested structs.


In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_struct AS
SELECT 
  * EXCEPT (decoded_value),
  from_json(
      decoded_value,    -- JSON formatted string column
      'STRUCT<browser: STRING, page: STRING, action: STRING, event_timestamp: BIGINT, location: STRUCT<city: STRING, country: STRING>, session_id: STRING, customer_id: STRING>'
  ) AS value
FROM kafka_events_bronze_decoded;

-- View the new table
SELECT *
FROM kafka_events_bronze_struct
LIMIT 5;


#### C2.2 Extract fields, nested fields from STRUCT columns

We can query the STRUCT column using `value.browser` or `value.location` in our SELECT statement.

1. Using this syntax, we can obtain values from the **value** struct column. Run the cell and view the results. Notice the following:

   - We obtained values from the STRUCT column for **browser** and **city** (nested field from location)
   
   - The STRUCT provides better performance and type safety than JSON string extraction


In [0]:
%sql
SELECT 
  decoded_key,
  value.browser as browser,           -- Field
  value.page as page,                 -- Field
  value.location.city as city,        -- Nested-field from location field
  value.location.country as country,  -- Nested-field from location field
  value.customer_id as customer_id    -- Field
FROM kafka_events_bronze_struct
LIMIT 10;


## D. Working with a VARIANT Column (Public Preview)

#### VARIANT Column Benefits and Considerations:

**BENEFITS**
- **Open** - Fully open-sourced, no proprietary data lock-in.
- **Flexible** - No strict schema. You can put any type of semi-structured data into VARIANT.
- **Performant** - Improved performance over existing methods.

**CONSIDERATIONS**
- Currently in public preview as of 2025 Q2.
- [Variant support in Delta Lake](https://docs.databricks.com/aws/en/delta/variant)

**RESOURCES**:
- [Introducing the Open Variant Data Type in Delta Lake and Apache Spark](https://www.databricks.com/blog/introducing-open-variant-data-type-delta-lake-and-apache-spark)
- [Say goodbye to messy JSON headaches with VARIANT](https://www.youtube.com/watch?v=fWdxF7nL3YI)
- [Variant Data Type - Making Semi-Structured Data Fast and Simple](https://www.youtube.com/watch?v=jtjOfggD4YY)

**NOTE:** Variant data type will not work on Serverless Version 1.

1. View the **kafka_events_bronze_decoded** table. Confirm the **decoded_value** column contains a JSON formatted string.


In [0]:
%sql
SELECT *
FROM kafka_events_bronze_decoded
LIMIT 5;


2. Use the [`parse_json`](https://docs.databricks.com/aws/en/sql/language-manual/functions/parse_json) function to return a VARIANT value from the JSON formatted string.

   Run the cell and view the results. Notice that the **json_variant_value** column is of type VARIANT.


In [0]:
%sql
CREATE OR REPLACE TABLE kafka_events_bronze_variant AS
SELECT
  decoded_key,
  offset,
  partition,
  timestamp,
  topic,
  parse_json(decoded_value) AS json_variant_value   -- Convert the decoded_value column to a variant data type
FROM kafka_events_bronze_decoded;

-- View the table
SELECT *
FROM kafka_events_bronze_variant
LIMIT 5;


3. You can parse the VARIANT data type column using `:` to create your desired table.

   [VARIANT type](https://docs.databricks.com/aws/en/sql/language-manual/data-types/variant-type)


In [0]:
%sql
SELECT
  json_variant_value,
  json_variant_value:browser :: STRING,  -- Obtain the value of browser and cast to a string
  json_variant_value:page :: STRING,
  json_variant_value:location
FROM kafka_events_bronze_variant
LIMIT 10;


# 5. Creating Streaming Tables with SQL using Auto Loader

In this demonstration we will create a streaming table to incrementally ingest files from a volume using Auto Loader with SQL. 

When you create a streaming table using the CREATE OR REFRESH STREAMING TABLE statement, the initial data refresh and population begin immediately. These operations do not consume DBSQL warehouse compute. Instead, streaming tables rely on serverless DLT for both creation and refresh. A dedicated serverless DLT pipeline is automatically created and managed by the system for each streaming table.

### Learning Objectives

By the end of this lesson, you should be able to:
- Create streaming tables in Databricks SQL for incremental data ingestion.
- Refresh streaming tables using the REFRESH statement.

### RECOMMENDATION

The CREATE STREAMING TABLE SQL command is the recommended alternative to the legacy COPY INTO SQL command for incremental ingestion from cloud object storage. Databricks recommends using streaming tables to ingest data using Databricks SQL. 

A streaming table is a table registered to Unity Catalog with extra support for streaming or incremental data processing. A DLT pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.


## A. Setup for Streaming Tables

**REQUIRED - SELECT YOUR SERVERLESS SQL WAREHOUSE**

**NOTE: Creating streaming tables with Databricks SQL requires a SQL warehouse.**

Before executing cells in this notebook, please select a **SQL WAREHOUSE** in the lab. Follow these steps:

1. Navigate to the top-right of this notebook and click the drop-down to select compute (it might say **Connect**).
2. Select **More**.
3. Then select the **SQL Warehouse** button.
4. Select or create a SQL warehouse.
5. Then, at the bottom of the pop-up, select **Start and attach**.


## B. Create Streaming Tables for Incremental Processing

1. Explore the volume `/Volumes/lakeflow_demo/lakeflow_schema/csv_files_autoloader_source` and confirm it contains CSV file(s).

   Use the `LIST` statement to view the files in this volume.


In [0]:
%sql
LIST '/Volumes/lakeflow_demo/lakeflow_schema/csv_files_autoloader_source';


2. Run the query below to view the data in the CSV file(s) in your cloud storage location. Notice that it was returned in tabular format.


In [0]:
%sql
SELECT *
FROM read_files(
  '/Volumes/lakeflow_demo/lakeflow_schema/csv_files_autoloader_source',
  format => 'CSV',
  sep => '|',
  header => true
)
LIMIT 10;


#### Create a STREAMING TABLE using Databricks SQL

3. Your goal is to create an incremental pipeline that only ingests new files (instead of using traditional batch ingestion). You can achieve this by using [streaming tables in Databricks SQL](https://docs.databricks.com/aws/en/dlt/dbsql/streaming) (Auto Loader).

   - The SQL code below creates a streaming table that will incrementally ingest only new data.
   
   - A pipeline is automatically created for each streaming table. You can use streaming tables for incremental data loading from Kafka and cloud object storage.

   **NOTE:** Incremental batch ingestion automatically detects new records in the data source and ignores records that have already been ingested. This reduces the amount of data processed, making ingestion jobs faster and more efficient in their use of compute resources.

   **REQUIRED: This process will take about a minute to run and set up the incremental ingestion pipeline.**


In [0]:
%sql
-- Create streaming table
CREATE STREAMING TABLE sql_csv_autoloader
AS
SELECT *
FROM STREAM read_files(
  '/Volumes/lakeflow_demo/lakeflow_schema/csv_files_autoloader_source',
  format => 'CSV',
  sep => '|',
  header => true
);


4. Run the cell below to view the streaming table. Confirm that the results contain the expected number of rows.


In [0]:
%sql
SELECT *
FROM sql_csv_autoloader;


5. Describe the STREAMING TABLE and view the results. Notice the following:

- Under **Detailed Table Information**, notice the following rows:
  - **View Text**: The query that created the table.
  - **Type**: Specifies that it is a STREAMING TABLE.
  - **Provider**: Indicates that it is a Delta table.

- Under **Refresh Information**, you can see specific refresh details including Last Refreshed, Last Refresh Type, Latest Refresh Status, etc.


In [0]:
%sql
DESCRIBE TABLE EXTENDED sql_csv_autoloader;


6. The `DESCRIBE HISTORY` statement displays a detailed list of all changes, versions, and metadata associated with a Delta streaming table, including information on updates, deletions, and schema changes.

   Run the cell below and view the results. Notice the following:

   - In the **operation** column, you can see that a streaming table performs operations: **CREATE TABLE**, **DLT SETUP** and **STREAMING UPDATE**.
   
   - Scroll to the right and find the **operationMetrics** column to see the number of rows processed.


In [0]:
%sql
DESCRIBE HISTORY sql_csv_autoloader;


7. To demonstrate incremental ingestion, manually add another file to your cloud storage location: `/Volumes/lakeflow_demo/lakeflow_schema/csv_files_autoloader_source`.

   **Option 1 - Using Python:**
   - Copy a file from the staging volume to the source volume

   **Option 2 - Using UI:**
   - Click the catalog icon on the left
   - Expand the **lakeflow_demo** catalog
   - Expand your **lakeflow_schema** schema
   - Expand **Volumes**
   - Open the **autoloader_staging_files** volume
   - Copy a file from there to the **csv_files_autoloader_source** volume


In [0]:
%python
# Option 1: Copy a file from staging to source volume using Python
def copy_files(copy_from, copy_to, n=1):
    files = dbutils.fs.ls(copy_from)
    for f in files[:n]:
        dbutils.fs.cp(f.path, f"{copy_to}/{f.name}")
    print(f"Copied {min(n, len(files))} file(s) from {copy_from} to {copy_to}")

# Copy one additional file for incremental ingestion demo
copy_files(
    copy_from="/Volumes/lakeflow_demo/lakeflow_schema/autoloader_staging_files",
    copy_to="/Volumes/lakeflow_demo/lakeflow_schema/csv_files_autoloader_source",
    n=1
)

print("File copied. You can now refresh the streaming table to see incremental ingestion.")


8. Next, manually refresh the STREAMING TABLE using `REFRESH STREAMING TABLE table-name`. 

   - [Refresh a streaming table](https://docs.databricks.com/aws/en/dlt/dbsql/streaming#refresh-a-streaming-table) documentation

   **NOTE:** You can also rerun the CREATE STREAMING TABLE cell to incrementally ingest only new files.


In [0]:
%sql
REFRESH STREAMING TABLE lakeflow_demo.lakeflow_schema.sql_csv_autoloader;


9. Run the cell below to view the data in the **sql_csv_autoloader** table. Notice that the table now contains additional rows from the newly added file.


In [0]:
%sql
SELECT *
FROM sql_csv_autoloader;


10. Describe the history of the **sql_csv_autoloader** table. Observe the following:

  - Additional versions of the streaming table include **STREAMING UPDATE** operations.

  - Expand the **operationMetrics** column and note the number of rows that were incrementally ingestet

In [0]:
%sql
DESCRIBE HISTORY sql_csv_autoloader;


## Additional Resources

- [Streaming Tables Documentation](https://docs.databricks.com/aws/en/dlt/dbsql/streaming)
- [CREATE STREAMING TABLE Syntax](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-ddl-create-streaming-table)
- [Using Streaming Tables in Databricks SQL](https://docs.databricks.com/aws/en/dlt/dbsql/streaming)
- [REFRESH (MATERIALIZED VIEW or STREAMING TABLE)](https://docs.databricks.com/aws/en/sql/language-manual/sql-ref-syntax-ddl-refresh-full)
