
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning">
</div>


# Load Data Lab

In this lab, you will load data into new and existing Delta tables.

## Learning Objectives
By the end of this lab, you should be able to:
- Create an empty Delta table with a provided schema
- Use `COPY INTO` and `CAST` to ingest data to an existing Delta table
- Use a CTAS statement to create a Delta table from files

## REQUIRED - SELECT CLASSIC COMPUTE

Before executing cells in this notebook, please select your classic compute cluster in the lab. Be aware that **Serverless** is enabled by default.

Follow these steps to select the classic compute cluster:

1. Navigate to the top-right of this notebook and click the drop-down menu to select your cluster. By default, the notebook will use **Serverless**.

1. If your cluster is available, select it and continue to the next cell. If the cluster is not shown:

  - In the drop-down, select **More**.

  - In the **Attach to an existing compute resource** pop-up, select the first drop-down. You will see a unique cluster name in that drop-down. Please select that cluster.

**NOTE:** If your cluster has terminated, you might need to restart it in order to select it. To do this:

1. Right-click on **Compute** in the left navigation pane and select *Open in new tab*.

1. Find the triangle icon to the right of your compute cluster name and click it.

1. Wait a few minutes for the cluster to start.

1. Once the cluster is running, complete the steps above to select your cluster.

## Classroom Setup

Run the following cell to configure your working environment for this course. It will also set your default catalog to **dbacademy** and the schema to your specific schema name shown below using the `USE` statements.
<br></br>


```
USE CATALOG dbacademy;
USE SCHEMA dbacademy.<your unique schema name>;
```

**NOTE:** The `DA` object is only used in Databricks Academy courses and is not available outside of these courses. It will dynamically reference the information needed to run the course.

In [0]:
%run ./Includes/Classroom-Setup-3L

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


0,1
Course Catalog:,
Your Schema:,


## Data Overview

We will work with a sample of raw Kafka data written as JSON files. 

Each file contains all records consumed during a 5-second interval, stored with the full Kafka schema as a multiple-record JSON file. 

The schema for the table:

| field  | type | description |
| ------ | ---- | ----------- |
| key    | BINARY | The **`user_id`** field is used as the key; this is a unique alphanumeric field that corresponds to session/cookie information |
| offset | BIGINT | This is a unique value, monotonically increasing for each partition |
| partition | INTEGER | Our current Kafka implementation uses only 2 partitions (0 and 1) |
| timestamp | BIGINT    | This timestamp is recorded as milliseconds since epoch, and represents the time at which the producer appends a record to a partition |
| topic | STRING | While the Kafka service hosts multiple topics, only those records from the **`clickstream`** topic are included here |
| value | BINARY | This is the full data payload (to be discussed later), sent as JSON |


## Define Schema for Empty Delta Table
Create an empty managed Delta table named **`events_bronze`** using the same schema.

In [0]:
CREATE TABLE events_bronze
(
    key BINARY,
    offset BIGINT,
    partition INTEGER,
    timestamp BIGINT,
    topic STRING,
    value BINARY
)

Run the cell below to confirm the table was created correctly.

In [0]:
%python
assert spark.catalog.tableExists("events_bronze"), "The table does not exist"
assert spark.table("events_bronze").count() == 0, "The table should have 0 records"
for i in ['key','offset','partition','timestamp','topic','value']:
  assert i in spark.table("events_bronze").columns, f"The column {i} is missing"
assert str(spark.table("events_bronze").schema['key'].dataType) == 'BinaryType()', "Column key is wrong type"
assert str(spark.table("events_bronze").schema['offset'].dataType) == 'LongType()', "Column offset is wrong type"
assert str(spark.table("events_bronze").schema['partition'].dataType) == 'IntegerType()', "Column partition is wrong type"
assert str(spark.table("events_bronze").schema['timestamp'].dataType) == 'LongType()', "Column timestamp is wrong type"
assert str(spark.table("events_bronze").schema['topic'].dataType) == 'StringType()', "Column topic is wrong type"
assert str(spark.table("events_bronze").schema['value'].dataType) == 'BinaryType()', "Column value is wrong type"

## Using `CAST` with JSON Data
In the next cell, you will use COPY INTO to ingest data into the table.  
  
