# Bootstrap Client with Root Credentials

In [1]:
from polaris.catalog.api.iceberg_catalog_api import IcebergCatalogAPI
from polaris.catalog.api.iceberg_o_auth2_api import IcebergOAuth2API
from polaris.catalog.api_client import ApiClient as CatalogApiClient
from polaris.catalog.api_client import Configuration as CatalogApiClientConfiguration
from polaris.management import *

client_id = '4f7e2797afd677ff'
client_secret = '679d81e89256142452d8dd0b74fd1cf4' # pragma: allowlist secret

client = CatalogApiClient(CatalogApiClientConfiguration(username=client_id,
                                 password=client_secret,
                                 host='http://polaris:8181/api/catalog'))

oauth_api = IcebergOAuth2API(client)
token = oauth_api.get_token(scope='PRINCIPAL_ROLE:ALL',
                            client_id=client_id,
                          client_secret=client_secret,
                          grant_type='client_credentials',
                          _headers={'realm': 'default-realm'})

client = ApiClient(Configuration(access_token=token.access_token,
                                   host='http://polaris:8181/api/management/v1'))
root_client = PolarisDefaultApi(client)

print(token)

access_token='principal:root;password:679d81e89256142452d8dd0b74fd1cf4;realm:default-realm;role:ALL' token_type='bearer' expires_in=3600 issued_token_type=None refresh_token=None scope='PRINCIPAL_ROLE:ALL'


# Create Catalog

In [2]:
from polaris.management import *

storage_conf = AwsStorageConfigInfo(storage_type="S3",
                                  allowed_locations=["s3://polaris-test-data-lake/polaris_test/"],
                                  role_arn="arn:aws:iam::479320942928:role/PolarisS3FullAccess")
catalog_name = 'polaris_demo'
try:
    catalog = Catalog(name=catalog_name, type='INTERNAL', properties={"default-base-location": "s3://polaris-test-data-lake/polaris_test/polaris_catalog"},
                    storage_config_info=storage_conf)
    catalog.storage_config_info = storage_conf
    root_client.create_catalog(create_catalog_request=CreateCatalogRequest(catalog=catalog))
    resp = root_client.get_catalog(catalog_name=catalog.name)
    print(resp)
except ApiException as e:
    if e.status == 409:
        # catalog already exists
        print(f"Catalog {catalog_name} already exists.")
    else:
        raise e

type='INTERNAL' name='polaris_demo' properties=CatalogProperties(default_base_location='s3://polaris-test-data-lake/polaris_test/polaris_catalog', additional_properties={}) create_timestamp=1725665539239 last_update_timestamp=1725665539239 entity_version=1 storage_config_info=AwsStorageConfigInfo(storage_type='S3', allowed_locations=['s3://polaris-test-data-lake/polaris_test/', 's3://polaris-test-data-lake/polaris_test/polaris_catalog'], role_arn='arn:aws:iam::479320942928:role/PolarisS3FullAccess', external_id=None, user_arn=None)


# Utility Functions

In [3]:
# Creates a principal with the given name
def create_principal(api, principal_name):
  principal = Principal(name=principal_name, type="SERVICE")
  try:
    principal_result = api.create_principal(CreatePrincipalRequest(principal=principal))
    return principal_result
  except ApiException as e:
    if e.status == 409:
        print(api.get_principal(principal_name=principal_name))
        return api.rotate_credentials(principal_name=principal_name)
    else:
      raise e

# Create a catalog role with the given name
def create_catalog_role(api, catalog, role_name):
  catalog_role = CatalogRole(name=role_name)
  try:
    api.create_catalog_role(catalog_name=catalog.name, create_catalog_role_request=CreateCatalogRoleRequest(catalog_role=catalog_role))
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  except ApiException as e:
    return api.get_catalog_role(catalog_name=catalog.name, catalog_role_name=role_name)
  else:
    raise e

# Create a principal role with the given name
def create_principal_role(api, role_name):
  principal_role = PrincipalRole(name=role_name)
  try:
    api.create_principal_role(CreatePrincipalRoleRequest(principal_role=principal_role))
    return api.get_principal_role(principal_role_name=role_name)
  except ApiException as e:
    return api.get_principal_role(principal_role_name=role_name)


# Create Principal, Role and Catalog Role

In [4]:
# Create the engineer_principal
engineer_principal = create_principal(root_client, "jarrod")

# Create the principal role
engineer_role = create_principal_role(root_client, "engineer")

# Create the catalog role
manager_catalog_role = create_catalog_role(root_client, catalog, "manage_catalog")

# Grant the catalog role to the principal role
# All principals in the principal role have the catalog role's privileges
root_client.assign_catalog_role_to_principal_role(principal_role_name=engineer_role.name,
                                                  catalog_name=catalog.name,
                                                  grant_catalog_role_request=GrantCatalogRoleRequest(catalog_role=manager_catalog_role))

# Assign privileges to the catalog role
# Here, we grant CATALOG_MANAGE_CONTENT
root_client.add_grant_to_catalog_role(catalog.name, manager_catalog_role.name,
                                      AddGrantRequest(grant=CatalogGrant(catalog_name=catalog.name,
                                                                       type='catalog',
                                                                       privilege=CatalogPrivilege.CATALOG_MANAGE_CONTENT)))

