In [1]:
import pyspark
from delta import *


# configure spark with Delta extension using Spark config options
builder = pyspark.sql.SparkSession.builder.appName("helloWorld") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/28 19:01:21 WARN Utils: Your hostname, korto, resolves to a loopback address: 127.0.1.1; using 192.168.1.147 instead (on interface wlp4s0)
25/07/28 19:01:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/data1/home/fidok/workspace_python/spark-lakehouse/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/fidok/.ivy2.5.2/cache
The jars for the packages stored in: /home/fidok/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-08f7e9fd-dbea-4939-a71d-de27e6e59055;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 125ms :: artifacts 

In [None]:
# create table using dataframe
data = spark.range(0, 5)
data.write.format("delta").option("mode", "override").save("data/delta-table")

25/07/28 17:57:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [None]:
# read existing table as dataframe
df = spark.read.format("delta").load("data/delta-table")
df.show()

+---+
| id|
+---+
|  0|
|  4|
|  2|
|  3|
|  1|
+---+



In [13]:
# read delta as DeltaTable type
delta_table = DeltaTable.forPath(spark, "data/delta-table")
delta_table.history().show(truncate=False)
delta_table.detail().show(truncate=False)

+-------+-----------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                       |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                           |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------+------------+-----------------------------------+
|0      |2025-07-28 17:57:21.835|NULL  |NULL    |WRITE    |{mode -> ErrorIfExists, partitionBy -> []}|NULL|NULL    |NULL     |NULL       |Serializable  |true         |{n

In [5]:
# Create table using schema and DeltaTableBuilder
# Create or replace table with path and add properties
st = DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("salary", "INT") \
  .property("description", "table with people's salary") \
  .location("/data1/home/fidok/workspace_python/spark-lakehouse/chapter_1/data/salary_table") \
  .execute()

In [6]:
st.toDF().show()

+---+---------+--------+------+
| id|firstName|lastName|salary|
+---+---------+--------+------+
+---+---------+--------+------+



In [9]:
# create table using Spark schema
import os 
import pyspark.sql.types as T
from pyspark.sql import functions as F

schema = T.StructType([
    T.StructField("id", T.IntegerType()),
    T.StructField("firstName", T.StringType()),
    T.StructField("lastName", T.StringType()),
    T.StructField("salary", T.IntegerType())
])

df = spark.createDataFrame([
    (1, "John", "Does", 1500),
    (2, "LongDark", "Man", 2000)
    ], schema=schema)

df.show()

delta_path = os.path.join(os.getcwd(), "chapter_1/data/salary_table")

df.write.format("delta").mode("append").save("data/salary_table")

+---+---------+--------+------+
| id|firstName|lastName|salary|
+---+---------+--------+------+
|  1|     John|    Does|  1500|
|  2| LongDark|     Man|  2000|
+---+---------+--------+------+



In [8]:
salary_df = DeltaTable.forPath(spark, "data/salary_table").toDF()
salary_df.show()

+---+---------+--------+------+
| id|firstName|lastName|salary|
+---+---------+--------+------+
|  2|Invisible|     Man|  2000|
|  1|     John|     Doe|  1500|
+---+---------+--------+------+

