# Campaign Functions Notebook

This notebook contains functions for matching users to events and generating messages using AI, then uploading to Airtable.

In [1]:
import json
import datetime
import os
import sys
import logging
from typing import List, Dict, Any, Optional

# Add backend directory to path to import utils
# Notebook is in Leo/jupyter/, backend is in Leo/backend/
# Go up one level from jupyter to get to Leo, then into backend
current_dir = os.getcwd()
leo_dir = os.path.dirname(current_dir) if 'jupyter' in current_dir else current_dir
backend_dir = os.path.join(leo_dir, 'backend')

# If that doesn't work, try going up from current directory
if not os.path.exists(os.path.join(backend_dir, 'utils')):
    backend_dir = os.path.join(os.path.dirname(current_dir), 'backend')

# Add to path if it exists
if os.path.exists(os.path.join(backend_dir, 'utils')):
    backend_dir = os.path.abspath(backend_dir)
    sys.path.insert(0, backend_dir)
    print(f"✓ Added backend directory to path: {backend_dir}")
else:
    # Fallback: try absolute path
    abs_backend = os.path.join(os.path.expanduser('~'), 'Documents', 'Development', 'Leos', 'Leo', 'backend')
    if os.path.exists(os.path.join(abs_backend, 'utils')):
        sys.path.insert(0, abs_backend)
        print(f"✓ Added backend directory to path: {abs_backend}")
    else:
        print(f"⚠ Warning: Could not find backend directory automatically.")
        print(f"Current working directory: {current_dir}")
        print(f"Please manually add the backend directory to sys.path")

# Import real utilities from Leo codebase
try:
    from utils.mongodb_pull import MongoDBPull
    from utils.ai_generate.ai_generate import ai_generate_meta_tag_parse
    from utils.airtable_sync.airtable_sync import upload_message_to_airtable
    from bson import ObjectId
    print("✓ Imports loaded successfully")
except ImportError as e:
    print(f"✗ Import error: {e}")
    print("Please adjust the path to the backend directory manually if needed.")

✓ Added backend directory to path: /Users/chriscruz/Documents/Development/Leos/Leo/backend
✓ Imports loaded successfully


## Pull Data from MongoDB

Function to pull users and events from MongoDB using the MongoDBPull utility.

In [2]:
# ============================================================================
# Campaign-Specific Filter Functions
# ============================================================================

