# Polars UDFs & Custom Functions - Comprehensive Workshop

Learn how to extend Polars with custom Python functions when built-in expressions aren't enough.

## What You'll Learn:
- When to use (and avoid) UDFs
- map_elements() for row-wise operations
- map_batches() for vectorized operations
- Custom aggregation functions
- Plugin system for performance-critical functions
- Performance optimization strategies
- Real-world use cases

## ⚠️ Important:
UDFs are **much slower** than native Polars expressions. Always try to use built-in expressions first!

In [None]:
import polars as pl
import numpy as np
from datetime import datetime, date
import time
import hashlib

print(f"Polars version: {pl.__version__}")

---
# Part 1: When to Use UDFs

## ✅ Good Use Cases:
- Complex business logic not available in Polars
- Calling external APIs or libraries
- Custom domain-specific calculations
- Integrating with existing Python functions

## ❌ Avoid UDFs For:
- Math operations (use expressions)
- String manipulation (use .str namespace)
- Date operations (use .dt namespace)
- Aggregations (use .agg())
- Conditional logic (use when/then/otherwise)

In [None]:
# Sample data
df = pl.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'age': [25, 34, 28, 42, 31],
    'salary': [50000, 75000, 60000, 95000, 68000],
    'email': ['alice@example.com', 'bob@test.org', 'charlie@email.com', 'diana@example.org', 'eve@test.com']
})

print(df)

## 1.1 Bad Example: DON'T use UDF for simple operations

In [None]:
# ❌ BAD: Using UDF for simple math
def double_salary_bad(salary):
    return salary * 2

result_bad = df.select([
    pl.col('name'),
    pl.col('salary').map_elements(double_salary_bad, return_dtype=pl.Int64).alias('doubled_salary_bad')
])

print("❌ Bad approach (UDF):")
print(result_bad)

In [None]:
# ✅ GOOD: Use native expression
result_good = df.select([
    pl.col('name'),
    (pl.col('salary') * 2).alias('doubled_salary_good')
])

print("✅ Good approach (native expression):")
print(result_good)

In [None]:
# Performance comparison
large_df = pl.DataFrame({
    'value': range(100000)
})

# UDF approach
start = time.time()
result1 = large_df.select(pl.col('value').map_elements(lambda x: x * 2, return_dtype=pl.Int64))
time_udf = time.time() - start

# Expression approach
start = time.time()
result2 = large_df.select(pl.col('value') * 2)
time_expr = time.time() - start

print(f"UDF approach: {time_udf:.4f}s")
print(f"Expression approach: {time_expr:.4f}s")
print(f"\nExpression is {time_udf/time_expr:.0f}x faster!")

---
# Part 2: map_elements() - Element-wise Operations

Applies a Python function to each element in a column.

## 2.1 Basic map_elements()

In [None]:
# Example: Custom email validation (complex logic not in Polars)
def classify_email_domain(email):
    """Classify email by domain type"""
    domain = email.split('@')[1]
    
    if domain.endswith('.com'):
        return 'commercial'
    elif domain.endswith('.org'):
        return 'organization'
    elif domain.endswith('.edu'):
        return 'education'
    else:
        return 'other'

result = df.select([
    pl.col('name'),
    pl.col('email'),
    pl.col('email').map_elements(classify_email_domain, return_dtype=pl.String).alias('domain_type')
])

print(result)

## 2.2 Multiple Input Columns with struct.map_elements()

In [None]:
# Function that needs multiple columns
def calculate_bonus(row):
    """Calculate bonus based on age and salary"""
    age = row['age']
    salary = row['salary']
    
    if age > 40:
        return salary * 0.15  # 15% bonus for senior employees
    elif age > 30:
        return salary * 0.10  # 10% bonus
    else:
        return salary * 0.05  # 5% bonus

result = df.select([
    pl.col('name'),
    pl.col('age'),
    pl.col('salary'),
    pl.struct(['age', 'salary']).map_elements(calculate_bonus, return_dtype=pl.Float64).alias('bonus')
])

print(result)

## 2.3 Return Complex Types from UDFs

In [None]:
# Return a tuple (will become a struct)
def analyze_name(name):
    """Return multiple values as struct"""
    return {
        'length': len(name),
        'uppercase': name.upper(),
        'has_vowels': any(v in name.lower() for v in 'aeiou'),
        'first_letter': name[0]
    }

