## Using SageMaker Studio to ingest data from Delta Table

In this SageMaker notebook, we will show how to access data created as a Delta Table (https://docs.delta.io/latest/index.html) and load it directly using the open source `delta-spark` library (https://github.com/delta-io/delta). We use the Dataframe API from the PySpark library to ingest and transform the dataset attributes. Finally, we show how to use the `DeltaTable` class to manipulate the underlying table structure.

Prerequisites: First, we have to install some libraries into our Jupyter environment. These include:

* install OpenJDK for access to Java and associated libraries
* install Pyspark (Spark for Python) library
* install Delta Spark open source library

We will use either conda or pip to install these various libraries, which are publicly available in either conda-forge or maven repositories on the internet.

Note 1: This notebook is designed to run within SageMaker Studio. 
Please make sure you select the `Python 3(Data Science)` kernel above. 

Note 2: Pyspark commands will run faster if you select an instance type with at least 16 GB of RAM (like `ml.g4dn.xlarge`).

This notebook is covered under the MIT-0 License:
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: MIT-0

In [1]:
%conda install openjdk -q -y 

Collecting package metadata (current_repodata.json): ...working... done
Solving environment: ...working... done

# All requested packages already installed.


Note: you may need to restart the kernel to use updated packages.


In [2]:
%pip install pyspark==3.2.0

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [3]:
%pip install delta-spark==1.1.0

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
%pip install -U "sagemaker>2.72"

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


#### Add hostname to instance /etc/hosts file

Additionally, there is another command below which adds the resolved hostname to the /etc/hosts file on the notebook instance. Note: This code is a temporary work-around required until SageMaker releases a fix for this issue.

In [5]:
# add hostname into /etc/hosts 
!grep `hostname` /etc/hosts >/dev/null || echo 127.0.0.1 `hostname` >> /etc/hosts

In [6]:
import sagemaker
sagemaker.__version__

'2.75.1'

In [7]:
# Import pyspark and build Spark session
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
import numpy as np
import pandas as pd
 

### Configure Spark Session to use additional library packages to satisfy dependencies


In [9]:
# Build list of packages entries using Maven coordinates (groupId:artifactId:version)
pkg_list = []
pkg_list.append("io.delta:delta-core_2.12:1.1.0")
pkg_list.append("org.apache.hadoop:hadoop-aws:3.2.2")

packages=(",".join(pkg_list))
print('packages: '+packages)

packages: io.delta:delta-core_2.12:1.1.0,org.apache.hadoop:hadoop-aws:3.2.2


In [10]:
# Instantiate Spark via builder
# Note: we use the `ContainerCredentialsProvider` to give us access to underlying IAM role permissions

spark = (SparkSession
    .builder
    .appName("PySparkApp") 
    .config("spark.jars.packages", packages) 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
    .config("fs.s3a.aws.credentials.provider",'com.amazonaws.auth.ContainerCredentialsProvider') 
    .getOrCreate())

sc = spark.sparkContext

print('Spark version: '+str(sc.version))

Spark version: 3.2.0


### Lending Club Loans Data

In this notebook, we will use a publicly available dataset from Lending Club that represents customer loan data. We have previously downloaded the "accepted" data file (accepted_2007_to_2018Q4.csv.gz"), and have selected a small subset of attributes. This dataset is available under the Creative Commons (CCO) License (https://creativecommons.org/publicdomain/zero/1.0/). 

Lending Club Data: https://www.kaggle.com/wordsforthewise/lending-club

In [11]:
import boto3
import sagemaker

# S3 bucket for saving processing job outputs
sm_session = sagemaker.Session()
bucket = sm_session.default_bucket()
region = sm_session.boto_region_name

sm_client = boto3.client('sagemaker')
iam_role = sagemaker.get_execution_role()

print('Default bucket: '+bucket)

Default bucket: sagemaker-us-east-1-572539092864


### Upload raw CSV file to S3 for efficient access by PySpark

Note: PySpark uses the 's3a' protocol to enable additional hadoop libraries. Therefore, we modify each native S3 URI to use 's3a' in the cells throughout this notebook.

In [12]:
from sagemaker.s3 import S3Uploader

local_basename = 'part-00000-tid-3730514337082854868-615b9e6a-1218-4f20-b930-a1fcfa6603a2-29-1-c000.csv'
local_file = '../data/raw_csv/' + local_basename
upload_s3_uri = f's3://{bucket}/delta_demo/raw_csv'

S3Uploader.upload(local_file, upload_s3_uri, sagemaker_session=sm_session)

's3://sagemaker-us-east-1-572539092864/delta_demo/raw_csv/part-00000-tid-3730514337082854868-615b9e6a-1218-4f20-b930-a1fcfa6603a2-29-1-c000.csv'

In [13]:
# Load raw data from S3 location

s3_raw_csv = f's3://{bucket}/delta_demo/raw_csv/part-00000-tid-3730514337082854868-615b9e6a-1218-4f20-b930-a1fcfa6603a2-29-1-c000.csv'
s3a_raw_csv = s3_raw_csv.replace('s3:','s3a:')

print(s3a_raw_csv)

s3a://sagemaker-us-east-1-572539092864/delta_demo/raw_csv/part-00000-tid-3730514337082854868-615b9e6a-1218-4f20-b930-a1fcfa6603a2-29-1-c000.csv


In [14]:
%%time

loans_df = spark.read.csv(s3a_raw_csv, header=True)

CPU times: user 3.66 ms, sys: 204 µs, total: 3.86 ms
Wall time: 5.35 s


In [15]:
print('Rows: '+str(loans_df.count()))
loans_df.dtypes

Rows: 49999


[('id', 'string'),
 ('loan_amnt', 'string'),
 ('funded_amnt', 'string'),
 ('term', 'string'),
 ('int_rate', 'string'),
 ('addr_state', 'string'),
 ('loan_status', 'string')]

In [16]:
loans_df.show(10)

+--------+---------+-----------+---------+--------+----------+-----------+
|      id|loan_amnt|funded_amnt|     term|int_rate|addr_state|loan_status|
+--------+---------+-----------+---------+--------+----------+-----------+
|68407277|   3600.0|     3600.0|36 months|   13.99|        PA| Fully Paid|
|68355089|  24700.0|    24700.0|36 months|   11.99|        SD| Fully Paid|
|68341763|  20000.0|    20000.0|60 months|   10.78|        IL| Fully Paid|
|66310712|  35000.0|    35000.0|60 months|   14.85|        NJ|    Current|
|68476807|  10400.0|    10400.0|60 months|   22.45|        PA| Fully Paid|
|68426831|  11950.0|    11950.0|36 months|   13.44|        GA| Fully Paid|
|68476668|  20000.0|    20000.0|36 months|    9.17|        MN| Fully Paid|
|67275481|  20000.0|    20000.0|36 months|    8.49|        SC| Fully Paid|
|68466926|  10000.0|    10000.0|36 months|    6.49|        PA| Fully Paid|
|68616873|   8000.0|     8000.0|36 months|   11.48|        RI| Fully Paid|
+--------+---------+-----

### Using Spark DataFrame, write out specifying the Delta Lake format

Note: No upfront schema definition needed.

In [17]:
# Write dataframe to Delta Table location using 's3a' protocol

s3_delta_table_uri=f's3://{bucket}/delta_demo/delta_format/'
s3a_delta_table_uri=s3_delta_table_uri.replace('s3:','s3a:')

print(s3a_delta_table_uri)

s3a://sagemaker-us-east-1-572539092864/delta_demo/delta_format/


In [18]:
loans_df.write.format("delta").mode("overwrite").save(s3a_delta_table_uri)

### We can use Spark Sql commands to query table data

Spark SQL can be run inline using SparkSession object. By passing a valid SQL syntax string, we can execute SQL commands which will return the result set as a Spark Dataframe.

In [19]:
# Create SQL command inserting the S3 path location

sql_cmd = f'SELECT * FROM delta.`{s3a_delta_table_uri}` ORDER BY loan_amnt'
print(f'SQL command: {sql_cmd}')


SQL command: SELECT * FROM delta.`s3a://sagemaker-us-east-1-572539092864/delta_demo/delta_format/` ORDER BY loan_amnt


In [20]:
# Execute SQL command which returns dataframe

sql_results = spark.sql(sql_cmd)
print(type(sql_results))

sql_results.show(10)

<class 'pyspark.sql.dataframe.DataFrame'>
+--------+---------+-----------+---------+--------+----------+-----------+
|      id|loan_amnt|funded_amnt|     term|int_rate|addr_state|loan_status|
+--------+---------+-----------+---------+--------+----------+-----------+
|68356556|   1000.0|     1000.0|36 months|   11.48|        NM| Fully Paid|
|68355144|   1000.0|     1000.0|36 months|   17.97|        NY| Fully Paid|
|67347650|   1000.0|     1000.0|36 months|   11.99|        NY| Fully Paid|
|68436190|   1000.0|     1000.0|36 months|   13.99|        TN|Charged Off|
|68356337|   1000.0|     1000.0|36 months|   12.88|        VA| Fully Paid|
|68576654|   1000.0|     1000.0|36 months|   13.44|        NY| Fully Paid|
|68395234|   1000.0|     1000.0|36 months|   12.88|        NY| Fully Paid|
|68536214|   1000.0|     1000.0|36 months|   11.99|        TX| Fully Paid|
|68465543|   1000.0|     1000.0|36 months|    7.49|        PA| Fully Paid|
|67859219|   1000.0|     1000.0|36 months|   11.48|       

### Leveraging DeltaTable API

The following notebook cells demonstrate various capabilities when using the DeltaTable class (https://docs.delta.io/latest/api/python/index.html).


The DeltaTable API allows us to retrieve the table modification history.

In [21]:
from delta import DeltaTable

# Use static method to determine table type
print(DeltaTable.isDeltaTable(spark, s3a_delta_table_uri))

True


In [22]:
deltaTable = DeltaTable.forPath(spark, s3a_delta_table_uri)

In [23]:
history_df = deltaTable.history()
history_df.head(3)

[Row(version=7, timestamp=datetime.datetime(2022, 2, 16, 18, 1, 3), userId=None, userName=None, operation='WRITE', operationParameters={'mode': 'Overwrite', 'partitionBy': '[]'}, job=None, notebook=None, clusterId=None, readVersion=6, isolationLevel='Serializable', isBlindAppend=False, operationMetrics={'numOutputRows': '49999', 'numOutputBytes': '526896', 'numFiles': '1'}, userMetadata=None, engineInfo='Apache-Spark/3.2.0 Delta-Lake/1.1.0'),
 Row(version=6, timestamp=datetime.datetime(2022, 2, 9, 16, 37, 39), userId=None, userName=None, operation='WRITE', operationParameters={'mode': 'Overwrite', 'partitionBy': '[]'}, job=None, notebook=None, clusterId=None, readVersion=5, isolationLevel='Serializable', isBlindAppend=False, operationMetrics={'numOutputRows': '49999', 'numOutputBytes': '526896', 'numFiles': '1'}, userMetadata=None, engineInfo='Apache-Spark/3.2.0 Delta-Lake/1.1.0'),
 Row(version=5, timestamp=datetime.datetime(2022, 2, 8, 20, 49, 24), userId=None, userName=None, operatio

### Demonstrate Schema Evolution

First, let's read data back in from the Delta Table. Since this data was written out as `delta` format, we need to specify `.format("delta")` when reading the data and then we provide the S3 URI where the Delta Table is located.

In [24]:
%%time

# optional: include `versionAsOf` option to read specific version
delta_df = (spark.read.format("delta").load(s3a_delta_table_uri))

CPU times: user 0 ns, sys: 2.15 ms, total: 2.15 ms
Wall time: 163 ms


In [25]:
delta_df.count()

49999

#### First we will write out the table with the existing schema to a new S3 location.

In [26]:
# We will specify a new S3 Uri to store the original data frame with the original schema

s3_delta_update_uri=f's3://{bucket}/delta_demo/delta_schema_update/'
s3a_delta_update_uri = s3_delta_update_uri.replace('s3:','s3a:')
print(s3a_delta_update_uri)

s3a://sagemaker-us-east-1-572539092864/delta_demo/delta_schema_update/


In [27]:
%%time

delta_df.write.format("delta").mode("overwrite").save(s3a_delta_update_uri)

CPU times: user 0 ns, sys: 3.12 ms, total: 3.12 ms
Wall time: 5.13 s


#### Now, let's add two new columns to the schema using the existing Dataframe

In [28]:
from pyspark.sql.functions import (datediff, lit, col)

funding_type_col = "funding_type"
excess_int_rate_col = "excess_int_rate"

delta_update_df = (delta_df.withColumn(funding_type_col, lit("NA"))
                           .withColumn(excess_int_rate_col, lit(0.0)))
delta_update_df.dtypes

[('id', 'string'),
 ('loan_amnt', 'string'),
 ('funded_amnt', 'string'),
 ('term', 'string'),
 ('int_rate', 'string'),
 ('addr_state', 'string'),
 ('loan_status', 'string'),
 ('funding_type', 'string'),
 ('excess_int_rate', 'double')]

In [29]:
delta_update_df.show(10)

+--------+---------+-----------+---------+--------+----------+-----------+------------+---------------+
|      id|loan_amnt|funded_amnt|     term|int_rate|addr_state|loan_status|funding_type|excess_int_rate|
+--------+---------+-----------+---------+--------+----------+-----------+------------+---------------+
|68407277|   3600.0|     3600.0|36 months|   13.99|        PA| Fully Paid|          NA|            0.0|
|68355089|  24700.0|    24700.0|36 months|   11.99|        SD| Fully Paid|          NA|            0.0|
|68341763|  20000.0|    20000.0|60 months|   10.78|        IL| Fully Paid|          NA|            0.0|
|66310712|  35000.0|    35000.0|60 months|   14.85|        NJ|    Current|          NA|            0.0|
|68476807|  10400.0|    10400.0|60 months|   22.45|        PA| Fully Paid|          NA|            0.0|
|68426831|  11950.0|    11950.0|36 months|   13.44|        GA| Fully Paid|          NA|            0.0|
|68476668|  20000.0|    20000.0|36 months|    9.17|        MN| F

In [30]:
print('Writing table update to: '+s3a_delta_update_uri)

Writing table update to: s3a://sagemaker-us-east-1-572539092864/delta_demo/delta_schema_update/


In [31]:
(delta_update_df.write.format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true") # option - evolve schema
 .save(s3a_delta_update_uri)
)

### DeltaTable Usage 

Let's check the modification history of our new table, which we will use to show schema evolution. The history will show each revision to the metadata.

In [32]:
# First, let's confirm that this S3 Uri path points to a DeltaTable

DeltaTable.isDeltaTable(spark, s3a_delta_update_uri)

True

In [33]:
# Using the S3 Uri, we instantiate a DeltaTable instance

deltaTableUpdate = DeltaTable.forPath(spark, s3a_delta_update_uri)

In [34]:
# Let's retrieve history BEFORE schema modification

history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      5|2022-02-16 18:02:45|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          4|  Serializable|        false|{numFiles -> 1, n...|        null|Apache-Spark/3.2....|
|      4|2022-02-16 18:02:19|  null|    null|    WRITE|{mode -> Overwrit...|null|    null|     null|          3|  Serializable|        false|{numFiles -> 1, n...|        null|Apache-Spark/3.2....|
|      3|2022-0

#### The DeltaTable `update` method can be used to execute a predicate and then apply a transform whenever the condition evaluates to True. 

In [35]:
deltaTableUpdate.update(condition = col("loan_amnt") == col("funded_amnt"),
 set = { funding_type_col: lit("FullyFunded") } )

In [36]:
final_update_df = deltaTableUpdate.toDF()

In [37]:
final_update_df.show(5)

+--------+---------+-----------+---------+--------+----------+-----------+------------+---------------+
|      id|loan_amnt|funded_amnt|     term|int_rate|addr_state|loan_status|funding_type|excess_int_rate|
+--------+---------+-----------+---------+--------+----------+-----------+------------+---------------+
|68407277|   3600.0|     3600.0|36 months|   13.99|        PA| Fully Paid| FullyFunded|            0.0|
|68355089|  24700.0|    24700.0|36 months|   11.99|        SD| Fully Paid| FullyFunded|            0.0|
|68341763|  20000.0|    20000.0|60 months|   10.78|        IL| Fully Paid| FullyFunded|            0.0|
|66310712|  35000.0|    35000.0|60 months|   14.85|        NJ|    Current| FullyFunded|            0.0|
|68476807|  10400.0|    10400.0|60 months|   22.45|        PA| Fully Paid| FullyFunded|            0.0|
+--------+---------+-----------+---------+--------+----------+-----------+------------+---------------+
only showing top 5 rows



#### Create DeltaTable `update` condition that passes function value

In [38]:
# Function that calculates rate overage (amount over 10.0)
def excess_int_rate(rate):
    return (rate-10.0)

print('Test: excess interest rate (rate-10.0) = '+str(excess_int_rate(12.5)))

Test: excess interest rate (rate-10.0) = 2.5


In [39]:
deltaTableUpdate.update(condition = col("int_rate") > 10.0,
 set = { excess_int_rate_col: excess_int_rate(col("int_rate")) } )

#### Use SQL command to dump records that met our previous condition

In [40]:
sql_cmd = f"SELECT * FROM delta.`{s3a_delta_update_uri}` WHERE int_rate > 10.0"
print(f'SQL command: {sql_cmd}')

SQL command: SELECT * FROM delta.`s3a://sagemaker-us-east-1-572539092864/delta_demo/delta_schema_update/` WHERE int_rate > 10.0


In [41]:
records = spark.sql(sql_cmd)
records.show(5)

+--------+---------+-----------+---------+--------+----------+-----------+------------+------------------+
|      id|loan_amnt|funded_amnt|     term|int_rate|addr_state|loan_status|funding_type|   excess_int_rate|
+--------+---------+-----------+---------+--------+----------+-----------+------------+------------------+
|68407277|   3600.0|     3600.0|36 months|   13.99|        PA| Fully Paid| FullyFunded|              3.99|
|68355089|  24700.0|    24700.0|36 months|   11.99|        SD| Fully Paid| FullyFunded|1.9900000000000002|
|68341763|  20000.0|    20000.0|60 months|   10.78|        IL| Fully Paid| FullyFunded|0.7799999999999994|
|66310712|  35000.0|    35000.0|60 months|   14.85|        NJ|    Current| FullyFunded|              4.85|
|68476807|  10400.0|    10400.0|60 months|   22.45|        PA| Fully Paid| FullyFunded|             12.45|
+--------+---------+-----------+---------+--------+----------+-----------+------------+------------------+
only showing top 5 rows



### Retrieve table history following schema modification

In [42]:
# Finally, let's retrieve table history AFTER the schema modifications

history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      7|2022-02-16 18:03:19|  null|    null|   UPDATE|{predicate -> (ca...|null|    null|     null|          6|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.2....|
|      6|2022-02-16 18:03:03|  null|    null|   UPDATE|{predicate -> (lo...|null|    null|     null|          5|  Serializable|        false|{numRemovedFiles ...|        null|Apache-Spark/3.2....|
|      5|2022-0