def filter_fill_table_users(users_array: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Filter users for Fill The Table campaign.
    Criteria: Profile complete (at least 4 of 5 required fields filled), 
    has attended at least 1 event, sorted by event_count descending.
    Required fields: interests, tableTypePreference, homeNeighborhood, gender, relationship_status
    """
    
    # Logging: Count input
    input_count = len(users_array)
    print(f"[filter_fill_table_users] Input: {input_count} users")
    
    required_fields = ['interests', 'tableTypePreference', 'homeNeighborhood', 'gender', 'relationship_status']
    
    filtered = []
    for user in users_array:
        # Must have attended at least 1 event
        event_count = user.get("event_count", 0)
        if event_count < 1:
            continue
        
        # Calculate personalization_ready manually
        # Count filled fields (non-empty values)
        filled_count = 0
        for field in required_fields:
            value = user.get(field)
            # Check if field is filled (not None, not empty string, not empty list)
            if value is not None:
                if field == 'interests':
                    # For interests, check if list is non-empty with non-empty values
                    if isinstance(value, list) and len(value) > 0:
                        if any(item for item in value if item):
                            filled_count += 1
                elif isinstance(value, str):
                    # For strings, check if non-empty after stripping whitespace
                    if value.strip():
                        filled_count += 1
                else:
                    # For other types (numbers, etc.), just check if truthy
                    if value:
                        filled_count += 1
        
        # personalization_ready = at least 4 of 5 fields filled
        is_ready = filled_count >= 4
        
        if is_ready:
            filtered.append(user)
    
    # Sort by event_count descending (most events first)
    filtered.sort(key=lambda x: x.get("event_count", 0), reverse=True)
    
    # Logging: Count output
    
    
    
    # Logging: Count output
    output_count = len(filtered)
    print(f"[filter_fill_table_users] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    return filtered


def filter_fill_table_events(events_array: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Filter events for Fill The Table campaign.
    Criteria: Future events, public type, participationPercentage < 50%.
    Sorted by participationPercentage ascending (most underfilled first).
    """
    
    # Logging: Count input
    input_count = len(events_array)
    print(f"[filter_fill_table_events] Input: {input_count} events")
    
    future_events = filter_future_events(events_array)
    filtered = []
    
    for event in future_events:
        if event.get("type") != "public":
            continue
        
        # Calculate participationPercentage manually
        participants = event.get("participants", []) or []
        participant_count = len(participants)
        max_participants = event.get("maxParticipants") or 0
        
        if max_participants > 0:
            participation_percentage = (participant_count / max_participants) * 100
        else:
            participation_percentage = 0
        
        # Only include events with < 50% participation
        if participation_percentage < 50:
            # Store calculated percentage for sorting
            event_copy = event.copy()
            event_copy["participationPercentage"] = participation_percentage
            filtered.append(event_copy)
    
    # Sort by participationPercentage ascending (most underfilled first)
    filtered.sort(key=lambda x: x.get("participationPercentage", 0))
    
    # Logging: Count output
    # Logging: Count output
    output_count = len(filtered)
    print(f"[filter_fill_table_events] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    return filtered


def filter_return_table_users(users_array: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Filter users for Return To Table campaign.
    Criteria: Dormant (31-90 days inactive), has attended at least 1 event,
    profile complete, has interests. Sorted by reactivation_score descending.
    """
    
    # Logging: Count input
    input_count = len(users_array)
    print(f"[filter_return_table_users] Input: {input_count} users")
    
    filtered = [
        user for user in users_array
        if (31 <= user.get("days_inactive", 9999) <= 90)
        and user.get("event_count", 0) >= 1
        and user.get("personalization_ready", False) == True
        and user.get("interests")  # Has interests (not None/empty)
    ]
    # Sort by reactivation_score descending
    filtered.sort(key=lambda x: x.get("reactivation_score", 0), reverse=True)
    
    # Logging: Count output
    # Logging: Count output
    output_count = len(filtered)
    print(f"[filter_return_table_users] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    return filtered


def filter_return_table_events(events_array: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Filter events for Return To Table campaign.
    Criteria: Future events, public type, participationPercentage >= 60%.
    Sorted by participationPercentage descending (most attended first).
    """
    
    # Logging: Count input
    input_count = len(events_array)
    print(f"[filter_return_table_events] Input: {input_count} events")
    
    future_events = filter_future_events(events_array)
    filtered = [
        event for event in future_events
        if event.get("type") == "public"
        and event.get("participationPercentage", 0) >= 60
    ]
    # Sort by participationPercentage descending (most attended first)
    filtered.sort(key=lambda x: x.get("participationPercentage", 0), reverse=True)
    
    # Logging: Count output
    # Logging: Count output
    output_count = len(filtered)
    print(f"[filter_return_table_events] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    return filtered


def filter_seat_newcomers_users(users_array: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Filter users for Seat Newcomers campaign.
    Criteria: 0-2 events attended, joined within 90 days, profile complete, has interests.
    Sorted by newcomer_score descending (prioritize 0 events).
    """
    
    # Logging: Count input
    input_count = len(users_array)
    print(f"[filter_seat_newcomers_users] Input: {input_count} users")
    
    filtered = [
        user for user in users_array
        if (0 <= user.get("event_count", 0) <= 2)
        and user.get("days_since_registration", 9999) <= 90
        and user.get("personalization_ready", False) == True
        and user.get("interests")  # Has interests (not None/empty)
    ]
    # Sort by newcomer_score descending
    filtered.sort(key=lambda x: x.get("newcomer_score", 0), reverse=True)
    
    # Logging: Count output
    # Logging: Count output
    output_count = len(filtered)
    print(f"[filter_seat_newcomers_users] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    return filtered


def filter_seat_newcomers_events(events_array: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Filter events for Seat Newcomers campaign.
    Criteria: Future events, public type, participationPercentage 50-80% (beginner-friendly).
    Sorted by proximity to 65% (middle of ideal range).
    """
    
    # Logging: Count input
    input_count = len(events_array)
    print(f"[filter_seat_newcomers_events] Input: {input_count} events")
    
    future_events = filter_future_events(events_array)
    filtered = [
        event for event in future_events
        if event.get("type") == "public"
        and 50 <= event.get("participationPercentage", 0) <= 80
    ]
    # Sort by proximity to 65% (middle of ideal range)
    def sort_key(event):
        pct = event.get("participationPercentage", 0)
        return abs(65 - pct)
    filtered.sort(key=sort_key)
    
    # Logging: Count output
    # Logging: Count output
    output_count = len(filtered)
    print(f"[filter_seat_newcomers_events] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    return filtered

In [3]:
def pull_users_and_events(
    users_filter: Optional[Dict[str, Any]] = None,
    users_limit: Optional[int] = None,
    events_filter: Optional[Dict[str, Any]] = None,
    events_limit: Optional[int] = None,
    generate_report: bool = False,
    save_data: bool = False,
    cache_file: Optional[str] = "cached_users_events.json"
) -> tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """
    Pull users and events from MongoDB using MongoDBPull utility.
    
    Args:
        users_filter: Optional MongoDB filter for users
        users_limit: Optional limit on number of users
        events_filter: Optional MongoDB filter for events
        events_limit: Optional limit on number of events
        generate_report: Whether to generate reports (default: False)
        save_data: Whether to save data to files (default: False)
        cache_file: Optional path to cache file. If provided and file exists, loads from cache.
                     If file does not exist, pulls from MongoDB and saves to cache file.
                     Set to None to always pull from MongoDB.
        
    Returns:
        Tuple of (users_list, events_list)
    """
    # Check if cache file exists and should be used
    if cache_file and os.path.exists(cache_file):
        print(f"Loading data from cache file: {cache_file}")
        try:
            with open(cache_file, "r") as f:
                cached_data = json.load(f)
                users = cached_data.get("users", [])
                events = cached_data.get("events", [])
                print(f"✓ Loaded {len(users)} users and {len(events)} events from cache")
                return users, events
        except Exception as e:
            print(f"⚠ Error loading cache file: {e}. Pulling from MongoDB instead...")
    
    # Pull from MongoDB if cache not available
    print("Initializing MongoDBPull...")
    mongodb_pull = MongoDBPull()
    
    try:
        print("Pulling users from MongoDB...")
        users = mongodb_pull.users_pull(
            filter=users_filter,
            limit=users_limit,
            generate_report=generate_report,
            save_data=save_data
        )
        print(f"✓ Fetched {len(users)} users")
        
        print("Pulling events from MongoDB...")
        events = mongodb_pull.events_pull(
            filter=events_filter,
            limit=events_limit,
            generate_report=generate_report,
            save_data=save_data
        )
        print(f"✓ Fetched {len(events)} events")
        
        # Save to cache file if cache_file is provided
        if cache_file:
            print(f"Saving data to cache file: {cache_file}")
            try:
                # Convert ObjectIds to strings for JSON serialization
                users_serializable = convert_objectid_to_string(users)
                events_serializable = convert_objectid_to_string(events)
                
                cache_data = {
                    "users": users_serializable,
                    "events": events_serializable
                }
                
                with open(cache_file, "w") as f:
                    json.dump(cache_data, f, indent=2)
                print(f"✓ Saved {len(users)} users and {len(events)} events to cache")
            except Exception as e:
                print(f"⚠ Error saving cache file: {e}")
        
        return users, events
        
    finally:
        # Close connection
        mongodb_pull.close()
        print("✓ MongoDB connection closed")


## Filter Functions

Functions to filter users and events based on criteria.

In [4]:
def format_users_summary_list(users_array: List[Dict[str, Any]]) -> str:
    """
    Creates a numbered list string of user summaries.
    Every user summary is on its own line.
    Used for Fill The Table campaign (multiple users for one event).
    """
    summary_lines = []
    for i, user in enumerate(users_array):
        user_name = f"{user.get('firstName', '')} {user.get('lastName', '')}".strip()
        user_summary = user.get('summary', 'No summary provided')
        summary_lines.append(f"{i}. {user_name}: {user_summary}")
    
    return "\n".join(summary_lines)


def match_prompt_create_fill_table(prompt_template: str, event_object: Dict[str, Any], users_summary_str: str) -> str:
    """
    Creates prompt for Fill The Table campaign (multiple users for one event).
    Uses event_summary and user_summaries.
    """
    event_summary = event_object.get("summary", "")
    
    # Replace variable placeholders
    updated_prompt = prompt_template.replace("event_summary", event_summary)
    updated_prompt = updated_prompt.replace("user_summaries", users_summary_str)
    
    return updated_prompt

In [5]:
def filter_target_users(users_array: List[Dict[str, Any]], target_email: str = "cruzc09@gmail.com") -> List[Dict[str, Any]]:
    """
    
    # Logging: Count input
    input_count = len(users_array)
    print(f"[filter_target_users] Input: {input_count} users")
    Step 1: Filter Users for those whose email matches the target email.
    """
    
    # Logging: Count input
    input_count = len(users_array)
    print(f"[filter_target_users] Input: {input_count} users")
    
    return [user for user in users_array if user.get("email") == target_email]


def convert_objectid_to_string(obj: Any) -> Any:
    """
    Convert ObjectId to string for JSON serialization.
    Handles nested dicts and lists recursively.
    
    Args:
        obj: Object to convert (may be ObjectId, dict, list, or other)
        
    Returns:
        Object with ObjectIds converted to strings.
    """
    if isinstance(obj, ObjectId):
        return str(obj)
    elif isinstance(obj, dict):
        return {k: convert_objectid_to_string(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [convert_objectid_to_string(item) for item in obj]
    else:
        return obj


def filter_future_events(events_array: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Step 2: Filter events for events in the future.
    Handles ISO 8601 datetime strings (e.g., "2025-10-10T17:00:00.000Z" or "2025-10-10").
    """
    
    # Logging: Count input
    input_count = len(events_array)
    print(f"[filter_future_events] Input: {input_count} events")
    
    future_events = []
    today = datetime.date.today()
    
    for event in events_array:
        start_date_str = event.get("startDate")
        if start_date_str:
            try:
                # Handle ISO 8601 datetime strings (with time) or date strings
                if isinstance(start_date_str, str):
                    # Replace 'Z' with '+00:00' for Python's fromisoformat
                    iso_str = start_date_str.replace('Z', '+00:00')
                    # Parse ISO format datetime
                    event_datetime = datetime.datetime.fromisoformat(iso_str)
                    event_date = event_datetime.date()
                elif isinstance(start_date_str, datetime.datetime):
                    # If it's already a datetime object
                    event_date = start_date_str.date()
                elif isinstance(start_date_str, datetime.date):
                    # If it's already a date object
                    event_date = start_date_str
                else:
                    continue
                    
                if event_date > today:
                    future_events.append(event)
            except (ValueError, AttributeError) as e:
                # Skip events with invalid date formats
                print(f"Warning: Could not parse startDate '{start_date_str}' for event {event.get('id', 'unknown')}: {e}")
                continue
                
    
    # Logging: Count output
    output_count = len(future_events)
    print(f"[filter_target_users] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    
    # Logging: Count output
    output_count = len(future_events)
    print(f"[filter_future_events] Output: {output_count} items ({input_count - output_count} filtered out)")
    
    return future_events


## Formatting Functions

Functions to format prompts and event summaries.

In [6]:
def format_events_summary_list(events_array: List[Dict[str, Any]]) -> str:
    """
    Step 3b (Partial): Creates a numbered list string of event summaries.
    Every event summary is on its own line.
    """
    summary_lines = []
    for i, event in enumerate(events_array):
        # We assume the index in this list matches the index required for selection
        summary_lines.append(f"{i}. {event.get('summary', 'No summary provided')}")
    
    return "\n".join(summary_lines)


def match_prompt_create(prompt_template: str, user_object: Dict[str, Any], events_summary_str: str) -> str:
    """
    Step 3c: Takes in prompt along with user_object and formatted events string 
    to return the updated prompt string.
    """
    user_summary = user_object.get("summary", "")
    
    # Replace variable placeholders
    # Note: Assuming literal string replacement based on prompt description
    updated_prompt = prompt_template.replace("user_summary", user_summary)
    updated_prompt = updated_prompt.replace("events_summaries", events_summary_str)
    
    return updated_prompt


def message_prompt_create(prompt_template: str, user_summary: str, event_summary: str) -> str:
    """
    Step 3g (Helper): Takes in prompt along user_summary and event_summary fields 
    to return one formatted prompt.
    """
    updated_prompt = prompt_template.replace("user_summary", user_summary)
    updated_prompt = updated_prompt.replace("event_summary", event_summary)
    
    return updated_prompt

In [12]:
def process_fill_table_campaign(
    users: List[Dict[str, Any]], 
    events: List[Dict[str, Any]],
    match_prompt_template: str,
    message_prompt_template: str,
    events: List[Dict[str, Any]],
        campaign: str = "Fill The Table",

    match_prompt_template: str,
    message_prompt_template: str
) -> List[Dict[str, Any]]:
    """
    Process Fill The Table campaign: Match multiple users to each event.
    This is different from other campaigns - we match multiple users to one event.
    
    Args:
        users: List of user dictionaries
        events: List of event dictionaries
        match_prompt_template: Template for matching prompt
        message_prompt_template: Template for message generation prompt
        campaign: Campaign name (default: "Fill The Table")
    """
    generated_messages = []
    
    # Format users summary list once
    users_summaries_str = format_users_summary_list(users)
    
    # Process each event
    for event in events:
        # Create matching prompt for this event with all users
        filled_match_prompt = match_prompt_create_fill_table(
            match_prompt_template, 
            event, 
            users_summaries_str
        )
        
        # Get matches (array of users for this event)
        match_output_str = ai_generate(filled_match_prompt)
        matches_data = json.loads(match_output_str)
        
        # matches_data should be an array of matches
        if not isinstance(matches_data, list):
            matches_data = [matches_data]
        
        # Process each match
        for match in matches_data:
            user_name = match.get("user_name", "")
            
            # Find user by name
            user = next(
                (u for u in users if f"{u.get('firstName', '')} {u.get('lastName', '')}".strip() == user_name),
                None
            )
            
            if not user:
                print(f"Warning: Could not find user {user_name}")
                continue
            
            match_reasoning = match.get("reasoning", "")
            match_confidence = match.get("confidence", 0)
            
            # Generate message for this user-event pair
            user_summary = user.get("summary", "")
            event_summary = event.get("summary", "")
            
            filled_message_prompt = message_prompt_create(
                message_prompt_template,
                user_summary,
                event_summary
            )
            
            message_output_str = ai_generate(filled_message_prompt)
            message_data = json.loads(message_output_str)
            
            # Get the message text and append the event link programmatically
            message_text = message_data.get("message", "").strip()
            event_id = str(event.get("id") or event.get("_id", ""))
            event_link = f"https://cucu.li/bookings/{event_id}"
            
            # Append link to message if not already present
            if event_link not in message_text:
                message_text = f"{message_text} {event_link}"
            
            
            # Create Airtable record
            event_id = str(event.get("id") or event.get("_id", ""))
            # Extract event_date from startDate
            event_date_value = None
            start_date = event.get("startDate")
            if start_date:
                try:
                    if isinstance(start_date, str):
                        # Handle ISO 8601 datetime strings
                        iso_str = start_date.replace("Z", "+00:00")
                        event_datetime = datetime.datetime.fromisoformat(iso_str)
                        event_date_value = event_datetime.isoformat()
                    elif isinstance(start_date, datetime.datetime):
                        event_date_value = start_date.isoformat()
                    elif isinstance(start_date, datetime.date):
                        event_date_value = datetime.datetime.combine(start_date, datetime.time()).isoformat()
                except (ValueError, AttributeError) as e:
                    print(f"Warning: Could not parse startDate for event {event_id}: {e}")
            payload = {
                "user_name": f"{user.get("firstName", "")} {user.get("lastName", "")}".strip(),
                "event_name": event.get("name", ""),
                "user_id": user.get("id") or user.get("_id"),
                "event_id": event_id,
                "user_email": user.get("email", ""),
                "user_phone": user.get("phone", ""),
                "user_summary": user_summary,
                "event_summary": event_summary,
                "event_date": event_date_value,
                "filled_message_prompt": filled_message_prompt,
                "match_reasoning": match_reasoning,
                "match_confidence": match_confidence,
                "generated_message": message_text,
                "message_reasoning": message_data.get("reasoning", ""),
                "message_confidence": message_data.get("confidence", 0),
                "campaign": campaign,
                "event_name": event.get("name", ""),
                "user_id": user.get("id") or user.get("_id"),
                "event_id": event_id,
                "user_email": user.get("email", ""),
                "user_phone": user.get("phone", ""),
                "user_summary": user_summary,
                "event_summary": event_summary,
                "filled_message_prompt": filled_message_prompt,
                "match_reasoning": match_reasoning,
                "match_confidence": match_confidence,
                "generated_message": message_text,
                "message_reasoning": message_data.get("reasoning", ""),
                "message_confidence": message_data.get("confidence", 0),
                "campaign": campaign
            }


SyntaxError: duplicate argument 'events' in function definition (1896334263.py, line 6)

## AI Generation Wrapper

Wrapper function to use the real ai_generate_meta_tag_parse utility.

In [None]:
def ai_generate(prompt: str) -> str:
    """
    Wrapper around ai_generate_meta_tag_parse that returns a JSON string.
    This maintains compatibility with the existing code that expects JSON strings.
    """
    # Hardcoded API key from .env file
    api_key = "YOUR_ANTHROPIC_API_KEY_HERE"
    
    # Log the request (prompt)
    print("=" * 80)
    print("[AI Generate] REQUEST:")
    print("=" * 80)
    print(prompt)
    print("=" * 80)
    
    # Call the real AI generate function with the API key
    result = ai_generate_meta_tag_parse(prompt=prompt, api_key=api_key)
    
    # Log the response
    result_json_str = json.dumps(result, indent=2)
    print("[AI Generate] RESPONSE:")
    print("=" * 80)
    print(result_json_str)
    print("=" * 80)
    
    # Convert result to JSON string
    return result_json_str

## Campaign Prompt Templates

Prompt templates for the three campaign segments: Fill The Table, Return To Table, and Seat Newcomers.

In [None]:
# Fill The Table Campaign Prompts

fill_table_match_prompt = """You are an expert event marketer focused on filling underbooked events.

PRIORITY: Match users to this event which has LOW participation. Your goal is to maximize attendance for events that need more participants.

MATCHING CRITERIA (in order of importance):
1. Spots left and urgency (event starts soon)
2. Interest alignment between user interests and event categories/features
3. Location proximity (user neighborhood vs event neighborhood)
4. User engagement history (event attendance count + recency)
5. Professional background relevance

Return a JSON array of match objects. Each object must contain:
- 'user_name': The user's full name (exact match from list below)
- 'event_name': The event name
- 'reasoning': Brief explanation focusing on why this match helps fill the event AND why the user would be interested
- 'confidence': Number 0-100 (prioritize matches for low-fill events)

Select the best 5-10 matches for this event. Higher confidence scores should go to better matches.

Event:
event_summary

Users:
user_summaries

Return only the JSON array, no additional text."""


fill_table_message_prompt = """You are an expert SMS copywriter specializing in high-conversion, personalized messages for a social dining app.

GOAL: Drive RSVPs and attendance (fill underbooked events). Motivate immediate action.

SMS BEST PRACTICES:
1) LENGTH: <180 chars total (including link that will be appended). Be concise.
2) TONE: Friendly, concise, 0–2 relevant emojis. URGENT, scarcity-driven.
3) STRUCTURE: [Greeting + Name] [Hook tied to interests/occupation/location] [Spots left + time urgency + social proof] [CTA].
4) SCARCITY: Make spots left/time explicit; small-group feel when true.
5) SOCIAL PROOF: Mention participants already in if available.
6) PROXIMITY: Call out neighborhood convenience explicitly.
7) CTA: End with action phrase like "Tap to RSVP" or "Join us" (link will be appended automatically).
8) AVOID: ALL CAPS, multiple questions, generic hype, long sentences.

TWILIO RULES:
- Banned words: poker, casino, gambling, betting, marijuana, cannabis, CBD, crypto → use alternatives
- Limits: 0 emojis = <150 chars, 1-2 emojis = <65 chars (link will be appended, so leave room)
- Do NOT include any links in your message - a link will be appended automatically
- Max 2 emojis if relevant

User: user_summary
Event: event_summary

Return JSON:
{
  "message": "text ending with CTA (no link)",
  "reasoning": "explanation",
  "confidence": <0-100>
}"""

In [None]:
# Return To Table Campaign Prompts

return_table_match_prompt = """You are an expert user reactivation specialist focused on re-engaging dormant users.

PRIORITY: Find the SINGLE BEST event for this dormant user (31-90 days inactive) that will re-engage them and drive an RSVP.

MATCHING CRITERIA (in order of importance):
1. Interest alignment: User interests MUST match event categories/features (this is critical)
2. Location proximity: Prefer events in or near user's neighborhood
3. Event quality: Prefer events with HIGH participation (50-100% filled) to show social proof and quality
4. Reactivation potential: Events similar to their past attendance patterns
5. Event timing: Prefer events happening soon (creates urgency for reactivation)
6. Welcome back vibe: Events that feel welcoming for returning users

IMPORTANT:
- Return ONLY the SINGLE BEST match (not multiple)
- Prioritize events with higher participation (50-100% filled) over empty events
- The goal is to reactivate this user with a high-quality, engaging event
- Quality and relevance are more important than filling empty events

Return a JSON object (not array) with:
- 'event_index': The index number from the events list below (0-based)
- 'reasoning': Detailed explanation (3-4 sentences) focusing on why this specific event will reactivate THIS user, how interests align, why the event quality/participation makes it appealing, and why location/timing work for reactivation
- 'confidence': Number 0-100 (should be 80+ for a good match)

User: user_summary

Events:
events_summaries

Return only the JSON object, no additional text."""


return_table_message_prompt = """You are an expert SMS copywriter specializing in user reactivation for a social dining app.

GOAL: Re-engage dormant users (31-90 days inactive) and drive RSVPs to future events.

SMS BEST PRACTICES:
1) LENGTH: <180 chars total (including link that will be appended). Be concise.
2) TONE: Warm, welcoming, friend-like. Acknowledge time away without being pushy. NOSTALGIC.
3) STRUCTURE: [Greeting + Name] [Welcome back hook] [Event hook by interests/occupation/location] [Spots left + time urgency + social proof] [CTA].
4) REACTIVATION: Acknowledge they haven't been around ("We miss you!", "Welcome back!", "It's been a while!").
5) PERSONALIZATION: Reference specific interests, neighborhood convenience, occupation if relevant.
6) SCARCITY: Make spots left/time explicit; create urgency for reactivation.
7) SOCIAL PROOF: Mention participants already in if available.
8) CTA: End with action phrase like "Tap to RSVP" or "Welcome back!" (link will be appended automatically).
9) AVOID: ALL CAPS, multiple questions, generic hype, long sentences.