In order to force the JSON data to fit the schema you used when you created the table, you will need to use `CAST` keyword. The syntax for `CAST` is `CAST(column AS data_type)`.  To use `CAST` with `COPY INTO`, replace the path in the `COPY INTO` command you learned in the previous lesson, with a SELECT query (make sure you include the parentheses):
  
  <code>(SELECT
  CAST(key AS BINARY) AS key,<br />
  CAST(offset AS BIGINT) AS offset,<br />
  CAST(partition AS INT) AS partition,<br />
  CAST(timestamp AS BIGINT) AS timestamp,<br />
  CAST(topic AS STRING) AS topic,<br />
  CAST(value AS BINARY) AS value<br />
FROM '/Volumes/dbacademy_ecommerce/v01/raw/events-kafka/')</code>
  
Note: Because the data files are in JSON format, you will not need to use the "delimiter" or "header" options.

In [0]:
COPY INTO events_bronze 
FROM
(SELECT CAST(key as BINARY) AS key,
CAST(offset as BIGINT) AS offset,
CAST(partition as INT) AS partition,
CAST(timestamp as BIGINT) AS timestamp,
CAST(topic as STRING) AS topic,
CAST(value as BINARY) AS value
FROM '/Volumes/dbacademy_ecommerce/v01/raw/events-kafka/')
fileformat = JSON

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
2252,2252,0


Manually review the table contents to ensure data was written as expected.

In [0]:
SELECT * FROM events_bronze LIMIT 10;

key,offset,partition,timestamp,topic,value
VlVFd01EQXdNREF4TURjek9UZ3dOVFE9,219255030,0,1593880885085,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltMWhhVzRpTENKbGRtVnVkRjkwYVcxbGMzUmhiWEFpT2pFMU9UTTRPREE0T0RVd016WXhNamtzSW1kbGJ5STY= (truncated)
VlVFd01EQXdNREF4TURjek9USTBOVGc9,219255043,0,1593880892303,clickstream,ZXlKa1pYWnBZMlVpT2lKcFQxTWlMQ0psWTI5dGJXVnlZMlVpT250OUxDSmxkbVZ1ZEY5dVlXMWxJam9pWVdSa1gybDBaVzBpTENKbGRtVnVkRjl3Y21WMmFXOTFjMTkwYVcxbGMzUmhiWEFpT2pFMU9UTTRPREF6TURBMk9UWTM= (truncated)
VlVFd01EQXdNREF4TURjek9UVTVOamc9,219255108,0,1593880889174,clickstream,ZXlKa1pYWnBZMlVpT2lKdFlXTlBVeUlzSW1WamIyMXRaWEpqWlNJNmUzMHNJbVYyWlc1MFgyNWhiV1VpT2lKd2NtVnRhWFZ0SWl3aVpYWmxiblJmY0hKbGRtbHZkWE5mZEdsdFpYTjBZVzF3SWpveE5Ua3pPRGd3T0RZeE1ETXc= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3dNekE9,219255118,0,1593880889725,clickstream,ZXlKa1pYWnBZMlVpT2lKcFQxTWlMQ0psWTI5dGJXVnlZMlVpT250OUxDSmxkbVZ1ZEY5dVlXMWxJam9pYjNKcFoybHVZV3dpTENKbGRtVnVkRjl3Y21WMmFXOTFjMTkwYVcxbGMzUmhiWEFpT2pFMU9UTTRPREE0T0RJME1qazU= (truncated)
VlVFd01EQXdNREF4TURjek9ESXlNek09,219438025,1,1593880886106,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltTmpYMmx1Wm04aUxDSmxkbVZ1ZEY5d2NtVjJhVzkxYzE5MGFXMWxjM1JoYlhBaU9qRTFPVE00T0RBek5qUXo= (truncated)
VlVFd01EQXdNREF4TURjek9ESXlNek09,219438069,1,1593880886106,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltTmpYMmx1Wm04aUxDSmxkbVZ1ZEY5d2NtVjJhVzkxYzE5MGFXMWxjM1JoYlhBaU9qRTFPVE00T0RBek5qUXo= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3dNemM9,219438089,1,1593880887640,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltUmxiR2wyWlhKNUlpd2laWFpsYm5SZmNISmxkbWx2ZFhOZmRHbHRaWE4wWVcxd0lqb3hOVGt6T0Rnd09EZ3k= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3hOVGs9,219438114,1,1593880894803,clickstream,ZXlKa1pYWnBZMlVpT2lKdFlXTlBVeUlzSW1WamIyMXRaWEpqWlNJNmUzMHNJbVYyWlc1MFgyNWhiV1VpT2lKdFlXbHVJaXdpWlhabGJuUmZkR2x0WlhOMFlXMXdJam94TlRrek9EZ3dPRGswTnpnNU5UYzVMQ0puWlc4aU9uc2k= (truncated)
VlVFd01EQXdNREF4TURjek56WTBOamM9,219438126,1,1593880888445,clickstream,ZXlKa1pYWnBZMlVpT2lKWGFXNWtiM2R6SWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltTmhjblFpTENKbGRtVnVkRjl3Y21WMmFXOTFjMTkwYVcxbGMzUmhiWEFpT2pFMU9UTTROemsyTVRrNE5USTI= (truncated)
VlVFd01EQXdNREF4TURjek9UZ3dNemM9,219438135,1,1593880887640,clickstream,ZXlKa1pYWnBZMlVpT2lKQmJtUnliMmxrSWl3aVpXTnZiVzFsY21ObElqcDdmU3dpWlhabGJuUmZibUZ0WlNJNkltUmxiR2wyWlhKNUlpd2laWFpsYm5SZmNISmxkbWx2ZFhOZmRHbHRaWE4wWVcxd0lqb3hOVGt6T0Rnd09EZ3k= (truncated)


