Конфигурация сессии, импорт модулей

In [None]:
import pyspark
from delta import *
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [None]:
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")

In [None]:
df = spark.read.format("delta").load("/tmp/delta-table/")
df.show()

In [None]:
df = spark.sql('SELECT * FROM delta.`/tmp/delta-table`')
df.show()

In [None]:
spark.sql('''DROP DATABASE delta1''')

Создание таблицы с помощью SQL:

In [None]:
# Refuses to work with databases other than delta, even if I create them
spark.sql('''CREATE OR REPLACE TABLE delta.`/tmp/delta-people` (
  name STRING,
  surname STRING)
USING DELTA''')

Добавление строк в таблицу:

In [None]:
spark.sql("INSERT INTO delta.`/tmp/delta-people` VALUES ('Василий', 'Петров')")
spark.sql("INSERT INTO delta.`/tmp/delta-people` VALUES ('Алексеев', 'Евгений')")

In [None]:
df = spark.read.format("delta").load("/tmp/delta-people/")
df.show()

Чтение предыдущих версий данных:

In [None]:
df = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta-people")
df.show()

Вставить в таблицу данные, не соответствующие схеме, невозможно:

In [None]:
spark.sql("INSERT INTO delta.`/tmp/delta-people` VALUES ('Романов', 'Петр', 1)")

Можно добавлять constraints:

In [None]:
spark.sql("ALTER TABLE delta.`/tmp/delta-people` ADD CONSTRAINT not_too_long_names CHECK (length(name) <= 10)")

Вставить в таблицу данные, не соответствующие constraints, невозможно:

In [None]:
spark.sql("INSERT INTO delta.`/tmp/delta-people` VALUES ('ВасилийВасилийВасилий', 'Петров')")

Добавление столбца в таблицу:

In [None]:
spark.sql("ALTER TABLE delta.`/tmp/delta-people` ADD COLUMN surname_length INT ")
spark.read.format("delta").load("/tmp/delta-people").show()

Вычисление значений для добавленного столбца:

In [None]:
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-people")
deltaTable.update(
  set = { "surname_length": expr("length(surname)") })
spark.read.format("delta").load("/tmp/delta-people").show()