# Skyflow Temporary UDFs - Complete Setup & Usage

This notebook demonstrates tokenization and detokenization using **temporary Pandas UDFs** with optimal API batching.

## Quick Start

1. **Configure cell 1** with your credentials
2. **Run cells 1-3** to register the UDFs
3. **UDFs are registered** and ready to use immediately!

## Prerequisites

- ‚úÖ Databricks cluster (any configuration)
- ‚úÖ Lambda API deployed and accessible from Databricks
- ‚úÖ Skyflow credentials (Cluster ID, Vault ID, Table name)

## Key Differences from Unity Catalog Approach

| Feature | Temporary UDFs | Unity Catalog Functions |
|---------|----------------|-------------------------|
| **Setup** | Run notebook cells | Create functions once |
| **Lifecycle** | Session-scoped | Permanent (cluster-wide) |
| **Availability** | Single session | All users with permission |
| **Configuration** | In notebook | Embedded in functions |
| **API Batching** | ‚úÖ Yes (25 rows/call) | ‚ùå No (1 row/call - scalar) |
| **Performance** | Optimal for high-volume | Spark parallelizes |
| **Views** | Must be temporary | Can be persistent |
| **Use Case** | High-volume, development | Shared access, governance |

## Performance Advantage

Pandas UDFs batch API calls for optimal performance:

```python
# Temporary UDF approach (this notebook)
# 100 values = 4 API calls (25 values per call)
skyflow_tokenize(column)  

# Unity Catalog approach (databricks_unity_catalog.ipynb)
# 100 values = 100 API calls (1 value per call)
skyflow_tokenize(value, table_name, column_name)
```

---

# Setup: Register UDFs

Run cells 1-3 to register the temporary UDFs.

In [None]:
# ============================================================================
# Step 1: Configuration - UPDATE THESE VALUES
# ============================================================================

# Lambda API configuration
LAMBDA_URL = "https://YOUR_API_ID.execute-api.YOUR_REGION.amazonaws.com/processDatabricks"

# Skyflow configuration
CLUSTER_ID = "YOUR_CLUSTER_ID"
VAULT_ID = "YOUR_VAULT_ID"
TABLE = "TABLE_NAME"

# Performance tuning
BATCH_SIZE = 1000  # Rows per Lambda call (Lambda batches internally at 25 rows/Skyflow API call)
REQUEST_TIMEOUT = 30  # Timeout in seconds (must be <= Lambda timeout, default 30s)

print("=" * 60)
print("Configuration Summary")
print("=" * 60)
print(f"Lambda URL:  {LAMBDA_URL}")
print(f"Cluster ID:  {CLUSTER_ID}")
print(f"Vault ID:    {VAULT_ID}")
print(f"Table:       {TABLE}")
print(f"Batch Size:  {BATCH_SIZE}")
print(f"Timeout:     {REQUEST_TIMEOUT}s")
print("=" * 60)
print("\n‚úì Configuration loaded")
print("\nNext: Run cells 2-3 to register UDFs")
print("")
print("üí° Performance Tips:")
print("   - BATCH_SIZE: Number of rows sent to Lambda per request")
print("   - Lambda internally batches at 25 rows per Skyflow API call")
print("   - Reduce BATCH_SIZE if you see timeout errors (e.g., 500, 250)")
print("   - Increase REQUEST_TIMEOUT if Lambda needs more processing time")

In [None]:
# ============================================================================
# Step 2: Register Tokenization UDF
# ============================================================================
#
# NOTE: This Pandas UDF accepts column_name as a runtime parameter.
# Pass the column name using lit() when calling the function.
#
# Performance characteristics:
# - Batches BATCH_SIZE rows per Lambda call (default: 1000)
# - Lambda further batches at 25 rows per Skyflow API call
# - Significantly reduces API costs and latency vs scalar functions
# - Session-scoped only (must re-register after cluster restart)
#
# For persistent functions with governance, use databricks_unity_catalog.ipynb instead.
#

import pandas as pd
import requests
from pyspark.sql.functions import pandas_udf, lit