TWILIO RULES:
- Banned words: poker, casino, gambling, betting, marijuana, cannabis, CBD, crypto → use alternatives
- Limits: 0 emojis = <150 chars, 1-2 emojis = <65 chars (link will be appended, so leave room)
- Do NOT include any links in your message - a link will be appended automatically
- Max 2 emojis if relevant

User: user_summary
Event: event_summary

Return JSON:
{
  "message": "text ending with CTA (no link)",
  "reasoning": "explanation",
  "confidence": <0-100>
}"""

In [None]:
# Seat Newcomers Campaign Prompts

seat_newcomers_match_prompt = """You are an expert user onboarding specialist focused on converting new users to their first event attendance.

PRIORITY: Find the SINGLE BEST event for this newcomer that will convert them to RSVP to their FIRST table.

MATCHING CRITERIA (in order of importance):
1. Interest alignment: User interests MUST match event categories/features (this is critical for first-timers)
2. Location proximity: Prefer events in or near user's neighborhood for convenience
3. Event welcomingness: Prefer events with GOOD participation (50-80% filled) to show quality without feeling exclusive
4. Beginner-friendly: Consider events with welcoming descriptions, group-friendly features
5. Event timing: Prefer events happening soon (creates urgency for first RSVP)
6. First-time appeal: Events that feel welcoming and not intimidating for newcomers

