# Bootstrap the client with ROOT credentials
Using the python client generated from our OpenAPI spec, we generate a token from our root user's credentials

In [1]:
from polaris.catalog.api.iceberg_catalog_api import IcebergCatalogAPI
from polaris.catalog.api.iceberg_o_auth2_api import IcebergOAuth2API
from polaris.catalog.api_client import ApiClient as CatalogApiClient
from polaris.catalog.api_client import Configuration as CatalogApiClientConfiguration

# (CHANGE ME): This credential changes on every Polaris service restart
# In the Polaris log, look for the `realm: default-realm root principal credentials:` string
polaris_credential = '74519dd27e1462be:4f6b0906c8bf65ae6d464e7ea2393e07' # pragma: allowlist secret

client_id, client_secret = polaris_credential.split(":")
client = CatalogApiClient(CatalogApiClientConfiguration(username=client_id,
                                 password=client_secret,
                                 host='http://polaris:8181/api/catalog'))

oauth_api = IcebergOAuth2API(client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL',
                            client_id=client_id,
                          client_secret=client_secret,
                          grant_type='client_credentials',
                          _headers={'realm': 'default-realm'})


# Create our first catalog

* Creates a catalog named `polaris_catalog` that writes to a specified location in the Local Filesystem.

In [2]:
from polaris.management import *

client = ApiClient(Configuration(access_token=token.access_token,
                                   host='http://polaris:8181/api/management/v1'))
root_client = PolarisDefaultApi(client)

storage_conf = FileStorageConfigInfo(storage_type="FILE", allowed_locations=["file:///tmp"])
catalog_name = 'polaris_demo'
catalog = Catalog(name=catalog_name, type='INTERNAL', properties={"default-base-location": "file:///tmp/polaris/"},
                storage_config_info=storage_conf)
catalog.storage_config_info = storage_conf
root_client.create_catalog(create_catalog_request=CreateCatalogRequest(catalog=catalog))
resp = root_client.get_catalog(catalog_name=catalog.name)
resp

PolarisCatalog(type='INTERNAL', name='polaris_demo', properties=CatalogProperties(default_base_location='file:///tmp/polaris/', additional_properties={}), create_timestamp=1734034158532, last_update_timestamp=1734034158532, entity_version=1, storage_config_info=FileStorageConfigInfo(storage_type='FILE', allowed_locations=['file:///tmp', 'file:///tmp/polaris/']))

# Utility Functions

In [3]:
# Creates a principal with the given name
def create_principal(api, principal_name):
  principal = Principal(name=principal_name, type="SERVICE")
  try:
    principal_result = api.create_principal(CreatePrincipalRequest(principal=principal))
    return principal_result
  except ApiException as e:
    if e.status == 409:
      return api.rotate_credentials(principal_name=principal_name)
    else:
      raise e

# Create a catalog role with the given name
def create_catalog_role(api, catalog, role_name):
  catalog_role = CatalogRole(name=role_name)
  try:
    api.create_catalog_role(catalog_name=catalog.name, create_catalog_role_request=CreateCatalogRoleRequest(catalog_role=catalog_role))
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  except ApiException as e:
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  else:
    raise e

# Create a principal role with the given name
def create_principal_role(api, role_name):
  principal_role = PrincipalRole(name=role_name)
  try:
    api.create_principal_role(CreatePrincipalRoleRequest(principal_role=principal_role))
    return api.get_principal_role(principal_role_name=role_name)
  except ApiException as e:
    return api.get_principal_role(principal_role_name=role_name)


# Create a new Principal, Principal Role, and Catalog Role
The new Principal belongs to the `engineer` principal role, which has `CATALOG_MANAGE_CONTENT` privileges on the `polaris_catalog`. 


`CATALOG_MANAGE_CONTENT` has create/list/read/write privileges on all entities within the catalog. The same privilege could be granted to a namespace, in which case, the engineers could create/list/read/write any entity under that namespace