@pandas_udf("string")
def skyflow_tokenize(values: pd.Series, column_names: pd.Series) -> pd.Series:
    """
    Tokenize a column of sensitive data using the Lambda API with batching.

    Args:
        values: Pandas Series containing plaintext values to tokenize (may include NULLs)
        column_names: Pandas Series containing the column name (same for all rows via lit())

    Returns:
        Pandas Series containing Skyflow tokens
        
    Usage:
        df.withColumn("email_token", skyflow_tokenize(col("email"), lit("email")))
    """
    # Extract column name (same for all rows)
    column_name = column_names.iloc[0]
    
    results = [None] * len(values)

    # Process in batches to optimize API calls
    for start in range(0, len(values), BATCH_SIZE):
        end = min(start + BATCH_SIZE, len(values))
        batch = values.iloc[start:end].tolist()

        # Filter out NULL values and build records
        records = []
        indices = []
        for i in range(start, end):
            val = values.iloc[i]
            if val is not None:
                records.append({column_name: val})
                indices.append(i)

        if not records:
            continue

        # Call Lambda API
        resp = requests.post(
            LAMBDA_URL,
            json={"records": records},
            headers={
                "Content-Type": "application/json",
                "X-Skyflow-Operation": "tokenize",
                "X-Skyflow-Cluster-ID": CLUSTER_ID,
                "X-Skyflow-Vault-ID": VAULT_ID,
                "X-Skyflow-Table": TABLE
            },
            timeout=REQUEST_TIMEOUT
        )
        resp.raise_for_status()

        # Parse response and extract tokens
        data = resp.json().get("data", [])

        # Fill results for this batch
        for idx, record in enumerate(data):
            result_index = indices[idx]
            results[result_index] = record.get(column_name)

    return pd.Series(results)

# Register the UDF for SQL use
spark.udf.register("skyflow_tokenize", skyflow_tokenize)

print("‚úì Created function: skyflow_tokenize(value, column_name)")
print(f"  Lambda URL: {LAMBDA_URL}")
print(f"  Batch Size: {BATCH_SIZE} rows per Lambda call")
print(f"  Timeout: {REQUEST_TIMEOUT}s")
print("")
print("Usage:")
print("  # Python DataFrame API")
print("  df.withColumn('email_token', skyflow_tokenize(col('email'), lit('email')))")
print("  ")
print("  # SQL (after registering)")
print("  SELECT skyflow_tokenize(email, 'email') as token FROM users")

In [None]:
# ============================================================================
# Step 3: Register Detokenization UDF
# ============================================================================
#
# NOTE: This is also a Pandas UDF with 2-level batching.
# Lambda internally batches at 25 tokens per Skyflow API call.
#

@pandas_udf("string")
def skyflow_detokenize(tokens: pd.Series) -> pd.Series:
    """
    Detokenize a column of Skyflow tokens using the Lambda API with batching.

    Args:
        tokens: Pandas Series containing Skyflow tokens (may include NULLs)

    Returns:
        Pandas Series containing detokenized values
    """
    results = [None] * len(tokens)

    # Process in batches to optimize API calls
    for start in range(0, len(tokens), BATCH_SIZE):
        end = min(start + BATCH_SIZE, len(tokens))
        batch = tokens.iloc[start:end].tolist()

        # Filter out NULL values
        non_null = [t for t in batch if t is not None]
        if not non_null:
            continue

        # Call Lambda API
        resp = requests.post(
            LAMBDA_URL,
            json={"tokens": non_null},
            headers={
                "Content-Type": "application/json",
                "X-Skyflow-Operation": "detokenize",
                "X-Skyflow-Cluster-ID": CLUSTER_ID,
                "X-Skyflow-Vault-ID": VAULT_ID
            },
            timeout=REQUEST_TIMEOUT
        )
        resp.raise_for_status()

        # Parse response and map tokens to values
        data = resp.json().get("data", [])
        token_to_value = {r["token"]: r["value"] for r in data}

        # Fill results for this batch
        for i in range(start, end):
            tok = tokens.iloc[i]
            results[i] = None if tok is None else token_to_value.get(tok)

    return pd.Series(results)

