-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px">
</div>

<i18n value="d2c3cdc2-5fcf-4edc-8101-2964a9355000"/>


# Extract and Load Data Lab

In this lab, you will extract and load raw data from JSON files into a Delta table.

## Learning Objectives
By the end of this lab, you should be able to:
- Create an external table to extract data from JSON files
- Create an empty Delta table with a provided schema
- Insert records from an existing table into a Delta table
- Use a CTAS statement to create a Delta table from files

<i18n value="e261fd97-ffd7-44b2-b1ca-61b843ee8961"/>


## Run Setup

Run the following cell to configure variables and datasets for this lesson.

In [0]:
%run ../Includes/Classroom-Setup-04.5L

<i18n value="d7759322-f9b9-4abe-9b30-25f5a7e30d9c"/>


## Overview of the Data

We will work with a sample of raw Kafka data written as JSON files. 

Each file contains all records consumed during a 5-second interval, stored with the full Kafka schema as a multiple-record JSON file. 

The schema for the table:

| field  | type | description |
| ------ | ---- | ----------- |
| key    | BINARY | The **`user_id`** field is used as the key; this is a unique alphanumeric field that corresponds to session/cookie information |
| offset | LONG | This is a unique value, monotonically increasing for each partition |
| partition | INTEGER | Our current Kafka implementation uses only 2 partitions (0 and 1) |
| timestamp | LONG    | This timestamp is recorded as milliseconds since epoch, and represents the time at which the producer appends a record to a partition |
| topic | STRING | While the Kafka service hosts multiple topics, only those records from the **`clickstream`** topic are included here |
| value | BINARY | This is the full data payload (to be discussed later), sent as JSON |

<i18n value="f2cd70fe-65a1-4dce-b264-c0c7d225640a"/>

 
## Extract Raw Events From JSON Files
To load this data into Delta properly, we first need to extract the JSON data using the correct schema.

Create an external table against JSON files located at the filepath provided below. Name this table **`events_json`** and declare the schema above.

In [0]:
%python
dbutils.fs.ls(f"{DA.paths.datasets}/ecommerce/raw/events-kafka")

In [0]:
-- TODO
create table events_json 
(key binary, offset long, partition integer, timestamp long, topic string, value binary)
using json
options (path = "${da.paths.datasets}/ecommerce/raw/events-kafka/")


In [0]:
select * from events_json limit 10

<i18n value="07ce3850-fdc7-4dea-9335-2a093c2e200c"/>


**NOTE**: We'll use Python to run checks occasionally throughout the lab. The following cell will return an error with a message on what needs to change if you have not followed instructions. No output from cell execution means that you have completed this step.

In [0]:
%python
assert spark.table("events_json"), "Table named `events_json` does not exist"
assert spark.table("events_json").columns == ['key', 'offset', 'partition', 'timestamp', 'topic', 'value'], "Please name the columns in the order provided above"
assert spark.table("events_json").dtypes == [('key', 'binary'), ('offset', 'bigint'), ('partition', 'int'), ('timestamp', 'bigint'), ('topic', 'string'), ('value', 'binary')], "Please make sure the column types are identical to those provided above"

total = spark.table("events_json").count()
assert total == 2252, f"Expected 2252 records, found {total}"

<i18n value="ae3b8554-d0e7-4fd7-b25a-27bfbc5f7c13"/>



## Insert Raw Events Into Delta Table
Create an empty managed Delta table named **`events_raw`** using the same schema.

In [0]:
-- TODO
create or replace table events_raw
(key binary, offset bigint, partition int, timestamp bigint, topic string, value binary)

<i18n value="3d56975b-47ba-4678-ae7b-7c5e4ac20a97"/>



Run the cell below to confirm the table was created correctly.

In [0]:
%python
assert spark.table("events_raw"), "Table named `events_raw` does not exist"
assert spark.table("events_raw").columns == ['key', 'offset', 'partition', 'timestamp', 'topic', 'value'], "Please name the columns in the order provided above"
assert spark.table("events_raw").dtypes == [('key', 'binary'), ('offset', 'bigint'), ('partition', 'int'), ('timestamp', 'bigint'), ('topic', 'string'), ('value', 'binary')], "Please make sure the column types are identical to those provided above"
assert spark.table("events_raw").count() == 0, "The table should have 0 records"

<i18n value="61815a62-6d4f-47fb-98a9-73c39842ac56"/>



Once the extracted data and Delta table are ready, insert the JSON records from the **`events_json`** table into the new **`events_raw`** Delta table.

In [0]:
-- TODO
insert into events_raw 
select * from events_json

<i18n value="4f545052-31c6-442b-a5e8-4c5892ec912f"/>


Manually review the table contents to ensure data was written as expected.

In [0]:
-- TODO
select * from events_raw limit 10;

<i18n value="0d66f26b-3df6-4819-9d84-22da9f55aeaa"/>



Run the cell below to confirm the data has been loaded correctly.

In [0]:
%python
import pyspark.sql.functions as F
assert spark.table("events_raw").count() == 2252, "The table should have 2252 records"

first_5 = [row['timestamp'] for row in spark.table("events_raw").select("timestamp").orderBy(F.col("timestamp").asc()).limit(5).collect()]
assert first_5 == [1593879303631, 1593879304224, 1593879305465, 1593879305482, 1593879305746], "Make sure you have not modified the data provided"

last_5 = [row['timestamp'] for row in spark.table("events_raw").select("timestamp").orderBy(F.col("timestamp").desc()).limit(5).collect()]
assert last_5 == [1593881096290, 1593881095799, 1593881093452, 1593881093394, 1593881092076], "Make sure you have not modified the data provided"

<i18n value="e9565088-1762-4f89-a06f-49576a53526a"/>



## Create Delta Table from a Query
In addition to new events data, let's also load a small lookup table that provides product details that we'll use later in the course.
Use a CTAS statement to create a managed Delta table named **`item_lookup`** that extracts data from the parquet directory provided below.

In [0]:
%python
dbutils.fs.ls(f"{DA.paths.datasets}/ecommerce/raw/")

In [0]:
-- TODO
create table item_lookup as 
select * from parquet.`${DA.paths.datasets}/ecommerce/raw/item-lookup`

In [0]:
select * from item_lookup;

<i18n value="9f1ad20f-1238-4a12-ad2a-f10169ed6475"/>



Run the cell below to confirm the lookup table has been loaded correctly.

In [0]:
%python
assert spark.table("item_lookup").count() == 12, "The table should have 12 records"
assert set(row['item_id'] for row in spark.table("item_lookup").select("item_id").orderBy('item_id').limit(5).collect()) == {'M_PREM_F', 'M_PREM_K', 'M_PREM_Q', 'M_PREM_T', 'M_STAN_F'}, "Make sure you have not modified the data provided"

<i18n value="c24885ea-010e-4b76-9e9d-cc749f10993a"/>

 
Run the following cell to delete the tables and files associated with this lesson.

In [0]:
%python
DA.cleanup()

-sandbox
&copy; 2023 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="https://help.databricks.com/">Support</a>