result = df.select([
    pl.col('name'),
    pl.col('name').map_elements(analyze_name, return_dtype=pl.Struct({
        'length': pl.Int64,
        'uppercase': pl.String,
        'has_vowels': pl.Boolean,
        'first_letter': pl.String
    })).alias('name_analysis')
])

print(result)
print("\nUnnested:")
print(result.unnest('name_analysis'))

## 2.4 Return Lists from UDFs

In [None]:
# Return a list
def generate_salary_range(salary):
    """Generate salary range (min, median, max) based on base salary"""
    return [int(salary * 0.8), int(salary), int(salary * 1.2)]

result = df.select([
    pl.col('name'),
    pl.col('salary'),
    pl.col('salary').map_elements(
        generate_salary_range, 
        return_dtype=pl.List(pl.Int64)
    ).alias('salary_range')
])

print(result)

# Extract from list
result_expanded = result.with_columns([
    pl.col('salary_range').list.get(0).alias('min_salary'),
    pl.col('salary_range').list.get(1).alias('median_salary'),
    pl.col('salary_range').list.get(2).alias('max_salary')
])

print("\nExpanded:")
print(result_expanded)

---
# Part 3: map_batches() - Vectorized Operations

`map_batches()` is faster than `map_elements()` because it processes entire Series at once.

## 3.1 Basic map_batches()

In [None]:
# Function that operates on entire Series (using NumPy)
def normalize_series(series):
    """Z-score normalization using NumPy"""
    arr = series.to_numpy()
    normalized = (arr - arr.mean()) / arr.std()
    return pl.Series(normalized)

result = df.select([
    pl.col('name'),
    pl.col('salary'),
    pl.col('salary').map_batches(normalize_series).alias('normalized_salary')
])

print(result)

## 3.2 Performance: map_elements vs map_batches

In [None]:
# Create larger dataset
large_df = pl.DataFrame({
    'value': np.random.randint(1, 100, 50000)
})

# Element-wise function
def square_root_element(x):
    return x ** 0.5

# Batch function
def square_root_batch(series):
    return pl.Series(np.sqrt(series.to_numpy()))

# Benchmark map_elements
start = time.time()
result1 = large_df.select(
    pl.col('value').map_elements(square_root_element, return_dtype=pl.Float64)
)
time_elements = time.time() - start

# Benchmark map_batches
start = time.time()
result2 = large_df.select(
    pl.col('value').map_batches(square_root_batch)
)
time_batches = time.time() - start

# Benchmark native expression
start = time.time()
result3 = large_df.select(
    pl.col('value').sqrt()
)
time_native = time.time() - start

print(f"map_elements: {time_elements:.4f}s")
print(f"map_batches:  {time_batches:.4f}s")
print(f"Native expr:  {time_native:.4f}s")
print(f"\nmap_batches is {time_elements/time_batches:.1f}x faster than map_elements")
print(f"Native expr is {time_batches/time_native:.1f}x faster than map_batches")

## 3.3 Using External Libraries with map_batches

In [None]:
# Example: Using scipy for advanced statistics
from scipy import stats

def calculate_percentile_rank(series):
    """Calculate percentile rank using scipy"""
    arr = series.to_numpy()
    ranks = stats.rankdata(arr, method='average')
    percentiles = (ranks / len(ranks)) * 100
    return pl.Series(percentiles)

result = df.select([
    pl.col('name'),
    pl.col('salary'),
    pl.col('salary').map_batches(calculate_percentile_rank).alias('salary_percentile')
])

print(result)

---
# Part 4: Custom Aggregation Functions

Use UDFs in group_by aggregations.

In [None]:
# Sample data with groups
df_groups = pl.DataFrame({
    'department': ['Sales', 'Sales', 'Sales', 'Engineering', 'Engineering', 'Engineering', 'HR', 'HR'],
    'employee': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve', 'Frank', 'Grace', 'Henry'],
    'salary': [50000, 55000, 52000, 75000, 80000, 78000, 45000, 47000],
    'years': [2, 5, 3, 7, 10, 8, 3, 4]
})

print(df_groups)

## 4.1 Custom Aggregation with map_batches

In [None]:
# Custom aggregation: coefficient of variation
def coefficient_of_variation(series):
    """CV = (std / mean) * 100"""
    arr = series.to_numpy()
    cv = (arr.std() / arr.mean()) * 100
    return cv