In [4]:
# Create the engineer_principal
engineer_principal = create_principal(root_client, "collado")

# Create the principal role
engineer_role = create_principal_role(root_client, "engineer")

# Create the catalog role
manager_catalog_role = create_catalog_role(root_client, catalog, "manage_catalog")

# Grant the catalog role to the principal role
# All principals in the principal role have the catalog role's privileges
root_client.assign_catalog_role_to_principal_role(principal_role_name=engineer_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=manager_catalog_role))

# Assign privileges to the catalog role
# Here, we grant CATALOG_MANAGE_CONTENT
root_client.add_grant_to_catalog_role(catalog.name, manager_catalog_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.CATALOG_MANAGE_CONTENT)))

# Assign the principal role to the principal
root_client.assign_principal_role(engineer_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=engineer_role))

# Create a reader Principal, Principal Role, and Catalog Role
This new principal belongs to the `product_manager` principal role, which is explicitly granted read and list permissions on the catalog.

Permissions cascade, so permissions granted at the catalog level are inherited by namespaces and tables within the catalog.

In [5]:
# Create a reader principal
reader_principal = create_principal(root_client, "mlee")

# Create the principal role
pm_role = create_principal_role(root_client, "product_manager")

# Create the catalog role
read_only_role = create_catalog_role(root_client, catalog, "read_only")

# Grant the catalog role to the principal role
root_client.assign_catalog_role_to_principal_role(principal_role_name=pm_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=read_only_role))

# Assign privileges to the catalog role
# Here, the catalog role is granted READ and LIST privileges at the catalog level
# Privileges cascade down
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_LIST)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.TABLE_READ_DATA)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.VIEW_LIST)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.VIEW_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.NAMESPACE_READ_PROPERTIES)))
root_client.add_grant_to_catalog_role(catalog.name, read_only_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.NAMESPACE_LIST)))

# Assign the principal role to the principal
root_client.assign_principal_role(reader_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=pm_role))

In [None]:
Create a Spark session against Delta

In [27]:
from pyspark.sql import SparkSession

deltaSpark = (SparkSession.builder
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .config('spark.sql.iceberg.vectorization.enabled', 'false')
         
  # Configure the 'polaris' catalog as an Iceberg rest catalog
  .config("spark.sql.catalog.polaris.type", "rest")
  .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog")
  # Specify the rest catalog endpoint       
  .config("spark.sql.catalog.polaris.uri", "http://polaris:8181/api/catalog")
  # Enable token refresh
  .config("spark.sql.catalog.polaris.token-refresh-enabled", "true")
  # specify the client_id:client_secret pair
  .config("spark.sql.catalog.polaris.credential", f"{engineer_principal.credentials.client_id}:{engineer_principal.credentials.client_secret}")

  # Set the warehouse to the name of the catalog we created
  .config("spark.sql.catalog.polaris.warehouse", catalog_name)

  # Scope set to PRINCIPAL_ROLE:ALL
  .config("spark.sql.catalog.polaris.scope", 'PRINCIPAL_ROLE:ALL')

  # Enable access credential delegation
  .config("spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation", 'vended-credentials')

  .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.io.ResolvingFileIO")
  .config("spark.sql.catalog.polaris.s3.region", "us-west-2")
  .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")).getOrCreate()



In [12]:
from pyspark.context import SparkContext
from pyspark import SparkFiles
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *

deltaSpark = (
          SparkSession.builder
              .master("local[1]")
              .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
              .config(
                  "spark.sql.catalog.spark_catalog",
                  "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()
)   

deltaSpark.sparkContext.getConf().getAll()

local_base_path = "file:///tmp/deltatest/"

records = [
   (1, 'John', 25, 'NYC', '2023-09-28 00:00:00'),
   (2, 'Emily', 30, 'SFO', '2023-09-28 00:00:00'),
   (3, 'Michael', 35, 'ORD', '2023-09-28 00:00:00'),
   (4, 'Andrew', 40, 'NYC', '2023-10-28 00:00:00'),
   (5, 'Bob', 28, 'SEA', '2023-09-23 00:00:00'),
   (6, 'Charlie', 31, 'DFW', '2023-08-29 00:00:00')
]


schema = StructType([
   StructField("id", IntegerType(), True),
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), True),
   StructField("city", StringType(), True),
   StructField("create_ts", StringType(), True)
])

