# Use Delta Tables in Apache Spark - R version
R code by Antti Rask

Original Python version: [https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/03-delta-lake.html](https://microsoftlearning.github.io/mslearn-fabric/Instructions/Labs/03-delta-lake.html)

**Note!** This notebook was made primarily to be run inside Microsoft Fabric / Azure Synapse Data Engineering. You might have to make changes if you decide to open it in VS Code, for instance.

Tables in a Microsoft Fabric lakehouse are based on the open source Delta Lake format for Apache Spark. Delta Lake adds support for relational semantics for both batch and streaming data operations, and enables the creation of a Lakehouse architecture in which Apache Spark can be used to process and query data in tables that are based on underlying files in a data lake.

This exercise should take approximately **40** minutes to complete

**Note:** You need a Microsoft Fabric trial to complete this exercise.

## Create a workspace
Before working with data in Fabric, create a workspace with the Fabric trial enabled.

1. On the [Microsoft Fabric home page](https://app.fabric.microsoft.com/) at ``` https://app.fabric.microsoft.com ```, select **Synapse Data Engineering**.
2. In the menu bar on the left, select **Workspaces** (the icon looks similar to 🗇).
3. Create a new workspace with a name of your choice, selecting a licensing mode that includes Fabric capacity (Trial, Premium, or Fabric).
4. When your new workspace opens, it should be empty.

## Create a lakehouse and upload data
Now that you have a workspace, it’s time to create a data lakehouse for the data you’re going to analyze.

1. In the **Synapse Data Engineering** home page, create a new **Lakehouse** with a name of your choice.

After a minute or so, a new empty lakehouse. You need to ingest some data into the data lakehouse for analysis. There are multiple ways to do this, but in this exercise you’ll simply download a text file to your local computer (or lab VM if applicable) and then upload it to your lakehouse.

2. Download the [data file](https://github.com/MicrosoftLearning/dp-data/raw/main/products.csv) for this exercise from  ``` https://github.com/MicrosoftLearning/dp-data/raw/main/products.csv ```, saving it as **products.csv** on your local computer (or lab VM if applicable).

3. Return to the web browser tab containing your lakehouse, and in the **…** menu for the **Files** folder in the **Explorer** pane, select **New subfolder** and create a folder named **products**.

4. In the **…** menu for the **products** folder, select **Upload** and **Upload files**, and then upload the **products.csv** file from your local computer (or lab VM if applicable) to the lakehouse.

5. After the file has been uploaded, select the **products** folder; and verify that the **products.csv** file has been uploaded.

## Housekeeping

We'll get to the actual code soon, but first some housekeeping: installing and loading packages. And sorting out possible function conflicts.

**Note:** Spark supports multiple coding languages, including Scala, Java, and R. In this exercise, we’ll use RSpark, which is a Spark-optimized variant of R. Although the heavy lifting will be done by [sparklyr](https://spark.posit.co/), a tidyverse friendly R interface to Apache Spark.

### Installing and loading needed R packages

In [1]:
# Install packages
install.packages(
  c(
    "conflicted",
    "dplyr",
    "sparklyr"
  )
)

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 3, Finished, Available)

Installing packages into ‘/nfs4/R/user-lib/application_1711834346503_0001’
(as ‘lib’ is unspecified)
also installing the dependencies ‘tidyselect’, ‘dbplyr’

trying URL 'https://cloud.r-project.org/src/contrib/tidyselect_1.2.1.tar.gz'
Content type 'application/x-gzip' length 103591 bytes (101 KB)
downloaded 101 KB

trying URL 'https://cloud.r-project.org/src/contrib/dbplyr_2.5.0.tar.gz'
Content type 'application/x-gzip' length 770647 bytes (752 KB)
downloaded 752 KB

trying URL 'https://cloud.r-project.org/src/contrib/conflicted_1.2.0.tar.gz'
Content type 'application/x-gzip' length 17071 bytes (16 KB)
downloaded 16 KB

trying URL 'https://cloud.r-project.org/src/contrib/dplyr_1.1.4.tar.gz'
Content type 'application/x-gzip' length 1207521 bytes (1.2 MB)
downloaded 1.2 MB

trying URL 'https://cloud.r-project.org/src/contrib/sparklyr_1.8.5.tar.gz'
Content type 'application/x-gzip' length 3210411 bytes (3.1 MB)
downloaded 3.1 MB

Loading required package: usethis
* installing *source* pac

In [2]:
# Load packages
library(conflicted) # Handling function conflicts between packages
library(dplyr)      # Data wrangling
library(sparklyr)   # Using Spark with R

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 4, Finished, Available)

### Handling function conflicts between packages using the [{conflicted}](https://conflicted.r-lib.org/) package

In [3]:
# Handling conflicts
conflicts_prefer(dplyr::summarize)

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 5, Finished, Available)

[conflicted] Will prefer dplyr::summarize over any other package.

### Setting up a Spark connection with {sparklyr}

In [4]:
# Setting up sparklyr
spark_version <- sparkR.version()
config        <- spark_config()
sc            <- spark_connect(
    master  = "yarn",
    method  = "synapse",
    version = spark_version,
    config  = config
)

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 6, Finished, Available)

