In [1]:
from pyspark.sql import SparkSession, Row
from delta import *

table_path = "./tmp/delta-table"

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark

In [4]:
from datetime import datetime, date

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



Create a table

In [6]:

# Sample soccer event data
df = spark.createDataFrame([
    Row(event_id=1, is_touch=True, minute=5, period=1, player_id=101, second=23, team_id=1, is_shot=False, is_goal=False),
    Row(event_id=2, is_touch=True, minute=15, period=1, player_id=102, second=45, team_id=2, is_shot=False, is_goal=False),
    Row(event_id=3, is_touch=True, minute=30, period=1, player_id=101, second=10, team_id=1, is_shot=True, is_goal=False),
    Row(event_id=4, is_touch=True, minute=40, period=2, player_id=103, second=5, team_id=1, is_shot=True, is_goal=True),
    Row(event_id=5, is_touch=True, minute=50, period=2, player_id=104, second=32, team_id=2, is_shot=True, is_goal=False),
    Row(event_id=6, is_touch=True, minute=55, period=2, player_id=101, second=10, team_id=1, is_shot=False, is_goal=False)
])

# Create DataFrame
delta_path = "./tmp/soccer_events_delta"

df.write.format("delta").save(delta_path)

df.show()


+--------+--------+------+------+---------+------+-------+-------+-------+
|event_id|is_touch|minute|period|player_id|second|team_id|is_shot|is_goal|
+--------+--------+------+------+---------+------+-------+-------+-------+
|       1|    true|     5|     1|      101|    23|      1|  false|  false|
|       2|    true|    15|     1|      102|    45|      2|  false|  false|
|       3|    true|    30|     1|      101|    10|      1|   true|  false|
|       4|    true|    40|     2|      103|     5|      1|   true|   true|
|       5|    true|    50|     2|      104|    32|      2|   true|  false|
|       6|    true|    55|     2|      101|    10|      1|  false|  false|
+--------+--------+------+------+---------+------+-------+-------+-------+



In [None]:
# Read Data
df = spark.read.format("delta").load(delta_path)
df.show()

+--------+--------+------+------+---------+------+-------+-------+-------+
|event_id|is_touch|minute|period|player_id|second|team_id|is_shot|is_goal|
+--------+--------+------+------+---------+------+-------+-------+-------+
|       4|    true|    40|     2|      103|     5|      1|   true|   true|
|       2|    true|    15|     1|      102|    45|      2|  false|  false|
|       1|    true|     5|     1|      101|    23|      1|  false|  false|
|       5|    true|    50|     2|      104|    32|      2|   true|  false|
|       6|    true|    55|     2|      101|    10|      1|  false|  false|
|       3|    true|    30|     1|      101|    10|      1|   true|  false|
+--------+--------+------+------+---------+------+-------+-------+-------+



Overwrite

In [10]:
df = spark.createDataFrame([
    Row(event_id=1, is_touch=True, minute=5, period=1, player_id=101, second=23, team_id=1, is_shot=False, is_goal=False),
    Row(event_id=2, is_touch=True, minute=15, period=1, player_id=102, second=45, team_id=2, is_shot=False, is_goal=False),
    Row(event_id=3, is_touch=True, minute=30, period=1, player_id=101, second=10, team_id=1, is_shot=True, is_goal=False),
    Row(event_id=4, is_touch=True, minute=40, period=2, player_id=103, second=5, team_id=1, is_shot=True, is_goal=True),
    Row(event_id=5, is_touch=False, minute=50, period=2, player_id=104, second=32, team_id=2, is_shot=True, is_goal=False),
    Row(event_id=6, is_touch=False, minute=55, period=2, player_id=101, second=10, team_id=1, is_shot=True, is_goal=True)
])

df.show()

df.write.format("delta").mode("overwrite").save(delta_path)


+--------+--------+------+------+---------+------+-------+-------+-------+
|event_id|is_touch|minute|period|player_id|second|team_id|is_shot|is_goal|
+--------+--------+------+------+---------+------+-------+-------+-------+
|       1|    true|     5|     1|      101|    23|      1|  false|  false|
|       2|    true|    15|     1|      102|    45|      2|  false|  false|
|       3|    true|    30|     1|      101|    10|      1|   true|  false|
|       4|    true|    40|     2|      103|     5|      1|   true|   true|
|       5|   false|    50|     2|      104|    32|      2|   true|  false|
|       6|   false|    55|     2|      101|    10|      1|   true|   true|
+--------+--------+------+------+---------+------+-------+-------+-------+



In [None]:
# Read Data
df = spark.read.format("delta").load(delta_path)
df.show()

+--------+--------+------+------+---------+------+-------+-------+-------+
|event_id|is_touch|minute|period|player_id|second|team_id|is_shot|is_goal|
+--------+--------+------+------+---------+------+-------+-------+-------+
|       2|    true|    15|     1|      102|    45|      2|  false|  false|
|       1|    true|     5|     1|      101|    23|      1|  false|  false|
|       4|    true|    40|     2|      103|     5|      1|   true|   true|
|       5|   false|    50|     2|      104|    32|      2|   true|  false|
|       6|   false|    55|     2|      101|    10|      1|   true|   true|
|       3|    true|    30|     1|      101|    10|      1|   true|  false|
+--------+--------+------+------+---------+------+-------+-------+-------+



Conditional update without overwrite

In [16]:
from delta.tables import *
from pyspark.sql.functions import expr

deltaTable = DeltaTable.forPath(spark, delta_path)

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("minute < 10"),
  set = { "is_goal": expr("True") })

# # Delete every even value
# deltaTable.delete(condition = expr("id % 2 == 0"))

# # Upsert (merge) new data
# newData = spark.range(0, 20)

# deltaTable.alias("oldData") \
#   .merge(
#     newData.alias("newData"),
#     "oldData.id = newData.id") \
#   .whenMatchedUpdate(set = { "id": col("newData.id") }) \
#   .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
#   .execute()

df = deltaTable.toDF()
df.show()
df

+--------+--------+------+------+---------+------+-------+-------+-------+
|event_id|is_touch|minute|period|player_id|second|team_id|is_shot|is_goal|
+--------+--------+------+------+---------+------+-------+-------+-------+
|       2|    true|    15|     1|      102|    45|      2|  false|  false|
|       1|    true|     5|     1|      101|    23|      1|  false|   true|
|       4|    true|    40|     2|      103|     5|      1|   true|   true|
|       5|   false|    50|     2|      104|    32|      2|   true|  false|
|       6|   false|    55|     2|      101|    10|      1|   true|   true|
|       3|    true|    30|     1|      101|    10|      1|   true|  false|
+--------+--------+------+------+---------+------+-------+-------+-------+



DataFrame[event_id: bigint, is_touch: boolean, minute: bigint, period: bigint, player_id: bigint, second: bigint, team_id: bigint, is_shot: boolean, is_goal: boolean]

Read older versions of data using time travel

In [18]:
df = spark.read.format("delta") \
  .option("versionAsOf", 1) \
  .load(delta_path)

df.show()

+--------+--------+------+------+---------+------+-------+-------+-------+
|event_id|is_touch|minute|period|player_id|second|team_id|is_shot|is_goal|
+--------+--------+------+------+---------+------+-------+-------+-------+
|       5|   false|    50|     2|      104|    32|      2|   true|  false|
|       6|   false|    55|     2|      101|    10|      1|   true|   true|
+--------+--------+------+------+---------+------+-------+-------+-------+

