In [1]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *

from delta import *
from delta.tables import *
import os


In [2]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [3]:
data = spark.range(0, 5)
data.write.format("delta").save("tmp/delta-table")

In [4]:
deltaTable = DeltaTable.forPath(spark, "tmp/delta-table")

In [5]:
# update and delete (auto applies to the data, not just the varaiable)
deltaTable.update(
    condition = expr("id % 2 == 0"),
    set = { "id": expr("id + 100") }
)
deltaTable.delete(condition = expr("id % 2 != 0"))


In [6]:
deltaTable.toDF().show()

+---+
| id|
+---+
|100|
|102|
|104|
+---+



In [7]:
newData = spark.range(0, 20)

deltaTable.alias("oldData").merge(
    newData.alias("newData"), "oldData.id = newData.id"
).whenMatchedUpdate(
    set = { "id": col("newData.id") }
).whenNotMatchedInsert(
    values = { "id": col("newData.id") }
).execute()

In [8]:
deltaTable.toDF().show()

+---+
| id|
+---+
|  3|
|  4|
|  8|
|  9|
| 18|
| 19|
| 13|
| 14|
|  0|
| 11|
|  7|
| 15|
|  5|
| 17|
|100|
|  1|
|  6|
|102|
| 12|
|  2|
+---+
only showing top 20 rows



In [14]:
df = spark.read.format("delta").option("versionAsOf", 2).load("tmp/delta-table")
df.show()

+---+
| id|
+---+
|100|
|102|
|104|
+---+



In [49]:
df = spark.read.format("csv").option("header", True).load("feature.csv")

df.show()

+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|   id|   domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|75331|     time|    ECDF_0|   IMU|   X|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|     time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75333|     time|    ECDF_0|   IMU|   Z|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75334|     time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75335|     time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75336|     time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75337|     time|    ECDF_0|   ACC|   X|2.258406919758802...|2023-07-06 13:19:36|      95941|
|75338|     time|    ECDF_0|   ACC|   Y|2.258406919758802...

In [50]:
df.write.format("delta").save("tmp/feature")


AnalysisException: [DELTA_PATH_EXISTS] Cannot write to already existent path file:/c:/Users/AnChengYang/IPerceptProjects/deltalake/tmp/feature without setting OVERWRITE = 'true'.

In [15]:
spark.read.format("delta").load("tmp/feature").show()

+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|   id|   domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|75331|     time|    ECDF_0|   IMU|   X|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|     time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75333|     time|    ECDF_0|   IMU|   Z|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75334|     time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75335|     time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75336|     time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75337|     time|    ECDF_0|   ACC|   X|2.258406919758802...|2023-07-06 13:19:36|      95941|
|75338|     time|    ECDF_0|   ACC|   Y|2.258406919758802...

In [16]:
spark.read.format("delta").load("tmp/feature").count()

1000

In [58]:
# grab a dataframe where aggregator is ECDF_0

df = spark.read.format("delta").load("tmp/feature")
df.show()

df.filter(df.aggregator == "ECDF_0").write.format("delta").save("tmp/feature_ecdf_0")

spark.read.format("delta").load("tmp/feature_ecdf_0").show()

spark.read.format("delta").load("tmp/feature_ecdf_0").count()



+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|   id|   domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|75331|     time|    ECDF_0|   IMU|   X|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|     time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75333|     time|    ECDF_0|   IMU|   Z|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75334|     time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75335|     time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75336|     time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75337|     time|    ECDF_0|   ACC|   X|2.258406919758802...|2023-07-06 13:19:36|      95941|
|75338|     time|    ECDF_0|   ACC|   Y|2.258406919758802...

24

In [59]:
# grab a dataframe where aggregator is ECDF_0 and sensor is IMU
df = spark.read.format("delta").load("tmp/feature")
df.show()

df.filter((df.aggregator == "ECDF_0") & (df.sensor == "IMU")).write.format("delta").save("tmp/feature_ecdf_0_imu")

spark.read.format("delta").load("tmp/feature_ecdf_0_imu").show()
spark.read.format("delta").load("tmp/feature_ecdf_0_imu").count()


+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|   id|   domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+---------+----------+------+----+--------------------+-------------------+-----------+
|75331|     time|    ECDF_0|   IMU|   X|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|     time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75333|     time|    ECDF_0|   IMU|   Z|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75334|     time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75335|     time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75336|     time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75337|     time|    ECDF_0|   ACC|   X|2.258406919758802...|2023-07-06 13:19:36|      95941|
|75338|     time|    ECDF_0|   ACC|   Y|2.258406919758802...

12

In [71]:
# create a dummy dataframe to be used for merge with feature ecdf_0 imu
new_df = spark.read.format("csv").option("header", True).load("new_feature.csv")

delta_table = DeltaTable.forPath(spark, "tmp/feature_ecdf_0_imu")

# filter the new_df where sensor == imu and aggregator == ecdf_0
filtered_new_df = new_df.filter((col("sensor") == "IMU") & (col("aggregator") == "ECDF_0"))
filtered_new_df.write.format("delta").mode("overwrite").save("tmp/filtered_new_df")
delta_filtered_new_df = spark.read.format("delta").load("tmp/filtered_new_df")

delta_table.alias("oldData").merge(
    source = delta_filtered_new_df.alias("newData"),
    condition = "oldData.id = newData.id"
).whenMatchedUpdateAll().execute()

In [69]:
delta_filtered_new_df.show()