## Explore data in a dataframe
1. On the **Home** page while viewing the contents of the **products** folder in your datalake, in the **Open notebook** menu, select **New notebook**.

After a few seconds, a new notebook containing a single cell will open. Notebooks are made up of one or more cells that can contain code or markdown (formatted text).

2. Select the existing cell in the notebook, which contains some simple code, and then use its 🗑 (Delete) icon at its top-right to remove it - you will not need this code.

3. In the **Lakehouse explorer** pane on the left, expand **Files** and select **products** to reveal a new pane showing the **products.csv** file you uploaded previously.

4. In the **…** menu for **products.csv**, select **Load data** > **Spark**. A new code cell containing the following code should be added to the notebook:

**Tip:** You can hide the pane containing the files on the left by using its « icon. Doing so will help you focus on the notebook.

5. Use the **▷ (Run cell)** button on the left of the cell to run it.

**Note:** Since this is the first time you’ve run any Spark code in this notebook, a Spark session must be started. This means that the first run can take a minute or so to complete. Subsequent runs will be quicker.

6. When the cell command has completed, review the output below the cell.

In [5]:
df <- spark_read_csv(
    sc,
    path = "Files/products/products.csv"
)
# df now is a Spark DataFrame containing CSV data from "Files/products/products.csv".
df

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 7, Finished, Available)

# Source: spark<products_7e1abce8_5676_4ef9_b259_c6b1ffeb2ff2> [?? x 4]
   ProductID ProductName             Category       ListPrice
       <int> <chr>                   <chr>              <dbl>
 1       771 Mountain-100 Silver, 38 Mountain Bikes     3400.
 2       772 Mountain-100 Silver, 42 Mountain Bikes     3400.
 3       773 Mountain-100 Silver, 44 Mountain Bikes     3400.
 4       774 Mountain-100 Silver, 48 Mountain Bikes     3400.
 5       775 Mountain-100 Black, 38  Mountain Bikes     3375.
 6       776 Mountain-100 Black, 42  Mountain Bikes     3375.
 7       777 Mountain-100 Black, 44  Mountain Bikes     3375.
 8       778 Mountain-100 Black, 48  Mountain Bikes     3375.
 9       779 Mountain-200 Silver, 38 Mountain Bikes     2320.
10       780 Mountain-200 Silver, 42 Mountain Bikes     2320.
# ℹ more rows

## Create delta tables
You can save the dataframe as a delta table by using the **spark_write_table()** function and adding delta as a format in the options. Delta Lake supports the creation of both managed and external tables.

### Create a managed table
Managed tables are tables for which both the schema metadata and the data files are managed by Fabric. The data files for the table are created in the **Tables** folder.

1. Under the results returned by the first code cell, use the **+ Code** icon to add a new code cell if one doesn’t already exist.

**Tip:** To see the **+ Code** icon, move the mouse to just below and to the left of the output from the current cell. Alternatively, in the menu bar, on the **Edit** tab, select **+ Add code cell**.

2. Enter the following code in the new cell and run it:

3. In the **Lakehouse explorer** pane, in the **…** menu for the **Tables** folder, select **Refresh**. Then expand the **Tables** node and verify that the **managed_products** table has been created.

In [6]:
df %>%
    spark_write_table(
        name    = "managed_products",
        mode    = "overwrite",
        options = list(format = "delta")
    )

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 8, Finished, Available)

