# Quickstart: Delta Lake



In [None]:
from delta import *
from pyspark.sql import SparkSession
from pyspark.errors import PySparkException
from pyspark.sql.functions import col

builder = (
    SparkSession.builder.appName("MyApp")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()


def get_schemas_from_catalog(catalog_source):
    try:
        df_schemas = spark.sql(f"SHOW SCHEMAS IN {catalog_source}")
        lst_schemas = [s[0] for s in df_schemas.select("databaseName").collect()]
    except:
        print(f"Catalog {catalog_source} does not exist.")
        lst_schemas = []
    return lst_schemas


def get_tables_from_schema(catalog_source, schema_name):
    try:
        df_tables = spark.sql(f"SHOW TABLES IN {catalog_source}.{schema_name}")
        lst_tables = [t[0] for t in df_tables.select("tableName").collect()]
    except:
        print(f"Schema {schema_name} does not exist in {catalog_source}.")
        lst_tables = []
    return lst_tables


def get_container_from_catalog(catalog_target):
    try:
        df = spark.sql(f"DESCRIBE CATALOG EXTENDED {catalog_target}")
        storage_location = df.filter(col("info_name") == "Storage Root").collect()[0][1]
        container_target = storage_location.split("@")[0].replace("abfss://", "")
    except PySparkException as ex:
        if ex.getErrorClass() == "NO_SUCH_CATALOG_EXCEPTION":
            container_target = f"Catalog {catalog_target} does not exist."
        else:
            raise
    return container_target


def create_init_catalog(catalog):
    print(f"Creating  catalog {catalog}")
    spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")


def create_init_schema(catalog, schema_name):
    print(f"Creating schema {schema_name} on catalog {catalog}")
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{schema_name}")


def create_schema(catalog_source, catalog_target, schema_name):
    df_schemas = spark.sql(f"SHOW SCHEMAS IN {catalog_source}")
    lst_schemas = [
        t[0] for t in df_schemas.select("databaseName").collect() if t[0] == schema_name
    ]
    if len(lst_schemas) > 0:
        print(f"Creating schema {schema_name} on catalog {catalog_target}")
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog_target}.{schema_name}")
    else:
        print(f"Schema {schema_name} does not exist in {catalog_source}.")


catalog = "spark_catalog"

schema="employeeDB"
# create_init_catalog(catalog)
create_init_schema(catalog, schema)


# Create a Delta table
# data = spark.range(3, 8)
# data.write.format("delta").save("/tmp/delta-table3")

# Read data from the Delta table
df = spark.read.format("delta").load("/tmp/delta-table3")
df.show()

Creating schema employeeDB on catalog spark_catalog
+---+
| id|
+---+
|  3|
|  7|
|  5|
|  4|
|  6|
+---+



## Creating a Delta Lake Table

-- SQL
CREATE TABLE exampleDB.countries (
 id LONG,
 country STRING,
 capital STRING
) USING DELTA;

In [None]:
# Python
from pyspark.sql.types import *
from delta.tables import *

delta_table = (
    DeltaTable
    #.create(spark)
    .createIfNotExists(spark)
    .tableName("employeeDB.countries")
    .addColumn("id", dataType=LongType(), nullable=False)
    .addColumn("country", dataType=StringType(), nullable=False)
    .addColumn("capital", dataType=StringType(), nullable=False)
    .execute()
)

INSERT INTO

-- SQL
INSERT INTO countries VALUES
(1, 'United Kingdom', 'London'),
(2, 'Canada', 'Toronto')
With PySpark DataFrame syntax, you just need to specify that inserting records into
a  specific  table  is  the  destination  of  a  write  operation  with  insertInto  (note  that
columns are aligned positionally, so column names will be ignored with this method):

In [None]:
data = [
    (1, "United Kingdom", "London"),
    (2, "Canada", "Toronto")
    ]
schema = ["id", "country", "capital"]
df = spark.createDataFrame(data, schema=schema)
(
df
.write
.format("delta")
.insertInto("countries")
)

Append

In [None]:
data = [(3, "United States", "Washington, D.C.")]
# Define the schema for the Delta table
schema = ["id", "country", "capital"]

# Create a DataFrame from the sample data and schema
df = spark.createDataFrame(data, schema=schema)
# Write the DataFrame to a Delta table in append mode
# (if the table doesn't exist, it will be created)
(df.write.format("delta").mode("append").saveAsTable("countries"))

CREATE TABLE AS SELECT

## Querying Data

a  high-level
understanding of how partition filtering works (which is explored much more deeply
in Chapters 5 and 10) and how the transaction log allows querying views of the data
from previous versions with time travel

In [None]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "countries")
df=delta_table.toDF()
df.show()

Filter Query
-- SQL
SELECT * FROM exampleDB.countries
WHERE capital = "London"

In [None]:
filtered_df=df.filter(df.capital == "London")
filtered_df.show()

-- SQL
SELECT
    id,
    capital
FROM
    countries

In [None]:
select_df=df.select("id", "capital")
select_df.show()

-- SQL
SELECT DISTINCT id FROM countries VERSION AS OF 1

In [None]:
read_df = (
    spark.read.option("versionAsOf", "1")
    .load("countries")
    .select("id")
    .distinct()
)
read_df.show()

In [None]:
#-- SQL
#SELECT count(1) FROM exampleDB.countries TIMESTAMP AS OF "2024-04-20"
# Python
(
spark
.read
.option("timestampAsOf", "2024-04-20")
.load("countries.delta")
.count()
)

Update

-- SQL
UPDATE countries
SET { country = 'U.K.' }
WHERE id = 1;

In [None]:
delta_table.update(condition="id = 1", set={"country": "'U.K'"})
df.show()

Delete

-- SQL
DELETE FROM countries
WHERE id = 1;

In [None]:
from pyspark.sql.functions import col

delta_table.delete("id = 1")  # uses SQL expression
delta_table.delete(col("id") == 2)  # uses PySpark expression

df.show()

Overwrite

In [None]:
(
    spark.createDataFrame(
        [(1, "India", "New Delhi"), (4, "Australia", "Canberra")],
        schema=["id", "country", "capital"],
    )
    .write.format("delta")
    .mode("overwrite")  # specify the output mode
    .saveAsTable("countries")
)

df.show()

Merge

-- SQL
MERGE INTO countries A
USING (select * from parquet.`countries.parquet`) B
ON A.id = B.id
WHEN MATCHED THEN
  UPDATE SET
    id = A.id,
    country = B.country,
    capital = B.capital
WHEN NOT MATCHED
  THEN INSERT (
    id,
    country,
    capital
  )
  VALUES (
    B.id,
    B.country,
    B.capital
  )

In [None]:
idf = (
    spark
    .createDataFrame([
        (1, 'India', 'New Delhi'),
        (4, 'Australia', 'Canberra')],
        schema=["id", "country", "capital"]
        )
    )
delta_table.alias("target").merge(
    source=idf.alias("source"), condition="source.id = target.id"
).whenMatchedUpdate(
    set={"country": "source.country", "capital": "source.capital"}
).whenNotMatchedInsert(
    values={"id": "source.id", "country": "source.country", "capital": "source.capital"}
).execute()

df.show()

## Conversion

-- SQL
CONVERT TO DELTA parquet.`countries.parquet`

In [None]:
from delta.tables import DeltaTable

'''delta_table = (
    DeltaTable
    .convertToDelta(
        spark, 
        "parquet.`countries.parquet`"
        )
    )'''
delta_table.detail()
delta_table.history()
# df.show()