df = deltaSpark.createDataFrame(records, schema)

(
   df.write
   .format("delta")
   .partitionBy("city")
   .save(f"{local_base_path}")
)



Py4JJavaError: An error occurred while calling o719.save.
: com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: 'java.lang.String org.apache.spark.ErrorInfo.messageFormat()'
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2261)
	at com.google.common.cache.LocalCache.get(LocalCache.java:4000)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
	at org.apache.spark.sql.delta.DeltaLog$.getDeltaLogFromCache$1(DeltaLog.scala:606)
	at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:613)
	at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:496)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:153)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.NoSuchMethodError: 'java.lang.String org.apache.spark.ErrorInfo.messageFormat()'
	at org.apache.spark.sql.delta.DeltaThrowableHelper$.getMessage(DeltaThrowableHelper.scala:80)
	at org.apache.spark.sql.delta.DeltaFileNotFoundException.<init>(DeltaErrors.scala:2448)
	at org.apache.spark.sql.delta.DeltaErrorsBase.fileOrDirectoryNotFoundException(DeltaErrors.scala:446)
	at org.apache.spark.sql.delta.DeltaErrorsBase.fileOrDirectoryNotFoundException$(DeltaErrors.scala:443)
	at org.apache.spark.sql.delta.DeltaErrors$.fileOrDirectoryNotFoundException(DeltaErrors.scala:2264)
	at org.apache.spark.sql.delta.storage.S3SingleDriverLogStore.listFromInternal(S3SingleDriverLogStore.scala:122)
	at org.apache.spark.sql.delta.storage.S3SingleDriverLogStore.listFrom(S3SingleDriverLogStore.scala:141)
	at org.apache.spark.sql.delta.SnapshotManagement.listFrom(SnapshotManagement.scala:69)
	at org.apache.spark.sql.delta.SnapshotManagement.listFrom$(SnapshotManagement.scala:68)
	at org.apache.spark.sql.delta.DeltaLog.listFrom(DeltaLog.scala:65)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromOrNone(SnapshotManagement.scala:86)
	at org.apache.spark.sql.delta.SnapshotManagement.listFromOrNone$(SnapshotManagement.scala:82)
	at org.apache.spark.sql.delta.DeltaLog.listFromOrNone(DeltaLog.scala:65)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$listDeltaAndCheckpointFiles$1(SnapshotManagement.scala:106)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:141)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:139)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:65)
	at org.apache.spark.sql.delta.SnapshotManagement.listDeltaAndCheckpointFiles(SnapshotManagement.scala:106)
	at org.apache.spark.sql.delta.SnapshotManagement.getLogSegmentForVersion(SnapshotManagement.scala:138)
	at org.apache.spark.sql.delta.SnapshotManagement.getLogSegmentForVersion$(SnapshotManagement.scala:133)
	at org.apache.spark.sql.delta.DeltaLog.getLogSegmentForVersion(DeltaLog.scala:65)
	at org.apache.spark.sql.delta.SnapshotManagement.getLogSegmentFrom(SnapshotManagement.scala:64)
	at org.apache.spark.sql.delta.SnapshotManagement.getLogSegmentFrom$(SnapshotManagement.scala:62)
	at org.apache.spark.sql.delta.DeltaLog.getLogSegmentFrom(DeltaLog.scala:65)
	at org.apache.spark.sql.delta.SnapshotManagement.$anonfun$getSnapshotAtInit$1(SnapshotManagement.scala:262)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:141)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:139)
	at org.apache.spark.sql.delta.DeltaLog.recordFrameProfile(DeltaLog.scala:65)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit(SnapshotManagement.scala:260)
	at org.apache.spark.sql.delta.SnapshotManagement.getSnapshotAtInit$(SnapshotManagement.scala:258)
	at org.apache.spark.sql.delta.DeltaLog.getSnapshotAtInit(DeltaLog.scala:65)
	at org.apache.spark.sql.delta.SnapshotManagement.$init$(SnapshotManagement.scala:54)
	at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:70)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$3(DeltaLog.scala:595)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$2(DeltaLog.scala:591)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:141)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:139)
	at org.apache.spark.sql.delta.DeltaLog$.recordFrameProfile(DeltaLog.scala:460)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:134)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
	at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:460)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:133)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:123)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:111)
	at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:460)
	at org.apache.spark.sql.delta.DeltaLog$.createDeltaLog$1(DeltaLog.scala:590)
	at org.apache.spark.sql.delta.DeltaLog$.$anonfun$apply$4(DeltaLog.scala:606)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
	... 47 more