### Create an external table
You can also create external tables for which the schema metadata is defined in the metastore for the lakehouse, but the data files are stored in an external location.

1. Add another new code cell, and add the following code to it:

2. In the **Lakehouse explorer** pane, in the **…** menu for the **Files** folder, select **Copy ABFS path**.

The ABFS path is the fully qualified path to the **Files** folder in the OneLake storage for your lakehouse - similar to this:

_abfss://workspace@tenant-onelake.dfs.fabric.microsoft.com/lakehousename.Lakehouse/Files_

3. In the code you entered into the code cell, replace **abfs_path** with the path you copied to the clipboard so that the code saves the dataframe as an external table with data files in a folder named **external_products** in your **Files** folder location. The full path should look similar to this:

_abfss://workspace@tenant-onelake.dfs.fabric.microsoft.com/lakehousename.Lakehouse/Files/external_products_

4. In the **Lakehouse explorer** pane, in the **…** menu for the **Tables** folder, select **Refresh**. Then expand the **Tables** node and verify that the **external_products** table has been created.

5. In the **Lakehouse explorer** pane, in the **…** menu for the **Files** folder, select **Refresh**. Then expand the **Files** node and verify that the **external_products** folder has been created for the table’s data files.

In [7]:
df %>%
    spark_write_table(
        name    = "external_products",
        mode    = "overwrite",
        options = list(
            format = "delta",
            path   = "abfs_path/external_products",
            type   = "EXTERNAL"
        )
    )

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 9, Finished, Available)

### Compare managed and external tables
Let’s explore the differences between managed and external tables.

1. Add another code cell and run the following code:

In the results, view the **Location** property for the table, which should be a path to the OneLake storage for the lakehouse ending with **/Tables/managed_products** (you may need to widen the **Data type** column to see the full path).

In [8]:
%%sql

DESCRIBE FORMATTED managed_products;

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 10, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

2. Modify the ``` DESCRIBE ``` command to show the details of the external_products table as shown here:

In the results, view the **Location** property for the table, which should be a path to the OneLake storage for the lakehouse ending with **/Files/external_products** (you may need to widen the **Data Type** column to see the full path).

The files for managed table are stored in the **Tables** folder in the OneLake storage for the lakehouse. In this case, a folder named **managed_products** has been created to store the Parquet files and **delta_log** folder for the table you created.

In [9]:
%%sql

DESCRIBE FORMATTED external_products;

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 11, Finished, Available)

<Spark SQL result set with 12 rows and 3 fields>

3. Add another code cell and run the following code:

4. In the **Lakehouse explorer** pane, in the **…** menu for the **Tables** folder, select **Refresh**. Then expand the **Tables** node and verify that no tables are listed.

5. In the **Lakehouse explorer** pane, expand the **Files** folder and verify that the **external_products** has not been deleted. Select this folder to view the Parquet data files and **_delta_log** folder for the data that was previously in the **external_products** table. The table metadata for the external table was deleted, but the files were not affected.

In [10]:
%%sql

DROP TABLE managed_products;
DROP TABLE external_products;

StatementMeta(, , -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

### Use SQL to create a table
1. Add another code cell and run the following code:

**Note!** I added the DROP TABLE in case you want to rerun the whole notebook. Otherwise the cell will produce an error.

2. In the **Lakehouse explorer** pane, in the **…** menu for the **Tables** folder, select **Refresh**. Then expand the **Tables** node and verify that a new table named **products** is listed. Then expand the table to verify that its schema matches the original dataframe that was saved in the **external_products** folder.

In [14]:
%%sql

DROP TABLE products;

CREATE TABLE products
USING DELTA
LOCATION 'Files/external_products';

StatementMeta(, , -1, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

<Spark SQL result set with 0 rows and 0 fields>

3. Add another code cell and run the following code:

In [13]:
%%sql

SELECT * FROM products;

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 16, Finished, Available)

<Spark SQL result set with 295 rows and 4 fields>

## Explore table versioning
Transaction history for delta tables is stored in JSON files in the **delta_log** folder. You can use this transaction log to manage data versioning.

1. Add a new code cell to the notebook and run the following code:

This code implements a 10% reduction in the price for mountain bikes.

In [15]:
%%sql

UPDATE products
SET ListPrice = ListPrice * 0.9
WHERE Category = 'Mountain Bikes';

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 19, Finished, Available)