IMPORTANT:
- Return ONLY the SINGLE BEST match (not multiple)
- This is their FIRST (or one of their first) events - make it count!
- Prioritize events with moderate participation (50-80% filled) - good social proof, not too exclusive
- The goal is to convert this user to their first RSVP
- Quality and relevance are critical for first-time conversion

Return a JSON object (not array) with:
- 'event_index': The index number from the events list below (0-based)
- 'reasoning': Detailed explanation (3-4 sentences) focusing on why this specific event is perfect for THIS user's FIRST table, how interests align (critical for first-timers), why the event is welcoming and beginner-friendly, and how location/timing work for first-time attendance
- 'confidence': Number 0-100 (should be 80+ for a good match)

User: user_summary

Events:
events_summaries

Return only the JSON object, no additional text."""


seat_newcomers_message_prompt = """You are an expert SMS copywriter specializing in converting new users to their first event attendance.

GOAL: Convert newcomers (0-2 events attended) to RSVP to their FIRST table.

SMS BEST PRACTICES:
1) LENGTH: <180 chars total (including link that will be appended). Be concise.
2) TONE: Warm, welcoming, encouraging. Make them feel excited about their first event. WELCOMING.
3) STRUCTURE: [Greeting + Name] [Welcome to community hook] [Event hook by interests/occupation/location] [Spots left + time urgency + social proof] [CTA].
4) FIRST-TIME FOCUS: Welcome them ("Welcome to Cuculi!", "Ready for your first table?", "Join us for your first event!").
5) PERSONALIZATION: Reference specific interests, neighborhood convenience, occupation if relevant.
6) WELCOMING: Emphasize that the event is welcoming, beginner-friendly, perfect for first-timers.
7) SCARCITY: Make spots left/time explicit; create urgency for first RSVP.
8) SOCIAL PROOF: Mention participants already in if available (shows community is active)
9) CTA: End with action phrase like "Tap to RSVP" or "Join us!" (link will be appended automatically).
10) AVOID: ALL CAPS, multiple questions, generic hype, long sentences.

