In [58]:
from delta import *
import pyspark
from pyspark.sql.functions import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Read and write Delta lake tables

In [2]:
#read the credit card data as spark dataframe
df=spark.read.option("header",True).csv("/opt/spark/python/mldatasets/UCI_Credit_Card.csv")

In [59]:
#read the credit card data as spark dataframe
df=spark.read.option("header",True).csv("/mnt/deltalake/df7k3s.ailab.local/delta/dbfs/creditcard/mldatasets/UCI_Credit_Card.csv")

In [57]:
#Remove the directoty 
!rm -rf /opt/spark/python/cc

## Write

In [60]:
# Save table as delta lake table
df.write.format("delta").mode("overwrite").save("/mnt/deltalake/df7k3s.ailab.local/delta/dbfs/creditcard/transactionlog")

In [61]:
df.sort(df.ID.desc()).select(df["ID"]).show(truncate=False)

+----+
|ID  |
+----+
|9999|
|9998|
|9997|
|9996|
|9995|
|9994|
|9993|
|9992|
|9991|
|9990|
|999 |
|9989|
|9988|
|9987|
|9986|
|9985|
|9984|
|9983|
|9982|
|9981|
+----+
only showing top 20 rows



In [5]:
#Read the file from delta lake
df_delta = spark.read.format("delta").load("/opt/spark/python/cc")



In [6]:
df_delta.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LIMIT_BAL: string (nullable = true)
 |-- SEX: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- MARRIAGE: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- PAY_0: string (nullable = true)
 |-- PAY_2: string (nullable = true)
 |-- PAY_3: string (nullable = true)
 |-- PAY_4: string (nullable = true)
 |-- PAY_5: string (nullable = true)
 |-- PAY_6: string (nullable = true)
 |-- BILL_AMT1: string (nullable = true)
 |-- BILL_AMT2: string (nullable = true)
 |-- BILL_AMT3: string (nullable = true)
 |-- BILL_AMT4: string (nullable = true)
 |-- BILL_AMT5: string (nullable = true)
 |-- BILL_AMT6: string (nullable = true)
 |-- PAY_AMT1: string (nullable = true)
 |-- PAY_AMT2: string (nullable = true)
 |-- PAY_AMT3: string (nullable = true)
 |-- PAY_AMT4: string (nullable = true)
 |-- PAY_AMT5: string (nullable = true)
 |-- PAY_AMT6: string (nullable = true)
 |-- default: string (nullable = true)



## Transformation 


In [7]:
# Convert the column type from string to int

In [8]:
df_delta=df_delta.withColumn("ID",col("ID").cast("int"))


In [9]:
print(type(df_delta))

<class 'pyspark.sql.dataframe.DataFrame'>


In [10]:
df_delta.sort(df_delta.ID.desc()).select(df_delta["ID"]).show(truncate=False)

+-----+
|ID   |
+-----+
|30007|
|30006|
|30005|
|30004|
|30003|
|30002|
|30001|
|30000|
|29999|
|29998|
|29997|
|29996|
|29995|
|29994|
|29993|
|29992|
|29991|
|29990|
|29989|
|29988|
+-----+
only showing top 20 rows



## Read 

In [11]:
# Read the file as delta lake 
deltaTable = DeltaTable.forPath(spark, "/opt/spark/python/cc")

In [12]:
print(type(deltaTable))

<class 'delta.tables.DeltaTable'>


## Update 

In [13]:

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('ID') == '30002',
  set = { 'ID': lit(40000) }
)

## Delete

In [14]:
# Delete 1 row
deltaTable=deltaTable.delete(condition = expr("ID == 30001"))
# Check the logs 
print("Delete completed")

Delete completed


## Append

In [16]:
# Add the few rows to the existing delta lake tables
df_append=spark.read.option("header",True).csv("/opt/spark/python/mldatasets/cc-append.csv")

In [17]:
df_append.write.format("delta").mode("append").save("/opt/spark/python/cc")

## Upsert using Merge

Merge has following features
1. Insert new records
2. update existing records
3. Delete records

All above can be done using same statements

Column names
ID,LIMIT_BAL,SEX,EDUCATION,MARRIAGE,AGE,PAY_0,PAY_2,PAY_3,PAY_4,PAY_5,PAY_6,BILL_AMT1,BILL_AMT2,BILL_AMT3,BILL_AMT4,BILL_AMT5,BILL_AMT6,PAY_AMT1,PAY_AMT2,PAY_AMT3,PAY_AMT4,PAY_AMT5,PAY_AMT6,default

