# Duckdb on Databricks

This notebook demos how to use Duckdb on databricks.
It shows three common ways to load tables on Databricks into duckdb process

## Installation



Installing the extensions via the tarball avoids having to do LOAD command later
- httpfs: for loading files from S3
- delta: for loading tables using delta table files (my preferred method)
- unity_catalog: for loading tables directly from unity catalog

In [0]:
%sh
DUCK_VER="v1.4.3"
ARCH="linux_amd64"
BASE_EXT_URL="https://extensions.duckdb.org/${DUCK_VER}/${ARCH}"
EXT_PATH="${HOME}/.duckdb/extensions/${DUCK_VER}/${ARCH}"

pip install  --upgrade duckdb[all]==${DUCK_VER}

EXTENSIONS=("httpfs" "delta" "unity_catalog") 

# 4. Download and decompress each extension
mkdir -p ${EXT_PATH}
for EXT in "${EXTENSIONS[@]}"; do
  echo "Installing DuckDB extension: ${EXT}..."
  wget -q -O "${EXT_PATH}/${EXT}.duckdb_extension.gz" "${BASE_EXT_URL}/${EXT}.duckdb_extension.gz"
  gzip -d -f "${EXT_PATH}/${EXT}.duckdb_extension.gz"
  echo "Successfully installed ${EXT}."
done

## Loading Table into DuckDB

### Without any extensions - Using Spark intermediary table


This is the easiest method which doesn't require any extensions.
However, it's slower with the added overhead of converting the tables first.

In [0]:
# Using spark and converting to arrow (or pandas)
import duckdb

TABLE_NAME = "catalog.myschema.customers"

spark_df = spark.sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5000000")
customers = spark_df.toArrow() # or toPandas()
con = duckdb.connect()

In [0]:
df = con.sql("SELECT * FROM customers").to_df()
display(df)

In [0]:
con.close()

## Using Unity Catalog extension

UC Catalog extension is in preview and not recommended for production workloads
Using this extension, DuckDB is able to access UC Catalogs directly

In [0]:
# connect to UC
import duckdb
region = "us-west-1"
raw_url = spark.conf.get("spark.databricks.workspaceUrl")
workspace_url = f"https://{raw_url}"
print(workspace_url)

# we use the secret from the notebook session
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()

con = duckdb.connect()
con.execute("LOAD unity_catalog")
print(con.execute(f"""
    CREATE OR REPLACE SECRET (
        TYPE unity_catalog,
        TOKEN '{token}',
        ENDPOINT '{workspace_url}',
        AWS_REGION '{region}'
    );
""").df())

catalog_name = "catalog"
con.execute(f"ATTACH OR REPLACE '{catalog_name}' AS unity (TYPE UNITY_CATALOG);")

In [0]:
# create duckdb table
con.execute("""
    CREATE OR REPLACE TABLE customers AS 
    SELECT * 
    FROM unity.tf_test.customers 
    LIMIT 5000000
""")

In [0]:
## Casting required because unity_catalog extension does not correctly map the high-precision DECIMAL(38, x) data type from Unity Catalog
# compute agg
query = f"""
SELECT
    customer_id,
    MIN(CAST(amount AS DOUBLE)) AS min_amount,
    MAX(CAST(amount AS DOUBLE)) AS max_amount,
    AVG(CAST(amount AS DOUBLE)) AS avg_amount,
    SUM(CAST(amount AS DOUBLE)) AS total_amount
FROM customers
GROUP BY
    1
"""

print("Building aggregate table")
customers_agg = con.sql(query)
df = con.sql("SELECT * FROM customers_agg").df()
display(df)

In [0]:
con.close()

## Using S3/Delta extension

Using this extension, DuckDB is able to access the Delta table files in S3 directly

In [0]:
import duckdb

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import TableOperation

table_name = "catalog.myschema.customers"
region = "us-west-1"
db_path = "/Workspace/Users/username/customers.ddb"

w = WorkspaceClient()

#  Fetch Temporary Credentials from Unity Catalog
#  This requires 'EXTERNAL USE SCHEMA' permission on the schema
table_info = w.tables.get(table_name)
creds = w.temporary_table_credentials.generate_temporary_table_credentials(
    table_id=table_info.table_id,
    operation=TableOperation.READ
).aws_temp_credentials

# Configure DuckDB Secret with the Temporary Session Token
print(f"Connecting to duckdb {db_path}")

# Here we persistenting the database by point to a file in the Workspace. Useful when the table is too large for memory
con = duckdb.connect(database=db_path, read_only=False)

print("Getting duckdb secret")
con.execute(f"""
    CREATE OR REPLACE SECRET (
        TYPE S3,
        PROVIDER config,
        KEY_ID '{creds.access_key_id}',
        SECRET '{creds.secret_access_key}',
        SESSION_TOKEN '{creds.session_token}',
        REGION '{region}'
    );
""")

# Run delta_scan using the authenticated session
storage_loc = table_info.storage_location
print(f"Scanning: {storage_loc}")



In [0]:
# Save as a DuckDB table
con.execute(f"""
    CREATE OR REPLACE TABLE customers_view AS
    SELECT * FROM delta_scan('{storage_loc}') LIMIT 5000000;
""")

In [0]:
# Aggregate the duckdb table
agg_table_name = "catalog.myschema.customers_agg"
query = f"""
SELECT
    customer_id,
    min(amount) AS min_amount,
    max(amount) AS max_amount,
    avg(amount) AS avg_amount,
    sum(amount) AS total_amount
FROM customers_view
GROUP BY
    1
"""

print("Building aggregate table")
adf = con.execute(query).fetch_arrow_table()

print("Writing to Delta Table ")
sdf = spark.createDataFrame(adf)
sdf.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable(agg_table_name)

display(sdf)

In [0]:
con.close()

## Benchmarking: Spark and Pandas

The below code is for comparision to see how fast DuckDB is compare to traditional spark with pandas

In [0]:
import pandas as pd

TABLE_NAME = "catalog.myschema.customers"
spark_df = spark.sql(f"SELECT * FROM {TABLE_NAME} LIMIT 5000000")

In [0]:
df = spark_df.toPandas().groupby("customer_id").agg(
    max_amount=pd.NamedAgg(column="amount", aggfunc="max"),
    min_amount=pd.NamedAgg(column="amount", aggfunc="min"),
    avg_amount=pd.NamedAgg(column="amount", aggfunc="mean"),
    total_amount=pd.NamedAgg(column="amount", aggfunc="sum")
).reset_index()

display(df)

In [0]:
from pyspark.sql.functions import col, min, max, avg, sum
df = spark_df.groupBy("customer_id").agg(
    min(col("amount")).alias("min_amount"),
    avg(col("amount")).alias("avg_amount"),
    max(col("amount")).alias("max_amount"),
    sum(col("amount")).alias("total_amount")
)

display(df)