# Assign the principal role to the principal
root_client.assign_principal_role(engineer_principal.principal.name, grant_principal_role_request=GrantPrincipalRoleRequest(principal_role=engineer_role))

# List Namespaces

In [7]:
client = CatalogApiClient(CatalogApiClientConfiguration(access_token=token.access_token,
              host='http://polaris:8181/api/catalog'))
catalog_client = IcebergCatalogAPI(client)

print(catalog_client.list_namespaces(
    prefix=catalog_name
))

next_page_token=None namespaces=[['test_namespace']]


# Create Namespace

In [6]:
# Import the required class
from polaris.catalog.models import CreateNamespaceRequest

demo_namespace = "test_namespace"

# Create the CreateNamespaceRequest object
create_namespace_request = CreateNamespaceRequest(namespace=[demo_namespace])

# Call the create_namespace method with the correct parameters
response = catalog_client.create_namespace(
    prefix=catalog_name,
    create_namespace_request=create_namespace_request
)

# Print the response
print(response)

namespace=['test_namespace'] properties={'location': 's3://polaris-test-data-lake/polaris_test/polaris_catalog/test_namespace'}


# Create Table

In [10]:
from polaris.catalog.models import CreateTableRequest, ModelSchema, StructField, Type

demo_table = 'test_table'

# How to create a table request
create_table_req = CreateTableRequest(
    name=demo_table,
    var_schema=ModelSchema(
        type='struct',
        fields=[
            StructField(
                id=0,
                name='user_id',
                type=Type('string'),
                required=True
            ),
            StructField(
                id=1,
                name='username',
                type=Type('string'),
                required=True
            ),
            StructField(
                id=2,
                name='email',
                type=Type('string'),
                required=True
            )
        ]
    )
)

response = catalog_client.create_table(
    prefix=catalog_name,
    namespace=demo_namespace,
    create_table_request=create_table_req
)

response

LoadTableResult(metadata_location='s3://polaris-test-data-lake/polaris_catalog/test_namespace/test_table/metadata/00000-90fa39d0-ad26-4d22-8190-32e6784784d5.metadata.json', metadata=TableMetadata(format_version=2, table_uuid='4ea2f7a5-cc1e-4c19-945a-e1d7a65f653c', location='s3://polaris-test-data-lake/polaris_catalog/test_namespace/test_table', last_updated_ms=1725623660669, properties={'write.parquet.compression-codec': 'zstd'}, schemas=[ModelSchema(type='struct', fields=[StructField(id=1, name='user_id', type=Type(oneof_schema_1_validator='string', oneof_schema_2_validator=None, oneof_schema_3_validator=None, oneof_schema_4_validator=None, actual_instance='string', one_of_schemas={'str', 'ListType', 'MapType', 'StructType'}), required=True, doc=None), StructField(id=2, name='username', type=Type(oneof_schema_1_validator='string', oneof_schema_2_validator=None, oneof_schema_3_validator=None, oneof_schema_4_validator=None, actual_instance='string', one_of_schemas={'str', 'ListType', 'M

# Display Table Metadata

In [9]:
import json
from IPython.display import display, JSON

# Call loadTable
tbl_meta = catalog_client.load_table(prefix=catalog_name, namespace=demo_namespace, table=demo_table)
display(JSON(tbl_meta.to_dict(), expanded=True))

<IPython.core.display.JSON object>

# Setup Spark

In [1]:
from pyspark.sql import SparkSession

client_id = '0c93d949d1128d8f'
client_secret = 'c501c218c4039f98e8a40e73e40bade9'

spark = (SparkSession.builder
  .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
  .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.5.0,org.apache.hadoop:hadoop-aws:3.4.0,software.amazon.awssdk:bundle:2.23.19,software.amazon.awssdk:url-connection-client:2.23.19")
  .config('spark.sql.iceberg.vectorization.enabled', 'false')
         
  # Configure the 'polaris' catalog as an Iceberg rest catalog
  .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.polaris.type", "rest")
  # Specify the rest catalog endpoint       
  .config("spark.sql.catalog.polaris.uri", "http://polaris:8181/api/catalog")
  # Enable token refresh
  .config("spark.sql.catalog.polaris.token-refresh-enabled", "true")
  # specify the client_id:client_secret pair
  .config("spark.sql.catalog.polaris.credential", f"{client_id}:{client_secret}")

  # Set the warehouse to the name of the catalog we created
  .config("spark.sql.catalog.polaris.warehouse", 'polaris_demo')

  # Scope set to PRINCIPAL_ROLE:ALL
  .config("spark.sql.catalog.polaris.scope", 'PRINCIPAL_ROLE:ALL')

  # Enable access credential delegation
  .config("spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation", 'vended-credentials')

  .config("spark.sql.catalog.polaris.io-impl", "org.apache.iceberg.io.ResolvingFileIO")
  .config("spark.sql.catalog.polaris.s3.region", "us-west-1")
  .config("spark.history.fs.logDirectory", "/home/iceberg/spark-events")).getOrCreate()

print(spark)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f65f841d180>

# Insert Data

In [None]:
spark.sql("USE polaris")
spark.sql("SHOW NAMESPACES").show()

# Read Data

In [6]:
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…