TWILIO RULES:
- Banned words: poker, casino, gambling, betting, marijuana, cannabis, CBD, crypto → use alternatives
- Limits: 0 emojis = <150 chars, 1-2 emojis = <65 chars (link will be appended, so leave room)
- Do NOT include any links in your message - a link will be appended automatically
- Max 2 emojis if relevant

User: user_summary
Event: event_summary

Return JSON:
{
  "message": "text ending with CTA (no link)",
  "reasoning": "explanation",
  "confidence": <0-100>
}"""

## Airtable Message Creation Wrapper

Wrapper function to use the real upload_message_to_airtable utility.

In [None]:
def airtable_message_create(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    Wrapper around upload_message_to_airtable that returns a dict with 'id' field.
    This maintains compatibility with the existing code that expects a dict response.
    """
    user_email = record.get("user_email", "unknown")
    event_id = record.get("event_id", "unknown")
    print(f"[Airtable] Creating message record for user: {user_email}, event: {event_id}")
    
    # Call the real Airtable upload function
    success, record_id = upload_message_to_airtable(record)
    
    if success and record_id:
        # Return the record with the Airtable ID
        record['id'] = record_id
        print(f"[Airtable] ✓ Successfully created message record: {record_id}")
        return record
    else:
        # If upload failed, still return the record but log the error
        print(f"[Airtable] ✗ Warning: Failed to upload message to Airtable for user: {user_email}")
        record['id'] = None
        return record

