In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
df = spark.read.option("inferSchema", "true").option("header", "true").csv("/FileStore/tables/emp.txt")


df.write.format("delta").mode("overwrite").save("/FileStore/tables/delta_train/")




In [0]:
display(dbutils.fs.ls("/FileStore/tables/delta_train/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/delta_train/_delta_log/,_delta_log/,0,0
dbfs:/FileStore/tables/delta_train/part-00000-25119bca-3aa9-4418-b90e-53b8f5491705-c000.snappy.parquet,part-00000-25119bca-3aa9-4418-b90e-53b8f5491705-c000.snappy.parquet,1599,1707892326000


In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS delta_training")


Out[6]: DataFrame[]

In [0]:

ddl_query = """
CREATE TABLE IF NOT EXISTS delta_training.emp_file
USING DELTA
LOCATION '/FileStore/tables/delta_train/'
"""

spark.sql(ddl_query)

Out[8]: DataFrame[]

In [0]:
spark.catalog.setCurrentDatabase("delta_training")
tables = spark.catalog.listTables()
for table in tables:
    print(table)

Table(name='emp_file', catalog='spark_catalog', namespace=['delta_training'], description=None, tableType='EXTERNAL', isTemporary=False)


In [0]:
spark.sql("select * from delta_training.emp_file").show()

+---------+---------+---------+---+
|123234877|  Michael|   Rogers| 14|
+---------+---------+---------+---+
|152934485|    Anand|Manikutty| 14|
|222364883|    Carol|    Smith| 37|
|326587417|      Joe|  Stevens| 37|
|332154719|Mary-Anne|   Foster| 14|
|332569843|   George| ODonnell| 77|
|546523478|     John|      Doe| 59|
|631231482|    David|    Smith| 77|
|654873219|   Zacary|    Efron| 59|
|745685214|     Eric|Goldsmith| 59|
|845657245|Elizabeth|      Doe| 14|
|845657246|    Kumar|    Swamy| 14|
+---------+---------+---------+---+



In [0]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [0]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [0]:
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")

In [0]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")

deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|105|
|187|
| 73|
|213|
|243|
| 35|
|217|
|255|
|157|
| 15|
|199|
|135|
| 11|
|281|
|149|
|249|
|305|
| 19|
| 61|
|121|
+---+
only showing top 20 rows



In [0]:
deltaTable.delete(condition = expr("id % 2 == 0"))

In [0]:
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()

+---+
| id|
+---+
|105|
|187|
| 73|
|213|
|243|
| 35|
|217|
|255|
|157|
| 15|
|199|
|135|
| 11|
|281|
|149|
|249|
|305|
| 19|
| 61|
|121|
+---+
only showing top 20 rows



In [0]:
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
| 21|
| 23|
| 25|
| 27|
| 29|
| 31|
| 33|
| 35|
| 37|
| 39|
| 41|
| 43|
| 45|
| 47|
| 49|
| 51|
| 53|
| 55|
| 57|
| 59|
+---+
only showing top 20 rows