# Register the UDF for SQL use
spark.udf.register("skyflow_detokenize", skyflow_detokenize)

print("‚úì Created function: skyflow_detokenize(token)")
print(f"  Lambda URL: {LAMBDA_URL}")
print(f"  Cluster ID: {CLUSTER_ID}")
print(f"  Vault ID: {VAULT_ID}")
print(f"  Batch Size: {BATCH_SIZE} tokens per Lambda call")
print(f"  Timeout: {REQUEST_TIMEOUT}s")

## Setup Complete!

The following temporary UDFs have been registered:

1. **skyflow_tokenize(value)** - Tokenizes sensitive data with batching
2. **skyflow_detokenize(token)** - Detokenizes tokens back to plaintext with batching

These functions are now:
- Available in your current Spark session
- Usable in SQL queries and DataFrame operations
- Optimized with API batching (25 rows per call)

**Note:** UDFs must be re-registered after cluster restart.

---

# Usage Examples

The cells below demonstrate how to use the UDFs.

## Generate Test Data

Create sample data for testing the functions:

In [None]:
from pyspark.sql.functions import expr, current_timestamp, col

# Configure number of test rows
NUM_ROWS = 100

# Create test data
test_df = spark.range(NUM_ROWS).select(
    (expr("id + 1").alias("user_id")),
    expr("concat('user_', id)").alias("username"),
    expr("concat('user', id, '@example.com')").alias("email"),
    expr("concat('555-01-', LPAD(id % 10000, 4, '0'))").alias("ssn"),
    expr("concat('+1-555-', LPAD(id % 1000, 3, '0'), '-', LPAD((id * 7) % 10000, 4, '0'))").alias("phone"),
    current_timestamp().alias("created_at")
)

# Save as table
test_df.write.mode("overwrite").saveAsTable("raw_users")

print(f"‚úì Created raw_users table with {NUM_ROWS} rows")
display(spark.table("raw_users").limit(10))

## Tokenize Single Column

Test the tokenization function on sample data:

In [None]:
# Tokenize email column - pass column name as runtime parameter
df = spark.table("raw_users")
df_tokenized = df.withColumn("email_token", skyflow_tokenize(col("email"), lit("email")))

# Display results
display(df_tokenized.select("user_id", "username", "email", "email_token", "phone").limit(10))

## Create Tokenized Table

Save tokenized data to a new table (keeping non-sensitive columns as plaintext):

In [None]:
# Create a new table with tokenized sensitive columns
tokenized_df = spark.table("raw_users").select(
    col("user_id"),
    col("username"),
    skyflow_tokenize(col("email"), lit("email")).alias("email_token"),
    col("phone"),  # Non-sensitive: keep as plaintext
    col("created_at")
)

# Save tokenized data
tokenized_df.write.mode("overwrite").saveAsTable("tokenized_users")

count = spark.table("tokenized_users").count()
print(f"‚úì Created tokenized_users table with {count} rows")
display(spark.table("tokenized_users").limit(10))

## Detokenize Tokens

Test the detokenization function:

In [None]:
# Detokenize the tokenized_users table
df_tokens = spark.table("tokenized_users")
df_detokenized = df_tokens.withColumn("email_detokenized", skyflow_detokenize(col("email_token")))

# Display: token vs detokenized value
display(df_detokenized.select("user_id", "username", "email_token", "email_detokenized").limit(10))

print("\n‚úì Roundtrip complete: Tokens ‚Üí Detokenized values")

## Create Temporary Detokenized View

Create a temporary view that automatically detokenizes tokens for authorized queries:

**Note:** The view must be temporary because the UDFs are session-scoped.

In [None]:
# Create a temporary view that detokenizes email tokens on-the-fly
spark.sql("""
    CREATE OR REPLACE TEMP VIEW users_detokenized AS
    SELECT 
        user_id,
        username,
        skyflow_detokenize(email_token) as email,
        phone,
        created_at
    FROM tokenized_users
""")