result = df_groups.group_by('department').agg([
    pl.col('salary').mean().alias('avg_salary'),
    pl.col('salary').std().alias('std_salary'),
    pl.col('salary').map_batches(lambda s: coefficient_of_variation(s), return_dtype=pl.Float64).alias('salary_cv'),
    pl.col('employee').count().alias('num_employees')
])

print(result)

## 4.2 Complex Multi-Column Aggregations

In [None]:
# Function that needs multiple columns in aggregation
def calculate_salary_per_year(df_slice):
    """Calculate average salary per year of experience"""
    total_salary = df_slice['salary'].sum()
    total_years = df_slice['years'].sum()
    return total_salary / total_years if total_years > 0 else 0

result = df_groups.group_by('department').agg([
    pl.col('salary').mean().alias('avg_salary'),
    pl.col('years').mean().alias('avg_years'),
    pl.struct(['salary', 'years']).map_batches(
        lambda s: calculate_salary_per_year(s.struct.unnest()),
        return_dtype=pl.Float64
    ).alias('salary_per_year')
])

print(result)

---
# Part 5: Real-World Examples

## 5.1 Example: Hash Generation for Data Privacy

In [None]:
# Hash PII data for privacy
def hash_string(text):
    """Create SHA256 hash of string"""
    return hashlib.sha256(text.encode()).hexdigest()[:16]  # First 16 chars

df_pii = pl.DataFrame({
    'user_id': [1, 2, 3],
    'email': ['alice@example.com', 'bob@test.com', 'charlie@email.com'],
    'ssn': ['123-45-6789', '987-65-4321', '555-55-5555']
})

result = df_pii.select([
    pl.col('user_id'),
    pl.col('email').map_elements(hash_string, return_dtype=pl.String).alias('email_hash'),
    pl.col('ssn').map_elements(hash_string, return_dtype=pl.String).alias('ssn_hash')
])

print("Original:")
print(df_pii)
print("\nHashed:")
print(result)

## 5.2 Example: Geocoding with External API (Mock)

In [None]:
# Mock geocoding function (in reality, would call an API)
def mock_geocode(city):
    """Mock geocoding - returns fake lat/lon"""
    # In reality: requests.get(f'https://api.geocode.com?city={city}')
    geocode_map = {
        'NYC': {'lat': 40.7128, 'lon': -74.0060},
        'LA': {'lat': 34.0522, 'lon': -118.2437},
        'Chicago': {'lat': 41.8781, 'lon': -87.6298},
        'Boston': {'lat': 42.3601, 'lon': -71.0589}
    }
    return geocode_map.get(city, {'lat': 0.0, 'lon': 0.0})

df_locations = pl.DataFrame({
    'store_id': [1, 2, 3, 4],
    'city': ['NYC', 'LA', 'Chicago', 'Boston']
})

result = df_locations.select([
    pl.col('store_id'),
    pl.col('city'),
    pl.col('city').map_elements(
        mock_geocode,
        return_dtype=pl.Struct({'lat': pl.Float64, 'lon': pl.Float64})
    ).alias('coordinates')
]).unnest('coordinates')

print(result)

## 5.3 Example: Natural Language Processing

In [None]:
# Simple sentiment analysis (mock - in reality use NLTK, spaCy, or transformers)
def simple_sentiment(text):
    """Very simple sentiment analysis based on keywords"""
    text_lower = text.lower()
    positive_words = ['good', 'great', 'excellent', 'love', 'amazing', 'best']
    negative_words = ['bad', 'terrible', 'hate', 'worst', 'awful', 'poor']
    
    pos_count = sum(1 for word in positive_words if word in text_lower)
    neg_count = sum(1 for word in negative_words if word in text_lower)
    
    if pos_count > neg_count:
        return 'positive'
    elif neg_count > pos_count:
        return 'negative'
    else:
        return 'neutral'

df_reviews = pl.DataFrame({
    'review_id': [1, 2, 3, 4, 5],
    'text': [
        'This product is great! I love it.',
        'Terrible experience, worst purchase ever.',
        'It is okay, nothing special.',
        'Amazing quality, best in class.',
        'Bad quality, would not recommend.'
    ]
})