## Main Processing Function

Orchestrates the user-event matching and message generation workflow.

In [None]:
def process_user_event_generation(
    users: List[Dict[str, Any]], 
    events: List[Dict[str, Any]],
    match_prompt_template: str,
    message_prompt_template: str,
    campaign: str = "Return To Table"
) -> List[Dict[str, Any]]:
    """
    Orchestrates Step 3 (3a - 3h):
    Loops through users, matches them to events using AI, generates messages,
    and uploads to Airtable.
    
    Args:
        users: List of user dictionaries
        events: List of event dictionaries
        match_prompt_template: Template for matching prompt
        message_prompt_template: Template for message generation prompt
        campaign: Campaign name (default: "Return To Table")
    """
    generated_messages = []
    
    # Pre-calculate the events summary string once if the list matches for all users
    # Step 3b: Define events_summaries
    events_summaries_str = format_events_summary_list(events)

    for user in users:
        # Step 3a: We have user and events_array available
        
        # Step 3b: Define user_summary
        user_summary = user.get("summary", "")

        # Step 3c: Replace string literal variables to update prompt
        filled_match_prompt = match_prompt_create(match_prompt_template, user, events_summaries_str)

        # Step 3d: Pass prompt to ai_generate
        match_output_str = ai_generate(filled_match_prompt)

        # Step 3e: Parse the string into json object
        match_data = json.loads(match_output_str)

        # Step 3f: Retain reasoning, confidence. Identify event_object by index.
        event_index = match_data.get("event_index")
        
        # Validate index existence to avoid runtime list index errors (optional but good practice)
        # However, requirements say "Let failures happen naturally", so we proceed directly.
        selected_event = events[event_index]
        match_reasoning = match_data.get("reasoning")
        match_confidence = match_data.get("confidence")

        # Step 3g: Replace event_summary and user_summary within message_generation_prompt
        filled_message_prompt = message_prompt_create(
            message_prompt_template, 
            user_summary, 
            selected_event.get("summary", "")
        )

        # Run AI Generate for the actual message
        # (This was implied by "so that you can run it within the ai_generate" in step 3g)
        message_output_str = ai_generate(filled_message_prompt)
        message_data = json.loads(message_output_str)

        # Get the message text and append the event link programmatically
        message_text = message_data.get("message", "").strip()
        event_id = str(selected_event.get("id") or selected_event.get("_id", ""))
        event_link = f"https://cucu.li/bookings/{event_id}"
        
        # Append link to message if not already present
        if event_link not in message_text:
            message_text = f"{message_text} {event_link}"

        # Extract event_date from startDate
        event_date_value = None
        start_date = selected_event.get("startDate")
        if start_date:
            try:
                if isinstance(start_date, str):
                    # Handle ISO 8601 datetime strings
                    iso_str = start_date.replace("Z", "+00:00")
                    event_datetime = datetime.datetime.fromisoformat(iso_str)
                    event_date_value = event_datetime.isoformat()
                elif isinstance(start_date, datetime.datetime):
                    event_date_value = start_date.isoformat()
                elif isinstance(start_date, datetime.date):
                    event_date_value = datetime.datetime.combine(start_date, datetime.time()).isoformat()
            except (ValueError, AttributeError) as e:
                print(f"Warning: Could not parse startDate for event {event_id}: {e}")
        
        # Get user_summary and event_summary
        # Get event_summary (user_summary already defined above)
        event_summary = selected_event.get("summary", "")

        event_summary = selected_event.get("summary", "")

        # Step 3h: Run airtable_message_create
        # Constructing the payload dictionary
        payload = {
            "user_name": f"{user.get("firstName", "")} {user.get("lastName", "")}".strip(),
            "event_name": selected_event.get("name", ""),
            "user_id": user.get("id") or user.get("_id"),
            "event_id": event_id,
            "user_email": user.get("email", ""),
            "user_phone": user.get("phone", ""),
            "user_summary": user_summary,
            "event_summary": event_summary,
            "event_date": event_date_value,
            "filled_message_prompt": filled_message_prompt,
            "match_reasoning": match_reasoning,
            "match_confidence": match_confidence,
            "generated_message": message_text,
            "message_reasoning": message_data.get("reasoning", ""),
            "message_confidence": message_data.get("confidence", 0),
            "campaign": campaign
        }
        
        record = airtable_message_create(payload)
        generated_messages.append(record)

    return generated_messages

