In [1]:
!pip install pyspark==3.1.2
!pip install delta-spark



In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '\
--packages "io.delta:delta-core_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.2.0" \
--conf "spark.hadoop.fs.s3a.endpoint=http://172.17.0.1:9000" \
--conf "spark.hadoop.fs.s3a.access.key=miniouser" \
--conf "spark.hadoop.fs.s3a.secret.key=miniouser" \
--conf "spark.hadoop.fs.s3a.path.style.access=true" \
--conf "spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
--conf "spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore" \
pyspark-shell'

In [3]:
from pyspark.sql import SparkSession
from delta import *

spark = SparkSession.builder \
    .appName("quickstart1") \
    .getOrCreate()

# Create a Delta table on S3:
spark.range(5).write.format("delta").save("s3a://miniouser/test01")

# Read a Delta table on S3:
spark.read.format("delta").load("s3a://miniouser/test01").show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [4]:
from pyspark.sql import SparkSession

tableName = "delta.`s3a://miniouser/test02/`"

# Enable SQL/DML commands and Metastore tables for the current spark session.
# We need to set the following configs

spark = SparkSession.builder \
    .appName("quickstart_sql1") \
    .master("local[*]") \
    .getOrCreate()

# Clear any previous runs
spark.sql("DROP TABLE IF EXISTS newData")

try: 
    # Create a table
    print("############# Creating a table ###############")
    spark.sql("CREATE TABLE %s(id LONG) USING delta" % tableName)
    spark.sql("INSERT INTO %s VALUES 0, 1, 2, 3, 4" % tableName)

    # Read the table
    print("############ Reading the table ###############")
    spark.sql("SELECT * FROM %s" % tableName).show()

    # Upsert (merge) new data
    print("########### Upsert new data #############")
    spark.sql("CREATE TABLE newData(id LONG) USING parquet")
    spark.sql("INSERT INTO newData VALUES 3, 4, 5, 6")

    spark.sql('''MERGE INTO {0} USING newData
            ON {0}.id = newData.id
            WHEN MATCHED THEN
              UPDATE SET {0}.id = newData.id
            WHEN NOT MATCHED THEN INSERT *
        '''.format(tableName))

    spark.sql("SELECT * FROM %s" % tableName).show()

    # Update table data
    print("########## Overwrite the table ###########")
    spark.sql("INSERT OVERWRITE %s select * FROM (VALUES 5, 6, 7, 8, 9) x (id)" % tableName)
    spark.sql("SELECT * FROM %s" % tableName).show()

    # Update every even value by adding 100 to it
    print("########### Update to the table(add 100 to every even value) ##############")
    spark.sql("UPDATE {0} SET id = (id + 100) WHERE (id % 2 == 0)".format(tableName))
    spark.sql("SELECT * FROM %s" % tableName).show()

    # Delete every even value
    print("######### Delete every even value ##############")
    spark.sql("DELETE FROM {0} WHERE (id % 2 == 0)".format(tableName))
    spark.sql("SELECT * FROM %s" % tableName).show()

    # Read old version of data using time travel
    print("######## Read old data using time travel ############")
    df = spark.read.format("delta").option("versionAsOf", 0).table(tableName)
    df.show()
    
finally:
    # cleanup
    spark.sql("DROP TABLE IF EXISTS newData")
    spark.stop()    

############# Creating a table ###############
############ Reading the table ###############
+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

########### Upsert new data #############
+---+
| id|
+---+
|  4|
|  5|
|  1|
|  0|
|  3|
|  6|
|  2|
+---+

########## Overwrite the table ###########
+---+
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+

########### Update to the table(add 100 to every even value) ##############
+---+
| id|
+---+
|  5|
|106|
|  7|
|108|
|  9|
+---+

######### Delete every even value ##############
+---+
| id|
+---+
|  5|
|  7|
|  9|
+---+

######## Read old data using time travel ############
+---+
| id|
+---+
+---+



In [27]:
tableName = "delta.`s3a://miniouser/test02/`"

spark.sql("DESCRIBE HISTORY %s" % tableName).show()

+-------+-------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|version|          timestamp|userId|userName|   operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|
+-------+-------------------+------+--------+------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+
|      5|2021-10-20 07:32:18|  null|    null|      DELETE|{predicate -> ["(...|null|    null|     null|          4|          null|        false|{numRemovedFiles ...|        null|
|      4|2021-10-20 07:32:14|  null|    null|      UPDATE|{predicate -> ((i...|null|    null|     null|          3|          null|        false|{numRemovedFiles ...|        null|
|      3|2021-10-20 07:32:09|  null|    null|       WRITE|{mode -> Overwrit...|null|    null|     null|  

In [28]:
spark.sql("DROP TABLE IF EXISTS test02")

tableName = "delta.`s3a://miniouser/test02/`"
# spark.sql("VACUUM %s"  % tableName)

# Read the table
print("############ Reading the table ###############")
spark.sql("SELECT * FROM %s" % tableName).show()

############ Reading the table ###############
+---+
| id|
+---+
|  5|
|  7|
|  9|
+---+