# Create a Spark session with the engineer credentials

* Catalog URI points to our Polaris installation
* Credential set using the client_id and client_secret generated for the principal
* Scope set to `PRINCIPAL_ROLE:ALL`
* `X-Iceberg-Access-Delegation` is set to vended-credentials

In [36]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
  .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
  .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.hadoop:hadoop-aws:3.4.0,software.amazon.awssdk:bundle:2.23.19,software.amazon.awssdk:url-connection-client:2.23.19")
  .config('spark.sql.iceberg.vectorization.enabled', 'false')
         
  # Configure the 'polaris' catalog as an Iceberg rest catalog
  .config("spark.sql.catalog.polaris.type", "rest")
  .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog")
  # Specify the rest catalog endpoint       
  .config("spark.sql.catalog.polaris.uri", "http://polaris:8181/api/catalog")
  # Enable token refresh
  .config("spark.sql.catalog.polaris.token-refresh-enabled", "true")
  # specify the client_id:client_secret pair
  .config("spark.sql.catalog.polaris.credential", f"{engineer_principal.credentials.client_id}:{engineer_principal.credentials.client_secret}")

  # Set the warehouse to the name of the catalog we created
  .config("spark.sql.catalog.polaris.warehouse", catalog_name)

  # Scope set to PRINCIPAL_ROLE:ALL
  .config("spark.sql.catalog.polaris.scope", 'PRINCIPAL_ROLE:ALL')

  # Enable access credential delegation
  .config("spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation", 'vended-credentials')

  .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.io.ResolvingFileIO")
  .config("spark.sql.catalog.polaris.s3.region", "us-west-2")
  .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")).getOrCreate()


# USE polaris
Tell Spark to use the Polaris catalog

In [7]:
spark.sql("USE polaris")
spark.sql("SHOW NAMESPACES").show()

+---------+
|namespace|
+---------+
+---------+



# Create Nested Namespaces

In [8]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS COLLADO_TEST")
spark.sql("CREATE NAMESPACE IF NOT EXISTS COLLADO_TEST.PUBLIC")
spark.sql("SHOW NAMESPACES IN COLLADO_TEST").show()

+-------------------+
|          namespace|
+-------------------+
|COLLADO_TEST.PUBLIC|
+-------------------+



# Create a table

In [9]:
spark.sql("USE NAMESPACE COLLADO_TEST.PUBLIC")
spark.sql("""CREATE TABLE IF NOT EXISTS TEST_TABLE (
    id bigint NOT NULL COMMENT 'unique id',
    data string)
USING iceberg;
""")

DataFrame[]

# It's Empty

In [None]:
spark.sql("SELECT * FROM TEST_TABLE").show()