In [8]:
pip install delta-spark

Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark<3.3.0,>=3.2.0
  Using cached pyspark-3.2.4-py2.py3-none-any.whl
Installing collected packages: pyspark
Successfully installed pyspark-3.2.4
Note: you may need to restart the kernel to use updated packages.


In [18]:
pip show pyspark

Name: pyspark
Version: 3.1.2
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /opt/spark-3.1.2-bin-hadoop3.2/python
Requires: py4j
Required-by: delta-spark
Note: you may need to restart the kernel to use updated packages.


In [None]:
!pyspark --packages io.delta:delta-core_2.12:1.0.0 \
  --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
  --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"

/opt/spark2/bin/pyspark: line 45: python: command not found
Python 3.6.9 (default, Dec  8 2021, 21:08:43) 
[GCC 8.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/opt/spark-3.1.2-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/itversity/.ivy2/cache
The jars for the packages stored in: /home/itversity/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0146762b-b61d-4418-8009-7c35c252eb15;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in cen

In [5]:
from delta import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [6]:
# Initialize the Spark session

spark = SparkSession.builder \
    .appName("DeltaExample") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:1.0.0") \
    .getOrCreate()


In [30]:
# Create a sample DataFrame
data = [("Alice", 1), ("Bob", 2), ("Cathy", 3)]
df = spark.createDataFrame(data, ["name", "id"])


In [31]:
# Save DataFrame as a Delta table
df.write.format("delta").save("/delta-final11")

In [33]:
df_read = spark.read.format("delta").load("/delta-final11")
df_read.show()

+-----+---+
| name| id|
+-----+---+
|Alice|  1|
|  Bob|  2|
|Cathy|  3|
+-----+---+



In [19]:
deltaTable = DeltaTable.forPath(spark, '/delta-final11')

In [35]:
deltaTable.toDF().show()

+-----+---+
| name| id|
+-----+---+
|Alice|  1|
|  Bob|  2|
|Cathy|  3|
+-----+---+



In [36]:
# Create a sample DataFrame
data = [("Ahmed", 1), ("Bob", 2), ("Cathy", 3), ("mooo", 4)]
df_new = spark.createDataFrame(data, ["name", "id"])


In [37]:
# here we merge (upsert) our target with the source 

deltaTable.alias("target").merge(
    df_new.alias("source"),
    "target.id = source.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

In [20]:
deltaTable.toDF().show()

+-----+---+
| name| id|
+-----+---+
|Cathy|  3|
|Ahmed|  1|
| mooo|  4|
|  Bob|  2|
+-----+---+



In [32]:
# you could review the history of the operations you did and vesions of your data
deltaTable.history()

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
2,2024-07-19 16:06:...,,,WRITE,{mode -> Overwrit...,,,,1.0,,False,"{numFiles -> 1, n...",
1,2024-07-19 13:07:...,,,MERGE,{predicate -> (ta...,,,,0.0,,False,{numTargetRowsCop...,
0,2024-07-19 13:04:...,,,WRITE,{mode -> ErrorIfE...,,,,,,True,"{numFiles -> 2, n...",


In [61]:
# read from version 0 of my data

df_read = spark.read.format("delta").option("versionAsof",0).load("/delta-final11")
df_read.show()

+-----+---+
| name| id|
+-----+---+
|Alice|  1|
|  Bob|  2|
|Cathy|  3|
+-----+---+



In [62]:
# read from version 1 of my data

df_read = spark.read.format("delta").option("versionAsof",1).load("/delta-final11")
df_read.show()

+-----+---+
| name| id|
+-----+---+
|Cathy|  3|
|Ahmed|  1|
| mooo|  4|
|  Bob|  2|
+-----+---+



In [None]:
#deltaTable.optimize().executeCompaction()

In [22]:
from delta.tables import *

In [29]:
# compact all parquet files into one file

path = "/delta-final11"
numFiles = 1

(
    spark.read.format("delta")
    .load(path)
    .repartition(numFiles)
    .write.option("dataChange", "false")
    .format("delta")
    .mode("overwrite")
    .save(path)
)

In [31]:
# read from the new compact file to make sure the data is saved in one file

fi =spark.read.parquet("/delta-final11/part-00000-881fe57e-ed6b-48ed-8405-630987b3c5f8-c000.snappy.parquet").show()

+-----+---+
| name| id|
+-----+---+
|  Bob|  2|
|Cathy|  3|
|Ahmed|  1|
| mooo|  4|
+-----+---+



In [None]:
spark.stop()