+-----+------+----------+------+----+--------------------+-------------------+-----------+
|   id|domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+------+----------+------+----+--------------------+-------------------+-----------+
|15343|  time|    ECDF_0|   IMU|   X|6.584021243214336...|2023-07-10 11:09:52|      95941|
+-----+------+----------+------+----+--------------------+-------------------+-----------+



In [72]:
delta_table.toDF().show()

+-----+------+----------+------+----+--------------------+-------------------+-----------+
|   id|domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+------+----------+------+----+--------------------+-------------------+-----------+
|75331|  time|    ECDF_0|   IMU|   X|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|  time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75333|  time|    ECDF_0|   IMU|   Z|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75334|  time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75335|  time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75336|  time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75979|  time|    ECDF_0|   IMU|   X|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75980|  time|    ECDF_0|   IMU|   Y|1.681548369738855...|2023-07-06 13:19:36|      95942|

In [73]:
updated_df = spark.read.format("delta").load("tmp/feature_ecdf_0_imu")

In [74]:
updated_df.show()

# didnt get updated because the id is got no match

+-----+------+----------+------+----+--------------------+-------------------+-----------+
|   id|domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+------+----------+------+----+--------------------+-------------------+-----------+
|75331|  time|    ECDF_0|   IMU|   X|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|  time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75333|  time|    ECDF_0|   IMU|   Z|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75334|  time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75335|  time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75336|  time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75979|  time|    ECDF_0|   IMU|   X|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75980|  time|    ECDF_0|   IMU|   Y|1.681548369738855...|2023-07-06 13:19:36|      95942|

In [76]:
delta_table.alias("oldData").merge(
    source = delta_filtered_new_df.alias("newData"),
    condition = "oldData.id = newData.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute() # add insert all when not matched

In [77]:
updated_df = spark.read.format("delta").load("tmp/feature_ecdf_0_imu")
updated_df.show()

+-----+------+----------+------+----+--------------------+-------------------+-----------+
|   id|domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+------+----------+------+----+--------------------+-------------------+-----------+
|75331|  time|    ECDF_0|   IMU|   X|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|  time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75333|  time|    ECDF_0|   IMU|   Z|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75334|  time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75335|  time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75336|  time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75979|  time|    ECDF_0|   IMU|   X|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75980|  time|    ECDF_0|   IMU|   Y|1.681548369738855...|2023-07-06 13:19:36|      95942|

In [87]:
delta_df = spark.read.format("delta").load("tmp/feature_ecdf_0_imu")
schema = delta_df.schema
schema


StructType([StructField('id', StringType(), True), StructField('domain', StringType(), True), StructField('aggregator', StringType(), True), StructField('sensor', StringType(), True), StructField('axis', StringType(), True), StructField('value', StringType(), True), StructField('create_time', StringType(), True), StructField('test_div_id', StringType(), True)])

In [89]:
# Define the data for the single entry
data = [75331, "time", "ECDF_0", "IMU", "X", 3.0, "2023-07-06 13:19:36", 95941]

# Create a Row object representing the single entry
single_entry_row = Row(*data)


# Create a DataFrame with the single entry
single_entry_df = spark.createDataFrame([single_entry_row], schema=schema)

In [90]:
single_entry_df.show()

+-----+------+----------+------+----+-----+-------------------+-----------+
|   id|domain|aggregator|sensor|axis|value|        create_time|test_div_id|
+-----+------+----------+------+----+-----+-------------------+-----------+
|75331|  time|    ECDF_0|   IMU|   X|  3.0|2023-07-06 13:19:36|      95941|
+-----+------+----------+------+----+-----+-------------------+-----------+



In [91]:
delta_table.alias("oldData").merge(
    source = single_entry_df.alias("newData"),
    condition = "oldData.id = newData.id"
).whenMatchedUpdateAll().execute()

In [93]:
spark.read.format("delta").load("tmp/feature_ecdf_0_imu").show()

+-----+------+----------+------+----+--------------------+-------------------+-----------+
|   id|domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+------+----------+------+----+--------------------+-------------------+-----------+
|75335|  time|    ECDF_0|   IMU|  GY|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75981|  time|    ECDF_0|   IMU|   Z|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75979|  time|    ECDF_0|   IMU|   X|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75331|  time|    ECDF_0|   IMU|   X|                 3.0|2023-07-06 13:19:36|      95941|
|75334|  time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75984|  time|    ECDF_0|   IMU|  GZ|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75980|  time|    ECDF_0|   IMU|   Y|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75336|  time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|

In [94]:
# delete entry from delta table where axis = GY

delta_table.delete(condition = "axis = 'GY'")

In [17]:
spark.read.format("delta").load("tmp/feature_ecdf_0_imu").show()

+-----+------+----------+------+----+--------------------+-------------------+-----------+
|   id|domain|aggregator|sensor|axis|               value|        create_time|test_div_id|
+-----+------+----------+------+----+--------------------+-------------------+-----------+
|75981|  time|    ECDF_0|   IMU|   Z|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75979|  time|    ECDF_0|   IMU|   X|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75331|  time|    ECDF_0|   IMU|   X|                 3.0|2023-07-06 13:19:36|      95941|
|75334|  time|    ECDF_0|   IMU|  GX|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75984|  time|    ECDF_0|   IMU|  GZ|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75980|  time|    ECDF_0|   IMU|   Y|1.681548369738855...|2023-07-06 13:19:36|      95942|
|75336|  time|    ECDF_0|   IMU|  GZ|3.607920106217168...|2023-07-06 13:19:36|      95941|
|75332|  time|    ECDF_0|   IMU|   Y|3.607920106217168...|2023-07-06 13:19:36|      95941|