## Main Function

Main orchestrator function that ties everything together.

In [None]:
def main(
    users_array: List[Dict[str, Any]], 
    events_array: List[Dict[str, Any]], 
    match_users_events_prompt: str, 
    message_generation_prompt: str
) -> Dict[str, Any]:
    """
    Main orchestrator function.
    
    Args:
        users_array: List of user dictionaries.
        events_array: List of event dictionaries.
        match_users_events_prompt: Template string for matching users to events.
        message_generation_prompt: Template string for generating the message.
        
    Returns:
        Dictionary containing the processed users, events, and generated messages.
    """
    
    # Step 1: Filter Users
    filtered_users = filter_target_users(users_array)
    
    # Step 2: Filter Events
    filtered_events = filter_future_events(events_array)
    
    # Step 3: Process matching and message generation
    # This encapsulates steps 3a through 3h
    messages = process_user_event_generation(
        filtered_users,
        filtered_events,
        match_users_events_prompt,
        message_generation_prompt
    )
    
    # Output formatting
    output = {
        "users": filtered_users,
        "events": filtered_events,
        "messages": messages
    }
    
    # Convert ObjectIds to strings for JSON serialization
    output = convert_objectid_to_string(output)
    
    return output

## Example Execution

Example of how to use the functions to pull data from MongoDB and run the campaign.

