In [0]:
import requests
from datetime import datetime
import logging
import urllib3

import sailpoint
import sailpoint.v2024
from sailpoint.configuration import (Configuration, ConfigurationParams)
from sailpoint.paginator import Paginator

from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, TimestampType, MapType, DateType

In [0]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Create console handler and set level to info
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)

# Create formatter
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# Add formatter to ch
ch.setFormatter(formatter)

# Add ch to logger
logger.addHandler(ch)

In [0]:
tenant = 'devrel-ga-5668'
schema = 'dev'
client_name = 'sailpoint-readall-client'
secret_name = 'sailpoint-readall-secret'

In [0]:
logger.info('Defining API credentials')
api_url = f'https://{tenant}.api.identitynow-demo.com'
token_url = f'https://{tenant}.api.identitynow-demo.com/oauth/token'
client_id = dbutils.secrets.get(scope='sailpoint-devrel', key=client_name)
client_secret = dbutils.secrets.get(scope='sailpoint-devrel', key=secret_name)

In [0]:
logger.info('Configuring SailPoint SDK')
configurationParams = ConfigurationParams()
configurationParams.base_url = api_url
configurationParams.token_url = token_url
configurationParams.client_id = client_id
configurationParams.client_secret = client_secret
configuration = Configuration(configurationParams)
configuration.experimental = True
configuration.retries = urllib3.Retry(
    total=5,  # Total number of retries
    backoff_factor=10,  # Backoff factor for retry delay
    status_forcelist=[429, 500, 502, 503, 504],  # HTTP status codes to retry on
    allowed_methods=["GET"]  # HTTP methods to retry
)

In [0]:
identity_schema = StructType([
    StructField("id", StringType(), False),
    StructField("name", StringType(), True),
    StructField("created", TimestampType(), True),
    StructField("modified", TimestampType(), True),
    StructField("alias", StringType(), True),
    StructField("email_address", StringType(), True),
    StructField("processing_state", StringType(), True),
    StructField("identity_status", StringType(), True),
    StructField("manager_ref", StructType([
        StructField("type", StringType(), True),
        StructField("id", StringType(), True),
        StructField("name", StringType(), True)
    ]), True),
    StructField("is_manager", BooleanType(), True),
    StructField("last_refresh", TimestampType(), True),
    StructField("attributes", MapType(StringType(), StringType(), True)),
    StructField("lifecycle_state", StructType([
        StructField("state_name", StringType(), True),
        StructField("manually_updated", BooleanType(), True)
    ]), True)
])

In [0]:
async def get_total_identity_count():
    logger.info('Retrieving total identity count...')
    try:
        with sailpoint.v2024.ApiClient(configuration) as api_client:
            api_instance = sailpoint.v2024.IdentitiesApi(api_client)
            response = api_instance.list_identities_with_http_info(limit=1, count=True)
            logger.info(f'Total identity count: {response.headers["X-Total-Count"]}')
            return response.headers['X-Total-Count']
    except Exception as e:
        raise Exception(f'Error Retrieving total identity count: {e}')

async def get_identities(total_identity_count):
    logger.info('Retrieving identities...')
    try:
        with sailpoint.v2024.ApiClient(configuration) as api_client:
            api_instance = sailpoint.v2024.IdentitiesApi(api_client)
            all_identities = Paginator.paginate(
                api_instance.list_identities,
                result_limit=total_identity_count,
                limit=250
            )
        logger.info(f'Retrieved {len(all_identities)} identities')
        return all_identities
    except Exception as e:
        raise Exception(e)

In [0]:
total_identity_count = int(await get_total_identity_count())
logger.info(f'Total Identities: {total_identity_count}')
all_identities = await get_identities(total_identity_count)
logger.info(f'Total Retrieved Identities: {len(all_identities)}')

In [0]:
try:
    logger.info('Creating data frame...')
    identities_df = spark.createDataFrame(all_identities, identity_schema)
    today = datetime.utcnow().date()
    identities_df = identities_df.withColumn('day', lit(today))

    logger.info('Writing to delta...')
    identities_df.write \
        .format('delta') \
        .mode('append') \
        .partitionBy('day') \
        .saveAsTable(f'`dunker_databricks_space`.sailpoint.identities')

except Exception as e:
    logger.error(f'Error in creating data frame or writing to delta: {e}')