result = df_reviews.select([
    pl.col('review_id'),
    pl.col('text'),
    pl.col('text').map_elements(simple_sentiment, return_dtype=pl.String).alias('sentiment')
])

print(result)

# Count sentiments
print("\nSentiment distribution:")
print(result.group_by('sentiment').agg(pl.len().alias('count')))

## 5.4 Example: Custom Date Business Logic

In [None]:
# Calculate business days between dates (excluding weekends)
def business_days_until_deadline(row):
    """Calculate business days from today to deadline"""
    from datetime import timedelta
    
    today = row['today']
    deadline = row['deadline']
    
    business_days = 0
    current_date = today
    
    while current_date < deadline:
        if current_date.weekday() < 5:  # Monday=0, Friday=4
            business_days += 1
        current_date += timedelta(days=1)
    
    return business_days

df_deadlines = pl.DataFrame({
    'task': ['Report', 'Presentation', 'Review', 'Analysis'],
    'today': [date(2024, 1, 15)] * 4,  # Monday
    'deadline': [
        date(2024, 1, 19),  # Friday
        date(2024, 1, 22),  # Monday (next week)
        date(2024, 1, 17),  # Wednesday
        date(2024, 1, 26)   # Friday (next week)
    ]
})

result = df_deadlines.select([
    pl.col('task'),
    pl.col('today'),
    pl.col('deadline'),
    pl.struct(['today', 'deadline']).map_elements(
        business_days_until_deadline,
        return_dtype=pl.Int64
    ).alias('business_days')
])

print(result)

---
# Part 6: Error Handling in UDFs

In [None]:
# UDF with error handling
def safe_divide(row):
    """Safely divide two numbers"""
    try:
        numerator = row['numerator']
        denominator = row['denominator']
        
        if denominator == 0:
            return None  # Return null for division by zero
        
        return numerator / denominator
    except Exception as e:
        print(f"Error: {e}")
        return None

df_division = pl.DataFrame({
    'numerator': [10, 20, 30, 40],
    'denominator': [2, 0, 5, 0]  # Some zeros!
})

result = df_division.select([
    pl.col('numerator'),
    pl.col('denominator'),
    pl.struct(['numerator', 'denominator']).map_elements(
        safe_divide,
        return_dtype=pl.Float64
    ).alias('result')
])

print(result)

---
# Part 7: Lazy Evaluation and UDFs

UDFs work with lazy evaluation, but may prevent some query optimizations.

In [None]:
# UDF in lazy context
def custom_transform(x):
    return x * 2 + 10

lazy_df = pl.LazyFrame({
    'id': range(1, 6),
    'value': [10, 20, 30, 40, 50]
})

lazy_result = (
    lazy_df
    .filter(pl.col('value') > 20)
    .select([
        pl.col('id'),
        pl.col('value'),
        pl.col('value').map_elements(custom_transform, return_dtype=pl.Int64).alias('transformed')
    ])
)

print("Lazy query plan:")
print(lazy_result.explain())

print("\nCollected result:")
print(lazy_result.collect())

---
# Part 8: Best Practices & Optimization Tips

## 8.1 Prefer Native Expressions

| Task | ❌ UDF | ✅ Native |
|------|--------|----------|
| Math | `map_elements(lambda x: x*2)` | `pl.col('x') * 2` |
| String | `map_elements(lambda s: s.upper())` | `pl.col('s').str.to_uppercase()` |
| Date | `map_elements(lambda d: d.year)` | `pl.col('d').dt.year()` |
| Conditional | `map_elements(lambda x: 'high' if x > 10 else 'low')` | `pl.when(...).then(...).otherwise(...)` |

## 8.2 Use map_batches() When Possible

In [None]:
# If you must use a UDF, prefer map_batches

# ❌ Slower: map_elements
def transform_element(x):
    return (x ** 2 + x ** 0.5) / 2

# ✅ Faster: map_batches with NumPy
def transform_batch(series):
    arr = series.to_numpy()
    result = (arr ** 2 + arr ** 0.5) / 2
    return pl.Series(result)

test_df = pl.DataFrame({'x': range(10000)})

# Benchmark
start = time.time()
result1 = test_df.select(pl.col('x').map_elements(transform_element, return_dtype=pl.Float64))
time1 = time.time() - start

start = time.time()
result2 = test_df.select(pl.col('x').map_batches(transform_batch))
time2 = time.time() - start