In [None]:
# Example: Pull data from MongoDB
# Uncomment the lines below to pull real data from MongoDB

# users, events = pull_users_and_events(
#     users_limit=10,  # Limit for testing
#     events_limit=10,  # Limit for testing
#     generate_report=False,
#     save_data=False
# )
#filtered_users = filter_target_users(users)
#filtered_events = filter_future_events(events)
# Sample Prompt Templates
match_prompt = """
Match this user: user_summary
To one of these events:
events_summaries

Return JSON with event_index, reasoning, confidence.
"""

msg_prompt = """
Write SMS for user: user_summary
Event: event_summary

TWILIO RULES:
- Banned words: poker, casino, gambling, betting, marijuana, cannabis, CBD, crypto → use alternatives
- Limits: 0 emojis = <150 chars, 1-2 emojis = <65 chars (link will be appended, so leave room)
- Do NOT include any links in your message - a link will be appended automatically
- Max 2 emojis if relevant

Format: [Greeting] [Interest/location hook] [Urgency: spots] [RSVP: link]

Return JSON:
{
  "message": "text ending with CTA (no link)",
  "reasoning": "explanation",
  "confidence": <0-100>
}
"""

# Execute Main with sample data
# For real execution, replace sample_users and sample_events with users and events from MongoDB
#result = main(filtered_users, filtered_events, match_prompt, msg_prompt)
#result
# Print Result to console (Pretty printed)
#print(json.dumps(result, indent=2))

## Example Execution: Fill The Table Campaign

Example execution for Fill The Table campaign (Yield Management) - matches multiple users to underfilled events.

In [None]:
# Example: Fill The Table Campaign
# Uncomment the lines below to pull real data from MongoDB

users, events = pull_users_and_events(
   # users_limit=50,  # Get more users for Fill The Table
  #  events_limit=20,  # Limit for testing
    generate_report=False,
    save_data=False
)

# #Filter users and events for Fill The Table campaign
filtered_users = filter_fill_table_users(users)
filtered_events = filter_fill_table_events(events)[:1]
# filtered_events[0]
# len(filtered_users)
# len(filtered_events)
#Use Fill The Table prompts
result_messages = process_fill_table_campaign(
    filtered_users,
    filtered_events,
    fill_table_match_prompt,
    fill_table_message_prompt
)
len(result_messages)
# Output formatting
# output = {
#     "users": filtered_users,
#     "events": filtered_events,
#     "messages": result_messages
# }
# output = convert_objectid_to_string(output)

# Print Result to console (Pretty printed)
# print(json.dumps(output, indent=2))

## Example Execution: Return To Table Campaign

Example execution for Return To Table campaign (Reactivation) - matches dormant users to high-quality events.

In [None]:
# Example: Return To Table Campaign
# Uncomment the lines below to pull real data from MongoDB

# users, events = pull_users_and_events(
#     users_limit=50,  # Get more users to find dormant ones
#     events_limit=20,  # Limit for testing
#     generate_report=False,
#     save_data=False
# )

# Filter users and events for Return To Table campaign
# filtered_users = filter_return_table_users(users)
# filtered_events = filter_return_table_events(events)

# Execute Main with Return To Table prompts
# result = main(filtered_users, filtered_events, return_table_match_prompt, return_table_message_prompt)

# Print Result to console (Pretty printed)
# print(json.dumps(result, indent=2))

## Example Execution: Seat Newcomers Campaign

Example execution for Seat Newcomers campaign (Acquisition) - matches newcomers to beginner-friendly events.

In [None]:
# Example: Seat Newcomers Campaign
# Uncomment the lines below to pull real data from MongoDB

# users, events = pull_users_and_events(
#     users_limit=50,  # Get more users to find newcomers
#     events_limit=20,  # Limit for testing
#     generate_report=False,
#     save_data=False
# )

# Filter users and events for Seat Newcomers campaign
# filtered_users = filter_seat_newcomers_users(users)
# filtered_events = filter_seat_newcomers_events(events)

# Execute Main with Seat Newcomers prompts
# result = main(filtered_users, filtered_events, seat_newcomers_match_prompt, seat_newcomers_message_prompt)

# Print Result to console (Pretty printed)
# print(json.dumps(result, indent=2))