![databricks_academy_logo.png](../Includes/images/databricks_academy_logo.png "databricks_academy_logo.png")

# Ingesting Data into Delta Lake

## Important: Select Environment 4
The cells below may not work in other environments. To choose environment 4: 
1. Click the ![environment.png](../Includes/images/environment.png "environment.png") button on the right sidebar
1. Open the **Environment version** dropdown
1. Select **4**

## Classroom Setup

Run the following cell to configure your working environment for this lesson.

In [0]:
####################################################################################
# Set python variables for catalog, schema, and volume names (change, if desired)
catalog_name = "dbacademy"
schema_name = "ingesting_data"
volume_name = "myfiles"
####################################################################################

####################################################################################
# Create the catalog, schema, and volume if they don't exist already
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog_name}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_name}.{schema_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {catalog_name}.{schema_name}.{volume_name}")
####################################################################################

####################################################################################
# Creates a file called employees.csv in the specified catalog.schema.volume
import pandas as pd
data = [
    ["1111", "Kristi", "USA", "Manager"],
    ["2222", "Sophia", "Greece", "Developer"],
    ["3333", "Peter", "USA", "Developer"],
    ["4444", "Zebi", "Pakistan", "Administrator"]
]
columns = ["ID", "Firstname", "Country", "Role"]
df = pd.DataFrame(data, columns=columns)
file_path = f"/Volumes/{catalog_name}/{schema_name}/{volume_name}/employees.csv"
df.to_csv(file_path, index=False)
################################################################################

## Please note:
We are using calls to `spark.sql()` instead of using regular SQL commands. This allows us to use python's f-strings to insert variables into the SQL commands.

## A. Configure and Explore Your Environment


####1. Setting Up catalog and Schema
Set the default catalog schema. Then, view the available tables to confirm that no tables currently exist in your schema.

In [0]:
## Set the default catalog and schema
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {schema_name}")

DataFrame[]

##### 1A. Using SQL

In [0]:
%sql
-- Display available tables in your schema
SHOW TABLES;

database,tableName,isTemporary
default,healthcare_dataset,False


##### 1B. Using PySpark

In [0]:
# # Set the default catalog and schema (Requires Spark 3.4.0 or later)
spark.catalog.setCurrentCatalog(catalog_name)
spark.catalog.setCurrentDatabase(schema_name)

# Display available tables in your schema
spark.catalog.listTables(schema_name)

[]

####2. Viewing the available files
View the available files in your schema's **myfiles** volume. Confirm that only the **employees.csv** file is available.

**NOTE:** Remember, when referencing data in volumes, use the path provided by Unity Catalog, which always has the following format: */Volumes/catalog_name/schema_name/volume_name/*.

In [0]:
spark.sql(f"LIST '/Volumes/{catalog_name}/{schema_name}/{volume_name}/' ").display()

path,name,size,modification_time
/Volumes/dbacademy/ingesting_data/myfiles/employees.csv,employees.csv,137,1771405202000


## B. Delta Lake Ingestion Techniques
**Objective**: Create a Delta table from the **employees.csv**  file using various methods.

- CREATE TABLE AS (`CTAS`)
- UPLOAD UI (`User Interface`)
- COPY INTO
- AUTOLOADER (`Overview only`, `outside the scope of this module`)

####1. CREATE TABLE (CTAS)
1. Create a table from the **employees.csv** file using the CREATE TABLE AS statement similar to the previous demonstration. Run the query and confirm that the **current_employees_ctas** table was successfully created.

In [0]:
#  Drop the table if it exists for demonstration purposes
spark.sql(f"DROP TABLE IF EXISTS current_employees_ctas;")

#  Create the table using CTAS
spark.sql(f"""
CREATE TABLE current_employees_ctas
AS
SELECT ID, FirstName, Country, Role 
FROM read_files(
  '/Volumes/{catalog_name}/{schema_name}/{volume_name}/',
  format => 'csv',
  header => true,
  inferSchema => true
 );""")

#  Display available tables in your schema
spark.sql(f"SHOW TABLES;").display()

database,tableName,isTemporary
ingesting_data,current_employees_ctas,False


2. Query the **current_employees_ctas** table and confirm that it contains 4 rows and 4 columns.

In [0]:
%sql
SELECT *
FROM current_employees_ctas;

ID,FirstName,Country,Role
1111,Kristi,USA,Manager
2222,Sophia,Greece,Developer
3333,Peter,USA,Developer
4444,Zebi,Pakistan,Administrator


####2. UPLOAD UI
The add data UI allows you to manually load data into Databricks from a variety of sources. 

1. Complete the following steps to manually download the **employees.csv** file from your volume:

   a. Select the Catalog icon ![catalog_icon.png](../../Includes/images/catalog_icon.png "catalog_icon.png") in the left navigation bar. 

   b. Select the refresh icon ![refresh_icon.png](../../Includes/images/refresh_icon.png "refresh_icon.png") to refresh the catalog.

   c. Expand the **dbacademy** catalog. Within the catalog, you should see a variety of schemas (databases).

   d. Expand **ingesting_data** schema. Notice that your schema contains **Tables** and **Volumes**.

   e. Expand **Volumes** then **myfiles**. The **myfiles** volume should contain a single CSV file named **employees.csv**. 

   f. Click on the kebab menu on the right-hand side of the **employees.csv** file and select **Download Volume file.** This will download the CSV file to your browser's download folder.

2. Complete the following steps to manually upload the **employees.csv** file to your schema. This will mimic loading a local file to Databricks:

   a. In the navigation bar select the **ingesting_data** schema. 

   b. Click the ellipses (three-dot) icon next to your schema and select **Open in Catalog Explorer**.

   c. Select the **Create** drop down icon ![create_drop_down](../../Includes/images/create_drop_down.png), and select **Table**.

   d. Select the **employees.csv** you downloaded earlier into the available section in the browser, or select **browse**, navigate to your downloads folder and select the **employees.csv** file.

3. Complete the following steps to create the Delta table using the UPLOAD UI.

   a. In the UI confirm the table will be created in the catalog **dbacademy** and the **ingesting_data** schema. 

   b. Under **Table name**, name the table **employees_upload**.

   c. Select the **Create table** button at the bottom of the screen to create the table.

   d. Confirm the table was created successfully. Then close out of the Catalog Explorer browser.

**Example**
<br></br>

![create_table_ui](../../Includes/images/create_table_ui.png)


4. Use the SHOW TABLES statement to view the available tables in your schema. Confirm that the **current_employees_ui** table has been created. 


In [0]:
%sql
SHOW TABLES;

5. Lastly, query the table to review its contents.

**NOTE**: If you did not upload the table using the UPLOAD UI and name it **current_employees_ui** an error will be returned.

In [0]:
%sql
SELECT * 
FROM employees_upload;

ID,Firstname,Country,Role
1111,Kristi,USA,Manager
2222,Sophia,Greece,Developer
3333,Peter,USA,Developer
4444,Zebi,Pakistan,Administrator


####3. COPY INTO
Create a table from the **employees.csv** file using the [COPY INTO](https://docs.databricks.com/en/sql/language-manual/delta-copy-into.html) statement. 

The `COPY INTO` statement incrementally loads data from a file location into a Delta table. This is a retryable and idempotent operation. Files in the source location that have already been loaded are skipped. This is true even if the files have been modified since they were loaded.

1. Create an empty table named **current_employees_copyinto** and define the column data types.

**NOTE:** You can also create an empty table with no columns and evolve the schema with `COPY INTO`.

In [0]:
%sql
-- Drop the table if it exists for demonstration purposes
DROP TABLE IF EXISTS current_employees_copyinto;

-- Create an empty table with the column data types
CREATE TABLE current_employees_copyinto (
  ID INT,
  FirstName STRING,
  Country STRING,
  Role STRING
);

2. Use the `COPY INTO` statement to load all files from the **myfiles** volume (currently only the **employees.csv** file exists) using the path provided by Unity Catalog. Confirm that the data is loaded into the **current_employees_copyinto** table.
   
    Confirm the following:
    - **num_affected_rows** is 4
    - **num_inserted_rows** is 4
    - **num_skipped_correct_files** is 0

In [0]:
spark.sql(f"""
COPY INTO current_employees_copyinto
  FROM '/Volumes/{catalog_name}/{schema_name}/{volume_name}/'
  FILEFORMAT = CSV
  FORMAT_OPTIONS (
      'header' = 'true', 
      'inferSchema' = 'true'
    )
  """).display()

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
4,4,0


3. Query the **current_employees_copyinto** table and confirm that all 4 rows have been copied into the Delta table correctly.   

In [0]:
%sql
SELECT * 
FROM current_employees_copyinto;

ID,FirstName,Country,Role
1111,Kristi,USA,Manager
2222,Sophia,Greece,Developer
3333,Peter,USA,Developer
4444,Zebi,Pakistan,Administrator


4. Run the `COPY INTO` statement again and confirm that it did not re-add the data from the volume that was already loaded. Remember, `COPY INTO` is a retryable and idempotent operation â€” Files in the source location that have already been loaded are skipped.   
    - **num_affected_rows** is 0
    - **num_inserted_rows** is 0
    - **num_skipped_correct_files** is 0

 

In [0]:
spark.sql(f'''
COPY INTO current_employees_copyinto
  FROM '/Volumes/{catalog_name}/{schema_name}/{volume_name}/'
  FILEFORMAT = CSV
  FORMAT_OPTIONS (
      'header' = 'true', 
      'inferSchema' = 'true'
    )
  ''').display()

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
0,0,0


5. Run the script below to create an additional CSV file named **employees2.csv** in your **myfiles** volume. View the results and confirm that your volume now contains two CSV files: the original **employees.csv** file and the new **employees2.csv** file.

In [0]:
## Create the new employees2.csv file in your volume
dbutils.fs.cp(f'/Volumes/{catalog_name}/{schema_name}/{volume_name}/employees.csv', f'/Volumes/{catalog_name}/{schema_name}/{volume_name}/employees2.csv')

## View the files in the your myfiles volume
files = dbutils.fs.ls(f'/Volumes/dbacademy/{schema_name}/myfiles')
display(files)

path,name,size,modificationTime
dbfs:/Volumes/dbacademy/ingesting_data/myfiles/employees.csv,employees.csv,137,1771405202000
dbfs:/Volumes/dbacademy/ingesting_data/myfiles/employees2.csv,employees2.csv,137,1771406657000


6. Query the new **employees2.csv** file directly. Confirm that only 2 rows exist in the CSV file.

In [0]:
spark.sql(f"""
SELECT 
  ID, 
  FirstName, 
  Country, 
  Role 
FROM read_files(
  '/Volumes/{catalog_name}/{schema_name}/{volume_name}/employees2.csv',
  format => 'csv',
  header => true,
  inferSchema => true
 );""").display()

ID,FirstName,Country,Role
1111,Kristi,USA,Manager
2222,Sophia,Greece,Developer
3333,Peter,USA,Developer
4444,Zebi,Pakistan,Administrator


7. Execute the `COPY INTO` statement again using your volume's path. Notice that only the 2 rows from the new **employees2.csv** file are added to the **current_employees_copyinto** table.

    - **num_affected_rows** is 2
    - **num_inserted_rows** is 2
    - **num_skipped_correct_files** is 0

In [0]:
spark.sql(f'''
COPY INTO current_employees_copyinto
  FROM '/Volumes/{catalog_name}/{schema_name}/{volume_name}/'
  FILEFORMAT = CSV
  FORMAT_OPTIONS (
      'header' = 'true', 
      'inferSchema' = 'true'
    )
  ''').display()

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
0,0,0


8. View the updated **current_employees_copyinto** table and confirm that it now contains 8 rows, including the new data that was added.

Note that `COPY INTO` will copy duplicate data into the destination table if it exists in the source files.

In [0]:
%sql
SELECT * 
FROM current_employees_copyinto;

ID,FirstName,Country,Role
1111,Kristi,USA,Manager
2222,Sophia,Greece,Developer
3333,Peter,USA,Developer
4444,Zebi,Pakistan,Administrator
1111,Kristi,USA,Manager
2222,Sophia,Greece,Developer
3333,Peter,USA,Developer
4444,Zebi,Pakistan,Administrator


9. View table's history. Notice that there are 3 versions.
    - **Version 0** is the initial empty table created by the CREATE TABLE statement.
    - **Version 1** is the first `COPY INTO` statement that loaded the **employees.csv** file into the Delta table.
    - **Version 2** is the second `COPY INTO` statement that only loaded the new **employees2.csv** file into the Delta table.

In [0]:
%sql
DESCRIBE HISTORY dbacademy.ingesting_data.current_employees_copyinto;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,queryHistoryStatementId,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2026-02-18T09:25:38.000Z,75851138266010,alladisindhu24@gmail.com,COPY INTO,Map(statsOnLoad -> true),,List(4374631413018150),1df367a2-f899-4ac7-852d-7596c70106d3,0218-085934-m4hjui1m-v2n,1.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1418, numSkippedCorruptFiles -> 0)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
1,2026-02-18T09:22:48.000Z,75851138266010,alladisindhu24@gmail.com,COPY INTO,Map(statsOnLoad -> true),,List(4374631413018150),670f045e-3be0-4942-ac85-3c321c072814,0218-085934-m4hjui1m-v2n,0.0,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 4, numOutputBytes -> 1418, numSkippedCorruptFiles -> 0)",,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13
0,2026-02-18T09:22:24.000Z,75851138266010,alladisindhu24@gmail.com,CREATE TABLE,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.parquet.compression.codec"":""zstd"",""delta.enableDeletionVectors"":""true"",""delta.enableRowTracking"":""true"",""delta.rowTracking.materializedRowCommitVersionColumnName"":""_row-commit-version-col-9e93ef84-b641-4f24-8362-1cceceef6c3c"",""delta.rowTracking.materializedRowIdColumnName"":""_row-id-col-e61338a2-7445-4ec9-be48-12ea7d858716""}, statsOnLoad -> false)",,List(4374631413018150),206ce5c9-d9fb-44ea-b6b4-f95f7dc23eee,0218-085934-m4hjui1m-v2n,,WriteSerializable,True,Map(),,Databricks-Runtime/18.0.x-aarch64-photon-scala2.13


####4. AUTO LOADER

**NOTE: Auto Loader is outside the scope of this course.**

Auto Loader incrementally and efficiently processes new data files as they arrive in cloud storage without any additional setup.

![autoloader](../../Includes/images/autoloader.png)

The key benefits of using the auto loader are:
- No file state management: The source incrementally processes new files as they land on cloud storage. You don't need to manage any state information on what files arrived.
- Scalable: The source will efficiently track the new files arriving by leveraging cloud services and RocksDB without having to list all the files in a directory. This approach is scalable even with millions of files in a directory.
- Easy to use: The source will automatically set up notification and message queue services required for incrementally processing the files. No setup needed on your side.

Check out the documentation
[What is Auto Loader](https://docs.databricks.com/en/ingestion/auto-loader/index.html) for more information.

## C. Cleanup
1. Drop the **ingesting_data** schema.

In [0]:
spark.sql(f"""
DROP SCHEMA IF EXISTS {schema_name} CASCADE;""")

DataFrame[]