print(f"map_elements: {time1:.4f}s")
print(f"map_batches:  {time2:.4f}s")
print(f"\nmap_batches is {time1/time2:.1f}x faster")

## 8.3 Specify return_dtype Explicitly

In [None]:
# Always specify return_dtype for better performance and type safety

# ❌ Bad: Type inference can be slow
# result = df.select(pl.col('x').map_elements(my_func))

# ✅ Good: Explicit dtype
def double_value(x):
    return x * 2

result = df.select(
    pl.col('salary').map_elements(double_value, return_dtype=pl.Int64).alias('doubled')
)

print(result)

## 8.4 Cache Expensive Operations

In [None]:
# If UDF is expensive, cache results
from functools import lru_cache

@lru_cache(maxsize=1000)
def expensive_computation(value):
    """Simulate expensive operation"""
    time.sleep(0.001)  # Simulate API call or complex calc
    return value ** 2

df_cached = pl.DataFrame({
    'value': [1, 2, 3, 1, 2, 3, 1, 2, 3]  # Repeated values
})

start = time.time()
result = df_cached.select(
    pl.col('value').map_elements(expensive_computation, return_dtype=pl.Int64).alias('result')
)
elapsed = time.time() - start

print(result)
print(f"\nTime with caching: {elapsed:.4f}s")
print("Note: Repeated values are computed only once!")

---
# Summary

## Key Takeaways:

### 1. **Always Prefer Native Expressions**
   - 10-100x faster than UDFs
   - Better parallelization
   - Query optimization possible

### 2. **When to Use UDFs**
   - ✅ External API calls
   - ✅ Complex business logic
   - ✅ Third-party libraries (scipy, sklearn, etc.)
   - ✅ Domain-specific calculations
   - ❌ Simple math, string, or date operations

### 3. **Performance Hierarchy** (fastest to slowest)
   1. Native Polars expressions (FASTEST)
   2. `map_batches()` with NumPy (FAST)
   3. `map_elements()` (SLOW)
   4. Python apply/map (SLOWEST - don't use)

### 4. **Best Practices**
   - Always specify `return_dtype`
   - Use `map_batches()` over `map_elements()` when possible
   - Leverage NumPy in batch operations
   - Add error handling in UDFs
   - Cache expensive computations
   - Use `struct` for multi-column inputs

### 5. **Common Patterns**

```python
# Single column UDF
pl.col('x').map_elements(my_func, return_dtype=pl.Float64)

# Multi-column UDF
pl.struct(['x', 'y']).map_elements(my_func, return_dtype=pl.Float64)

# Vectorized UDF
pl.col('x').map_batches(my_batch_func)

# Return complex types
pl.col('x').map_elements(my_func, return_dtype=pl.List(pl.Int64))
pl.col('x').map_elements(my_func, return_dtype=pl.Struct({'a': pl.Int64, 'b': pl.String}))
```

## Remember:
> **UDFs are a last resort. Always try native expressions first!**

---
# Practice Exercises

In [None]:
# Exercise data
exercise_df = pl.DataFrame({
    'product_id': [1, 2, 3, 4, 5],
    'name': ['Laptop', 'Mouse', 'Keyboard', 'Monitor', 'Headphones'],
    'price': [1200, 25, 75, 300, 150],
    'category': ['Electronics', 'Accessories', 'Accessories', 'Electronics', 'Accessories'],
    'description': [
        'High performance laptop',
        'Wireless mouse with great battery',
        'Mechanical keyboard for gaming',
        '4K monitor with amazing colors',
        'Noise cancelling headphones'
    ]
})

print(exercise_df)

In [None]:
# Exercise 1: Create a UDF that classifies products by price range
# Budget: < 50, Mid-range: 50-200, Premium: > 200
# Your code here:


In [None]:
# Exercise 2: Create a UDF that counts vowels in the product name
# Your code here:


In [None]:
# Exercise 3: Create a UDF that generates a product code
# Format: first 3 letters of category + price rounded to nearest 10
# Example: "Electronics" + 1200 -> "ELE1200"
# Your code here:


In [None]:
# Exercise 4: Create a batch UDF that normalizes prices (z-score)
# Your code here:


In [None]:
# Exercise 5: Create a UDF that returns struct with word count and char count
# Your code here:
