# Brief tutorial of Unity Catalog Python client.

In this example I'll 
1. create a catalog
2. create a schema within that catalog
3. register an *existing* delta lake table to that schema.
4. Given the table name in unity catalog, load the table into polars

[Here](https://docs.databricks.com/aws/en/database-objects/#what-are-database-objects-in-databricks) is a nice description of the database objects in Unity Catalogs. 

In [None]:
from unitycatalog.client import (
    ApiClient, 
    Configuration, 
    CatalogsApi, 
    SchemasApi, 
    TablesApi,
    )
from unitycatalog.client.models import (
    CreateCatalog, 
    CreateSchema, 
    CreateTable, 
    )
import deltalake as dl
import pandas as pd
import polars as pl

config = Configuration()
config.host = "http://unitycatalog-server.uc.svc.cluster.local:8080/api/2.1/unity-catalog"
client = ApiClient(configuration=config)
catalogs_api = CatalogsApi(api_client=client)

In [None]:
# defining a few functions to make a nicer API than the raw Unity Catalog API

async def create_catalog(catalog_name, catalog_api, comment=None):
    new_catalog = CreateCatalog(
        name=catalog_name,
        comment=comment or ""
    )
    return await catalog_api.create_catalog(create_catalog=new_catalog)

async def create_schema(schema_name, catalog_name, schema_api, comment=None):
    new_schema = CreateSchema(
        name=schema_name,
        catalog_name=catalog_name,
        comment=comment or ""
    )
    return await schema_api.create_schema(create_schema=new_schema)

async def register_delta_table(
        table_name, 
        schema_name, 
        catalog_name, 
        table_location, 
        tables_api
    ):
    new_table = CreateTable(
        name=table_name,
        catalog_name=catalog_name,
        schema_name=schema_name,
        table_type="EXTERNAL",  # 'EXTERNAL' for existing Delta tables, 'MANAGED' is also an option but I don't know what it entails
        data_source_format="DELTA",
        storage_location=table_location,  # S3/HDFS path to your Delta table
        columns=[]  # Unity Catalog can infer schema from Delta metadata
    )
    return await tables_api.create_table(create_table=new_table)

# Create Catalog

In [50]:
catalogs_api = CatalogsApi(api_client=client)
await create_catalog("Test_Catalog", catalog_api=catalogs_api, comment="This is a new catalog.")

my_catalogs = await catalogs_api.list_catalogs()
print(my_catalogs)

catalogs=[CatalogInfo(name='Test_Catalog', comment='This is a new catalog.', properties={}, owner=None, created_at=1754624386067, created_by=None, updated_at=1754624386067, updated_by=None, id='e490d7f9-784a-4ab1-9892-69ae9a3b0b2d')] next_page_token=None


# Create Schema

In [51]:
schemas_api = SchemasApi(api_client=client)
await create_schema(schema_name="Test_Schema", catalog_name="Test_Catalog", schema_api=schemas_api, comment="This is a new schema.")

my_schemas = await schemas_api.list_schemas(catalog_name="Test_Catalog")
print(my_schemas)

schemas=[SchemaInfo(name='Test_Schema', catalog_name='Test_Catalog', comment='This is a new schema.', properties={}, full_name='Test_Catalog.Test_Schema', owner=None, created_at=1754624386102, created_by=None, updated_at=1754624386102, updated_by=None, schema_id='81acaece-f241-4ea8-9247-3192fec414bf')] next_page_token=None


# Make delta lake table, register it to schema

In [52]:
# Create Delta Lake table
table_path = "/lab/users/barnold/k8s_apps/unity_catalog_test/test_delta_table"  # Replace with your storage path
data = {
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'age': [25, 30, 35, 28, 32],
    'department': ['Engineering', 'Sales', 'Marketing', 'Engineering', 'Sales']
}
df = pd.DataFrame(data)
dl.write_deltalake(table_path, df, mode="overwrite")

# Register the Delta table with Unity Catalog
tables_api = TablesApi(api_client=client)
await register_delta_table(
    table_name="test_delta_table",
    schema_name="Test_Schema",
    catalog_name="Test_Catalog",
    table_location=table_path,
    tables_api=tables_api
)

my_tables = await tables_api.list_tables(catalog_name="Test_Catalog", 
                                         schema_name="Test_Schema")
print(my_tables)

tables=[TableInfo(name='test_delta_table', catalog_name='Test_Catalog', schema_name='Test_Schema', table_type=<TableType.EXTERNAL: 'EXTERNAL'>, data_source_format=<DataSourceFormat.DELTA: 'DELTA'>, columns=[], storage_location='file:///lab/users/barnold/k8s_apps/unity_catalog_test/test_delta_table', comment=None, properties={}, owner=None, created_at=1754624386167, created_by=None, updated_at=1754624386167, updated_by=None, table_id='3cf7c0c2-8328-45d9-b9d1-b97ae55e8497')] next_page_token=None


# Use table name in Unity Catalog to load table into Polars

In [53]:
table_info = await tables_api.get_table(
    full_name="Test_Catalog.Test_Schema.test_delta_table"
)

# Extract the storage location
table_location = table_info.storage_location

dt = dl.DeltaTable(table_location)

# Read Delta table with Polars
df = pl.scan_delta(table_location).collect()
# Or use lazy evaluation (recommended for large tables)
lazy_df = pl.scan_delta(table_location)

df

id,name,age,department
i64,str,i64,str
1,"""Alice""",25,"""Engineering"""
2,"""Bob""",30,"""Sales"""
3,"""Charlie""",35,"""Marketing"""
4,"""Diana""",28,"""Engineering"""
5,"""Eve""",32,"""Sales"""


# Remove table, schema, and catalog

In [None]:
# Remove a table
await tables_api.delete_table(
    full_name="Test_Catalog.Test_Schema.test_delta_table"
)
my_tables = await tables_api.list_tables(catalog_name="Test_Catalog", 
                                         schema_name="Test_Schema")
print(my_tables)

tables=[] next_page_token=None


In [56]:
# Remove a schema
await schemas_api.delete_schema(
    full_name="Test_Catalog.Test_Schema"
)
my_schemas = await schemas_api.list_schemas(catalog_name="Test_Catalog")
print(my_schemas)

schemas=[] next_page_token=None


In [57]:
# Remove a catalog
await catalogs_api.delete_catalog(name="Test_Catalog")
my_catalogs = await catalogs_api.list_catalogs()
print(my_catalogs)

catalogs=[] next_page_token=None


# Appendix

In [58]:
# a simple example of consuming a paginated response

async def list_all_catalogs(catalog_api):
  token = None
  catalogs = []
  while True:
    results = await catalog_api.list_catalogs(page_token=token)
    catalogs += results.catalogs
    if next_token := results.next_page_token:
      token = next_token
    else:
      break
  return catalogs

my_catalogs = await list_all_catalogs(catalogs_api)
my_catalogs

[]

In [59]:
# import unitycatalog.client.models as models
# for i in dir(models):
#     print(i)