print("‚úì Created TEMPORARY view: users_detokenized")
print("  This view exists for the duration of your Spark session")
display(spark.sql("SELECT * FROM users_detokenized LIMIT 10"))

## Verify Roundtrip Accuracy

Compare original values with tokenized and detokenized values:

In [None]:
# Compare original vs detokenized
verification_df = spark.sql("""
    SELECT
        t.user_id,
        r.email as original_email,
        t.email_token,
        d.email as detokenized_email,
        CASE
            WHEN r.email = d.email THEN 'MATCH'
            ELSE 'MISMATCH'
        END as verification
    FROM tokenized_users t
    JOIN raw_users r ON t.user_id = r.user_id
    JOIN users_detokenized d ON t.user_id = d.user_id
    LIMIT 10
""")

display(verification_df)

# Check for any mismatches
mismatches = verification_df.filter("verification = 'MISMATCH'").count()
if mismatches == 0:
    print("\n‚úì All records match! Tokenization ‚Üí Detokenization working correctly.")
else:
    print(f"\n‚úó Found {mismatches} mismatches - investigate!")

## SQL Usage Examples

Both UDFs can be used directly in SQL queries:

In [None]:
# Example 1: Tokenize with SQL - pass column name as string literal
result = spark.sql("""
    SELECT 
        user_id,
        email,
        skyflow_tokenize(email, 'email') as email_token
    FROM raw_users
    WHERE user_id <= 5
""")
display(result)

# Example 2: Detokenize with filtering
result = spark.sql("""
    SELECT 
        user_id,
        skyflow_detokenize(email_token) as email
    FROM tokenized_users
    WHERE phone LIKE '+1-555-000%'
    LIMIT 10
""")
display(result)

# Example 3: Create table with inline tokenization
spark.sql("""
    CREATE OR REPLACE TABLE secure_contacts AS
    SELECT 
        user_id,
        username,
        skyflow_tokenize(email, 'email') as email_token,
        phone
    FROM raw_users
""")
print("\n‚úì Created secure_contacts table with tokenized emails")

## Cleanup (Optional)

To remove all test resources:

In [None]:
# Uncomment to drop test tables and views
# spark.sql("DROP TABLE IF EXISTS raw_users")
# spark.sql("DROP TABLE IF EXISTS tokenized_users")
# spark.sql("DROP TABLE IF EXISTS secure_contacts")
# spark.sql("DROP VIEW IF EXISTS users_detokenized")
# print("‚úì Cleanup complete")

## Summary

This notebook demonstrated temporary Pandas UDFs for Skyflow integration:

**Key Features:**
- Simple setup - Register UDFs in your session (cells 1-3)
- Optimal batching - 25 rows per API call (configurable)
- Session-scoped - Functions available for session duration
- High performance - Significantly fewer API calls than scalar functions
- Temporary views - Create session-scoped detokenized views
- Flexible configuration - All settings in notebook (not embedded)

**Important Performance Note:**

Pandas UDFs with batching provide **optimal API efficiency**:
- Makes 1 API call per 25 rows (vs 1 call per row for Unity Catalog)
- Reduces API costs by ~25x compared to scalar functions
- Best for high-volume tokenization workloads

**When to use Temporary UDFs vs Unity Catalog:**

| Use Case | Recommended Approach |
|----------|---------------------|
| High-volume tokenization (100K+ rows) | Temporary Pandas UDFs (this notebook) |
| Ad-hoc analysis, development | Temporary Pandas UDFs (this notebook) |
| Optimal API call batching required | Temporary Pandas UDFs (this notebook) |
| Production pipelines, shared resources | Unity Catalog (databricks_unity_catalog.ipynb) |
| Persistent views required | Unity Catalog (databricks_unity_catalog.ipynb) |
| Team collaboration, governance | Unity Catalog (databricks_unity_catalog.ipynb) |

**To update configuration:**
1. Update variables in cell 1
2. Re-run cells 2-3 to re-register UDFs with new configuration