### Sample retrive delta tables from s3

###### Skip if you plan to use Spark to retrieve the data

In [None]:
%sh
pip3 install awswrangler 

##### Imports and configs

In [None]:
import boto3
from delta.tables import DeltaTable
from pyspark import SparkConf
from pyspark.sql import SparkSession

#################################################
### conf spark context and install delta-core ###
#################################################
conf = SparkConf()
conf.set('spark.jars.packages', 'io.delta:delta-core_2.12:1.0.0')
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

###########################################
### Set aws credentials (From env_vars) ###
###########################################
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

### Init session aws ###
session = boto3.Session()
s3_resource = session.resource('s3')


In [None]:
s3_path_in = f"s3://bucketname/uri/"

In [None]:
cols_use = [
    'field1',
    'field2',
    'field3'
]

In [None]:
### Spark mode
df = spark.read.parquet(s3_path_in).select(cols_use)

### pandas mode (if awsrangles was installed)
### use_threads=True (Max threads available use)
df = wr.s3.read_parquet(s3_path_in, dataset=True, boto3_session=session, columns=cols_use, use_threads=6)

In [None]:
### view spark dataframe
df.show(10)

### view pandas dataframe
df.head(10)

### Example of creating and upsert delta tables from s3

In [1]:
s3_bucket = "data-test-bucket"
s3_path = f"s3://bucketname/uri/"

### initially prepopulate the table with some data
users_initial = [
    { 'user_id': 1, 'name': 'Gina Burch', 'gender': 'f' },
    { 'user_id': 2, 'name': 'Francesco Coates', 'gender': 'm' },
    { 'user_id': 3, 'name': 'Saeed Wicks', 'gender': 'm' },
    { 'user_id': 4, 'name': 'Raisa Oconnell', 'gender': 'f' },
    { 'user_id': 5, 'name': 'Josh Copeland', 'gender': 'm' },
    { 'user_id': 6, 'name': 'Kaiden Williamson', 'gender': 'm' }
]

In [2]:
### Create df and load into s3
spark.createDataFrame(users_initial) \
  .write.format("delta").mode("overwrite").save(s3_path)

In [3]:
### load results via Spark API
print("DF after initial load:")
spark.read.format("delta").load(s3_path).orderBy("user_id").show()

In [4]:
### new data
users_append = [
        { 'user_id': 6, 'name': 'Kaiden Mccarty', 'gender': 'm' },
        { 'user_id': 7, 'name': 'Melody Gamble', 'gender': 'f' },
        { 'user_id': 8, 'name': 'Alexandre Huff', 'gender': 'm' },
]

df_users_append = spark.createDataFrame(users_append)

In [5]:
### Upsert delta mode
deltaTable = DeltaTable.forPath(spark, s3_path)

deltaTable.alias("old") \
  .merge( df_users_append.alias("new"), "old.user_id = new.user_id" ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

In [6]:
### load results via DeltaTable
print("\nDF after upserting data:")
deltaTable.toDF().orderBy("user_id").show()

In [7]:
### list all files before vacuum
bucket = boto3.resource("s3").Bucket(s3_bucket)
print("\nObjects on S3 level BEFORE vacuum:")
[print(obj.key) for obj in bucket.objects.all()]

In [9]:
### list all files after vacuum
print("\nObjects on S3 level AFTER vacuum:")
[print(obj.key) for obj in bucket.objects.all()]