Run the cell below to confirm the data has been loaded correctly.

In [0]:
%python
import pyspark.sql.functions as F

assert spark.catalog.tableExists("events_bronze"), "The table does not exist"
assert spark.table("events_bronze").count() == 2252, "The table should have 2252 records"

first_five = [r["timestamp"] for r in spark.table("events_bronze").orderBy(F.col("timestamp").asc()).limit(5).collect()]
assert first_five == [1593879303631, 1593879304224, 1593879305465, 1593879305482, 1593879305746], "First 5 values are not correct"

last_five = [r["timestamp"] for r in spark.table("events_bronze").orderBy(F.col("timestamp").desc()).limit(5).collect()]
assert last_five == [1593881096290, 1593881095799, 1593881093452, 1593881093394, 1593881092076], "Last 5 values are not correct"

## Create a Delta Table From Query Results

In addition to new events data, let's also load a small lookup table that provides product details that we'll use later in the course.
Use a CTAS statement to create a managed Delta table named **`item_lookup`** that extracts data from the parquet directory provided below. 

Parquet files directory: `/Volumes/dbacademy_ecommerce/v01/raw/item-lookup/`

In [0]:
CREATE OR REPLACE TABLE item_lookup AS
SELECT * 
FROM parquet.`/Volumes/dbacademy_ecommerce/v01/raw/item-lookup/`;

num_affected_rows,num_inserted_rows


Run the cell below to confirm the lookup table has been loaded correctly.

In [0]:
SELECT * 
FROM item_lookup 
LIMIT 10;

item_id,name,price
M_PREM_Q,Premium Queen Mattress,1795.0
M_STAN_F,Standard Full Mattress,945.0
M_PREM_F,Premium Full Mattress,1695.0
M_PREM_T,Premium Twin Mattress,1095.0
M_PREM_K,Premium King Mattress,1995.0
P_DOWN_S,Standard Down Pillow,119.0
M_STAN_Q,Standard Queen Mattress,1045.0
M_STAN_K,Standard King Mattress,1195.0
M_STAN_T,Standard Twin Mattress,595.0
P_FOAM_S,Standard Foam Pillow,59.0


In [0]:
%python

assert spark.catalog.tableExists("item_lookup"), "The table does not exist"

actual_values = [r["item_id"] for r in spark.table("item_lookup").collect()]
expected_values = ['M_PREM_Q','M_STAN_F','M_PREM_F','M_PREM_T','M_PREM_K','P_DOWN_S','M_STAN_Q','M_STAN_K','M_STAN_T','P_FOAM_S','P_FOAM_K','P_DOWN_K']
assert actual_values == expected_values, "Does not contain the 12 expected item IDs"


&copy; 2025 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/><a href="https://databricks.com/privacy-policy">Privacy Policy</a> | 
<a href="https://databricks.com/terms-of-use">Terms of Use</a> | 
<a href="https://help.databricks.com/">Support</a>