In [0]:
print("Delta Table Creation")

Delta Table Creation


In [0]:
# Delta Tables creation in SQL

In [0]:
%sql
-- creating a delta table
CREATE TABLE delta.`/tmp/delta-table1` USING DELTA AS SELECT col1 as id FROM VALUES 10,20,30,40,50;

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- Reading data from delta table
SELECT * FROM delta.`/tmp/delta-table1`;

id
10
20
30
40
50


In [0]:
%sql
--Update table data

--1) Overwrite
INSERT OVERWRITE delta.`/tmp/delta-table1` SELECT col1 as id FROM VALUES 5,6,7,8,9;

num_affected_rows,num_inserted_rows
5,5


In [0]:
%sql
-- Check after overwriting
SELECT * FROM delta.`/tmp/delta-table1`;
-- we observe values are over written

id
5
6
7
8
9


In [0]:
%sql
-- 2) Conditional update without over writing
-- Update every even value by adding 1000 to it
UPDATE delta.`/tmp/delta-table1` SET id = id + 1000 WHERE id % 2 == 0;

num_affected_rows
2


In [0]:
%sql
-- Check after update
SELECT * FROM delta.`/tmp/delta-table1`;
-- Every even value got added by 1000

id
5
1006
7
1008
9


In [0]:
%sql
-- Conditional update without over writing
-- Update every odd value by adding 200 to it
UPDATE delta.`/tmp/delta-table1` SET id = id + 200 WHERE id % 2 != 0;

num_affected_rows
3


In [0]:
%sql
-- Check after update
SELECT * FROM delta.`/tmp/delta-table1`;
-- Every odd value got added by 200

id
205
1006
207
1008
209


In [0]:
%sql

-- Delete very even value
DELETE FROM delta.`/tmp/delta-table1` WHERE id % 2 == 0;

num_affected_rows
2


In [0]:
%sql
-- Check after delete
SELECT * FROM delta.`/tmp/delta-table1`;
-- Even values got deleted

id
205
207
209


In [0]:
%sql
-- Upsert (merge) new data
CREATE TEMP VIEW newData AS SELECT col1 AS id FROM VALUES 1,3,5,7,9,11,13,15,17,19;

MERGE INTO delta.`/tmp/delta-table1` AS oldData
USING newData
ON oldData.id = newData.id
WHEN MATCHED
  THEN UPDATE SET id = newData.id
WHEN NOT MATCHED
  THEN INSERT (id) VALUES (newData.id);

SELECT * FROM delta.`/tmp/delta-table1`;

id
205
207
209
7
9
17
19
1
3
5


In [0]:
%sql
-- Read older versions of data using time travel
SELECT * FROM delta.`/tmp/delta-table1` VERSION AS OF 0;

-- The data which we have inserted in the starting is displayed.

id
10
20
30
40
50


In [0]:
# Delta table creation in python

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [0]:
spark

In [0]:
# Creating table
data = spark.range(0,10)
data.write.format("delta").save("/tmp/delta-tablex")

In [0]:
# Read data from table
df = spark.read.format("delta").load("/tmp/delta-tablex")
df.show()

+---+
| id|
+---+
|  3|
|  4|
|  8|
|  9|
|  0|
|  1|
|  2|
|  5|
|  6|
|  7|
+---+



In [0]:
# Update table data
# 1) Overwrite
data = spark.range(55,60)
data.write.format("delta").mode("overwrite").save("/tmp/delta-tablex")

In [0]:
# Check after overwriting
df = spark.read.format("delta").load("/tmp/delta-tablex")
df.show()

+---+
| id|
+---+
| 55|
| 56|
| 57|
| 58|
| 59|
+---+



In [0]:
# 2) Conditional update without overwriting
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "/tmp/delta-tablex")

# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

In [0]:
# Check after updating
df = spark.read.format("delta").load("/tmp/delta-tablex")
df.show()

# Even values get added by 100

+---+
| id|
+---+
| 55|
| 57|
| 59|
|156|
|158|
+---+



In [0]:
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))

# Check after deleting
df = spark.read.format("delta").load("/tmp/delta-tablex")
df.show()

+---+
| id|
+---+
| 55|
| 57|
| 59|
+---+



In [0]:
# Upsert (merge) new data
newData = spark.range(0, 20)

deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show(25)

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  7|
|  8|
|  9|
| 12|
| 13|
| 14|
| 17|
| 18|
| 19|
|  0|
|  1|
|  5|
|  6|
| 10|
| 11|
| 15|
| 16|
| 55|
| 57|
| 59|
+---+



In [0]:
# After merging

df = spark.read.format("delta").load("/tmp/delta-tablex")
df.show(25)

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  7|
|  8|
|  9|
| 12|
| 13|
| 14|
| 17|
| 18|
| 19|
|  0|
|  1|
|  5|
|  6|
| 10|
| 11|
| 15|
| 16|
| 55|
| 57|
| 59|
+---+



In [0]:
# Read older versions using time travel
# It shows values inserted after creating table
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-tablex")
df.show()

+---+
| id|
+---+
|  3|
|  4|
|  8|
|  9|
|  0|
|  1|
|  2|
|  5|
|  6|
|  7|
+---+



In [0]:
# Writing a stream of data to table


streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-tabley")

In [0]:
# Read stream of changes from table

stream2 = spark.readStream.format("delta").load("/tmp/delta-tabley").writeStream.format("console").start()