In [18]:
deltaTable = DeltaTable.forPath(spark, '/opt/spark/python/cc')
df_merge=spark.read.option("header",True).csv("/opt/spark/python/mldatasets/cc-upsert.csv")


In [19]:
df_merge.printSchema()

root
 |-- ID: string (nullable = true)
 |-- LIMIT_BAL: string (nullable = true)
 |-- SEX: string (nullable = true)
 |-- EDUCATION: string (nullable = true)
 |-- MARRIAGE: string (nullable = true)
 |-- AGE: string (nullable = true)
 |-- PAY_0: string (nullable = true)
 |-- PAY_2: string (nullable = true)
 |-- PAY_3: string (nullable = true)
 |-- PAY_4: string (nullable = true)
 |-- PAY_5: string (nullable = true)
 |-- PAY_6: string (nullable = true)
 |-- BILL_AMT1: string (nullable = true)
 |-- BILL_AMT2: string (nullable = true)
 |-- BILL_AMT3: string (nullable = true)
 |-- BILL_AMT4: string (nullable = true)
 |-- BILL_AMT5: string (nullable = true)
 |-- BILL_AMT6: string (nullable = true)
 |-- PAY_AMT1: string (nullable = true)
 |-- PAY_AMT2: string (nullable = true)
 |-- PAY_AMT3: string (nullable = true)
 |-- PAY_AMT4: string (nullable = true)
 |-- PAY_AMT5: string (nullable = true)
 |-- PAY_AMT6: string (nullable = true)
 |-- default: string (nullable = true)



In [20]:
deltaTable.alias('cc') \
  .merge(df_merge.alias('upsrt'),'cc.ID = upsrt.ID') \
  .whenMatchedUpdate(set ={\
"ID": "upsrt.ID",
"LIMIT_BAL":"upsrt.LIMIT_BAL",
"SEX": "upsrt.SEX",
"EDUCATION": "upsrt.EDUCATION",
"MARRIAGE": "upsrt.MARRIAGE",
"AGE": "upsrt.AGE",
"PAY_0": "upsrt.PAY_0",
"PAY_2": "upsrt.PAY_2",
"PAY_3": "upsrt.PAY_3",
"PAY_4":"upsrt.PAY_4",
"PAY_5": "upsrt.PAY_5",
"PAY_6":"upsrt.PAY_6",
"BILL_AMT1": "upsrt.BILL_AMT1",
"BILL_AMT2": "upsrt.BILL_AMT2",
"BILL_AMT3": "upsrt.BILL_AMT3",
"BILL_AMT4": "upsrt.BILL_AMT4",
"BILL_AMT5": "upsrt.BILL_AMT5",
"BILL_AMT6": "upsrt.BILL_AMT6",
"PAY_AMT1": "upsrt.PAY_AMT1"  ,
"PAY_AMT2": "upsrt.PAY_AMT2",
"PAY_AMT3": "upsrt.PAY_AMT3",
"PAY_AMT4": "upsrt.PAY_AMT4",
"PAY_AMT5": "upsrt.PAY_AMT5",
"PAY_AMT6": "upsrt.PAY_AMT6",
"default": "upsrt.default"  }) \
.whenNotMatchedInsert(values ={ \
"ID": "upsrt.ID",
"LIMIT_BAL":"upsrt.LIMIT_BAL",
"SEX": "upsrt.SEX",
"EDUCATION": "upsrt.EDUCATION",
"MARRIAGE": "upsrt.MARRIAGE",
"AGE": "upsrt.AGE",
"PAY_0": "upsrt.PAY_0",
"PAY_2": "upsrt.PAY_2",
"PAY_3": "upsrt.PAY_3",
"PAY_4":"upsrt.PAY_4",
"PAY_5": "upsrt.PAY_5",
"PAY_6":"upsrt.PAY_6",
"BILL_AMT1": "upsrt.BILL_AMT1",
"BILL_AMT2": "upsrt.BILL_AMT2",
"BILL_AMT3": "upsrt.BILL_AMT3",
"BILL_AMT4": "upsrt.BILL_AMT4",
"BILL_AMT5": "upsrt.BILL_AMT5",
"BILL_AMT6": "upsrt.BILL_AMT6",
"PAY_AMT1": "upsrt.PAY_AMT1"  ,
"PAY_AMT2": "upsrt.PAY_AMT2",
"PAY_AMT3": "upsrt.PAY_AMT3",
"PAY_AMT4": "upsrt.PAY_AMT4",
"PAY_AMT5": "upsrt.PAY_AMT5",
"PAY_AMT6": "upsrt.PAY_AMT6",
"default": "upsrt.default"  }) .execute()