<Spark SQL result set with 1 rows and 1 fields>

2. Add another code cell and run the following code:

The results show the history of transactions recorded for the table.

In [16]:
%%sql

DESCRIBE HISTORY products;

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 20, Finished, Available)

<Spark SQL result set with 4 rows and 15 fields>

3. Add another code cell and run the following code:

The results show two dataframes - one containing the data after the price reduction, and the other showing the original version of the data.

In [20]:
delta_table_path <- "Files/external_products"

# Get the current data
current_data <- spark_read_delta(sc, path = delta_table_path)

current_data %>% 
    summarize(sum = sum(ListPrice, na.rm = TRUE))

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 24, Finished, Available)

# Source: spark<?> [?? x 1]
      sum
    <dbl>
1 214269.

In [18]:
# Get the version 0 data
original_data <- spark_read_delta(
    sc,
    path    = delta_table_path,
    version = 0
)
original_data %>% 
    summarize(sum = sum(ListPrice))

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 22, Finished, Available)

# Source: spark<?> [?? x 1]
      sum
    <dbl>
1 219656.

## Use delta tables for streaming data
**Note!** SparkR isn't yet supported for streaming data. That's why this part is using PySpark code.

Delta lake supports streaming data. Delta tables can be a sink or a source for data streams created using the Spark Structured Streaming API. In this example, you’ll use a delta table as a sink for some streaming data in a simulated internet of things (IoT) scenario.

1. Add a new code cell in the notebook. Then, in the new cell, add the following code and run it:

Ensure the message Source stream created… is printed. The code you just ran has created a streaming data source based on a folder to which some data has been saved, representing readings from hypothetical IoT devices.

In [21]:
%%pyspark

from notebookutils         import mssparkutils
from pyspark.sql.types     import *
from pyspark.sql.functions import *

# Create a folder
inputPath = 'Files/data/'
mssparkutils.fs.mkdirs(inputPath)

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
StructField("device", StringType(), False),
StructField("status", StringType(), False)
])
iotstream = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)

# Write some event data to the folder
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "data.txt", device_data, True)

print("Source stream created...")

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 26, Finished, Available)

Source stream created...


2. In a new code cell, add and run the following code:

This code writes the streaming device data in delta format to a folder named **iotdevicedata**. Because the path for the folder location in the **Tables** folder, a table will automatically be created for it.

In [22]:
%%pyspark

# Write the stream to a delta table
delta_stream_table_path = 'Tables/iotdevicedata'
checkpointpath          = 'Files/delta/checkpoint'
deltastream             = iotstream.writeStream.format("delta").option("checkpointLocation", checkpointpath).start(delta_stream_table_path)

print("Streaming to delta sink...")

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 27, Finished, Available)

Streaming to delta sink...


3. In a new code cell, add and run the following code:

This code queries the **iotdevicedata** table, which contains the device data from the streaming source.

In [23]:
%%sql

SELECT * FROM iotdevicedata;

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 28, Finished, Available)

<Spark SQL result set with 16 rows and 2 fields>

4. In a new code cell, add and run the following code:

This code writes more hypothetical device data to the streaming source.

In [24]:
%%pyspark

# Add more data to the source stream
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 29, Finished, Available)

True

5. Re-run the cell containing the following code:

This code queries the **iotdevicedata** table again, which should now include the additional data that was added to the streaming source.

In [25]:
%%sql

SELECT * FROM iotdevicedata;

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 30, Finished, Available)

<Spark SQL result set with 16 rows and 2 fields>

6. In a new code cell, add and run the following code:

This code stops the stream.

In [26]:
%%pyspark

deltastream.stop()

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 31, Finished, Available)

## Save the notebook and end the Spark session
Now that you’ve finished working with the data, you can save the notebook with a meaningful name and end the Spark session.

1. In the notebook menu bar, use the **⚙️ Settings** icon to view the notebook settings.
2. Set the **Name** of the notebook to **Use Delta Tables in Apache Spark - R version**, and then close the settings pane.
3. Run the following code to **disconnect** the Spark connection:

In [27]:
# Disconnect the Spark connection 
spark_disconnect(sc)

StatementMeta(, 434f51d4-1857-4492-9ba0-b0be66a4c065, 32, Finished, Available)