In [21]:
#Convert the deltalake table to spark dataframe
deltaTable = DeltaTable.forPath(spark, '/opt/spark/python/cc')
df_merge_upsert=deltaTable.toDF()

In [22]:
df_merge_upsert_int=df_merge_upsert.withColumn("ID",col("ID").cast("int"))


In [23]:
df_merge_upsert_int.count()


30011

In [24]:
df_merge_upsert_int.sort(df_merge_upsert_int.ID.desc()).select(df_merge_upsert_int["ID"],df_merge_upsert_int["LIMIT_BAL"]).show(truncate=False)



+-----+---------+
|ID   |LIMIT_BAL|
+-----+---------+
|50011|120000   |
|50003|50000    |
|50002|90000    |
|50001|120000   |
|50000|10000    |
|40000|50000    |
|30007|50000    |
|30006|50000    |
|30005|50000    |
|30004|50000    |
|30003|50000    |
|30000|50000    |
|29999|80000    |
|29998|30000    |
|29997|150000   |
|29996|220000   |
|29995|80000    |
|29994|1e+05    |
|29993|10000    |
|29992|210000   |
+-----+---------+
only showing top 20 rows



New row has been added and the existing row is updated
1. New row with ID 50011 is added
2. The row id with 50000 has the balance of 10000

## History

In [26]:
fullHistoryDF = deltaTable.history() 

In [27]:
print(type(fullHistoryDF))

<class 'pyspark.sql.dataframe.DataFrame'>


In [28]:
fullHistoryDF.printSchema()

root
 |-- version: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- userId: string (nullable = true)
 |-- userName: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- operationParameters: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- job: struct (nullable = true)
 |    |-- jobId: string (nullable = true)
 |    |-- jobName: string (nullable = true)
 |    |-- runId: string (nullable = true)
 |    |-- jobOwnerId: string (nullable = true)
 |    |-- triggerType: string (nullable = true)
 |-- notebook: struct (nullable = true)
 |    |-- notebookId: string (nullable = true)
 |-- clusterId: string (nullable = true)
 |-- readVersion: long (nullable = true)
 |-- isolationLevel: string (nullable = true)
 |-- isBlindAppend: boolean (nullable = true)
 |-- operationMetrics: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- userMetadata: string (nullable =

In [29]:
fullHistoryDF.select(col("timestamp"),col("operation"),col("version")).show(truncate=False)

+-----------------------+---------+-------+
|timestamp              |operation|version|
+-----------------------+---------+-------+
|2022-09-29 15:52:32.714|MERGE    |4      |
|2022-09-29 15:51:05.804|WRITE    |3      |
|2022-09-29 15:49:58.131|DELETE   |2      |
|2022-09-29 15:49:56.495|UPDATE   |1      |
|2022-09-29 15:49:38.518|WRITE    |0      |
+-----------------------+---------+-------+



## Detail 

In [33]:
detailDF = deltaTable.detail()

In [34]:
detailDF.printSchema()

root
 |-- format: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- createdAt: timestamp (nullable = true)
 |-- lastModified: timestamp (nullable = true)
 |-- partitionColumns: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- numFiles: long (nullable = true)
 |-- sizeInBytes: long (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- minReaderVersion: integer (nullable = true)
 |-- minWriterVersion: integer (nullable = true)



In [35]:
detailDF.show(truncate=False)

+------+------------------------------------+----+-----------+-------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+
|format|id                                  |name|description|location                 |createdAt              |lastModified           |partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+------------------------------------+----+-----------+-------------------------+-----------------------+-----------------------+----------------+--------+-----------+----------+----------------+----------------+
|delta |0ce09a69-b58c-4da7-8faf-5ece9655d710|null|null       |file:/opt/spark/python/cc|2022-09-29 15:49:35.636|2022-09-29 15:52:32.714|[]              |2       |1770751    |{}        |1               |2               |
+------+------------------------------------+----+-----------+-------------------------+-----------------------+--------

## Time Travel 

In [36]:
df_time_travel = spark.read.format("delta").option("versionAsOf", 1).load("/opt/spark/python/cc")


In [37]:
df_time_travel.count()

30007

In [None]:
df_time_travel.s

##  Transaction Log

In [47]:
## Create tables using SQL

In [42]:
from pyspark.sql import SparkSession




In [None]:
pysparkdataframe> view(saurabh)

In [46]:
df = spark.sql('''
CREATE TABLE IF NOT EXISTS saurabh (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA

''')  
df.printSchema()

root



In [52]:
%SQL

UsageError: Line magic function `%SQL` not found.


In [None]:
CREATE TABLE IF NOT EXISTS default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA
