In [1]:
from genson import SchemaBuilder

seed_schema = {
    'type': 'array',
    'items': {
        'type': 'object',
        'properties': {
            'mapping': {
                'type': 'object',
                'patternProperties': {
                    # UUID pattern - GenSON will fill in the actual schema
                    r'^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$': None,
                    # Also handle the client-created-root pattern
                    r'^client-created-': None
                }
            }
        }
    }
}
builder = SchemaBuilder()
builder.add_schema(seed_schema)

In [None]:
import sys, os
from pathlib import Path
import json
PATH=str((Path().cwd().parent /'src').absolute())
print(PATH)
if PATH not in sys.path:
    sys.path.append(PATH)
import chat2obs
from chat2obs.process_chatgpt import load_conversations, convs_to_articles

root = "../data/ingestion/chatgpt/a40ff5f79c1b3edd3c366f0f628fb79170bae83ecf3a1758b5b258c71f843f53-2025-06-05-03-28-15-df2ed357a4e64443bf464446686c9692/"
fpath = Path(root) / "conversations.json"
convs = json.load(fpath.open())

builder.add_object(convs)


/Users/dmarx/proj/chat2obs/src


In [3]:
builder.to_schema()

{'$schema': 'http://json-schema.org/schema#',
 'type': 'array',
 'items': {'type': 'object',
  'properties': {'mapping': {'type': 'object',
    'patternProperties': {'^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$': {'type': 'object',
      'properties': {'id': {'type': 'string'},
       'message': {'anyOf': [{'type': 'null'},
         {'type': 'object',
          'properties': {'id': {'type': 'string'},
           'author': {'type': 'object',
            'properties': {'role': {'type': 'string'},
             'name': {'type': ['null', 'string']},
             'metadata': {'type': 'object',
              'properties': {'real_author': {'type': 'string'}}}},
            'required': ['metadata', 'name', 'role']},
           'create_time': {'type': ['null', 'number']},
           'update_time': {'type': ['null', 'number']},
           'content': {'type': 'object',
            'properties': {'content_type': {'type': 'string'},
             'parts': {'type': 'a

In [4]:
conv_seed_schema = {
        'type': 'object',
        'properties': {
            'mapping': {
                'type': 'object',
                'patternProperties': {
                    # UUID pattern - GenSON will fill in the actual schema
                    r'^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$': None,
                    # Also handle the client-created-root pattern
                    r'^client-created-': None
                }
            }
        }
    }

c = convs[2]

conv_schema = SchemaBuilder()
conv_schema.add_schema(conv_seed_schema)
conv_schema.add_object(c)
conv_schema.to_schema()

{'$schema': 'http://json-schema.org/schema#',
 'type': 'object',
 'properties': {'mapping': {'type': 'object',
   'patternProperties': {'^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$': {'type': 'object',
     'properties': {'id': {'type': 'string'},
      'message': {'type': 'object',
       'properties': {'id': {'type': 'string'},
        'author': {'type': 'object',
         'properties': {'role': {'type': 'string'},
          'name': {'type': ['null', 'string']},
          'metadata': {'type': 'object',
           'properties': {'real_author': {'type': 'string'}}}},
         'required': ['metadata', 'name', 'role']},
        'create_time': {'type': ['null', 'number']},
        'update_time': {'type': 'null'},
        'content': {'type': 'object',
         'properties': {'content_type': {'type': 'string'},
          'parts': {'type': 'array', 'items': {'type': 'string'}},
          'thoughts': {'type': 'array',
           'items': {'type': 'object',
  

In [5]:
# conversation_tagger.py
"""
Flexible conversation tagging system for LLM conversation analysis.
Supports base tags (from content) and supplemental tags (from other tags).
"""

from typing import Dict, Any, List, Callable, Set
from collections import defaultdict


class ConversationTagger:
    """
    Flexible system for tagging conversations with multiple criteria.
    Supports base rules (applied directly to conversations) and 
    supplemental rules (applied based on existing tags).
    """
    
    def __init__(self):
        self.base_rules: Dict[str, Callable] = {}
        self.supplemental_rules: Dict[str, Callable] = {}
        self.rule_descriptions: Dict[str, str] = {}
    
    def add_base_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """
        Add a base tagging rule that analyzes conversation content.
        
        Args:
            tag_name: Name of the tag to apply
            rule_function: Function that takes (conversation) -> bool
            description: Human-readable description of the rule
        """
        self.base_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def add_supplemental_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """
        Add a supplemental rule that depends on existing tags.
        
        Args:
            tag_name: Name of the tag to apply  
            rule_function: Function that takes (conversation, current_tags) -> bool
            description: Human-readable description of the rule
        """
        self.supplemental_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def tag_conversation(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
        """
        Apply all tagging rules to a conversation.
        
        Returns:
            Dict with 'tags', 'debug_info', and original conversation data
        """
        tags = set()
        debug_info = defaultdict(list)
        
        # Apply base rules
        for tag_name, rule_func in self.base_rules.items():
            try:
                if rule_func(conversation):
                    tags.add(tag_name)
                    debug_info['applied_rules'].append(f"BASE: {tag_name}")
                else:
                    debug_info['skipped_rules'].append(f"BASE: {tag_name}")
            except Exception as e:
                debug_info['errors'].append(f"BASE: {tag_name} - {str(e)}")
        
        # Apply supplemental rules (may need multiple passes)
        max_iterations = 5
        for iteration in range(max_iterations):
            initial_tag_count = len(tags)
            
            for tag_name, rule_func in self.supplemental_rules.items():
                if tag_name in tags:  # Already applied
                    continue
                    
                try:
                    if rule_func(conversation, tags):
                        tags.add(tag_name)
                        debug_info['applied_rules'].append(f"SUPP: {tag_name} (iter {iteration})")
                    else:
                        debug_info['skipped_rules'].append(f"SUPP: {tag_name} (iter {iteration})")
                except Exception as e:
                    debug_info['errors'].append(f"SUPP: {tag_name} - {str(e)}")
            
            # If no new tags were added, we're done
            if len(tags) == initial_tag_count:
                break
        
        return {
            'conversation_id': conversation.get('conversation_id', conversation.get('id', 'unknown')),
            'title': conversation.get('title', 'Untitled'),
            'tags': list(tags),
            'debug_info': dict(debug_info),
            'conversation': conversation
        }
    
    def tag_conversations(self, conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Tag multiple conversations and return results."""
        return [self.tag_conversation(conv) for conv in conversations]
    
    def filter_by_tags(self, tagged_conversations: List[Dict[str, Any]], 
                      include_tags: Set[str] = None, 
                      exclude_tags: Set[str] = None) -> List[Dict[str, Any]]:
        """
        Filter conversations by tags.
        
        Args:
            tagged_conversations: List of tagged conversation results
            include_tags: Only return conversations with ALL these tags
            exclude_tags: Exclude conversations with ANY of these tags
        """
        filtered = []
        
        for tagged_conv in tagged_conversations:
            conv_tags = set(tagged_conv['tags'])
            
            # Check exclusions first
            if exclude_tags and any(tag in conv_tags for tag in exclude_tags):
                continue
                
            # Check inclusions
            if include_tags and not include_tags.issubset(conv_tags):
                continue
                
            filtered.append(tagged_conv)
        
        return filtered
    
    def print_summary(self, tagged_conversations: List[Dict[str, Any]]):
        """Print a summary of tagging results."""
        total = len(tagged_conversations)
        tag_counts = defaultdict(int)
        
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                tag_counts[tag] += 1
        
        print(f"Tagged {total} conversations")
        print(f"Tag distribution:")
        for tag, count in sorted(tag_counts.items()):
            percentage = (count / total) * 100
            print(f"  {tag}: {count} ({percentage:.1f}%)")
    
    def debug_conversation(self, conversation: Dict[str, Any]):
        """Debug a single conversation - show which rules fire."""
        result = self.tag_conversation(conversation)
        
        print(f"Conversation: {result['title'][:50]}...")
        print(f"Tags applied: {result['tags']}")
        print(f"\nRule details:")
        
        for rule in result['debug_info'].get('applied_rules', []):
            print(f"  ✅ {rule}")
            
        for rule in result['debug_info'].get('skipped_rules', []):
            print(f"  ❌ {rule}")
            
        for error in result['debug_info'].get('errors', []):
            print(f"  ⚠️  ERROR: {error}")


# Pre-defined rule functions for common patterns
def has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if conversation has unusually large content (suggests context sharing)."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        if node_id.startswith('client-created'):
            continue
            
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        text = content.get('text', '')
        if len(text) > min_length:
            return True
            
        parts = content.get('parts', [])
        for part in parts:
            if isinstance(part, str) and len(part) > min_length:
                return True
    
    return False


def has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for clear code patterns in conversation."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        if node_id.startswith('client-created'):
            continue
            
        message = node.get('message')
        if not message:
            continue
        
        # Check content
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Strong code indicators
        code_indicators = [
            '```',  # Code blocks
            'def ', 'function ', 'class ',  # Definitions
            'import ', 'from ', 'require(',  # Imports
            '#!/bin/', '#include',  # Script headers
        ]
        
        if any(indicator in all_text for indicator in code_indicators):
            return True
    
    return False


def has_github_repos(conversation: Dict[str, Any]) -> bool:
    """Check if GitHub repositories were selected for context."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        if node_id.startswith('client-created'):
            continue
            
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        repos = metadata.get('selected_github_repos', [])
        if repos:  # Non-empty list
            return True
    
    return False


def has_canvas_operations(conversation: Dict[str, Any]) -> bool:
    """Check for canvas/document operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        if node_id.startswith('client-created'):
            continue
            
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('canvas'):
            return True
    
    return False


def has_search_operations(conversation: Dict[str, Any]) -> bool:
    """Check for web search operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        if node_id.startswith('client-created'):
            continue
            
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('search_queries') or 
            metadata.get('search_result_groups') or
            metadata.get('content_references')):
            return True
    
    return False


def has_reasoning_thoughts(conversation: Dict[str, Any]) -> bool:
    """Check for reasoning/thinking patterns."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        if node_id.startswith('client-created'):
            continue
            
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        if content.get('thoughts'):  # Reasoning thoughts
            return True
    
    return False


def get_first_user_message(conversation: Dict[str, Any]) -> Dict[str, Any]:
    """Find the first user message in the conversation."""
    mapping = conversation.get('mapping', {})
    user_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        author = message.get('author', {})
        if author.get('role') == 'user':
            create_time = message.get('create_time')
            user_messages.append((create_time or 0, message, node))
    
    if not user_messages:
        return None
    
    # Sort by create_time and return the first user message
    user_messages.sort(key=lambda x: x[0])
    return user_messages[0][1]  # Return the message object


def first_user_has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if the first user message has large content."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    if len(text) > min_length:
        return True
        
    parts = content.get('parts', [])
    for part in parts:
        if isinstance(part, str) and len(part) > min_length:
            return True
    
    return False


def first_user_has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message contains code patterns."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    parts = content.get('parts', [])
    all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
    
    # Strong code indicators
    code_indicators = [
        '```',  # Code blocks
        'def ', 'function ', 'class ',  # Definitions
        'import ', 'from ', 'require(',  # Imports
        '#!/bin/', '#include',  # Script headers
    ]
    
    return any(indicator in all_text for indicator in code_indicators)


def first_user_has_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    return len(attachments) > 0


# Example usage setup
def create_default_tagger() -> ConversationTagger:
    """Create a tagger with common rules pre-configured."""
    tagger = ConversationTagger()
    
    # Base content analysis rules (anywhere in conversation)
    tagger.add_base_rule(
        'large_content',
        lambda conv: has_large_content(conv, 2000),
        'Content longer than 2000 characters (anywhere in conversation)'
    )
    
    tagger.add_base_rule(
        'code_patterns', 
        has_code_patterns,
        'Contains clear code patterns (```, def, function, etc.)'
    )
    
    tagger.add_base_rule(
        'github_context',
        has_github_repos,
        'GitHub repositories selected for context'
    )
    
    tagger.add_base_rule(
        'canvas_operations',
        has_canvas_operations,
        'Uses canvas/document features'
    )
    
    tagger.add_base_rule(
        'web_search',
        has_search_operations,
        'Includes web search functionality'
    )
    
    tagger.add_base_rule(
        'reasoning',
        has_reasoning_thoughts,
        'Contains reasoning/thinking content'
    )
    
    # First user message specific rules
    tagger.add_base_rule(
        'starts_large_content',
        lambda conv: first_user_has_large_content(conv, 2000),
        'First user message has large content (>2000 chars)'
    )
    
    tagger.add_base_rule(
        'starts_code_patterns',
        first_user_has_code_patterns,
        'First user message contains code patterns'
    )
    
    tagger.add_base_rule(
        'starts_with_attachments',
        first_user_has_attachments,
        'First user message has attachments'
    )
    
    # Supplemental rules based on tag combinations
    tagger.add_supplemental_rule(
        'coding_assistance',
        lambda conv, tags: ('code_patterns' in tags or 'github_context' in tags),
        'Likely coding assistance (has code patterns or GitHub context)'
    )
    
    tagger.add_supplemental_rule(
        'coding_assistance_start',
        lambda conv, tags: ('starts_code_patterns' in tags or 
                           'starts_large_content' in tags or 
                           'starts_with_attachments' in tags),
        'Likely coding assistance based on how conversation starts'
    )
    
    tagger.add_supplemental_rule(
        'research_session',
        lambda conv, tags: ('web_search' in tags and 'large_content' in tags),
        'Research session (web search + substantial content)'
    )
    
    tagger.add_supplemental_rule(
        'complex_analysis',
        lambda conv, tags: ('reasoning' in tags and len(tags) >= 3),
        'Complex analysis (reasoning + multiple other features)'
    )
    
    tagger.add_supplemental_rule(
        'context_heavy_start',
        lambda conv, tags: ('starts_large_content' in tags or 'starts_with_attachments' in tags),
        'Conversation starts with substantial context (large content or files)'
    )
    
    return tagger


# Jupyter notebook usage example:
"""
# Load your conversations (assuming you have them in memory)
# conversations = your_conversation_data

# Create and configure tagger
tagger = create_default_tagger()

# Tag all conversations
tagged_results = tagger.tag_conversations(conversations)

# Print summary
tagger.print_summary(tagged_results)

# Filter for different types of conversations
coding_conversations = tagger.filter_by_tags(
    tagged_results, 
    include_tags={'coding_assistance'}
)

coding_start_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags={'coding_assistance_start'}
)

context_heavy_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags={'context_heavy_start'}
)

# Exclude conversations that start with large content or attachments
lightweight_conversations = tagger.filter_by_tags(
    tagged_results,
    exclude_tags={'starts_large_content', 'starts_with_attachments'}
)

# Debug a specific conversation to see why it was tagged
tagger.debug_conversation(conversations[0])

# Add custom rules for your specific needs
tagger.add_base_rule(
    'starts_with_question',
    lambda conv: get_first_user_message(conv) and 
                 get_first_user_message(conv).get('content', {}).get('text', '').strip().endswith('?'),
    'Conversation starts with a question'
)

# Example: Find conversations that start with large content but aren't coding
large_non_coding = tagger.filter_by_tags(
    tagged_results,
    include_tags={'starts_large_content'},
    exclude_tags={'coding_assistance', 'coding_assistance_start'}
)

print(f"Found {len(large_non_coding)} conversations with large initial content that aren't coding-related")
"""

'\n# Load your conversations (assuming you have them in memory)\n# conversations = your_conversation_data\n\n# Create and configure tagger\ntagger = create_default_tagger()\n\n# Tag all conversations\ntagged_results = tagger.tag_conversations(conversations)\n\n# Print summary\ntagger.print_summary(tagged_results)\n\n# Filter for different types of conversations\ncoding_conversations = tagger.filter_by_tags(\n    tagged_results, \n    include_tags={\'coding_assistance\'}\n)\n\ncoding_start_conversations = tagger.filter_by_tags(\n    tagged_results,\n    include_tags={\'coding_assistance_start\'}\n)\n\ncontext_heavy_conversations = tagger.filter_by_tags(\n    tagged_results,\n    include_tags={\'context_heavy_start\'}\n)\n\n# Exclude conversations that start with large content or attachments\nlightweight_conversations = tagger.filter_by_tags(\n    tagged_results,\n    exclude_tags={\'starts_large_content\', \'starts_with_attachments\'}\n)\n\n# Debug a specific conversation to see why it 

In [6]:
tagger = create_default_tagger()

# Tag all conversations
tagged_results = tagger.tag_conversations(convs)

# Print summary
tagger.print_summary(tagged_results)



Tagged 1673 conversations
Tag distribution:
  canvas_operations: 3 (0.2%)
  code_patterns: 1502 (89.8%)
  coding_assistance: 1502 (89.8%)
  coding_assistance_start: 260 (15.5%)
  complex_analysis: 8 (0.5%)
  context_heavy_start: 127 (7.6%)
  large_content: 1362 (81.4%)
  reasoning: 12 (0.7%)
  research_session: 53 (3.2%)
  starts_code_patterns: 240 (14.3%)
  starts_large_content: 119 (7.1%)
  starts_with_attachments: 8 (0.5%)
  web_search: 66 (3.9%)


In [9]:
# conversation_tagger.py
"""
Enhanced conversation tagging system with structured tags supporting key-value attributes.
Tags can be simple strings or objects with metadata for better analysis and filtering.
"""

from typing import Dict, Any, List, Callable, Set, Union
from collections import defaultdict
import json


class Tag:
    """Represents a tag with optional key-value attributes."""
    
    def __init__(self, name: str, **attributes):
        self.name = name
        self.attributes = attributes
    
    def __str__(self):
        if self.attributes:
            attrs_str = ", ".join(f"{k}={v}" for k, v in self.attributes.items())
            return f"{self.name}({attrs_str})"
        return self.name
    
    def __repr__(self):
        return f"Tag('{self.name}', {self.attributes})"
    
    def __eq__(self, other):
        if isinstance(other, str):
            return self.name == other
        elif isinstance(other, Tag):
            return self.name == other.name and self.attributes == other.attributes
        return False
    
    def __hash__(self):
        return hash((self.name, tuple(sorted(self.attributes.items()))))
    
    def matches(self, name: str, **criteria) -> bool:
        """Check if tag matches name and optional attribute criteria."""
        if self.name != name:
            return False
        
        for key, value in criteria.items():
            if key not in self.attributes:
                return False
            
            attr_value = self.attributes[key]
            
            # Support comparison operators
            if isinstance(value, dict):
                for op, target in value.items():
                    if op == 'gt' and not (attr_value > target):
                        return False
                    elif op == 'gte' and not (attr_value >= target):
                        return False
                    elif op == 'lt' and not (attr_value < target):
                        return False
                    elif op == 'lte' and not (attr_value <= target):
                        return False
                    elif op == 'eq' and not (attr_value == target):
                        return False
                    elif op == 'ne' and not (attr_value != target):
                        return False
                    elif op == 'in' and not (attr_value in target):
                        return False
            else:
                # Direct equality
                if attr_value != value:
                    return False
        
        return True


class ConversationTagger:
    """
    Enhanced tagging system supporting structured tags with attributes.
    """
    
    def __init__(self):
        self.base_rules: Dict[str, Callable] = {}
        self.multi_tag_rules: Dict[str, Callable] = {}
        self.supplemental_rules: Dict[str, Callable] = {}
        self.rule_descriptions: Dict[str, str] = {}
    
    def add_base_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a base tagging rule that returns bool or Tag object."""
        self.base_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def add_multi_tag_rule(self, rule_name: str, rule_function: Callable, description: str = ""):
        """Add a rule that returns multiple tags (strings or Tag objects)."""
        self.multi_tag_rules[rule_name] = rule_function
        self.rule_descriptions[rule_name] = description
    
    def add_supplemental_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a supplemental rule that depends on existing tags."""
        self.supplemental_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def _normalize_tag(self, tag: Union[str, Tag]) -> Tag:
        """Convert string tags to Tag objects."""
        if isinstance(tag, str):
            return Tag(tag)
        return tag
    
    def tag_conversation(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
        """Apply all tagging rules to a conversation."""
        tags = set()
        debug_info = defaultdict(list)
        
        # Apply base rules
        for tag_name, rule_func in self.base_rules.items():
            try:
                result = rule_func(conversation)
                if result:
                    if isinstance(result, bool):
                        tag = Tag(tag_name)
                    else:
                        tag = self._normalize_tag(result)
                    tags.add(tag)
                    debug_info['applied_rules'].append(f"BASE: {tag}")
                else:
                    debug_info['skipped_rules'].append(f"BASE: {tag_name}")
            except Exception as e:
                debug_info['errors'].append(f"BASE: {tag_name} - {str(e)}")
        
        # Apply multi-tag rules
        for rule_name, rule_func in self.multi_tag_rules.items():
            try:
                new_tags = rule_func(conversation)
                if new_tags:
                    normalized_tags = [self._normalize_tag(tag) for tag in new_tags]
                    tags.update(normalized_tags)
                    debug_info['applied_rules'].append(f"MULTI: {rule_name} -> {[str(t) for t in normalized_tags]}")
                else:
                    debug_info['skipped_rules'].append(f"MULTI: {rule_name}")
            except Exception as e:
                debug_info['errors'].append(f"MULTI: {rule_name} - {str(e)}")
        
        # Apply supplemental rules
        max_iterations = 5
        for iteration in range(max_iterations):
            initial_tag_count = len(tags)
            
            for tag_name, rule_func in self.supplemental_rules.items():
                # Check if tag already exists
                if any(tag.name == tag_name for tag in tags):
                    continue
                    
                try:
                    result = rule_func(conversation, tags)
                    if result:
                        if isinstance(result, bool):
                            tag = Tag(tag_name)
                        else:
                            tag = self._normalize_tag(result)
                        tags.add(tag)
                        debug_info['applied_rules'].append(f"SUPP: {tag} (iter {iteration})")
                    else:
                        debug_info['skipped_rules'].append(f"SUPP: {tag_name} (iter {iteration})")
                except Exception as e:
                    debug_info['errors'].append(f"SUPP: {tag_name} - {str(e)}")
            
            if len(tags) == initial_tag_count:
                break
        
        return {
            'conversation_id': conversation.get('conversation_id', conversation.get('id', 'unknown')),
            'title': conversation.get('title', 'Untitled'),
            'tags': list(tags),
            'debug_info': dict(debug_info),
            'conversation': conversation
        }
    
    def tag_conversations(self, conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Tag multiple conversations."""
        return [self.tag_conversation(conv) for conv in conversations]
    
    def filter_by_tags(self, tagged_conversations: List[Dict[str, Any]], 
                      include_tags: List[Union[str, Dict]] = None,
                      exclude_tags: List[Union[str, Dict]] = None) -> List[Dict[str, Any]]:
        """
        Filter conversations by tags with attribute support.
        
        Args:
            include_tags: List of tag names or dicts with criteria
                Examples: ['web_search', {'name': 'gizmo', 'type': 'dalle'}]
            exclude_tags: Similar format for exclusions
        """
        filtered = []
        
        for tagged_conv in tagged_conversations:
            tags = tagged_conv['tags']
            
            # Check exclusions first
            if exclude_tags:
                should_exclude = False
                for exclude_criterion in exclude_tags:
                    if self._matches_criterion(tags, exclude_criterion):
                        should_exclude = True
                        break
                if should_exclude:
                    continue
            
            # Check inclusions
            if include_tags:
                should_include = True
                for include_criterion in include_tags:
                    if not self._matches_criterion(tags, include_criterion):
                        should_include = False
                        break
                if not should_include:
                    continue
            
            filtered.append(tagged_conv)
        
        return filtered
    
    def _matches_criterion(self, tags: List[Tag], criterion: Union[str, Dict]) -> bool:
        """Check if any tag matches the given criterion."""
        if isinstance(criterion, str):
            return any(tag.name == criterion for tag in tags)
        
        elif isinstance(criterion, dict):
            name = criterion.get('name')
            if not name:
                return False
            
            criteria = {k: v for k, v in criterion.items() if k != 'name'}
            return any(tag.matches(name, **criteria) for tag in tags)
        
        return False
    
    def get_tag_values(self, tagged_conversations: List[Dict[str, Any]], 
                      tag_name: str, attribute: str) -> List[Any]:
        """Extract attribute values from tags across conversations."""
        values = []
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                if tag.name == tag_name and attribute in tag.attributes:
                    values.append(tag.attributes[attribute])
        return values
    
    def print_summary(self, tagged_conversations: List[Dict[str, Any]]):
        """Print enhanced summary with attribute statistics."""
        total = len(tagged_conversations)
        tag_counts = defaultdict(int)
        tag_attributes = defaultdict(list)
        
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                tag_counts[tag.name] += 1
                if tag.attributes:
                    tag_attributes[tag.name].extend(tag.attributes.items())
        
        print(f"Tagged {total} conversations")
        print(f"Tag distribution:")
        for tag_name, count in sorted(tag_counts.items()):
            percentage = (count / total) * 100
            print(f"  {tag_name}: {count} ({percentage:.1f}%)")
            
            # Show attribute statistics
            if tag_name in tag_attributes:
                attrs = defaultdict(list)
                for key, value in tag_attributes[tag_name]:
                    if isinstance(value, (int, float)):
                        attrs[key].append(value)
                
                for attr_name, values in attrs.items():
                    if values:
                        avg = sum(values) / len(values)
                        min_val = min(values)
                        max_val = max(values)
                        print(f"    {attr_name}: avg={avg:.1f}, range=[{min_val}, {max_val}]")
    
    def debug_conversation(self, conversation: Dict[str, Any]):
        """Debug a single conversation with enhanced tag details."""
        result = self.tag_conversation(conversation)
        
        print(f"Conversation: {result['title'][:50]}...")
        print(f"Tags applied:")
        for tag in result['tags']:
            print(f"  {tag}")
        
        print(f"\nRule details:")
        for rule in result['debug_info'].get('applied_rules', []):
            print(f"  ✅ {rule}")
        
        for error in result['debug_info'].get('errors', []):
            print(f"  ⚠️  ERROR: {error}")


# Enhanced rule functions for all previously discussed tagging rules

def get_all_user_messages(conversation: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Get all user messages in chronological order."""
    mapping = conversation.get('mapping', {})
    user_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        author = message.get('author', {})
        if author.get('role') == 'user':
            create_time = message.get('create_time') or 0
            user_messages.append((create_time, message))
    
    user_messages.sort(key=lambda x: x[0])
    return [msg for _, msg in user_messages]


def get_first_user_message(conversation: Dict[str, Any]) -> Dict[str, Any]:
    """Find the first user message in the conversation."""
    user_messages = get_all_user_messages(conversation)
    return user_messages[0] if user_messages else None


def create_conversation_length_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for conversation length."""
    user_count = len(get_all_user_messages(conversation))
    
    # Determine category
    if user_count == 1:
        category = 'single'
    elif user_count <= 3:
        category = 'short'
    elif user_count <= 10:
        category = 'medium'
    elif user_count <= 25:
        category = 'long'
    else:
        category = 'very_long'
    
    return Tag('conversation_length', count=user_count, category=category)


def create_prompt_stats_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for prompt statistics."""
    user_messages = get_all_user_messages(conversation)
    
    if not user_messages:
        return Tag('prompt_stats', count=0, mean=0, median=0, variance=0, 
                  length_category='none', consistency='none')
    
    # Calculate message lengths
    lengths = []
    for message in user_messages:
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        lengths.append(len(all_text))
    
    # Calculate statistics
    mean_length = sum(lengths) / len(lengths)
    sorted_lengths = sorted(lengths)
    n = len(sorted_lengths)
    median_length = (sorted_lengths[n//2] if n % 2 == 1 
                    else (sorted_lengths[n//2-1] + sorted_lengths[n//2]) / 2)
    variance = sum((x - mean_length) ** 2 for x in lengths) / len(lengths) if len(lengths) > 1 else 0
    
    # Determine categories
    if mean_length < 50:
        length_category = 'very_short'
    elif mean_length < 200:
        length_category = 'short'
    elif mean_length < 1000:
        length_category = 'medium'
    elif mean_length < 3000:
        length_category = 'long'
    else:
        length_category = 'very_long'
    
    if variance < 1000:
        consistency = 'consistent'
    elif variance < 10000:
        consistency = 'mixed'
    else:
        consistency = 'variable'
    
    return Tag('prompt_stats', 
               count=len(lengths),
               mean=round(mean_length, 1),
               median=round(median_length, 1),
               variance=round(variance, 1),
               length_category=length_category,
               consistency=consistency)


def create_gizmo_plugin_tags(conversation: Dict[str, Any]) -> List[Tag]:
    """Create structured tags for gizmos and plugins."""
    tags = []
    gizmos = set()
    plugins = set()
    
    # Check conversation-level
    if conversation.get('gizmo_id'):
        gizmos.add(conversation['gizmo_id'])
    
    plugin_ids = conversation.get('plugin_ids', [])
    if plugin_ids:
        plugins.update(plugin_ids)
    
    # Check message-level
    mapping = conversation.get('mapping', {})
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        
        # Invoked plugins
        invoked_plugin = metadata.get('invoked_plugin', {})
        if invoked_plugin:
            if invoked_plugin.get('plugin_id'):
                plugins.add(invoked_plugin['plugin_id'])
            if invoked_plugin.get('namespace'):
                plugins.add(invoked_plugin['namespace'])
        
        # Gizmo usage
        if metadata.get('gizmo_id'):
            gizmos.add(metadata['gizmo_id'])
    
    # Create tags
    for gizmo in gizmos:
        tags.append(Tag('gizmo', name=gizmo))
    
    for plugin in plugins:
        tags.append(Tag('plugin', name=plugin))
    
    return tags


# Boolean rule functions for basic content analysis
def has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if conversation has unusually large content anywhere."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        text = content.get('text', '')
        if len(text) > min_length:
            return True
            
        parts = content.get('parts', [])
        for part in parts:
            if isinstance(part, str) and len(part) > min_length:
                return True
    
    return False


def has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for clear code patterns anywhere in conversation."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Strong code indicators
        code_indicators = [
            '```',  # Code blocks
            'def ', 'function ', 'class ',  # Function/class definitions
            'import ', 'from ', 'require(',  # Import statements
            '#!/bin/', '#include', 'using namespace',  # Script headers
        ]
        
        if any(indicator in all_text for indicator in code_indicators):
            return True
            
        # Also check for high density of coding keywords
        coding_keywords = ['function', 'class', 'import', 'def ', 'const ', 'let ', 'var ', 'return', 'if ', 'for ', 'while ']
        keyword_count = sum(1 for keyword in coding_keywords if keyword in all_text.lower())
        if len(all_text) > 1000 and keyword_count >= 3:  # Multiple coding keywords in large text suggest actual code
            return True
    
    return False


def has_github_repos(conversation: Dict[str, Any]) -> bool:
    """Check if GitHub repositories were selected for context."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        repos = metadata.get('selected_github_repos', [])
        if repos:  # Non-empty list
            return True
    
    return False


def has_canvas_operations(conversation: Dict[str, Any]) -> bool:
    """Check for canvas/document operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('canvas'):
            return True
    
    return False


def has_web_search(conversation: Dict[str, Any]) -> bool:
    """Check for web search operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('search_queries') or 
            metadata.get('search_result_groups') or
            metadata.get('content_references')):
            return True
    
    return False


def has_reasoning_thoughts(conversation: Dict[str, Any]) -> bool:
    """Check for reasoning/thinking patterns."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        if content.get('thoughts'):  # Reasoning thoughts
            return True
    
    return False


def has_code_execution(conversation: Dict[str, Any]) -> bool:
    """Check for code execution artifacts."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('aggregate_result') or 
            metadata.get('jupyter_messages')):
            return True
    
    return False


# First user message specific rules
def first_user_has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if the first user message has large content."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    if len(text) > min_length:
        return True
        
    parts = content.get('parts', [])
    for part in parts:
        if isinstance(part, str) and len(part) > min_length:
            return True
    
    return False


def first_user_has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message contains code patterns."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    parts = content.get('parts', [])
    all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
    
    # Strong code indicators
    code_indicators = [
        '```',  # Code blocks
        'def ', 'function ', 'class ',  # Definitions
        'import ', 'from ', 'require(',  # Imports
        '#!/bin/', '#include',  # Script headers
    ]
    
    return any(indicator in all_text for indicator in code_indicators)


def first_user_has_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    return len(attachments) > 0


def first_user_has_code_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has code-related attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    
    for attachment in attachments:
        mime_type = attachment.get('mime_type', '').lower()
        name = attachment.get('name', '').lower()
        
        # Check for code file extensions
        code_extensions = ['.py', '.js', '.java', '.cpp', '.c', '.go', '.rs', '.ts', '.jsx', '.tsx', '.sql', '.sh', '.rb', '.php']
        if any(ext in name for ext in code_extensions):
            return True
            
        # Check for code-related MIME types
        code_mimes = ['text/x-python', 'text/x-java', 'application/javascript', 'text/x-script']
        if any(mime in mime_type for mime in code_mimes):
            return True
    
    return False


def create_default_tagger() -> ConversationTagger:
    """Create a tagger with all previously discussed rules in the enhanced structured framework."""
    tagger = ConversationTagger()
    
    # ===== BASIC CONTENT ANALYSIS RULES (Boolean) =====
    tagger.add_base_rule(
        'large_content', 
        lambda conv: has_large_content(conv, 2000),
        'Content longer than 2000 characters anywhere in conversation'
    )
    
    tagger.add_base_rule(
        'code_patterns', 
        has_code_patterns,
        'Contains clear code patterns (```, def, function, etc.)'
    )
    
    tagger.add_base_rule(
        'github_context',
        has_github_repos,
        'GitHub repositories selected for context'
    )
    
    tagger.add_base_rule(
        'canvas_operations',
        has_canvas_operations,
        'Uses canvas/document features'
    )
    
    tagger.add_base_rule(
        'web_search',
        has_web_search,
        'Includes web search functionality'
    )
    
    tagger.add_base_rule(
        'reasoning',
        has_reasoning_thoughts,
        'Contains reasoning/thinking content'
    )
    
    tagger.add_base_rule(
        'code_execution',
        has_code_execution,
        'Contains code execution (Jupyter, aggregate results)'
    )
    
    # ===== FIRST USER MESSAGE RULES (Boolean) =====
    tagger.add_base_rule(
        'starts_large_content',
        lambda conv: first_user_has_large_content(conv, 2000),
        'First user message has large content (>2000 chars)'
    )
    
    tagger.add_base_rule(
        'starts_code_patterns',
        first_user_has_code_patterns,
        'First user message contains code patterns'
    )
    
    tagger.add_base_rule(
        'starts_with_attachments',
        first_user_has_attachments,
        'First user message has any attachments'
    )
    
    tagger.add_base_rule(
        'starts_code_attachments',
        first_user_has_code_attachments,
        'First user message has code-related attachments'
    )
    
    # ===== STRUCTURED TAG RULES =====
    tagger.add_base_rule(
        'conversation_length',
        create_conversation_length_tag,
        'Conversation length with count and category'
    )
    
    tagger.add_base_rule(
        'prompt_stats',
        create_prompt_stats_tag,
        'User message statistics (length, variance, etc.)'
    )
    
    tagger.add_multi_tag_rule(
        'gizmo_plugin_usage',
        create_gizmo_plugin_tags,
        'Specific gizmos and plugins used in conversation'
    )
    
    # ===== SUPPLEMENTAL RULES (Based on existing tags) =====
    
    # Coding assistance detection
    tagger.add_supplemental_rule(
        'coding_assistance',
        lambda conv, tags: (
            any(tag.name in ['code_patterns', 'github_context', 'code_execution'] for tag in tags)
        ),
        'Likely coding assistance (code patterns, GitHub, or execution)'
    )
    
    tagger.add_supplemental_rule(
        'coding_assistance_start',
        lambda conv, tags: (
            any(tag.name in ['starts_code_patterns', 'starts_large_content', 
                           'starts_with_attachments', 'starts_code_attachments'] for tag in tags)
        ),
        'Likely coding assistance based on how conversation starts'
    )
    
    # Research and analysis patterns  
    tagger.add_supplemental_rule(
        'research_session',
        lambda conv, tags: (
            any(tag.name == 'web_search' for tag in tags) and 
            any(tag.name == 'large_content' for tag in tags)
        ),
        'Research session (web search + substantial content)'
    )
    
    tagger.add_supplemental_rule(
        'complex_analysis',
        lambda conv, tags: (
            any(tag.name == 'reasoning' for tag in tags) and 
            len([tag for tag in tags if tag.name in ['web_search', 'large_content', 'canvas_operations']]) >= 2
        ),
        'Complex analysis (reasoning + multiple advanced features)'
    )
    
    # Context and interaction patterns
    tagger.add_supplemental_rule(
        'context_heavy_start',
        lambda conv, tags: (
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Conversation starts with substantial context'
    )
    
    tagger.add_supplemental_rule(
        'enhanced_conversation',
        lambda conv, tags: (
            any(tag.name in ['gizmo', 'plugin'] for tag in tags)
        ),
        'Uses enhanced features (gizmos or plugins)'
    )
    
    # Length-based classifications using structured tags
    tagger.add_supplemental_rule(
        'brief_interaction',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags)
        ),
        'Brief interaction (1-3 user messages)'
    )
    
    tagger.add_supplemental_rule(
        'extended_conversation',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['long', 'very_long'] for tag in tags)
        ),
        'Extended conversation (11+ user messages)'
    )
    
    # Prompt pattern classifications using structured tags
    tagger.add_supplemental_rule(
        'long_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['long', 'very_long'] for tag in tags)
        ),
        'Conversation has consistently long prompts'
    )
    
    tagger.add_supplemental_rule(
        'short_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['very_short', 'short'] for tag in tags)
        ),
        'Conversation has consistently short prompts'
    )
    
    tagger.add_supplemental_rule(
        'consistent_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'User prompts are consistent in length'
    )
    
    tagger.add_supplemental_rule(
        'variable_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'User prompts vary significantly in length'
    )
    
    # Combined patterns for specific use cases
    tagger.add_supplemental_rule(
        'context_dump',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags) and
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Short conversation starting with large context (likely context dump)'
    )
    
    tagger.add_supplemental_rule(
        'interactive_session',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'Extended back-and-forth conversation with consistent prompt style'
    )
    
    tagger.add_supplemental_rule(
        'evolving_discussion',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'Extended conversation where prompt style evolves'
    )
    
    return tagger


# Enhanced usage examples with all ported rules
"""
# Create enhanced tagger with all previously discussed rules
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(conversations)

# Enhanced summary shows both simple and structured tag statistics
tagger.print_summary(tagged_results)

# ===== BASIC FILTERING (Boolean tags) =====

# Find coding assistance conversations
coding_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=['coding_assistance']
)

# Exclude coding and enhanced features for clean analysis
basic_conversations = tagger.filter_by_tags(
    tagged_results,
    exclude_tags=['coding_assistance', 'enhanced_conversation', 'context_heavy_start']
)

# Find research sessions
research_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=['research_session']
)

# ===== STRUCTURED TAG FILTERING =====

# Find short conversations (1-3 messages)
short_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=[{'name': 'conversation_length', 'category': 'short'}]
)

# Find conversations with 5-15 user messages
medium_length_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=[{'name': 'conversation_length', 'count': {'gte': 5, 'lte': 15}}]
)

# Find conversations with long prompts (mean > 1000 chars)
long_prompt_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=[{'name': 'prompt_stats', 'mean': {'gt': 1000}}]
)

# Find conversations with consistent prompt lengths
consistent_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=[{'name': 'prompt_stats', 'consistency': 'consistent'}]
)

# ===== GIZMO/PLUGIN FILTERING =====

# Find conversations using DALL-E specifically
dalle_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=[{'name': 'gizmo', 'name': 'dalle'}]
)

# Find any gizmo usage
any_gizmo_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=['gizmo']
)

# Find conversations using browser plugin
browser_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=[{'name': 'plugin', 'name': 'browser'}]
)

# ===== COMPLEX PATTERN FILTERING =====

# Find context dumps (short conversations with large initial content)
context_dumps = tagger.filter_by_tags(
    tagged_results,
    include_tags=['context_dump']
)

# Find interactive sessions (long, consistent conversations)
interactive_sessions = tagger.filter_by_tags(
    tagged_results,
    include_tags=['interactive_session']
)

# Find evolving discussions (long conversations with variable prompts)
evolving_discussions = tagger.filter_by_tags(
    tagged_results,
    include_tags=['evolving_discussion']
)

# Find conversations that start with code context
code_context_starts = tagger.filter_by_tags(
    tagged_results,
    include_tags=['starts_code_patterns', 'starts_code_attachments']
)

# ===== STATISTICAL ANALYSIS =====

# Extract conversation lengths for analysis
conversation_lengths = tagger.get_tag_values(tagged_results, 'conversation_length', 'count')
avg_length = sum(conversation_lengths) / len(conversation_lengths)
print(f"Average conversation length: {avg_length:.1f} messages")

# Extract prompt statistics
mean_prompt_lengths = tagger.get_tag_values(tagged_results, 'prompt_stats', 'mean')
overall_avg_prompt = sum(mean_prompt_lengths) / len(mean_prompt_lengths)
print(f"Average prompt length across all conversations: {overall_avg_prompt:.1f} characters")

# Analyze prompt variance
prompt_variances = tagger.get_tag_values(tagged_results, 'prompt_stats', 'variance')
high_variance_count = sum(1 for v in prompt_variances if v > 10000)
print(f"Conversations with highly variable prompts: {high_variance_count}")

# ===== DEBUGGING SPECIFIC CASES =====

# Debug a conversation to see all applied tags
print("\\n=== Debug Example ===")
tagger.debug_conversation(conversations[0])

# Find and analyze the false positive from earlier
problem_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=['canvas_operations'],
    exclude_tags=['coding_assistance']
)
print(f"\\nFound {len(problem_conversations)} canvas conversations that aren't coding-related")

# ===== COMPREHENSIVE FILTERING FOR DOWNSTREAM PROCESSING =====

# Example: Clean conversations for analysis (exclude everything complex)
clean_for_analysis = tagger.filter_by_tags(
    tagged_results,
    exclude_tags=[
        'coding_assistance',           # No coding help
        'coding_assistance_start',     # No coding context
        'enhanced_conversation',       # No gizmos/plugins
        'context_heavy_start',         # No large initial context
        'context_dump',               # No context dumps
        'research_session'            # No research sessions
    ],
    include_tags=[
        {'name': 'conversation_length', 'count': {'gte': 2}},  # At least 2 messages
        {'name': 'prompt_stats', 'mean': {'lt': 1000}}         # Reasonable prompt length
    ]
)

print(f"\\nFiltered to {len(clean_for_analysis)} clean conversations for downstream processing")

# ===== CUSTOM RULES =====

# Add domain-specific rules
tagger.add_base_rule(
    'starts_with_question',
    lambda conv: (get_first_user_message(conv) and 
                 get_first_user_message(conv).get('content', {}).get('text', '').strip().endswith('?')),
    'Conversation starts with a question'
)

tagger.add_supplemental_rule(
    'qa_session',
    lambda conv, tags: (
        any(tag.name == 'starts_with_question' for tag in tags) and
        any(tag.name == 'conversation_length' and 
            tag.attributes.get('category') in ['short', 'medium'] for tag in tags)
    ),
    'Question-and-answer session'
)

# Re-tag with new rules
updated_results = tagger.tag_conversations(conversations)

# ===== DISTRIBUTION ANALYSIS =====

print("\\n=== Tag Distribution Analysis ===")
tag_counts = {}
for result in tagged_results:
    for tag in result['tags']:
        if tag.name not in tag_counts:
            tag_counts[tag.name] = 0
        tag_counts[tag.name] += 1

for tag_name, count in sorted(tag_counts.items(), key=lambda x: x[1], reverse=True):
    percentage = (count / len(tagged_results)) * 100
    print(f"{tag_name}: {count} ({percentage:.1f}%)")

# ===== GIZMO/PLUGIN USAGE ANALYSIS =====

print("\\n=== Gizmo/Plugin Usage ===")
gizmo_usage = {}
plugin_usage = {}

for result in tagged_results:
    for tag in result['tags']:
        if tag.name == 'gizmo':
            gizmo_name = tag.attributes.get('name', 'unknown')
            gizmo_usage[gizmo_name] = gizmo_usage.get(gizmo_name, 0) + 1
        elif tag.name == 'plugin':
            plugin_name = tag.attributes.get('name', 'unknown')
            plugin_usage[plugin_name] = plugin_usage.get(plugin_name, 0) + 1

print("Gizmos:")
for gizmo, count in sorted(gizmo_usage.items(), key=lambda x: x[1], reverse=True):
    print(f"  {gizmo}: {count} conversations")

print("Plugins:")
for plugin, count in sorted(plugin_usage.items(), key=lambda x: x[1], reverse=True):
    print(f"  {plugin}: {count} conversations")

# ===== EXPORT FILTERED DATASETS =====

# Create different filtered datasets for various purposes
datasets = {
    'coding_assistance': tagger.filter_by_tags(tagged_results, include_tags=['coding_assistance']),
    'research_sessions': tagger.filter_by_tags(tagged_results, include_tags=['research_session']),
    'basic_qa': tagger.filter_by_tags(tagged_results, include_tags=['qa_session']),
    'enhanced_features': tagger.filter_by_tags(tagged_results, include_tags=['enhanced_conversation']),
    'clean_conversations': clean_for_analysis,
}

for dataset_name, conversations in datasets.items():
    print(f"{dataset_name}: {len(conversations)} conversations")
    # conversations now contains just the conversation objects you can process further
"""

'\n# Create enhanced tagger with all previously discussed rules\ntagger = create_default_tagger()\ntagged_results = tagger.tag_conversations(conversations)\n\n# Enhanced summary shows both simple and structured tag statistics\ntagger.print_summary(tagged_results)\n\n# ===== BASIC FILTERING (Boolean tags) =====\n\n# Find coding assistance conversations\ncoding_conversations = tagger.filter_by_tags(\n    tagged_results,\n    include_tags=[\'coding_assistance\']\n)\n\n# Exclude coding and enhanced features for clean analysis\nbasic_conversations = tagger.filter_by_tags(\n    tagged_results,\n    exclude_tags=[\'coding_assistance\', \'enhanced_conversation\', \'context_heavy_start\']\n)\n\n# Find research sessions\nresearch_conversations = tagger.filter_by_tags(\n    tagged_results,\n    include_tags=[\'research_session\']\n)\n\n# ===== STRUCTURED TAG FILTERING =====\n\n# Find short conversations (1-3 messages)\nshort_conversations = tagger.filter_by_tags(\n    tagged_results,\n    include

In [10]:
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(convs)
tagger.print_summary(tagged_results)

Tagged 1673 conversations
Tag distribution:
  brief_interaction: 906 (54.2%)
  canvas_operations: 3 (0.2%)
  code_execution: 26 (1.6%)
  code_patterns: 1546 (92.4%)
  coding_assistance: 1551 (92.7%)
  coding_assistance_start: 260 (15.5%)
  complex_analysis: 5 (0.3%)
  consistent_prompts: 896 (53.6%)
  context_dump: 64 (3.8%)
  context_heavy_start: 127 (7.6%)
  conversation_length: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
  evolving_discussion: 247 (14.8%)
  extended_conversation: 304 (18.2%)
  interactive_session: 184 (11.0%)
  large_content: 1362 (81.4%)
  long_prompts: 151 (9.0%)
  prompt_stats: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    mean: avg=1142.4, range=[1.0, 184662.5]
    median: avg=814.2, range=[1.0, 184662.5]
    variance: avg=51563065.2, range=[0, 34065162056.2]
  reasoning: 12 (0.7%)
  research_session: 53 (3.2%)
  short_prompts: 1261 (75.4%)
  starts_code_patterns: 240 (14.3%)
  starts_large_content: 119 (7.1%)
  starts_with_attachments: 8 (0.5%)
  va

In [20]:
# conversation_tagger.py
"""
Enhanced conversation tagging system with structured tags supporting key-value attributes.
Tags can be simple strings or objects with metadata for better analysis and filtering.
"""

from typing import Dict, Any, List, Callable, Set, Union
from collections import defaultdict
import json


class Tag:
    """Represents a tag with optional key-value attributes."""
    
    def __init__(self, name: str, **attributes):
        self.name = name
        self.attributes = attributes
    
    def __str__(self):
        if self.attributes:
            attrs_str = ", ".join(f"{k}={v}" for k, v in self.attributes.items())
            return f"{self.name}({attrs_str})"
        return self.name
    
    def __repr__(self):
        return f"Tag('{self.name}', {self.attributes})"
    
    def __eq__(self, other):
        if isinstance(other, str):
            return self.name == other
        elif isinstance(other, Tag):
            return self.name == other.name and self.attributes == other.attributes
        return False
    
    def __hash__(self):
        return hash((self.name, tuple(sorted(self.attributes.items()))))
    
    def matches(self, name: str, **criteria) -> bool:
        """Check if tag matches name and optional attribute criteria."""
        if self.name != name:
            return False
        
        for key, value in criteria.items():
            if key not in self.attributes:
                return False
            
            attr_value = self.attributes[key]
            
            # Support comparison operators
            if isinstance(value, dict):
                for op, target in value.items():
                    if op == 'gt' and not (attr_value > target):
                        return False
                    elif op == 'gte' and not (attr_value >= target):
                        return False
                    elif op == 'lt' and not (attr_value < target):
                        return False
                    elif op == 'lte' and not (attr_value <= target):
                        return False
                    elif op == 'eq' and not (attr_value == target):
                        return False
                    elif op == 'ne' and not (attr_value != target):
                        return False
                    elif op == 'in' and not (attr_value in target):
                        return False
            else:
                # Direct equality
                if attr_value != value:
                    return False
        
        return True


class ConversationTagger:
    """
    Enhanced tagging system supporting structured tags with attributes.
    """
    
    def __init__(self):
        self.base_rules: Dict[str, Callable] = {}
        self.multi_tag_rules: Dict[str, Callable] = {}
        self.supplemental_rules: Dict[str, Callable] = {}
        self.rule_descriptions: Dict[str, str] = {}
    
    def add_base_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a base tagging rule that returns bool or Tag object."""
        self.base_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def add_multi_tag_rule(self, rule_name: str, rule_function: Callable, description: str = ""):
        """Add a rule that returns multiple tags (strings or Tag objects)."""
        self.multi_tag_rules[rule_name] = rule_function
        self.rule_descriptions[rule_name] = description
    
    def add_supplemental_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a supplemental rule that depends on existing tags."""
        self.supplemental_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def _normalize_tag(self, tag: Union[str, Tag]) -> Tag:
        """Convert string tags to Tag objects."""
        if isinstance(tag, str):
            return Tag(tag)
        return tag
    
    def tag_conversation(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
        """Apply all tagging rules to a conversation."""
        tags = set()
        debug_info = defaultdict(list)
        
        # Apply base rules
        for tag_name, rule_func in self.base_rules.items():
            try:
                result = rule_func(conversation)
                if result:
                    if isinstance(result, bool):
                        tag = Tag(tag_name)
                    else:
                        tag = self._normalize_tag(result)
                    tags.add(tag)
                    debug_info['applied_rules'].append(f"BASE: {tag}")
                else:
                    debug_info['skipped_rules'].append(f"BASE: {tag_name}")
            except Exception as e:
                debug_info['errors'].append(f"BASE: {tag_name} - {str(e)}")
        
        # Apply multi-tag rules
        for rule_name, rule_func in self.multi_tag_rules.items():
            try:
                new_tags = rule_func(conversation)
                if new_tags:
                    normalized_tags = [self._normalize_tag(tag) for tag in new_tags]
                    tags.update(normalized_tags)
                    debug_info['applied_rules'].append(f"MULTI: {rule_name} -> {[str(t) for t in normalized_tags]}")
                else:
                    debug_info['skipped_rules'].append(f"MULTI: {rule_name}")
            except Exception as e:
                debug_info['errors'].append(f"MULTI: {rule_name} - {str(e)}")
        
        # Apply supplemental rules
        max_iterations = 5
        for iteration in range(max_iterations):
            initial_tag_count = len(tags)
            
            for tag_name, rule_func in self.supplemental_rules.items():
                # Check if tag already exists
                if any(tag.name == tag_name for tag in tags):
                    continue
                    
                try:
                    result = rule_func(conversation, tags)
                    if result:
                        if isinstance(result, bool):
                            tag = Tag(tag_name)
                        else:
                            tag = self._normalize_tag(result)
                        tags.add(tag)
                        debug_info['applied_rules'].append(f"SUPP: {tag} (iter {iteration})")
                    else:
                        debug_info['skipped_rules'].append(f"SUPP: {tag_name} (iter {iteration})")
                except Exception as e:
                    debug_info['errors'].append(f"SUPP: {tag_name} - {str(e)}")
            
            if len(tags) == initial_tag_count:
                break
        
        return {
            'conversation_id': conversation.get('conversation_id', conversation.get('id', 'unknown')),
            'title': conversation.get('title', 'Untitled'),
            'tags': list(tags),
            'debug_info': dict(debug_info),
            'conversation': conversation
        }
    
    def tag_conversations(self, conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Tag multiple conversations."""
        return [self.tag_conversation(conv) for conv in conversations]
    
    def filter_by_tags(self, tagged_conversations: List[Dict[str, Any]], 
                      include_tags: List[Union[str, Dict]] = None,
                      exclude_tags: List[Union[str, Dict]] = None) -> List[Dict[str, Any]]:
        """
        Filter conversations by tags with attribute support.
        
        Args:
            include_tags: List of tag names or dicts with criteria
                Examples: ['web_search', {'name': 'gizmo', 'type': 'dalle'}]
            exclude_tags: Similar format for exclusions
        """
        filtered = []
        
        for tagged_conv in tagged_conversations:
            tags = tagged_conv['tags']
            
            # Check exclusions first
            if exclude_tags:
                should_exclude = False
                for exclude_criterion in exclude_tags:
                    if self._matches_criterion(tags, exclude_criterion):
                        should_exclude = True
                        break
                if should_exclude:
                    continue
            
            # Check inclusions
            if include_tags:
                should_include = True
                for include_criterion in include_tags:
                    if not self._matches_criterion(tags, include_criterion):
                        should_include = False
                        break
                if not should_include:
                    continue
            
            filtered.append(tagged_conv)
        
        return filtered
    
    def _matches_criterion(self, tags: List[Tag], criterion: Union[str, Dict]) -> bool:
        """Check if any tag matches the given criterion."""
        if isinstance(criterion, str):
            return any(tag.name == criterion for tag in tags)
        
        elif isinstance(criterion, dict):
            name = criterion.get('name')
            if not name:
                return False
            
            criteria = {k: v for k, v in criterion.items() if k != 'name'}
            return any(tag.matches(name, **criteria) for tag in tags)
        
        return False
    
    def get_tag_values(self, tagged_conversations: List[Dict[str, Any]], 
                      tag_name: str, attribute: str) -> List[Any]:
        """Extract attribute values from tags across conversations."""
        values = []
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                if tag.name == tag_name and attribute in tag.attributes:
                    values.append(tag.attributes[attribute])
        return values
    
    def print_summary(self, tagged_conversations: List[Dict[str, Any]], show_details: bool = True):
        """Print comprehensive summary with all tag types and optional details."""
        total = len(tagged_conversations)
        tag_counts = defaultdict(int)
        tag_attributes = defaultdict(lambda: defaultdict(list))
        unique_structured_tags = defaultdict(set)
        
        # Collect all tag information
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                tag_counts[tag.name] += 1
                
                # Collect attribute information
                for attr_name, attr_value in tag.attributes.items():
                    if isinstance(attr_value, (int, float)):
                        tag_attributes[tag.name][attr_name].append(attr_value)
                    else:
                        # For non-numeric attributes, track unique values
                        unique_structured_tags[tag.name].add(f"{attr_name}={attr_value}")
        
        print(f"Tagged {total} conversations")
        print(f"\n=== TAG SUMMARY ===")
        
        # Sort tags by frequency for better readability
        sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
        
        for tag_name, count in sorted_tags:
            percentage = (count / total) * 100
            print(f"{tag_name}: {count} ({percentage:.1f}%)")
            
            if show_details:
                # Show numeric attribute statistics
                if tag_name in tag_attributes:
                    for attr_name, values in tag_attributes[tag_name].items():
                        if values:
                            avg_val = sum(values) / len(values)
                            min_val = min(values)
                            max_val = max(values)
                            print(f"    {attr_name}: avg={avg_val:.1f}, range=[{min_val}, {max_val}]")
                
                # Show unique structured values for non-numeric attributes
                if tag_name in unique_structured_tags:
                    unique_vals = sorted(unique_structured_tags[tag_name])
                    if len(unique_vals) <= 10:  # Show all if not too many
                        print(f"    values: {', '.join(unique_vals)}")
                    else:  # Show top 10 most common
                        print(f"    values: {', '.join(unique_vals[:10])} ... (+{len(unique_vals)-10} more)")
    
    def print_detailed_breakdown(self, tagged_conversations: List[Dict[str, Any]]):
        """Print detailed breakdown of specific tag types."""
        total = len(tagged_conversations)
        
        print(f"\n=== DETAILED BREAKDOWN ===")
        
        # Gizmo usage breakdown
        gizmo_counts = defaultdict(int)
        plugin_counts = defaultdict(int)
        
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                if tag.name == 'gizmo':
                    gizmo_id = tag.attributes.get('gizmo_id', 'unknown')
                    gizmo_counts[gizmo_id] += 1
                elif tag.name == 'plugin':
                    plugin_id = tag.attributes.get('plugin_id', 'unknown')
                    plugin_counts[plugin_id] += 1
        
        if gizmo_counts:
            print("\nGizmo Usage:")
            for gizmo_id, count in sorted(gizmo_counts.items(), key=lambda x: x[1], reverse=True):
                percentage = (count / total) * 100
                print(f"  {gizmo_id}: {count} ({percentage:.1f}%)")
        else:
            print("\nGizmo Usage: None detected")
        
        if plugin_counts:
            print("\nPlugin Usage:")
            for plugin_id, count in sorted(plugin_counts.items(), key=lambda x: x[1], reverse=True):
                percentage = (count / total) * 100
                print(f"  {plugin_id}: {count} ({percentage:.1f}%)")
        else:
            print("\nPlugin Usage: None detected")
        
        # Conversation length distribution
        length_distribution = defaultdict(int)
        prompt_length_distribution = defaultdict(int)
        
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                if tag.name == 'conversation_length':
                    category = tag.attributes.get('category', 'unknown')
                    length_distribution[category] += 1
                elif tag.name == 'prompt_stats':
                    length_cat = tag.attributes.get('length_category', 'unknown')
                    prompt_length_distribution[length_cat] += 1
        
        if length_distribution:
            print("\nConversation Length Distribution:")
            for category, count in sorted(length_distribution.items(), key=lambda x: x[1], reverse=True):
                percentage = (count / total) * 100
                print(f"  {category}: {count} ({percentage:.1f}%)")
        
        if prompt_length_distribution:
            print("\nPrompt Length Distribution:")
            for category, count in sorted(prompt_length_distribution.items(), key=lambda x: x[1], reverse=True):
                percentage = (count / total) * 100
                print(f"  {category}: {count} ({percentage:.1f}%)")
    
    def debug_tag_creation(self, conversations: List[Dict[str, Any]], max_conversations: int = 5):
        """Debug tag creation process for the first few conversations."""
        print(f"\n=== TAG CREATION DEBUG (first {max_conversations} conversations) ===")
        
        for i, conv in enumerate(conversations[:max_conversations]):
            print(f"\n--- Conversation {i+1}: {conv.get('title', 'Untitled')[:50]}... ---")
            result = self.tag_conversation(conv)
            
            print("Tags created:")
            for tag in result['tags']:
                if tag.attributes:
                    attrs = ', '.join(f"{k}={v}" for k, v in tag.attributes.items())
                    print(f"  {tag.name}({attrs})")
                else:
                    print(f"  {tag.name}")
            
            # Show applied rules
            applied_rules = result['debug_info'].get('applied_rules', [])
            if applied_rules:
                print("Applied rules:")
                for rule in applied_rules:
                    print(f"  ✅ {rule}")
            
            # Show any errors
            errors = result['debug_info'].get('errors', [])
            if errors:
                print("Errors:")
                for error in errors:
                    print(f"  ⚠️  {error}")
    
    def validate_tag_system(self, tagged_conversations: List[Dict[str, Any]]):
        """Validate that the tag system is working correctly."""
        print(f"\n=== TAG SYSTEM VALIDATION ===")
        
        # Check for conversations with no tags
        no_tags = [conv for conv in tagged_conversations if not conv['tags']]
        print(f"Conversations with no tags: {len(no_tags)}")
        
        # Check for missing expected structured tags
        missing_length = [conv for conv in tagged_conversations 
                         if not any(tag.name == 'conversation_length' for tag in conv['tags'])]
        print(f"Conversations missing conversation_length tag: {len(missing_length)}")
        
        missing_stats = [conv for conv in tagged_conversations 
                        if not any(tag.name == 'prompt_stats' for tag in conv['tags'])]
        print(f"Conversations missing prompt_stats tag: {len(missing_stats)}")
        
        # Count rule types that fired
        rule_types = defaultdict(int)
        for conv in tagged_conversations:
            for rule_info in conv['debug_info'].get('applied_rules', []):
                rule_type = rule_info.split(':')[0]
                rule_types[rule_type] += 1
        
        print(f"\nRule type activity:")
        for rule_type, count in sorted(rule_types.items()):
            print(f"  {rule_type}: {count} instances")
        
        # Check for gizmo/plugin detection specifically
        gizmo_plugin_conversations = []
        for conv in tagged_conversations:
            has_gizmo_plugin = any(tag.name in ['gizmo', 'plugin'] for tag in conv['tags'])
            if has_gizmo_plugin:
                gizmo_plugin_conversations.append(conv)
        
        print(f"\nConversations with gizmo/plugin tags: {len(gizmo_plugin_conversations)}")
        
        if len(gizmo_plugin_conversations) == 0:
            print("⚠️  No gizmo/plugin usage detected. This might indicate:")
            print("   - No conversations actually used gizmos/plugins")
            print("   - The gizmo/plugin detection logic needs debugging")
            print("   - Check raw conversation data for gizmo_id/plugin_ids fields")
    
    def debug_conversation(self, conversation: Dict[str, Any]):
        """Debug a single conversation with enhanced tag details."""
        result = self.tag_conversation(conversation)
        
        print(f"Conversation: {result['title'][:50]}...")
        print(f"Tags applied:")
        for tag in result['tags']:
            print(f"  {tag}")
        
        print(f"\nRule details:")
        for rule in result['debug_info'].get('applied_rules', []):
            print(f"  ✅ {rule}")
        
        for error in result['debug_info'].get('errors', []):
            print(f"  ⚠️  ERROR: {error}")


# Enhanced rule functions for all previously discussed tagging rules

def get_all_user_messages(conversation: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Get all user messages in chronological order."""
    mapping = conversation.get('mapping', {})
    user_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        author = message.get('author', {})
        if author.get('role') == 'user':
            create_time = message.get('create_time') or 0
            user_messages.append((create_time, message))
    
    user_messages.sort(key=lambda x: x[0])
    return [msg for _, msg in user_messages]


def get_first_user_message(conversation: Dict[str, Any]) -> Dict[str, Any]:
    """Find the first user message in the conversation."""
    user_messages = get_all_user_messages(conversation)
    return user_messages[0] if user_messages else None


def create_conversation_length_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for conversation length."""
    user_count = len(get_all_user_messages(conversation))
    
    # Determine category
    if user_count == 1:
        category = 'single'
    elif user_count <= 3:
        category = 'short'
    elif user_count <= 10:
        category = 'medium'
    elif user_count <= 25:
        category = 'long'
    else:
        category = 'very_long'
    
    return Tag('conversation_length', count=user_count, category=category)


def create_prompt_stats_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for prompt statistics."""
    user_messages = get_all_user_messages(conversation)
    
    if not user_messages:
        return Tag('prompt_stats', count=0, mean=0, median=0, variance=0, 
                  length_category='none', consistency='none')
    
    # Calculate message lengths
    lengths = []
    for message in user_messages:
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        lengths.append(len(all_text))
    
    # Calculate statistics
    mean_length = sum(lengths) / len(lengths)
    sorted_lengths = sorted(lengths)
    n = len(sorted_lengths)
    median_length = (sorted_lengths[n//2] if n % 2 == 1 
                    else (sorted_lengths[n//2-1] + sorted_lengths[n//2]) / 2)
    variance = sum((x - mean_length) ** 2 for x in lengths) / len(lengths) if len(lengths) > 1 else 0
    
    # Determine categories
    if mean_length < 50:
        length_category = 'very_short'
    elif mean_length < 200:
        length_category = 'short'
    elif mean_length < 1000:
        length_category = 'medium'
    elif mean_length < 3000:
        length_category = 'long'
    else:
        length_category = 'very_long'
    
    if variance < 1000:
        consistency = 'consistent'
    elif variance < 10000:
        consistency = 'mixed'
    else:
        consistency = 'variable'
    
    return Tag('prompt_stats', 
               count=len(lengths),
               mean=round(mean_length, 1),
               median=round(median_length, 1),
               variance=round(variance, 1),
               length_category=length_category,
               consistency=consistency)


def create_gizmo_plugin_tags(conversation: Dict[str, Any]) -> List[Tag]:
    """Create structured tags for gizmos and plugins."""
    tags = []
    gizmos = set()
    plugins = set()
    
    # Check conversation-level
    if conversation.get('gizmo_id'):
        gizmos.add(conversation['gizmo_id'])
    
    plugin_ids = conversation.get('plugin_ids', [])
    if plugin_ids:
        plugins.update(plugin_ids)
    
    # Check message-level
    mapping = conversation.get('mapping', {})
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        
        # Invoked plugins
        invoked_plugin = metadata.get('invoked_plugin', {})
        if invoked_plugin:
            if invoked_plugin.get('plugin_id'):
                plugins.add(invoked_plugin['plugin_id'])
            if invoked_plugin.get('namespace'):
                plugins.add(invoked_plugin['namespace'])
        
        # Gizmo usage
        if metadata.get('gizmo_id'):
            gizmos.add(metadata['gizmo_id'])
    
    # Create tags - FIX: Use different attribute name to avoid conflict
    for gizmo in gizmos:
        tags.append(Tag('gizmo', gizmo_id=gizmo))
    
    for plugin in plugins:
        tags.append(Tag('plugin', plugin_id=plugin))
    
    return tags


# Boolean rule functions for basic content analysis
def has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if conversation has unusually large content anywhere."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        text = content.get('text', '')
        if len(text) > min_length:
            return True
            
        parts = content.get('parts', [])
        for part in parts:
            if isinstance(part, str) and len(part) > min_length:
                return True
    
    return False


def has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for clear code patterns anywhere in conversation."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Strong code indicators
        code_indicators = [
            '```',  # Code blocks
            'def ', 'function ', 'class ',  # Function/class definitions
            'import ', 'from ', 'require(',  # Import statements
            '#!/bin/', '#include', 'using namespace',  # Script headers
        ]
        
        if any(indicator in all_text for indicator in code_indicators):
            return True
            
        # Also check for high density of coding keywords
        coding_keywords = ['function', 'class', 'import', 'def ', 'const ', 'let ', 'var ', 'return', 'if ', 'for ', 'while ']
        keyword_count = sum(1 for keyword in coding_keywords if keyword in all_text.lower())
        if len(all_text) > 1000 and keyword_count >= 3:  # Multiple coding keywords in large text suggest actual code
            return True
    
    return False


def has_github_repos(conversation: Dict[str, Any]) -> bool:
    """Check if GitHub repositories were selected for context."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        repos = metadata.get('selected_github_repos', [])
        if repos:  # Non-empty list
            return True
    
    return False


def has_canvas_operations(conversation: Dict[str, Any]) -> bool:
    """Check for canvas/document operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('canvas'):
            return True
    
    return False


def has_web_search(conversation: Dict[str, Any]) -> bool:
    """Check for web search operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('search_queries') or 
            metadata.get('search_result_groups') or
            metadata.get('content_references')):
            return True
    
    return False


def has_reasoning_thoughts(conversation: Dict[str, Any]) -> bool:
    """Check for reasoning/thinking patterns."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        if content.get('thoughts'):  # Reasoning thoughts
            return True
    
    return False


def has_code_execution(conversation: Dict[str, Any]) -> bool:
    """Check for code execution artifacts."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('aggregate_result') or 
            metadata.get('jupyter_messages')):
            return True
    
    return False


# First user message specific rules
def first_user_has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if the first user message has large content."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    if len(text) > min_length:
        return True
        
    parts = content.get('parts', [])
    for part in parts:
        if isinstance(part, str) and len(part) > min_length:
            return True
    
    return False


def first_user_has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message contains code patterns."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    parts = content.get('parts', [])
    all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
    
    # Strong code indicators
    code_indicators = [
        '```',  # Code blocks
        'def ', 'function ', 'class ',  # Definitions
        'import ', 'from ', 'require(',  # Imports
        '#!/bin/', '#include',  # Script headers
    ]
    
    return any(indicator in all_text for indicator in code_indicators)


def first_user_has_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    return len(attachments) > 0


def first_user_has_code_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has code-related attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    
    for attachment in attachments:
        mime_type = attachment.get('mime_type', '').lower()
        name = attachment.get('name', '').lower()
        
        # Check for code file extensions
        code_extensions = ['.py', '.js', '.java', '.cpp', '.c', '.go', '.rs', '.ts', '.jsx', '.tsx', '.sql', '.sh', '.rb', '.php']
        if any(ext in name for ext in code_extensions):
            return True
            
        # Check for code-related MIME types
        code_mimes = ['text/x-python', 'text/x-java', 'application/javascript', 'text/x-script']
        if any(mime in mime_type for mime in code_mimes):
            return True
    
    return False


def create_default_tagger() -> ConversationTagger:
    """Create a tagger with all previously discussed rules in the enhanced structured framework."""
    tagger = ConversationTagger()
    
    # ===== BASIC CONTENT ANALYSIS RULES (Boolean) =====
    tagger.add_base_rule(
        'large_content', 
        lambda conv: has_large_content(conv, 2000),
        'Content longer than 2000 characters anywhere in conversation'
    )
    
    tagger.add_base_rule(
        'code_patterns', 
        has_code_patterns,
        'Contains clear code patterns (```, def, function, etc.)'
    )
    
    tagger.add_base_rule(
        'github_context',
        has_github_repos,
        'GitHub repositories selected for context'
    )
    
    tagger.add_base_rule(
        'canvas_operations',
        has_canvas_operations,
        'Uses canvas/document features'
    )
    
    tagger.add_base_rule(
        'web_search',
        has_web_search,
        'Includes web search functionality'
    )
    
    tagger.add_base_rule(
        'reasoning',
        has_reasoning_thoughts,
        'Contains reasoning/thinking content'
    )
    
    tagger.add_base_rule(
        'code_execution',
        has_code_execution,
        'Contains code execution (Jupyter, aggregate results)'
    )
    
    # ===== FIRST USER MESSAGE RULES (Boolean) =====
    tagger.add_base_rule(
        'starts_large_content',
        lambda conv: first_user_has_large_content(conv, 2000),
        'First user message has large content (>2000 chars)'
    )
    
    tagger.add_base_rule(
        'starts_code_patterns',
        first_user_has_code_patterns,
        'First user message contains code patterns'
    )
    
    tagger.add_base_rule(
        'starts_with_attachments',
        first_user_has_attachments,
        'First user message has any attachments'
    )
    
    tagger.add_base_rule(
        'starts_code_attachments',
        first_user_has_code_attachments,
        'First user message has code-related attachments'
    )
    
    # ===== STRUCTURED TAG RULES =====
    tagger.add_base_rule(
        'conversation_length',
        create_conversation_length_tag,
        'Conversation length with count and category'
    )
    
    tagger.add_base_rule(
        'prompt_stats',
        create_prompt_stats_tag,
        'User message statistics (length, variance, etc.)'
    )
    
    tagger.add_multi_tag_rule(
        'gizmo_plugin_usage',
        create_gizmo_plugin_tags,
        'Specific gizmos and plugins used in conversation'
    )
    
    # ===== SUPPLEMENTAL RULES (Based on existing tags) =====
    
    # Coding assistance detection
    tagger.add_supplemental_rule(
        'coding_assistance',
        lambda conv, tags: (
            any(tag.name in ['code_patterns', 'github_context', 'code_execution'] for tag in tags)
        ),
        'Likely coding assistance (code patterns, GitHub, or execution)'
    )
    
    tagger.add_supplemental_rule(
        'coding_assistance_start',
        lambda conv, tags: (
            any(tag.name in ['starts_code_patterns', 'starts_large_content', 
                           'starts_with_attachments', 'starts_code_attachments'] for tag in tags)
        ),
        'Likely coding assistance based on how conversation starts'
    )
    
    # Research and analysis patterns  
    tagger.add_supplemental_rule(
        'research_session',
        lambda conv, tags: (
            any(tag.name == 'web_search' for tag in tags) and 
            any(tag.name == 'large_content' for tag in tags)
        ),
        'Research session (web search + substantial content)'
    )
    
    tagger.add_supplemental_rule(
        'complex_analysis',
        lambda conv, tags: (
            any(tag.name == 'reasoning' for tag in tags) and 
            len([tag for tag in tags if tag.name in ['web_search', 'large_content', 'canvas_operations']]) >= 2
        ),
        'Complex analysis (reasoning + multiple advanced features)'
    )
    
    # Context and interaction patterns
    tagger.add_supplemental_rule(
        'context_heavy_start',
        lambda conv, tags: (
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Conversation starts with substantial context'
    )
    
    tagger.add_supplemental_rule(
        'enhanced_conversation',
        lambda conv, tags: (
            any(tag.name in ['gizmo', 'plugin'] for tag in tags)
        ),
        'Uses enhanced features (gizmos or plugins)'
    )
    
    # Length-based classifications using structured tags
    tagger.add_supplemental_rule(
        'brief_interaction',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags)
        ),
        'Brief interaction (1-3 user messages)'
    )
    
    tagger.add_supplemental_rule(
        'extended_conversation',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['long', 'very_long'] for tag in tags)
        ),
        'Extended conversation (11+ user messages)'
    )
    
    # Prompt pattern classifications using structured tags
    tagger.add_supplemental_rule(
        'long_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['long', 'very_long'] for tag in tags)
        ),
        'Conversation has consistently long prompts'
    )
    
    tagger.add_supplemental_rule(
        'short_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['very_short', 'short'] for tag in tags)
        ),
        'Conversation has consistently short prompts'
    )
    
    tagger.add_supplemental_rule(
        'consistent_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'User prompts are consistent in length'
    )
    
    tagger.add_supplemental_rule(
        'variable_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'User prompts vary significantly in length'
    )
    
    # Combined patterns for specific use cases
    tagger.add_supplemental_rule(
        'context_dump',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags) and
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Short conversation starting with large context (likely context dump)'
    )
    
    tagger.add_supplemental_rule(
        'interactive_session',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'Extended back-and-forth conversation with consistent prompt style'
    )
    
    tagger.add_supplemental_rule(
        'evolving_discussion',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'Extended conversation where prompt style evolves'
    )
    
    return tagger


# Enhanced debugging and analysis examples
"""
# Create enhanced tagger with all rules
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(conversations)

# ===== COMPREHENSIVE SUMMARY WITH DEBUGGING =====

# Basic summary (shows all tags with counts and percentages)
tagger.print_summary(tagged_results)

# Detailed breakdown (shows gizmo/plugin specifics, distributions)
tagger.print_detailed_breakdown(tagged_results)

# Debug tag creation for first few conversations
tagger.debug_tag_creation(conversations, max_conversations=3)

# Validate the tag system is working correctly
tagger.validate_tag_system(tagged_results)

# ===== INVESTIGATE MISSING GIZMO/PLUGIN TAGS =====

# If no gizmo/plugin tags show up, let's debug why
print("\\n=== INVESTIGATING GIZMO/PLUGIN DETECTION ===")

# Check raw data for gizmo/plugin fields in first few conversations
for i, conv in enumerate(conversations[:5]):
    print(f"\\nConversation {i+1}: {conv.get('title', 'Untitled')[:50]}...")
    
    # Check conversation-level gizmo/plugin fields
    print(f"  gizmo_id: {conv.get('gizmo_id')}")
    print(f"  plugin_ids: {conv.get('plugin_ids')}")
    
    # Check message-level fields
    mapping = conv.get('mapping', {})
    gizmo_messages = []
    plugin_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('gizmo_id'):
            gizmo_messages.append(metadata['gizmo_id'])
        if metadata.get('invoked_plugin'):
            plugin_messages.append(metadata['invoked_plugin'])
    
    if gizmo_messages:
        print(f"  message-level gizmos: {gizmo_messages}")
    if plugin_messages:
        print(f"  message-level plugins: {plugin_messages}")
    
    if not any([conv.get('gizmo_id'), conv.get('plugin_ids'), gizmo_messages, plugin_messages]):
        print(f"  ✅ No gizmo/plugin usage detected in this conversation")

# Test the gizmo/plugin detection function directly
print("\\n=== TESTING GIZMO/PLUGIN DETECTION FUNCTION ===")
for i, conv in enumerate(conversations[:3]):
    print(f"\\nConversation {i+1}:")
    gizmo_plugin_tags = create_gizmo_plugin_tags(conv)
    if gizmo_plugin_tags:
        print(f"  Generated tags: {[str(tag) for tag in gizmo_plugin_tags]}")
    else:
        print(f"  No gizmo/plugin tags generated")

# ===== ADD CUSTOM DEBUGGING RULES =====

# Add a rule to detect any field that might contain gizmo/plugin info
def debug_gizmo_plugin_fields(conversation: Dict[str, Any]) -> List[str]:
    \"\"\"Debug function to find any gizmo/plugin-related fields.\"\"\"
    found_fields = []
    
    # Check top-level fields
    for key, value in conversation.items():
        if 'gizmo' in key.lower() or 'plugin' in key.lower():
            found_fields.append(f"top_level.{key}={value}")
    
    # Check message metadata
    mapping = conversation.get('mapping', {})
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        for key, value in metadata.items():
            if 'gizmo' in key.lower() or 'plugin' in key.lower():
                found_fields.append(f"metadata.{key}={value}")
    
    return found_fields

# Test field detection
print("\\n=== SEARCHING FOR ANY GIZMO/PLUGIN FIELDS ===")
for i, conv in enumerate(conversations[:10]):
    fields = debug_gizmo_plugin_fields(conv)
    if fields:
        print(f"Conversation {i+1}: {fields}")

# ===== ENHANCED SUMMARY USAGE =====

# Quick summary without details
tagger.print_summary(tagged_results, show_details=False)

# Full detailed analysis
print("\\n" + "="*60)
print("FULL DETAILED ANALYSIS")
print("="*60)

tagger.print_summary(tagged_results, show_details=True)
tagger.print_detailed_breakdown(tagged_results)
tagger.validate_tag_system(tagged_results)

# ===== CONTINUOUS MONITORING =====

# Function to monitor tag distribution changes when you add new rules
def compare_tag_distributions(old_results, new_results):
    \"\"\"Compare tag distributions between two runs.\"\"\"
    old_counts = defaultdict(int)
    new_counts = defaultdict(int)
    
    for result in old_results:
        for tag in result['tags']:
            old_counts[tag.name] += 1
    
    for result in new_results:
        for tag in result['tags']:
            new_counts[tag.name] += 1
    
    all_tags = set(old_counts.keys()) | set(new_counts.keys())
    
    print("\\n=== TAG DISTRIBUTION CHANGES ===")
    for tag in sorted(all_tags):
        old_count = old_counts[tag]
        new_count = new_counts[tag]
        if old_count != new_count:
            change = new_count - old_count
            print(f"{tag}: {old_count} → {new_count} ({change:+d})")

# Usage when you add new rules:
# old_results = tagger.tag_conversations(conversations)
# tagger.add_base_rule('new_rule', some_function)
# new_results = tagger.tag_conversations(conversations)
# compare_tag_distributions(old_results, new_results)

# ===== SPECIFIC ISSUE INVESTIGATION =====

# If you suspect certain conversations should have gizmo/plugin tags but don't:
def investigate_specific_conversation(conversation_id_or_title):
    \"\"\"Deep dive into a specific conversation.\"\"\"
    target_conv = None
    for conv in conversations:
        if (conv.get('conversation_id') == conversation_id_or_title or 
            conversation_id_or_title.lower() in conv.get('title', '').lower()):
            target_conv = conv
            break
    
    if not target_conv:
        print(f"Conversation '{conversation_id_or_title}' not found")
        return
    
    print(f"\\n=== INVESTIGATING: {target_conv.get('title', 'Untitled')} ===")
    
    # Full conversation structure
    print("\\nConversation structure:")
    print(f"  Title: {target_conv.get('title')}")
    print(f"  ID: {target_conv.get('conversation_id')}")
    print(f"  Top-level gizmo_id: {target_conv.get('gizmo_id')}")
    print(f"  Top-level plugin_ids: {target_conv.get('plugin_ids')}")
    
    # Message analysis
    mapping = target_conv.get('mapping', {})
    print(f"\\nMessage mapping has {len(mapping)} nodes")
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if message:
            metadata = message.get('metadata', {})
            gizmo_fields = {k: v for k, v in metadata.items() if 'gizmo' in k.lower()}
            plugin_fields = {k: v for k, v in metadata.items() if 'plugin' in k.lower()}
            
            if gizmo_fields or plugin_fields:
                print(f"  Message {node_id[:8]}...")
                if gizmo_fields:
                    print(f"    Gizmo fields: {gizmo_fields}")
                if plugin_fields:
                    print(f"    Plugin fields: {plugin_fields}")
    
    # Test tagging
    result = tagger.tag_conversation(target_conv)
    print(f"\\nGenerated tags:")
    for tag in result['tags']:
        print(f"  {tag}")
    
    # Show debug info
    print(f"\\nDebug info:")
    for rule in result['debug_info'].get('applied_rules', []):
        print(f"  ✅ {rule}")
    for error in result['debug_info'].get('errors', []):
        print(f"  ⚠️  {error}")

# Example usage:
# investigate_specific_conversation("some conversation title")
# investigate_specific_conversation("conv-id-12345")
"""

'\n# Create enhanced tagger with all rules\ntagger = create_default_tagger()\ntagged_results = tagger.tag_conversations(conversations)\n\n# ===== COMPREHENSIVE SUMMARY WITH DEBUGGING =====\n\n# Basic summary (shows all tags with counts and percentages)\ntagger.print_summary(tagged_results)\n\n# Detailed breakdown (shows gizmo/plugin specifics, distributions)\ntagger.print_detailed_breakdown(tagged_results)\n\n# Debug tag creation for first few conversations\ntagger.debug_tag_creation(conversations, max_conversations=3)\n\n# Validate the tag system is working correctly\ntagger.validate_tag_system(tagged_results)\n\n# ===== INVESTIGATE MISSING GIZMO/PLUGIN TAGS =====\n\n# If no gizmo/plugin tags show up, let\'s debug why\nprint("\\n=== INVESTIGATING GIZMO/PLUGIN DETECTION ===")\n\n# Check raw data for gizmo/plugin fields in first few conversations\nfor i, conv in enumerate(conversations[:5]):\n    print(f"\\nConversation {i+1}: {conv.get(\'title\', \'Untitled\')[:50]}...")\n\n    # Check

In [21]:
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(convs)
tagger.print_summary(tagged_results)

Tagged 1673 conversations

=== TAG SUMMARY ===
prompt_stats: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    mean: avg=1142.4, range=[1.0, 184662.5]
    median: avg=814.2, range=[1.0, 184662.5]
    variance: avg=51563065.2, range=[0, 34065162056.2]
    values: consistency=consistent, consistency=mixed, consistency=variable, length_category=long, length_category=medium, length_category=short, length_category=very_long, length_category=very_short
conversation_length: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    values: category=long, category=medium, category=short, category=single, category=very_long
coding_assistance: 1551 (92.7%)
code_patterns: 1546 (92.4%)
large_content: 1362 (81.4%)
short_prompts: 1261 (75.4%)
brief_interaction: 906 (54.2%)
consistent_prompts: 896 (53.6%)
enhanced_conversation: 681 (40.7%)
gizmo: 676 (40.4%)
    values: gizmo_id=g-IibMsD7w8, gizmo_id=g-KpF6lTka3, gizmo_id=g-QsUj0Smzg, gizmo_id=g-WiEAUBGzb, gizmo_id=g-bWPVPw7oK, gizmo_id=g-pYtHuQdGh
vari

In [16]:

# If no gizmo/plugin tags show up, let's debug why
print("\\n=== INVESTIGATING GIZMO/PLUGIN DETECTION ===")

# Check raw data for gizmo/plugin fields in first few conversations
for i, conv in enumerate(convs[:25]):
    print(f"\\nConversation {i+1}: {conv.get('title', 'Untitled')[:50]}...")
    
    # Check conversation-level gizmo/plugin fields
    print(f"  gizmo_id: {conv.get('gizmo_id')}")
    print(f"  plugin_ids: {conv.get('plugin_ids')}")
    
    # Check message-level fields
    mapping = conv.get('mapping', {})
    gizmo_messages = []
    plugin_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('gizmo_id'):
            gizmo_messages.append(metadata['gizmo_id'])
        if metadata.get('invoked_plugin'):
            plugin_messages.append(metadata['invoked_plugin'])
    
    if gizmo_messages:
        print(f"  message-level gizmos: {gizmo_messages}")
    if plugin_messages:
        print(f"  message-level plugins: {plugin_messages}")
    
    if not any([conv.get('gizmo_id'), conv.get('plugin_ids'), gizmo_messages, plugin_messages]):
        print(f"  ✅ No gizmo/plugin usage detected in this conversation")


\n=== INVESTIGATING GIZMO/PLUGIN DETECTION ===
\nConversation 1: SCOTUS Justices Info...
  gizmo_id: None
  plugin_ids: None
  ✅ No gizmo/plugin usage detected in this conversation
\nConversation 2: MOTU Meaning Explained...
  gizmo_id: None
  plugin_ids: None
  ✅ No gizmo/plugin usage detected in this conversation
\nConversation 3: Poker Face Glasses Inquiry...
  gizmo_id: None
  plugin_ids: None
  ✅ No gizmo/plugin usage detected in this conversation
\nConversation 4: OPM Proposed Rule Summary...
  gizmo_id: None
  plugin_ids: None
  ✅ No gizmo/plugin usage detected in this conversation
\nConversation 5: Post Tracking Fixes...
  gizmo_id: None
  plugin_ids: None
  ✅ No gizmo/plugin usage detected in this conversation
\nConversation 6: Reddit Data Pipeline...
  gizmo_id: None
  plugin_ids: None
  ✅ No gizmo/plugin usage detected in this conversation
\nConversation 7: JAX SLURM Setup...
  gizmo_id: None
  plugin_ids: None
  ✅ No gizmo/plugin usage detected in this conversation
\nConver

In [17]:
conversations = convs

In [18]:
# Quick test of gizmo detection
test_conv = next(conv for conv in conversations if conv.get('gizmo_id'))
print(f"Testing: {test_conv.get('title')}")
print(f"Gizmo ID: {test_conv.get('gizmo_id')}")

try:
    tags = create_gizmo_plugin_tags(test_conv)
    print(f"Function returned: {tags}")
    print(f"Tag types: {[type(tag) for tag in tags]}")
except Exception as e:
    print(f"Exception in create_gizmo_plugin_tags: {e}")
    import traceback
    traceback.print_exc()

Testing: Salami Tactics Overview
Gizmo ID: g-IibMsD7w8
Exception in create_gizmo_plugin_tags: Tag.__init__() got multiple values for argument 'name'


Traceback (most recent call last):
  File "/var/folders/l6/nkl_x29x4n37bxnfcr7rt68m0000gn/T/ipykernel_43229/744364887.py", line 7, in <module>
    tags = create_gizmo_plugin_tags(test_conv)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/l6/nkl_x29x4n37bxnfcr7rt68m0000gn/T/ipykernel_43229/3884093179.py", line 578, in create_gizmo_plugin_tags
    tags.append(Tag('gizmo', name=gizmo))
                ^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Tag.__init__() got multiple values for argument 'name'


In [22]:
# enhanced_conversation_tagger.py
"""
Enhanced conversation tagging system with faceting capabilities.
Allows analysis of tag distributions across different facets (gizmos, conversation types, etc.).
"""

from typing import Dict, Any, List, Callable, Set, Union, Optional, Tuple
from collections import defaultdict
import json


class Tag:
    """Represents a tag with optional key-value attributes."""
    
    def __init__(self, name: str, **attributes):
        self.name = name
        self.attributes = attributes
    
    def __str__(self):
        if self.attributes:
            attrs_str = ", ".join(f"{k}={v}" for k, v in self.attributes.items())
            return f"{self.name}({attrs_str})"
        return self.name
    
    def __repr__(self):
        return f"Tag('{self.name}', {self.attributes})"
    
    def __eq__(self, other):
        if isinstance(other, str):
            return self.name == other
        elif isinstance(other, Tag):
            return self.name == other.name and self.attributes == other.attributes
        return False
    
    def __hash__(self):
        return hash((self.name, tuple(sorted(self.attributes.items()))))
    
    def matches(self, name: str, **criteria) -> bool:
        """Check if tag matches name and optional attribute criteria."""
        if self.name != name:
            return False
        
        for key, value in criteria.items():
            if key not in self.attributes:
                return False
            
            attr_value = self.attributes[key]
            
            # Support comparison operators
            if isinstance(value, dict):
                for op, target in value.items():
                    if op == 'gt' and not (attr_value > target):
                        return False
                    elif op == 'gte' and not (attr_value >= target):
                        return False
                    elif op == 'lt' and not (attr_value < target):
                        return False
                    elif op == 'lte' and not (attr_value <= target):
                        return False
                    elif op == 'eq' and not (attr_value == target):
                        return False
                    elif op == 'ne' and not (attr_value != target):
                        return False
                    elif op == 'in' and not (attr_value in target):
                        return False
            else:
                # Direct equality
                if attr_value != value:
                    return False
        
        return True


class ConversationTagger:
    """
    Enhanced tagging system supporting structured tags with attributes and faceting.
    """
    
    def __init__(self):
        self.base_rules: Dict[str, Callable] = {}
        self.multi_tag_rules: Dict[str, Callable] = {}
        self.supplemental_rules: Dict[str, Callable] = {}
        self.rule_descriptions: Dict[str, str] = {}
    
    def add_base_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a base tagging rule that returns bool or Tag object."""
        self.base_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def add_multi_tag_rule(self, rule_name: str, rule_function: Callable, description: str = ""):
        """Add a rule that returns multiple tags (strings or Tag objects)."""
        self.multi_tag_rules[rule_name] = rule_function
        self.rule_descriptions[rule_name] = description
    
    def add_supplemental_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a supplemental rule that depends on existing tags."""
        self.supplemental_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def _normalize_tag(self, tag: Union[str, Tag]) -> Tag:
        """Convert string tags to Tag objects."""
        if isinstance(tag, str):
            return Tag(tag)
        return tag
    
    def tag_conversation(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
        """Apply all tagging rules to a conversation."""
        tags = set()
        debug_info = defaultdict(list)
        
        # Apply base rules
        for tag_name, rule_func in self.base_rules.items():
            try:
                result = rule_func(conversation)
                if result:
                    if isinstance(result, bool):
                        tag = Tag(tag_name)
                    else:
                        tag = self._normalize_tag(result)
                    tags.add(tag)
                    debug_info['applied_rules'].append(f"BASE: {tag}")
                else:
                    debug_info['skipped_rules'].append(f"BASE: {tag_name}")
            except Exception as e:
                debug_info['errors'].append(f"BASE: {tag_name} - {str(e)}")
        
        # Apply multi-tag rules
        for rule_name, rule_func in self.multi_tag_rules.items():
            try:
                new_tags = rule_func(conversation)
                if new_tags:
                    normalized_tags = [self._normalize_tag(tag) for tag in new_tags]
                    tags.update(normalized_tags)
                    debug_info['applied_rules'].append(f"MULTI: {rule_name} -> {[str(t) for t in normalized_tags]}")
                else:
                    debug_info['skipped_rules'].append(f"MULTI: {rule_name}")
            except Exception as e:
                debug_info['errors'].append(f"MULTI: {rule_name} - {str(e)}")
        
        # Apply supplemental rules
        max_iterations = 5
        for iteration in range(max_iterations):
            initial_tag_count = len(tags)
            
            for tag_name, rule_func in self.supplemental_rules.items():
                # Check if tag already exists
                if any(tag.name == tag_name for tag in tags):
                    continue
                    
                try:
                    result = rule_func(conversation, tags)
                    if result:
                        if isinstance(result, bool):
                            tag = Tag(tag_name)
                        else:
                            tag = self._normalize_tag(result)
                        tags.add(tag)
                        debug_info['applied_rules'].append(f"SUPP: {tag} (iter {iteration})")
                    else:
                        debug_info['skipped_rules'].append(f"SUPP: {tag_name} (iter {iteration})")
                except Exception as e:
                    debug_info['errors'].append(f"SUPP: {tag_name} - {str(e)}")
            
            if len(tags) == initial_tag_count:
                break
        
        return {
            'conversation_id': conversation.get('conversation_id', conversation.get('id', 'unknown')),
            'title': conversation.get('title', 'Untitled'),
            'tags': list(tags),
            'debug_info': dict(debug_info),
            'conversation': conversation
        }
    
    def tag_conversations(self, conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Tag multiple conversations."""
        return [self.tag_conversation(conv) for conv in conversations]
    
    def filter_by_tags(self, tagged_conversations: List[Dict[str, Any]], 
                      include_tags: List[Union[str, Dict]] = None,
                      exclude_tags: List[Union[str, Dict]] = None) -> List[Dict[str, Any]]:
        """
        Filter conversations by tags with attribute support.
        
        Args:
            include_tags: List of tag names or dicts with criteria
                Examples: ['web_search', {'name': 'gizmo', 'type': 'dalle'}]
            exclude_tags: Similar format for exclusions
        """
        filtered = []
        
        for tagged_conv in tagged_conversations:
            tags = tagged_conv['tags']
            
            # Check exclusions first
            if exclude_tags:
                should_exclude = False
                for exclude_criterion in exclude_tags:
                    if self._matches_criterion(tags, exclude_criterion):
                        should_exclude = True
                        break
                if should_exclude:
                    continue
            
            # Check inclusions
            if include_tags:
                should_include = True
                for include_criterion in include_tags:
                    if not self._matches_criterion(tags, include_criterion):
                        should_include = False
                        break
                if not should_include:
                    continue
            
            filtered.append(tagged_conv)
        
        return filtered
    
    def _matches_criterion(self, tags: List[Tag], criterion: Union[str, Dict]) -> bool:
        """Check if any tag matches the given criterion."""
        if isinstance(criterion, str):
            return any(tag.name == criterion for tag in tags)
        
        elif isinstance(criterion, dict):
            name = criterion.get('name')
            if not name:
                return False
            
            criteria = {k: v for k, v in criterion.items() if k != 'name'}
            return any(tag.matches(name, **criteria) for tag in tags)
        
        return False
    
    def get_facet_value(self, tags: List[Tag], facet_tag_name: str, 
                       facet_attribute: Optional[str] = None) -> str:
        """Extract facet value from a conversation's tags."""
        matching_tags = [tag for tag in tags if tag.name == facet_tag_name]
        
        if not matching_tags:
            return "<none>"
        
        if facet_attribute is None:
            # Just check for presence of the tag
            return f"has_{facet_tag_name}"
        
        # Extract specific attribute values
        values = []
        for tag in matching_tags:
            if facet_attribute in tag.attributes:
                values.append(str(tag.attributes[facet_attribute]))
        
        if not values:
            return f"<{facet_tag_name}_no_{facet_attribute}>"
        
        # If multiple values, join them
        return "; ".join(sorted(set(values)))
    
    def facet_conversations(self, tagged_conversations: List[Dict[str, Any]], 
                           facet_tag_name: str, 
                           facet_attribute: Optional[str] = None,
                           max_facets: int = 50) -> Dict[str, List[Dict[str, Any]]]:
        """
        Group conversations by facet values.
        
        Args:
            facet_tag_name: Tag name to facet by (e.g., 'gizmo')
            facet_attribute: Optional attribute within tag (e.g., 'gizmo_id')
            max_facets: Maximum number of facet values to show
            
        Returns:
            Dictionary mapping facet values to lists of conversations
        """
        facets = defaultdict(list)
        
        for tagged_conv in tagged_conversations:
            facet_value = self.get_facet_value(tagged_conv['tags'], facet_tag_name, facet_attribute)
            facets[facet_value].append(tagged_conv)
        
        # Sort by facet size (largest first) and limit
        sorted_facets = dict(sorted(facets.items(), key=lambda x: len(x[1]), reverse=True))
        
        if len(sorted_facets) > max_facets:
            # Keep top facets and group rest into "others"
            items = list(sorted_facets.items())
            top_facets = dict(items[:max_facets-1])
            
            other_conversations = []
            for _, conversations in items[max_facets-1:]:
                other_conversations.extend(conversations)
            
            if other_conversations:
                top_facets["<other>"] = other_conversations
            
            return top_facets
        
        return sorted_facets
    
    def print_faceted_summary(self, tagged_conversations: List[Dict[str, Any]], 
                             facet_tag_name: str, 
                             facet_attribute: Optional[str] = None,
                             show_details: bool = False,
                             max_facets: int = 20,
                             max_tags_per_facet: int = 15):
        """
        Print tag summary broken down by facets.
        
        Args:
            facet_tag_name: Tag to facet by
            facet_attribute: Optional attribute within the tag  
            show_details: Show detailed tag attribute info
            max_facets: Maximum facets to show
            max_tags_per_facet: Maximum tags to show per facet
        """
        total = len(tagged_conversations)
        facets = self.facet_conversations(tagged_conversations, facet_tag_name, facet_attribute, max_facets)
        
        print(f"Tagged {total} conversations")
        print(f"Faceted by: {facet_tag_name}" + 
              (f".{facet_attribute}" if facet_attribute else ""))
        print(f"Found {len(facets)} facet values")
        
        print(f"\n{'='*80}")
        print(f"FACETED TAG SUMMARY")
        print(f"{'='*80}")
        
        for facet_value, facet_conversations in facets.items():
            facet_size = len(facet_conversations)
            facet_percentage = (facet_size / total) * 100
            
            print(f"\n📊 FACET: {facet_value}")
            print(f"    Conversations: {facet_size} ({facet_percentage:.1f}% of total)")
            print(f"    {'-' * 60}")
            
            # Calculate tag statistics for this facet
            tag_counts = defaultdict(int)
            tag_attributes = defaultdict(lambda: defaultdict(list))
            unique_structured_tags = defaultdict(set)
            
            for tagged_conv in facet_conversations:
                for tag in tagged_conv['tags']:
                    tag_counts[tag.name] += 1
                    
                    # Collect attribute information
                    for attr_name, attr_value in tag.attributes.items():
                        if isinstance(attr_value, (int, float)):
                            tag_attributes[tag.name][attr_name].append(attr_value)
                        else:
                            unique_structured_tags[tag.name].add(f"{attr_name}={attr_value}")
            
            # Sort and limit tags for this facet
            sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
            display_tags = sorted_tags[:max_tags_per_facet]
            
            for tag_name, count in display_tags:
                percentage = (count / facet_size) * 100
                print(f"    {tag_name}: {count} ({percentage:.1f}%)")
                
                if show_details:
                    # Show numeric attribute statistics
                    if tag_name in tag_attributes:
                        for attr_name, values in tag_attributes[tag_name].items():
                            if values:
                                avg_val = sum(values) / len(values)
                                min_val = min(values)
                                max_val = max(values)
                                print(f"        {attr_name}: avg={avg_val:.1f}, range=[{min_val}, {max_val}]")
                    
                    # Show unique structured values
                    if tag_name in unique_structured_tags:
                        unique_vals = sorted(unique_structured_tags[tag_name])
                        if len(unique_vals) <= 5:
                            print(f"        values: {', '.join(unique_vals)}")
                        else:
                            print(f"        values: {', '.join(unique_vals[:5])} ... (+{len(unique_vals)-5} more)")
            
            if len(sorted_tags) > max_tags_per_facet:
                remaining = len(sorted_tags) - max_tags_per_facet
                print(f"    ... and {remaining} more tags")
    
    def compare_facets(self, tagged_conversations: List[Dict[str, Any]], 
                      facet_tag_name: str, 
                      facet_attribute: Optional[str] = None,
                      comparison_tags: List[str] = None,
                      min_facet_size: int = 10) -> None:
        """
        Compare specific tags across facets.
        
        Args:
            comparison_tags: List of tags to compare across facets
            min_facet_size: Minimum facet size to include in comparison
        """
        facets = self.facet_conversations(tagged_conversations, facet_tag_name, facet_attribute)
        
        # Filter facets by minimum size
        large_facets = {k: v for k, v in facets.items() if len(v) >= min_facet_size}
        
        if not large_facets:
            print(f"No facets with at least {min_facet_size} conversations found")
            return
        
        # If no specific tags provided, use most common tags overall
        if comparison_tags is None:
            overall_tag_counts = defaultdict(int)
            for tagged_conv in tagged_conversations:
                for tag in tagged_conv['tags']:
                    overall_tag_counts[tag.name] += 1
            
            # Get top 10 most common tags
            comparison_tags = [tag for tag, _ in 
                             sorted(overall_tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]]
        
        print(f"\n{'='*80}")
        print(f"FACET COMPARISON")
        print(f"Comparing tags: {', '.join(comparison_tags)}")
        print(f"Across facets: {facet_tag_name}" + 
              (f".{facet_attribute}" if facet_attribute else ""))
        print(f"{'='*80}")
        
        # Calculate percentages for each tag in each facet
        results = {}
        for facet_value, facet_conversations in large_facets.items():
            facet_size = len(facet_conversations)
            facet_tag_counts = defaultdict(int)
            
            for tagged_conv in facet_conversations:
                for tag in tagged_conv['tags']:
                    facet_tag_counts[tag.name] += 1
            
            results[facet_value] = {
                'size': facet_size,
                'percentages': {tag: (facet_tag_counts[tag] / facet_size) * 100 
                               for tag in comparison_tags}
            }
        
        # Print comparison table
        print(f"\n{'Facet':<30} {'Size':<8} " + 
              "".join(f"{tag:<15}" for tag in comparison_tags))
        print("-" * (30 + 8 + 15 * len(comparison_tags)))
        
        for facet_value, data in results.items():
            facet_display = facet_value[:28] + ".." if len(facet_value) > 30 else facet_value
            row = f"{facet_display:<30} {data['size']:<8} "
            row += "".join(f"{data['percentages'][tag]:<15.1f}" for tag in comparison_tags)
            print(row)
        
        # Highlight interesting differences
        print(f"\n🔍 NOTABLE DIFFERENCES:")
        for tag in comparison_tags:
            percentages = [results[facet]['percentages'][tag] for facet in results.keys()]
            if max(percentages) - min(percentages) > 20:  # 20% difference threshold
                max_facet = max(results.keys(), key=lambda f: results[f]['percentages'][tag])
                min_facet = min(results.keys(), key=lambda f: results[f]['percentages'][tag])
                print(f"    {tag}: {max_facet} ({results[max_facet]['percentages'][tag]:.1f}%) vs " +
                      f"{min_facet} ({results[min_facet]['percentages'][tag]:.1f}%)")
    
    def get_tag_values(self, tagged_conversations: List[Dict[str, Any]], 
                      tag_name: str, attribute: str) -> List[Any]:
        """Extract attribute values from tags across conversations."""
        values = []
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                if tag.name == tag_name and attribute in tag.attributes:
                    values.append(tag.attributes[attribute])
        return values
    
    def print_summary(self, tagged_conversations: List[Dict[str, Any]], show_details: bool = True):
        """Print comprehensive summary with all tag types and optional details."""
        total = len(tagged_conversations)
        tag_counts = defaultdict(int)
        tag_attributes = defaultdict(lambda: defaultdict(list))
        unique_structured_tags = defaultdict(set)
        
        # Collect all tag information
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                tag_counts[tag.name] += 1
                
                # Collect attribute information
                for attr_name, attr_value in tag.attributes.items():
                    if isinstance(attr_value, (int, float)):
                        tag_attributes[tag.name][attr_name].append(attr_value)
                    else:
                        # For non-numeric attributes, track unique values
                        unique_structured_tags[tag.name].add(f"{attr_name}={attr_value}")
        
        print(f"Tagged {total} conversations")
        print(f"\n=== TAG SUMMARY ===")
        
        # Sort tags by frequency for better readability
        sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
        
        for tag_name, count in sorted_tags:
            percentage = (count / total) * 100
            print(f"{tag_name}: {count} ({percentage:.1f}%)")
            
            if show_details:
                # Show numeric attribute statistics
                if tag_name in tag_attributes:
                    for attr_name, values in tag_attributes[tag_name].items():
                        if values:
                            avg_val = sum(values) / len(values)
                            min_val = min(values)
                            max_val = max(values)
                            print(f"    {attr_name}: avg={avg_val:.1f}, range=[{min_val}, {max_val}]")
                
                # Show unique structured values for non-numeric attributes
                if tag_name in unique_structured_tags:
                    unique_vals = sorted(unique_structured_tags[tag_name])
                    if len(unique_vals) <= 10:  # Show all if not too many
                        print(f"    values: {', '.join(unique_vals)}")
                    else:  # Show top 10 most common
                        print(f"    values: {', '.join(unique_vals[:10])} ... (+{len(unique_vals)-10} more)")


# Example usage functions to demonstrate the faceting capabilities

def demo_faceting_usage():
    """
    Demonstrate how to use the new faceting capabilities.
    """
    print("""
# FACETING USAGE EXAMPLES

# 1. Facet by gizmo presence (with/without gizmos)
tagger.print_faceted_summary(tagged_results, 'gizmo')

# 2. Facet by specific gizmo IDs  
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')

# 3. Facet by conversation length categories
tagger.print_faceted_summary(tagged_results, 'conversation_length', 'category')

# 4. Facet by prompt length categories
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'length_category')

# 5. Facet by prompt consistency patterns
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'consistency')

# 6. Compare coding patterns across gizmos
tagger.compare_facets(tagged_results, 'gizmo', 'gizmo_id', 
                     comparison_tags=['coding_assistance', 'code_patterns', 'web_search'])

# 7. Compare conversation patterns across length categories
tagger.compare_facets(tagged_results, 'conversation_length', 'category',
                     comparison_tags=['coding_assistance', 'web_search', 'large_content'])

# 8. Show detailed faceted summary
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id', 
                            show_details=True, max_facets=10)

# 9. Focus on specific gizmo usage patterns
gizmo_conversations = tagger.filter_by_tags(tagged_results, include_tags=['gizmo'])
tagger.print_faceted_summary(gizmo_conversations, 'gizmo', 'gizmo_id')

# 10. Analyze how conversation patterns differ by whether they start with code
tagger.print_faceted_summary(tagged_results, 'starts_code_patterns')

# 11. Multi-level analysis: first facet by conversation length, then by gizmo within each
for length_cat in ['single', 'short', 'medium', 'long', 'very_long']:
    length_conversations = tagger.filter_by_tags(
        tagged_results, 
        include_tags=[{'name': 'conversation_length', 'category': length_cat}]
    )
    if length_conversations:
        print(f"\\n=== GIZMO USAGE WITHIN {length_cat.upper()} CONVERSATIONS ===")
        tagger.print_faceted_summary(length_conversations, 'gizmo', 'gizmo_id', max_facets=5)
    """)

# if __name__ == "__main__":
#     demo_faceting_usage()

In [23]:
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(convs)
tagger.print_summary(tagged_results)

Tagged 1673 conversations

=== TAG SUMMARY ===
prompt_stats: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    mean: avg=1142.4, range=[1.0, 184662.5]
    median: avg=814.2, range=[1.0, 184662.5]
    variance: avg=51563065.2, range=[0, 34065162056.2]
    values: consistency=consistent, consistency=mixed, consistency=variable, length_category=long, length_category=medium, length_category=short, length_category=very_long, length_category=very_short
conversation_length: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    values: category=long, category=medium, category=short, category=single, category=very_long
coding_assistance: 1551 (92.7%)
code_patterns: 1546 (92.4%)
large_content: 1362 (81.4%)
short_prompts: 1261 (75.4%)
brief_interaction: 906 (54.2%)
consistent_prompts: 896 (53.6%)
enhanced_conversation: 681 (40.7%)
gizmo: 676 (40.4%)
    values: gizmo_id=g-IibMsD7w8, gizmo_id=g-KpF6lTka3, gizmo_id=g-QsUj0Smzg, gizmo_id=g-WiEAUBGzb, gizmo_id=g-bWPVPw7oK, gizmo_id=g-pYtHuQdGh
vari

In [25]:
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')

Tagged 1673 conversations
Faceted by: gizmo.gizmo_id
Found 7 facet values

FACETED TAG SUMMARY

📊 FACET: <none>
    Conversations: 997 (59.6% of total)
    ------------------------------------------------------------
    prompt_stats: 997 (100.0%)
    conversation_length: 997 (100.0%)
    coding_assistance: 891 (89.4%)
    code_patterns: 886 (88.9%)
    large_content: 696 (69.8%)
    short_prompts: 627 (62.9%)
    brief_interaction: 606 (60.8%)
    consistent_prompts: 427 (42.8%)
    variable_prompts: 330 (33.1%)
    coding_assistance_start: 236 (23.7%)
    starts_code_patterns: 216 (21.7%)
    evolving_discussion: 191 (19.2%)
    long_prompts: 139 (13.9%)
    context_heavy_start: 119 (11.9%)
    extended_conversation: 112 (11.2%)
    ... and 12 more tags

📊 FACET: g-IibMsD7w8
    Conversations: 668 (39.9% of total)
    ------------------------------------------------------------
    gizmo: 668 (100.0%)
    prompt_stats: 668 (100.0%)
    enhanced_conversation: 668 (100.0%)
    conversa

In [29]:
# enhanced_conversation_tagger.py
"""
Enhanced conversation tagging system with faceting capabilities.
Allows analysis of tag distributions across different facets (gizmos, conversation types, etc.).
"""

from typing import Dict, Any, List, Callable, Set, Union, Optional, Tuple
from collections import defaultdict
import json


class Tag:
    """Represents a tag with optional key-value attributes."""
    
    def __init__(self, name: str, **attributes):
        self.name = name
        self.attributes = attributes
    
    def __str__(self):
        if self.attributes:
            attrs_str = ", ".join(f"{k}={v}" for k, v in self.attributes.items())
            return f"{self.name}({attrs_str})"
        return self.name
    
    def __repr__(self):
        return f"Tag('{self.name}', {self.attributes})"
    
    def __eq__(self, other):
        if isinstance(other, str):
            return self.name == other
        elif isinstance(other, Tag):
            return self.name == other.name and self.attributes == other.attributes
        return False
    
    def __hash__(self):
        return hash((self.name, tuple(sorted(self.attributes.items()))))
    
    def matches(self, name: str, **criteria) -> bool:
        """Check if tag matches name and optional attribute criteria."""
        if self.name != name:
            return False
        
        for key, value in criteria.items():
            if key not in self.attributes:
                return False
            
            attr_value = self.attributes[key]
            
            # Support comparison operators
            if isinstance(value, dict):
                for op, target in value.items():
                    if op == 'gt' and not (attr_value > target):
                        return False
                    elif op == 'gte' and not (attr_value >= target):
                        return False
                    elif op == 'lt' and not (attr_value < target):
                        return False
                    elif op == 'lte' and not (attr_value <= target):
                        return False
                    elif op == 'eq' and not (attr_value == target):
                        return False
                    elif op == 'ne' and not (attr_value != target):
                        return False
                    elif op == 'in' and not (attr_value in target):
                        return False
            else:
                # Direct equality
                if attr_value != value:
                    return False
        
        return True


class ConversationTagger:
    """
    Enhanced tagging system supporting structured tags with attributes and faceting.
    """
    
    def __init__(self):
        self.base_rules: Dict[str, Callable] = {}
        self.multi_tag_rules: Dict[str, Callable] = {}
        self.supplemental_rules: Dict[str, Callable] = {}
        self.rule_descriptions: Dict[str, str] = {}
    
    def add_base_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a base tagging rule that returns bool or Tag object."""
        self.base_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def add_multi_tag_rule(self, rule_name: str, rule_function: Callable, description: str = ""):
        """Add a rule that returns multiple tags (strings or Tag objects)."""
        self.multi_tag_rules[rule_name] = rule_function
        self.rule_descriptions[rule_name] = description
    
    def add_supplemental_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a supplemental rule that depends on existing tags."""
        self.supplemental_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def _normalize_tag(self, tag: Union[str, Tag]) -> Tag:
        """Convert string tags to Tag objects."""
        if isinstance(tag, str):
            return Tag(tag)
        return tag
    
    def tag_conversation(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
        """Apply all tagging rules to a conversation."""
        tags = set()
        debug_info = defaultdict(list)
        
        # Apply base rules
        for tag_name, rule_func in self.base_rules.items():
            try:
                result = rule_func(conversation)
                if result:
                    if isinstance(result, bool):
                        tag = Tag(tag_name)
                    else:
                        tag = self._normalize_tag(result)
                    tags.add(tag)
                    debug_info['applied_rules'].append(f"BASE: {tag}")
                else:
                    debug_info['skipped_rules'].append(f"BASE: {tag_name}")
            except Exception as e:
                debug_info['errors'].append(f"BASE: {tag_name} - {str(e)}")
        
        # Apply multi-tag rules
        for rule_name, rule_func in self.multi_tag_rules.items():
            try:
                new_tags = rule_func(conversation)
                if new_tags:
                    normalized_tags = [self._normalize_tag(tag) for tag in new_tags]
                    tags.update(normalized_tags)
                    debug_info['applied_rules'].append(f"MULTI: {rule_name} -> {[str(t) for t in normalized_tags]}")
                else:
                    debug_info['skipped_rules'].append(f"MULTI: {rule_name}")
            except Exception as e:
                debug_info['errors'].append(f"MULTI: {rule_name} - {str(e)}")
        
        # Apply supplemental rules
        max_iterations = 5
        for iteration in range(max_iterations):
            initial_tag_count = len(tags)
            
            for tag_name, rule_func in self.supplemental_rules.items():
                # Check if tag already exists
                if any(tag.name == tag_name for tag in tags):
                    continue
                    
                try:
                    result = rule_func(conversation, tags)
                    if result:
                        if isinstance(result, bool):
                            tag = Tag(tag_name)
                        else:
                            tag = self._normalize_tag(result)
                        tags.add(tag)
                        debug_info['applied_rules'].append(f"SUPP: {tag} (iter {iteration})")
                    else:
                        debug_info['skipped_rules'].append(f"SUPP: {tag_name} (iter {iteration})")
                except Exception as e:
                    debug_info['errors'].append(f"SUPP: {tag_name} - {str(e)}")
            
            if len(tags) == initial_tag_count:
                break
        
        return {
            'conversation_id': conversation.get('conversation_id', conversation.get('id', 'unknown')),
            'title': conversation.get('title', 'Untitled'),
            'tags': list(tags),
            'debug_info': dict(debug_info),
            'conversation': conversation
        }
    
    def tag_conversations(self, conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Tag multiple conversations."""
        return [self.tag_conversation(conv) for conv in conversations]
    
    def filter_by_tags(self, tagged_conversations: List[Dict[str, Any]], 
                      include_tags: List[Union[str, Dict]] = None,
                      exclude_tags: List[Union[str, Dict]] = None) -> List[Dict[str, Any]]:
        """
        Filter conversations by tags with attribute support.
        
        Args:
            include_tags: List of tag names or dicts with criteria
                Examples: ['web_search', {'name': 'gizmo', 'type': 'dalle'}]
            exclude_tags: Similar format for exclusions
        """
        filtered = []
        
        for tagged_conv in tagged_conversations:
            tags = tagged_conv['tags']
            
            # Check exclusions first
            if exclude_tags:
                should_exclude = False
                for exclude_criterion in exclude_tags:
                    if self._matches_criterion(tags, exclude_criterion):
                        should_exclude = True
                        break
                if should_exclude:
                    continue
            
            # Check inclusions
            if include_tags:
                should_include = True
                for include_criterion in include_tags:
                    if not self._matches_criterion(tags, include_criterion):
                        should_include = False
                        break
                if not should_include:
                    continue
            
            filtered.append(tagged_conv)
        
        return filtered
    
    def _matches_criterion(self, tags: List[Tag], criterion: Union[str, Dict]) -> bool:
        """Check if any tag matches the given criterion."""
        if isinstance(criterion, str):
            return any(tag.name == criterion for tag in tags)
        
        elif isinstance(criterion, dict):
            name = criterion.get('name')
            if not name:
                return False
            
            criteria = {k: v for k, v in criterion.items() if k != 'name'}
            return any(tag.matches(name, **criteria) for tag in tags)
        
        return False
    
    def get_facet_value(self, tags: List[Tag], facet_tag_name: str, 
                       facet_attribute: Optional[str] = None) -> str:
        """Extract facet value from a conversation's tags."""
        matching_tags = [tag for tag in tags if tag.name == facet_tag_name]
        
        if not matching_tags:
            return "<none>"
        
        if facet_attribute is None:
            # Just check for presence of the tag
            return f"has_{facet_tag_name}"
        
        # Extract specific attribute values
        values = []
        for tag in matching_tags:
            if facet_attribute in tag.attributes:
                values.append(str(tag.attributes[facet_attribute]))
        
        if not values:
            return f"<{facet_tag_name}_no_{facet_attribute}>"
        
        # If multiple values, join them
        return "; ".join(sorted(set(values)))
    
    def facet_conversations(self, tagged_conversations: List[Dict[str, Any]], 
                           facet_tag_name: str, 
                           facet_attribute: Optional[str] = None,
                           max_facets: int = 50) -> Dict[str, List[Dict[str, Any]]]:
        """
        Group conversations by facet values.
        
        Args:
            facet_tag_name: Tag name to facet by (e.g., 'gizmo')
            facet_attribute: Optional attribute within tag (e.g., 'gizmo_id')
            max_facets: Maximum number of facet values to show
            
        Returns:
            Dictionary mapping facet values to lists of conversations
        """
        facets = defaultdict(list)
        
        for tagged_conv in tagged_conversations:
            facet_value = self.get_facet_value(tagged_conv['tags'], facet_tag_name, facet_attribute)
            facets[facet_value].append(tagged_conv)
        
        # Sort by facet size (largest first) and limit
        sorted_facets = dict(sorted(facets.items(), key=lambda x: len(x[1]), reverse=True))
        
        if len(sorted_facets) > max_facets:
            # Keep top facets and group rest into "others"
            items = list(sorted_facets.items())
            top_facets = dict(items[:max_facets-1])
            
            other_conversations = []
            for _, conversations in items[max_facets-1:]:
                other_conversations.extend(conversations)
            
            if other_conversations:
                top_facets["<other>"] = other_conversations
            
            return top_facets
        
        return sorted_facets
    
    def print_faceted_summary(self, tagged_conversations: List[Dict[str, Any]], 
                             facet_tag_name: str, 
                             facet_attribute: Optional[str] = None,
                             show_details: bool = False,
                             max_facets: int = 20):
        """
        Print tag summary broken down by facets.
        
        Args:
            facet_tag_name: Tag to facet by
            facet_attribute: Optional attribute within the tag  
            show_details: Show detailed tag attribute info
            max_facets: Maximum facets to show
        """
        total = len(tagged_conversations)
        facets = self.facet_conversations(tagged_conversations, facet_tag_name, facet_attribute, max_facets)
        
        print(f"Tagged {total} conversations")
        print(f"Faceted by: {facet_tag_name}" + 
              (f".{facet_attribute}" if facet_attribute else ""))
        print(f"Found {len(facets)} facet values")
        
        print(f"\n{'='*80}")
        print(f"FACETED TAG SUMMARY")
        print(f"{'='*80}")
        
        for facet_value, facet_conversations in facets.items():
            facet_size = len(facet_conversations)
            facet_percentage = (facet_size / total) * 100
            
            print(f"\n📊 FACET: {facet_value}")
            print(f"    Conversations: {facet_size} ({facet_percentage:.1f}% of total)")
            print(f"    {'-' * 60}")
            
            # Calculate tag statistics for this facet
            tag_counts = defaultdict(int)
            tag_attributes = defaultdict(lambda: defaultdict(list))
            unique_structured_tags = defaultdict(set)
            
            for tagged_conv in facet_conversations:
                for tag in tagged_conv['tags']:
                    tag_counts[tag.name] += 1
                    
                    # Collect attribute information
                    for attr_name, attr_value in tag.attributes.items():
                        if isinstance(attr_value, (int, float)):
                            tag_attributes[tag.name][attr_name].append(attr_value)
                        else:
                            unique_structured_tags[tag.name].add(f"{attr_name}={attr_value}")
            
            # Sort tags for this facet and show all (no truncation)
            sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
            
            for tag_name, count in sorted_tags:
                percentage = (count / facet_size) * 100
                print(f"    {tag_name}: {count} ({percentage:.1f}%)")
                
                if show_details:
                    # Show numeric attribute statistics
                    if tag_name in tag_attributes:
                        for attr_name, values in tag_attributes[tag_name].items():
                            if values:
                                avg_val = sum(values) / len(values)
                                min_val = min(values)
                                max_val = max(values)
                                print(f"        {attr_name}: avg={avg_val:.1f}, range=[{min_val}, {max_val}]")
                    
                    # Show unique structured values
                    if tag_name in unique_structured_tags:
                        unique_vals = sorted(unique_structured_tags[tag_name])
                        if len(unique_vals) <= 5:
                            print(f"        values: {', '.join(unique_vals)}")
                        else:
                            print(f"        values: {', '.join(unique_vals[:5])} ... (+{len(unique_vals)-5} more)")
    
    def compare_facets(self, tagged_conversations: List[Dict[str, Any]], 
                      facet_tag_name: str, 
                      facet_attribute: Optional[str] = None,
                      comparison_tags: List[str] = None,
                      min_facet_size: int = 10) -> None:
        """
        Compare specific tags across facets.
        
        Args:
            comparison_tags: List of tags to compare across facets
            min_facet_size: Minimum facet size to include in comparison
        """
        facets = self.facet_conversations(tagged_conversations, facet_tag_name, facet_attribute)
        
        # Filter facets by minimum size
        large_facets = {k: v for k, v in facets.items() if len(v) >= min_facet_size}
        
        if not large_facets:
            print(f"No facets with at least {min_facet_size} conversations found")
            return
        
        # If no specific tags provided, use most common tags overall
        if comparison_tags is None:
            overall_tag_counts = defaultdict(int)
            for tagged_conv in tagged_conversations:
                for tag in tagged_conv['tags']:
                    overall_tag_counts[tag.name] += 1
            
            # Get top 10 most common tags
            comparison_tags = [tag for tag, _ in 
                             sorted(overall_tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]]
        
        print(f"\n{'='*80}")
        print(f"FACET COMPARISON")
        print(f"Comparing tags: {', '.join(comparison_tags)}")
        print(f"Across facets: {facet_tag_name}" + 
              (f".{facet_attribute}" if facet_attribute else ""))
        print(f"{'='*80}")
        
        # Calculate percentages for each tag in each facet
        results = {}
        for facet_value, facet_conversations in large_facets.items():
            facet_size = len(facet_conversations)
            facet_tag_counts = defaultdict(int)
            
            for tagged_conv in facet_conversations:
                for tag in tagged_conv['tags']:
                    facet_tag_counts[tag.name] += 1
            
            results[facet_value] = {
                'size': facet_size,
                'percentages': {tag: (facet_tag_counts[tag] / facet_size) * 100 
                               for tag in comparison_tags}
            }
        
        # Print comparison table
        print(f"\n{'Facet':<30} {'Size':<8} " + 
              "".join(f"{tag:<15}" for tag in comparison_tags))
        print("-" * (30 + 8 + 15 * len(comparison_tags)))
        
        for facet_value, data in results.items():
            facet_display = facet_value[:28] + ".." if len(facet_value) > 30 else facet_value
            row = f"{facet_display:<30} {data['size']:<8} "
            row += "".join(f"{data['percentages'][tag]:<15.1f}" for tag in comparison_tags)
            print(row)
        
        # Highlight interesting differences
        print(f"\n🔍 NOTABLE DIFFERENCES:")
        for tag in comparison_tags:
            percentages = [results[facet]['percentages'][tag] for facet in results.keys()]
            if max(percentages) - min(percentages) > 20:  # 20% difference threshold
                max_facet = max(results.keys(), key=lambda f: results[f]['percentages'][tag])
                min_facet = min(results.keys(), key=lambda f: results[f]['percentages'][tag])
                print(f"    {tag}: {max_facet} ({results[max_facet]['percentages'][tag]:.1f}%) vs " +
                      f"{min_facet} ({results[min_facet]['percentages'][tag]:.1f}%)")
    
    def get_tag_values(self, tagged_conversations: List[Dict[str, Any]], 
                      tag_name: str, attribute: str) -> List[Any]:
        """Extract attribute values from tags across conversations."""
        values = []
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                if tag.name == tag_name and attribute in tag.attributes:
                    values.append(tag.attributes[attribute])
        return values
    
    def print_summary(self, tagged_conversations: List[Dict[str, Any]], show_details: bool = True):
        """Print comprehensive summary with all tag types and optional details."""
        total = len(tagged_conversations)
        tag_counts = defaultdict(int)
        tag_attributes = defaultdict(lambda: defaultdict(list))
        unique_structured_tags = defaultdict(set)
        
        # Collect all tag information
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                tag_counts[tag.name] += 1
                
                # Collect attribute information
                for attr_name, attr_value in tag.attributes.items():
                    if isinstance(attr_value, (int, float)):
                        tag_attributes[tag.name][attr_name].append(attr_value)
                    else:
                        # For non-numeric attributes, track unique values
                        unique_structured_tags[tag.name].add(f"{attr_name}={attr_value}")
        
        print(f"Tagged {total} conversations")
        print(f"\n=== TAG SUMMARY ===")
        
        # Sort tags by frequency for better readability
        sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
        
        for tag_name, count in sorted_tags:
            percentage = (count / total) * 100
            print(f"{tag_name}: {count} ({percentage:.1f}%)")
            
            if show_details:
                # Show numeric attribute statistics
                if tag_name in tag_attributes:
                    for attr_name, values in tag_attributes[tag_name].items():
                        if values:
                            avg_val = sum(values) / len(values)
                            min_val = min(values)
                            max_val = max(values)
                            print(f"    {attr_name}: avg={avg_val:.1f}, range=[{min_val}, {max_val}]")
                
                # Show unique structured values for non-numeric attributes
                if tag_name in unique_structured_tags:
                    unique_vals = sorted(unique_structured_tags[tag_name])
                    if len(unique_vals) <= 10:  # Show all if not too many
                        print(f"    values: {', '.join(unique_vals)}")
                    else:  # Show top 10 most common
                        print(f"    values: {', '.join(unique_vals[:10])} ... (+{len(unique_vals)-10} more)")


# Example usage functions to demonstrate the faceting capabilities

def demo_faceting_usage():
    """
    Demonstrate how to use the new faceting capabilities and improved code detection.
    """
    print("""
# FACETING USAGE EXAMPLES

# 1. Facet by gizmo presence (with/without gizmos)
tagger.print_faceted_summary(tagged_results, 'gizmo')

# 2. Facet by specific gizmo IDs (now without truncation)
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')

# 3. Facet by conversation length categories
tagger.print_faceted_summary(tagged_results, 'conversation_length', 'category')

# 4. Facet by prompt length categories
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'length_category')

# 5. Facet by prompt consistency patterns
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'consistency')

# 6. Compare coding patterns across gizmos (with improved detection)
tagger.compare_facets(tagged_results, 'gizmo', 'gizmo_id', 
                     comparison_tags=['definite_coding_assistance', 'strict_code_patterns', 
                                    'documentation_generation', 'wiki_documentation'])

# 7. Compare old vs new code detection
tagger.compare_facets(tagged_results, 'gizmo', 'gizmo_id',
                     comparison_tags=['code_patterns', 'strict_code_patterns', 
                                    'coding_assistance', 'definite_coding_assistance'])

# 8. Investigate specific gizmo patterns
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id', show_details=True)

# 9. Focus on documentation vs coding patterns
tagger.compare_facets(tagged_results, 'wiki_documentation', None,
                     comparison_tags=['code_patterns', 'strict_code_patterns', 'large_content'])

# 10. Multi-level analysis: documentation generation within each gizmo
for gizmo_id in ['g-IibMsD7w8', 'g-KpF6lTka3', 'g-QsUj0Smzg']:  # Replace with actual IDs
    gizmo_conversations = tagger.filter_by_tags(
        tagged_results, 
        include_tags=[{'name': 'gizmo', 'gizmo_id': gizmo_id}]
    )
    if gizmo_conversations:
        print(f"\\n=== CONTENT ANALYSIS FOR GIZMO {gizmo_id} ===")
        tagger.print_faceted_summary(gizmo_conversations, 'wiki_documentation')

# 11. Debug improved code detection for wiki gizmo
wiki_gizmo_conversations = tagger.filter_by_tags(
    tagged_results,
    include_tags=[{'name': 'gizmo', 'gizmo_id': 'g-IibMsD7w8'}]  # Replace with actual wiki gizmo ID
)

print("\\n=== WIKI GIZMO CODE DETECTION ANALYSIS ===")
print("Before improvements vs After improvements:")

old_code_count = len([c for c in wiki_gizmo_conversations 
                     if any(t.name == 'code_patterns' for t in c['tags'])])
new_strict_count = len([c for c in wiki_gizmo_conversations 
                       if any(t.name == 'strict_code_patterns' for t in c['tags'])])
wiki_doc_count = len([c for c in wiki_gizmo_conversations 
                     if any(t.name == 'wiki_documentation' for t in c['tags'])])

print(f"Original code_patterns detection: {old_code_count} conversations")
print(f"Improved strict_code_patterns: {new_strict_count} conversations") 
print(f"Wiki/documentation detection: {wiki_doc_count} conversations")
print(f"Reduction in false positives: {old_code_count - new_strict_count} conversations")
    """)


def debug_code_detection_improvements():
    """
    Debug function to compare old vs new code detection on specific conversations.
    """
    print("""
# DEBUGGING CODE DETECTION IMPROVEMENTS

# Test improved detection on wiki gizmo conversations
def test_detection_on_sample(conversations, sample_size=10):
    for i, conv in enumerate(conversations[:sample_size]):
        print(f"\\n--- Conversation {i+1}: {conv.get('title', 'Untitled')[:50]}... ---")
        
        # Test old detection
        old_result = has_code_patterns_old(conv)  # You'd need to save the old function
        
        # Test new detection  
        new_result = has_code_patterns(conv)
        strict_result = has_strict_code_patterns(conv)
        wiki_result = has_wiki_documentation_patterns(conv)
        
        print(f"Old code detection: {old_result}")
        print(f"New code detection: {new_result}")
        print(f"Strict code detection: {strict_result}")
        print(f"Wiki documentation: {wiki_result}")
        
        if old_result != new_result:
            print("🔄 DETECTION CHANGED!")
            
        if old_result and wiki_result:
            print("📖 Likely false positive caught by wiki detection")

# Usage:
# wiki_conversations = tagger.filter_by_tags(tagged_results, 
#                                           include_tags=[{'name': 'gizmo', 'gizmo_id': 'g-IibMsD7w8'}])
# test_detection_on_sample(wiki_conversations)
    """)

if __name__ == "__main__":
    demo_faceting_usage()
    debug_code_detection_improvements()


# FACETING USAGE EXAMPLES

# 1. Facet by gizmo presence (with/without gizmos)
tagger.print_faceted_summary(tagged_results, 'gizmo')

# 2. Facet by specific gizmo IDs (now without truncation)
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')

# 3. Facet by conversation length categories
tagger.print_faceted_summary(tagged_results, 'conversation_length', 'category')

# 4. Facet by prompt length categories
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'length_category')

# 5. Facet by prompt consistency patterns
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'consistency')

# 6. Compare coding patterns across gizmos (with improved detection)
tagger.compare_facets(tagged_results, 'gizmo', 'gizmo_id', 
                     comparison_tags=['definite_coding_assistance', 'strict_code_patterns', 
                                    'documentation_generation', 'wiki_documentation'])

# 7. Compare old vs new code detection
tagger.compare_facets(tagge

In [32]:

# Enhanced rule functions for all previously discussed tagging rules

def get_all_user_messages(conversation: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Get all user messages in chronological order."""
    mapping = conversation.get('mapping', {})
    user_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        author = message.get('author', {})
        if author.get('role') == 'user':
            create_time = message.get('create_time') or 0
            user_messages.append((create_time, message))
    
    user_messages.sort(key=lambda x: x[0])
    return [msg for _, msg in user_messages]


def get_first_user_message(conversation: Dict[str, Any]) -> Dict[str, Any]:
    """Find the first user message in the conversation."""
    user_messages = get_all_user_messages(conversation)
    return user_messages[0] if user_messages else None


def create_conversation_length_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for conversation length."""
    user_count = len(get_all_user_messages(conversation))
    
    # Determine category
    if user_count == 1:
        category = 'single'
    elif user_count <= 3:
        category = 'short'
    elif user_count <= 10:
        category = 'medium'
    elif user_count <= 25:
        category = 'long'
    else:
        category = 'very_long'
    
    return Tag('conversation_length', count=user_count, category=category)


def create_prompt_stats_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for prompt statistics."""
    user_messages = get_all_user_messages(conversation)
    
    if not user_messages:
        return Tag('prompt_stats', count=0, mean=0, median=0, variance=0, 
                  length_category='none', consistency='none')
    
    # Calculate message lengths
    lengths = []
    for message in user_messages:
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        lengths.append(len(all_text))
    
    # Calculate statistics
    mean_length = sum(lengths) / len(lengths)
    sorted_lengths = sorted(lengths)
    n = len(sorted_lengths)
    median_length = (sorted_lengths[n//2] if n % 2 == 1 
                    else (sorted_lengths[n//2-1] + sorted_lengths[n//2]) / 2)
    variance = sum((x - mean_length) ** 2 for x in lengths) / len(lengths) if len(lengths) > 1 else 0
    
    # Determine categories
    if mean_length < 50:
        length_category = 'very_short'
    elif mean_length < 200:
        length_category = 'short'
    elif mean_length < 1000:
        length_category = 'medium'
    elif mean_length < 3000:
        length_category = 'long'
    else:
        length_category = 'very_long'
    
    if variance < 1000:
        consistency = 'consistent'
    elif variance < 10000:
        consistency = 'mixed'
    else:
        consistency = 'variable'
    
    return Tag('prompt_stats', 
               count=len(lengths),
               mean=round(mean_length, 1),
               median=round(median_length, 1),
               variance=round(variance, 1),
               length_category=length_category,
               consistency=consistency)


def create_gizmo_plugin_tags(conversation: Dict[str, Any]) -> List[Tag]:
    """Create structured tags for gizmos and plugins."""
    tags = []
    gizmos = set()
    plugins = set()
    
    # Check conversation-level
    if conversation.get('gizmo_id'):
        gizmos.add(conversation['gizmo_id'])
    
    plugin_ids = conversation.get('plugin_ids', [])
    if plugin_ids:
        plugins.update(plugin_ids)
    
    # Check message-level
    mapping = conversation.get('mapping', {})
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        
        # Invoked plugins
        invoked_plugin = metadata.get('invoked_plugin', {})
        if invoked_plugin:
            if invoked_plugin.get('plugin_id'):
                plugins.add(invoked_plugin['plugin_id'])
            if invoked_plugin.get('namespace'):
                plugins.add(invoked_plugin['namespace'])
        
        # Gizmo usage
        if metadata.get('gizmo_id'):
            gizmos.add(metadata['gizmo_id'])
    
    # Create tags - FIX: Use different attribute name to avoid conflict
    for gizmo in gizmos:
        tags.append(Tag('gizmo', gizmo_id=gizmo))
    
    for plugin in plugins:
        tags.append(Tag('plugin', plugin_id=plugin))
    
    return tags


# Boolean rule functions for basic content analysis
def has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if conversation has unusually large content anywhere."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        text = content.get('text', '')
        if len(text) > min_length:
            return True
            
        parts = content.get('parts', [])
        for part in parts:
            if isinstance(part, str) and len(part) > min_length:
                return True
    
    return False


def has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for clear code patterns anywhere in conversation."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Strong code indicators
        code_indicators = [
            '```',  # Code blocks
            'def ', 'function ', 'class ',  # Function/class definitions
            'import ', 'from ', 'require(',  # Import statements
            '#!/bin/', '#include', 'using namespace',  # Script headers
        ]
        
        if any(indicator in all_text for indicator in code_indicators):
            return True
            
        # Also check for high density of coding keywords
        coding_keywords = ['function', 'class', 'import', 'def ', 'const ', 'let ', 'var ', 'return', 'if ', 'for ', 'while ']
        keyword_count = sum(1 for keyword in coding_keywords if keyword in all_text.lower())
        if len(all_text) > 1000 and keyword_count >= 3:  # Multiple coding keywords in large text suggest actual code
            return True
    
    return False


def has_github_repos(conversation: Dict[str, Any]) -> bool:
    """Check if GitHub repositories were selected for context."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        repos = metadata.get('selected_github_repos', [])
        if repos:  # Non-empty list
            return True
    
    return False


def has_canvas_operations(conversation: Dict[str, Any]) -> bool:
    """Check for canvas/document operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('canvas'):
            return True
    
    return False


def has_web_search(conversation: Dict[str, Any]) -> bool:
    """Check for web search operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('search_queries') or 
            metadata.get('search_result_groups') or
            metadata.get('content_references')):
            return True
    
    return False


def has_reasoning_thoughts(conversation: Dict[str, Any]) -> bool:
    """Check for reasoning/thinking patterns."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        if content.get('thoughts'):  # Reasoning thoughts
            return True
    
    return False


def has_code_execution(conversation: Dict[str, Any]) -> bool:
    """Check for code execution artifacts."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('aggregate_result') or 
            metadata.get('jupyter_messages')):
            return True
    
    return False


# First user message specific rules
def first_user_has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if the first user message has large content."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    if len(text) > min_length:
        return True
        
    parts = content.get('parts', [])
    for part in parts:
        if isinstance(part, str) and len(part) > min_length:
            return True
    
    return False


def first_user_has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message contains code patterns."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    parts = content.get('parts', [])
    all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
    
    # Strong code indicators
    code_indicators = [
        '```',  # Code blocks
        'def ', 'function ', 'class ',  # Definitions
        'import ', 'from ', 'require(',  # Imports
        '#!/bin/', '#include',  # Script headers
    ]
    
    return any(indicator in all_text for indicator in code_indicators)


def first_user_has_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    return len(attachments) > 0


def first_user_has_code_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has code-related attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    
    for attachment in attachments:
        mime_type = attachment.get('mime_type', '').lower()
        name = attachment.get('name', '').lower()
        
        # Check for code file extensions
        code_extensions = ['.py', '.js', '.java', '.cpp', '.c', '.go', '.rs', '.ts', '.jsx', '.tsx', '.sql', '.sh', '.rb', '.php']
        if any(ext in name for ext in code_extensions):
            return True
            
        # Check for code-related MIME types
        code_mimes = ['text/x-python', 'text/x-java', 'application/javascript', 'text/x-script']
        if any(mime in mime_type for mime in code_mimes):
            return True
    
    return False


def create_default_tagger() -> ConversationTagger:
    """Create a tagger with all previously discussed rules in the enhanced structured framework."""
    tagger = ConversationTagger()
    
    # ===== BASIC CONTENT ANALYSIS RULES (Boolean) =====
    tagger.add_base_rule(
        'large_content', 
        lambda conv: has_large_content(conv, 2000),
        'Content longer than 2000 characters anywhere in conversation'
    )
    
    tagger.add_base_rule(
        'code_patterns', 
        has_code_patterns,
        'Contains clear code patterns (```, def, function, etc.)'
    )
    
    tagger.add_base_rule(
        'github_context',
        has_github_repos,
        'GitHub repositories selected for context'
    )
    
    tagger.add_base_rule(
        'canvas_operations',
        has_canvas_operations,
        'Uses canvas/document features'
    )
    
    tagger.add_base_rule(
        'web_search',
        has_web_search,
        'Includes web search functionality'
    )
    
    tagger.add_base_rule(
        'reasoning',
        has_reasoning_thoughts,
        'Contains reasoning/thinking content'
    )
    
    tagger.add_base_rule(
        'code_execution',
        has_code_execution,
        'Contains code execution (Jupyter, aggregate results)'
    )
    
    # ===== FIRST USER MESSAGE RULES (Boolean) =====
    tagger.add_base_rule(
        'starts_large_content',
        lambda conv: first_user_has_large_content(conv, 2000),
        'First user message has large content (>2000 chars)'
    )
    
    tagger.add_base_rule(
        'starts_code_patterns',
        first_user_has_code_patterns,
        'First user message contains code patterns'
    )
    
    tagger.add_base_rule(
        'starts_with_attachments',
        first_user_has_attachments,
        'First user message has any attachments'
    )
    
    tagger.add_base_rule(
        'starts_code_attachments',
        first_user_has_code_attachments,
        'First user message has code-related attachments'
    )
    
    # ===== STRUCTURED TAG RULES =====
    tagger.add_base_rule(
        'conversation_length',
        create_conversation_length_tag,
        'Conversation length with count and category'
    )
    
    tagger.add_base_rule(
        'prompt_stats',
        create_prompt_stats_tag,
        'User message statistics (length, variance, etc.)'
    )
    
    tagger.add_multi_tag_rule(
        'gizmo_plugin_usage',
        create_gizmo_plugin_tags,
        'Specific gizmos and plugins used in conversation'
    )
    
    # ===== SUPPLEMENTAL RULES (Based on existing tags) =====
    
    # Coding assistance detection
    tagger.add_supplemental_rule(
        'coding_assistance',
        lambda conv, tags: (
            any(tag.name in ['code_patterns', 'github_context', 'code_execution'] for tag in tags)
        ),
        'Likely coding assistance (code patterns, GitHub, or execution)'
    )
    
    tagger.add_supplemental_rule(
        'coding_assistance_start',
        lambda conv, tags: (
            any(tag.name in ['starts_code_patterns', 'starts_large_content', 
                           'starts_with_attachments', 'starts_code_attachments'] for tag in tags)
        ),
        'Likely coding assistance based on how conversation starts'
    )
    
    # Research and analysis patterns  
    tagger.add_supplemental_rule(
        'research_session',
        lambda conv, tags: (
            any(tag.name == 'web_search' for tag in tags) and 
            any(tag.name == 'large_content' for tag in tags)
        ),
        'Research session (web search + substantial content)'
    )
    
    tagger.add_supplemental_rule(
        'complex_analysis',
        lambda conv, tags: (
            any(tag.name == 'reasoning' for tag in tags) and 
            len([tag for tag in tags if tag.name in ['web_search', 'large_content', 'canvas_operations']]) >= 2
        ),
        'Complex analysis (reasoning + multiple advanced features)'
    )
    
    # Context and interaction patterns
    tagger.add_supplemental_rule(
        'context_heavy_start',
        lambda conv, tags: (
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Conversation starts with substantial context'
    )
    
    tagger.add_supplemental_rule(
        'enhanced_conversation',
        lambda conv, tags: (
            any(tag.name in ['gizmo', 'plugin'] for tag in tags)
        ),
        'Uses enhanced features (gizmos or plugins)'
    )
    
    # Length-based classifications using structured tags
    tagger.add_supplemental_rule(
        'brief_interaction',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags)
        ),
        'Brief interaction (1-3 user messages)'
    )
    
    tagger.add_supplemental_rule(
        'extended_conversation',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['long', 'very_long'] for tag in tags)
        ),
        'Extended conversation (11+ user messages)'
    )
    
    # Prompt pattern classifications using structured tags
    tagger.add_supplemental_rule(
        'long_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['long', 'very_long'] for tag in tags)
        ),
        'Conversation has consistently long prompts'
    )
    
    tagger.add_supplemental_rule(
        'short_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['very_short', 'short'] for tag in tags)
        ),
        'Conversation has consistently short prompts'
    )
    
    tagger.add_supplemental_rule(
        'consistent_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'User prompts are consistent in length'
    )
    
    tagger.add_supplemental_rule(
        'variable_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'User prompts vary significantly in length'
    )
    
    # Combined patterns for specific use cases
    tagger.add_supplemental_rule(
        'context_dump',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags) and
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Short conversation starting with large context (likely context dump)'
    )
    
    tagger.add_supplemental_rule(
        'interactive_session',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'Extended back-and-forth conversation with consistent prompt style'
    )
    
    tagger.add_supplemental_rule(
        'evolving_discussion',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'Extended conversation where prompt style evolves'
    )
    
    return tagger


In [33]:
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(convs)
tagger.print_summary(tagged_results)
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')


Tagged 1673 conversations

=== TAG SUMMARY ===
prompt_stats: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    mean: avg=1142.4, range=[1.0, 184662.5]
    median: avg=814.2, range=[1.0, 184662.5]
    variance: avg=51563065.2, range=[0, 34065162056.2]
    values: consistency=consistent, consistency=mixed, consistency=variable, length_category=long, length_category=medium, length_category=short, length_category=very_long, length_category=very_short
conversation_length: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    values: category=long, category=medium, category=short, category=single, category=very_long
coding_assistance: 1551 (92.7%)
code_patterns: 1546 (92.4%)
large_content: 1362 (81.4%)
short_prompts: 1261 (75.4%)
brief_interaction: 906 (54.2%)
consistent_prompts: 896 (53.6%)
enhanced_conversation: 681 (40.7%)
gizmo: 676 (40.4%)
    values: gizmo_id=g-IibMsD7w8, gizmo_id=g-KpF6lTka3, gizmo_id=g-QsUj0Smzg, gizmo_id=g-WiEAUBGzb, gizmo_id=g-bWPVPw7oK, gizmo_id=g-pYtHuQdGh
vari

In [34]:
# Enhanced rule functions for all previously discussed tagging rules

def get_all_user_messages(conversation: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Get all user messages in chronological order."""
    mapping = conversation.get('mapping', {})
    user_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        author = message.get('author', {})
        if author.get('role') == 'user':
            create_time = message.get('create_time') or 0
            user_messages.append((create_time, message))
    
    user_messages.sort(key=lambda x: x[0])
    return [msg for _, msg in user_messages]


def get_first_user_message(conversation: Dict[str, Any]) -> Dict[str, Any]:
    """Find the first user message in the conversation."""
    user_messages = get_all_user_messages(conversation)
    return user_messages[0] if user_messages else None


def create_conversation_length_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for conversation length."""
    user_count = len(get_all_user_messages(conversation))
    
    # Determine category
    if user_count == 1:
        category = 'single'
    elif user_count <= 3:
        category = 'short'
    elif user_count <= 10:
        category = 'medium'
    elif user_count <= 25:
        category = 'long'
    else:
        category = 'very_long'
    
    return Tag('conversation_length', count=user_count, category=category)


def create_prompt_stats_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for prompt statistics."""
    user_messages = get_all_user_messages(conversation)
    
    if not user_messages:
        return Tag('prompt_stats', count=0, mean=0, median=0, variance=0, 
                  length_category='none', consistency='none')
    
    # Calculate message lengths
    lengths = []
    for message in user_messages:
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        lengths.append(len(all_text))
    
    # Calculate statistics
    mean_length = sum(lengths) / len(lengths)
    sorted_lengths = sorted(lengths)
    n = len(sorted_lengths)
    median_length = (sorted_lengths[n//2] if n % 2 == 1 
                    else (sorted_lengths[n//2-1] + sorted_lengths[n//2]) / 2)
    variance = sum((x - mean_length) ** 2 for x in lengths) / len(lengths) if len(lengths) > 1 else 0
    
    # Determine categories
    if mean_length < 50:
        length_category = 'very_short'
    elif mean_length < 200:
        length_category = 'short'
    elif mean_length < 1000:
        length_category = 'medium'
    elif mean_length < 3000:
        length_category = 'long'
    else:
        length_category = 'very_long'
    
    if variance < 1000:
        consistency = 'consistent'
    elif variance < 10000:
        consistency = 'mixed'
    else:
        consistency = 'variable'
    
    return Tag('prompt_stats', 
               count=len(lengths),
               mean=round(mean_length, 1),
               median=round(median_length, 1),
               variance=round(variance, 1),
               length_category=length_category,
               consistency=consistency)


def create_gizmo_plugin_tags(conversation: Dict[str, Any]) -> List[Tag]:
    """Create structured tags for gizmos and plugins."""
    tags = []
    gizmos = set()
    plugins = set()
    
    # Check conversation-level
    if conversation.get('gizmo_id'):
        gizmos.add(conversation['gizmo_id'])
    
    plugin_ids = conversation.get('plugin_ids', [])
    if plugin_ids:
        plugins.update(plugin_ids)
    
    # Check message-level
    mapping = conversation.get('mapping', {})
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        
        # Invoked plugins
        invoked_plugin = metadata.get('invoked_plugin', {})
        if invoked_plugin:
            if invoked_plugin.get('plugin_id'):
                plugins.add(invoked_plugin['plugin_id'])
            if invoked_plugin.get('namespace'):
                plugins.add(invoked_plugin['namespace'])
        
        # Gizmo usage
        if metadata.get('gizmo_id'):
            gizmos.add(metadata['gizmo_id'])
    
    # Create tags
    for gizmo in gizmos:
        tags.append(Tag('gizmo', gizmo_id=gizmo))
    
    for plugin in plugins:
        tags.append(Tag('plugin', plugin_id=plugin))
    
    return tags


# Boolean rule functions for basic content analysis
def has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if conversation has unusually large content anywhere."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        text = content.get('text', '')
        if len(text) > min_length:
            return True
            
        parts = content.get('parts', [])
        for part in parts:
            if isinstance(part, str) and len(part) > min_length:
                return True
    
    return False


def has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for clear code patterns anywhere in conversation."""
    mapping = conversation.get('mapping', {})
    
    # Get gizmo context for smarter detection
    gizmo_ids = set()
    if conversation.get('gizmo_id'):
        gizmo_ids.add(conversation['gizmo_id'])
    
    # Known gizmos that should have stricter code detection
    wiki_writing_gizmos = {
        'g-IibMsD7w8',  # Your wiki gizmo
        'g-pYtHuQdGh',  # Potentially another writing gizmo
        # Add other known writing/article gizmos here
    }
    
    is_writing_gizmo = bool(gizmo_ids & wiki_writing_gizmos)
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Very strong code indicators (reliable across all gizmos)
        very_strong_indicators = [
            '```',  # Code blocks
            '#!/bin/',  # Script headers
            '#include',  # C includes
            'using namespace',  # C++ using
        ]
        
        if any(indicator in all_text for indicator in very_strong_indicators):
            return True
        
        # Medium strength indicators (require more context)
        medium_indicators = [
            'def ', 'function ', 'class ',  # Function/class definitions
            'import ', 'from ', 'require(',  # Import statements
        ]
        
        if is_writing_gizmo:
            # For writing gizmos, require multiple strong indicators or very specific patterns
            strong_indicator_count = sum(1 for indicator in medium_indicators if indicator in all_text)
            if strong_indicator_count >= 3:  # Multiple function definitions, etc.
                return True
            
            # Look for actual code structure patterns, not just keywords
            if ('def ' in all_text and '(' in all_text and ':' in all_text and 
                'return' in all_text):  # Function definition pattern
                return True
                
        else:
            # For non-writing gizmos, use original logic
            if any(indicator in all_text for indicator in medium_indicators):
                return True
            
            # Check for high density of coding keywords (but stricter for writing gizmos)
            coding_keywords = ['function', 'class', 'import', 'def ', 'const ', 'let ', 'var ', 'return', 'if ', 'for ', 'while ']
            keyword_count = sum(1 for keyword in coding_keywords if keyword in all_text.lower())
            
            # Higher threshold for writing gizmos
            min_keywords = 5 if is_writing_gizmo else 3
            if len(all_text) > 1000 and keyword_count >= min_keywords:
                return True
    
    return False


def has_github_repos(conversation: Dict[str, Any]) -> bool:
    """Check if GitHub repositories were selected for context."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        repos = metadata.get('selected_github_repos', [])
        if repos:  # Non-empty list
            return True
    
    return False


def has_canvas_operations(conversation: Dict[str, Any]) -> bool:
    """Check for canvas/document operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('canvas'):
            return True
    
    return False


def has_web_search(conversation: Dict[str, Any]) -> bool:
    """Check for web search operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('search_queries') or 
            metadata.get('search_result_groups') or
            metadata.get('content_references')):
            return True
    
    return False


def has_reasoning_thoughts(conversation: Dict[str, Any]) -> bool:
    """Check for reasoning/thinking patterns."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        if content.get('thoughts'):  # Reasoning thoughts
            return True
    
    return False


def has_code_execution(conversation: Dict[str, Any]) -> bool:
    """Check for code execution artifacts."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('aggregate_result') or 
            metadata.get('jupyter_messages')):
            return True
    
    return False


# First user message specific rules
def first_user_has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if the first user message has large content."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    if len(text) > min_length:
        return True
        
    parts = content.get('parts', [])
    for part in parts:
        if isinstance(part, str) and len(part) > min_length:
            return True
    
    return False


def first_user_has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message contains code patterns."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    parts = content.get('parts', [])
    all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
    
    # Strong code indicators
    code_indicators = [
        '```',  # Code blocks
        'def ', 'function ', 'class ',  # Definitions
        'import ', 'from ', 'require(',  # Imports
        '#!/bin/', '#include',  # Script headers
    ]
    
    return any(indicator in all_text for indicator in code_indicators)


def first_user_has_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    return len(attachments) > 0


def first_user_has_code_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has code-related attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    
    for attachment in attachments:
        mime_type = attachment.get('mime_type', '').lower()
        name = attachment.get('name', '').lower()
        
        # Check for code file extensions
        code_extensions = ['.py', '.js', '.java', '.cpp', '.c', '.go', '.rs', '.ts', '.jsx', '.tsx', '.sql', '.sh', '.rb', '.php']
        if any(ext in name for ext in code_extensions):
            return True
            
        # Check for code-related MIME types
        code_mimes = ['text/x-python', 'text/x-java', 'application/javascript', 'text/x-script']
        if any(mime in mime_type for mime in code_mimes):
            return True
    
    return False


def create_default_tagger() -> ConversationTagger:
    """Create a tagger with all previously discussed rules in the enhanced structured framework."""
    tagger = ConversationTagger()
    
    # ===== BASIC CONTENT ANALYSIS RULES (Boolean) =====
    tagger.add_base_rule(
        'large_content', 
        lambda conv: has_large_content(conv, 2000),
        'Content longer than 2000 characters anywhere in conversation'
    )
    
    tagger.add_base_rule(
        'code_patterns', 
        has_code_patterns,
        'Contains clear code patterns (```, def, function, etc.)'
    )
    
    tagger.add_base_rule(
        'github_context',
        has_github_repos,
        'GitHub repositories selected for context'
    )
    
    tagger.add_base_rule(
        'canvas_operations',
        has_canvas_operations,
        'Uses canvas/document features'
    )
    
    tagger.add_base_rule(
        'web_search',
        has_web_search,
        'Includes web search functionality'
    )
    
    tagger.add_base_rule(
        'reasoning',
        has_reasoning_thoughts,
        'Contains reasoning/thinking content'
    )
    
    tagger.add_base_rule(
        'code_execution',
        has_code_execution,
        'Contains code execution (Jupyter, aggregate results)'
    )
    
    # ===== FIRST USER MESSAGE RULES (Boolean) =====
    tagger.add_base_rule(
        'starts_large_content',
        lambda conv: first_user_has_large_content(conv, 2000),
        'First user message has large content (>2000 chars)'
    )
    
    tagger.add_base_rule(
        'starts_code_patterns',
        first_user_has_code_patterns,
        'First user message contains code patterns'
    )
    
    tagger.add_base_rule(
        'starts_with_attachments',
        first_user_has_attachments,
        'First user message has any attachments'
    )
    
    tagger.add_base_rule(
        'starts_code_attachments',
        first_user_has_code_attachments,
        'First user message has code-related attachments'
    )
    
    # ===== STRUCTURED TAG RULES =====
    tagger.add_base_rule(
        'conversation_length',
        create_conversation_length_tag,
        'Conversation length with count and category'
    )
    
    tagger.add_base_rule(
        'prompt_stats',
        create_prompt_stats_tag,
        'User message statistics (length, variance, etc.)'
    )
    
    tagger.add_multi_tag_rule(
        'gizmo_plugin_usage',
        create_gizmo_plugin_tags,
        'Specific gizmos and plugins used in conversation'
    )
    
    # ===== SUPPLEMENTAL RULES (Based on existing tags) =====
    
    # Coding assistance detection
    tagger.add_supplemental_rule(
        'coding_assistance',
        lambda conv, tags: (
            any(tag.name in ['code_patterns', 'github_context', 'code_execution'] for tag in tags)
        ),
        'Likely coding assistance (code patterns, GitHub, or execution)'
    )
    
    tagger.add_supplemental_rule(
        'coding_assistance_start',
        lambda conv, tags: (
            any(tag.name in ['starts_code_patterns', 'starts_large_content', 
                           'starts_with_attachments', 'starts_code_attachments'] for tag in tags)
        ),
        'Likely coding assistance based on how conversation starts'
    )
    
    # Research and analysis patterns  
    tagger.add_supplemental_rule(
        'research_session',
        lambda conv, tags: (
            any(tag.name == 'web_search' for tag in tags) and 
            any(tag.name == 'large_content' for tag in tags)
        ),
        'Research session (web search + substantial content)'
    )
    
    tagger.add_supplemental_rule(
        'complex_analysis',
        lambda conv, tags: (
            any(tag.name == 'reasoning' for tag in tags) and 
            len([tag for tag in tags if tag.name in ['web_search', 'large_content', 'canvas_operations']]) >= 2
        ),
        'Complex analysis (reasoning + multiple advanced features)'
    )
    
    # Context and interaction patterns
    tagger.add_supplemental_rule(
        'context_heavy_start',
        lambda conv, tags: (
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Conversation starts with substantial context'
    )
    
    tagger.add_supplemental_rule(
        'enhanced_conversation',
        lambda conv, tags: (
            any(tag.name in ['gizmo', 'plugin'] for tag in tags)
        ),
        'Uses enhanced features (gizmos or plugins)'
    )
    
    # Length-based classifications using structured tags
    tagger.add_supplemental_rule(
        'brief_interaction',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags)
        ),
        'Brief interaction (1-3 user messages)'
    )
    
    tagger.add_supplemental_rule(
        'extended_conversation',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['long', 'very_long'] for tag in tags)
        ),
        'Extended conversation (11+ user messages)'
    )
    
    # Prompt pattern classifications using structured tags
    tagger.add_supplemental_rule(
        'long_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['long', 'very_long'] for tag in tags)
        ),
        'Conversation has consistently long prompts'
    )
    
    tagger.add_supplemental_rule(
        'short_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['very_short', 'short'] for tag in tags)
        ),
        'Conversation has consistently short prompts'
    )
    
    tagger.add_supplemental_rule(
        'consistent_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'User prompts are consistent in length'
    )
    
    tagger.add_supplemental_rule(
        'variable_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'User prompts vary significantly in length'
    )
    
    # Combined patterns for specific use cases
    tagger.add_supplemental_rule(
        'context_dump',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags) and
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Short conversation starting with large context (likely context dump)'
    )
    
    tagger.add_supplemental_rule(
        'interactive_session',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'Extended back-and-forth conversation with consistent prompt style'
    )
    
    tagger.add_supplemental_rule(
        'evolving_discussion',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'Extended conversation where prompt style evolves'
    )
    
    return tagger

In [35]:
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(convs)
tagger.print_summary(tagged_results)
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')


Tagged 1673 conversations

=== TAG SUMMARY ===
prompt_stats: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    mean: avg=1142.4, range=[1.0, 184662.5]
    median: avg=814.2, range=[1.0, 184662.5]
    variance: avg=51563065.2, range=[0, 34065162056.2]
    values: consistency=consistent, consistency=mixed, consistency=variable, length_category=long, length_category=medium, length_category=short, length_category=very_long, length_category=very_short
conversation_length: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    values: category=long, category=medium, category=short, category=single, category=very_long
large_content: 1362 (81.4%)
short_prompts: 1261 (75.4%)
coding_assistance: 1005 (60.1%)
code_patterns: 1000 (59.8%)
brief_interaction: 906 (54.2%)
consistent_prompts: 896 (53.6%)
enhanced_conversation: 681 (40.7%)
gizmo: 676 (40.4%)
    values: gizmo_id=g-IibMsD7w8, gizmo_id=g-KpF6lTka3, gizmo_id=g-QsUj0Smzg, gizmo_id=g-WiEAUBGzb, gizmo_id=g-bWPVPw7oK, gizmo_id=g-pYtHuQdGh
vari

In [41]:
# enhanced_conversation_tagger.py
"""
Enhanced conversation tagging system with faceting capabilities.
Allows analysis of tag distributions across different facets (gizmos, conversation types, etc.).
"""

from typing import Dict, Any, List, Callable, Set, Union, Optional, Tuple
from collections import defaultdict
import json


class Tag:
    """Represents a tag with optional key-value attributes."""
    
    def __init__(self, name: str, **attributes):
        self.name = name
        self.attributes = attributes
    
    def __str__(self):
        if self.attributes:
            attrs_str = ", ".join(f"{k}={v}" for k, v in self.attributes.items())
            return f"{self.name}({attrs_str})"
        return self.name
    
    def __repr__(self):
        return f"Tag('{self.name}', {self.attributes})"
    
    def __eq__(self, other):
        if isinstance(other, str):
            return self.name == other
        elif isinstance(other, Tag):
            return self.name == other.name and self.attributes == other.attributes
        return False
    
    def __hash__(self):
        return hash((self.name, tuple(sorted(self.attributes.items()))))
    
    def matches(self, name: str, **criteria) -> bool:
        """Check if tag matches name and optional attribute criteria."""
        if self.name != name:
            return False
        
        for key, value in criteria.items():
            if key not in self.attributes:
                return False
            
            attr_value = self.attributes[key]
            
            # Support comparison operators
            if isinstance(value, dict):
                for op, target in value.items():
                    if op == 'gt' and not (attr_value > target):
                        return False
                    elif op == 'gte' and not (attr_value >= target):
                        return False
                    elif op == 'lt' and not (attr_value < target):
                        return False
                    elif op == 'lte' and not (attr_value <= target):
                        return False
                    elif op == 'eq' and not (attr_value == target):
                        return False
                    elif op == 'ne' and not (attr_value != target):
                        return False
                    elif op == 'in' and not (attr_value in target):
                        return False
            else:
                # Direct equality
                if attr_value != value:
                    return False
        
        return True


class ConversationTagger:
    """
    Enhanced tagging system supporting structured tags with attributes and faceting.
    """
    
    def __init__(self):
        self.base_rules: Dict[str, Callable] = {}
        self.multi_tag_rules: Dict[str, Callable] = {}
        self.supplemental_rules: Dict[str, Callable] = {}
        self.rule_descriptions: Dict[str, str] = {}
    
    def add_base_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a base tagging rule that returns bool or Tag object."""
        self.base_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def add_multi_tag_rule(self, rule_name: str, rule_function: Callable, description: str = ""):
        """Add a rule that returns multiple tags (strings or Tag objects)."""
        self.multi_tag_rules[rule_name] = rule_function
        self.rule_descriptions[rule_name] = description
    
    def add_supplemental_rule(self, tag_name: str, rule_function: Callable, description: str = ""):
        """Add a supplemental rule that depends on existing tags."""
        self.supplemental_rules[tag_name] = rule_function
        self.rule_descriptions[tag_name] = description
    
    def _normalize_tag(self, tag: Union[str, Tag]) -> Tag:
        """Convert string tags to Tag objects."""
        if isinstance(tag, str):
            return Tag(tag)
        return tag
    
    def tag_conversation(self, conversation: Dict[str, Any]) -> Dict[str, Any]:
        """Apply all tagging rules to a conversation."""
        tags = set()
        debug_info = defaultdict(list)
        
        # Apply base rules
        for tag_name, rule_func in self.base_rules.items():
            try:
                result = rule_func(conversation)
                if result:
                    if isinstance(result, bool):
                        tag = Tag(tag_name)
                    else:
                        tag = self._normalize_tag(result)
                    tags.add(tag)
                    debug_info['applied_rules'].append(f"BASE: {tag}")
                else:
                    debug_info['skipped_rules'].append(f"BASE: {tag_name}")
            except Exception as e:
                debug_info['errors'].append(f"BASE: {tag_name} - {str(e)}")
        
        # Apply multi-tag rules
        for rule_name, rule_func in self.multi_tag_rules.items():
            try:
                new_tags = rule_func(conversation)
                if new_tags:
                    normalized_tags = [self._normalize_tag(tag) for tag in new_tags]
                    tags.update(normalized_tags)
                    debug_info['applied_rules'].append(f"MULTI: {rule_name} -> {[str(t) for t in normalized_tags]}")
                else:
                    debug_info['skipped_rules'].append(f"MULTI: {rule_name}")
            except Exception as e:
                debug_info['errors'].append(f"MULTI: {rule_name} - {str(e)}")
        
        # Apply supplemental rules
        max_iterations = 5
        for iteration in range(max_iterations):
            initial_tag_count = len(tags)
            
            for tag_name, rule_func in self.supplemental_rules.items():
                # Check if tag already exists
                if any(tag.name == tag_name for tag in tags):
                    continue
                    
                try:
                    result = rule_func(conversation, tags)
                    if result:
                        if isinstance(result, bool):
                            tag = Tag(tag_name)
                        else:
                            tag = self._normalize_tag(result)
                        tags.add(tag)
                        debug_info['applied_rules'].append(f"SUPP: {tag} (iter {iteration})")
                    else:
                        debug_info['skipped_rules'].append(f"SUPP: {tag_name} (iter {iteration})")
                except Exception as e:
                    debug_info['errors'].append(f"SUPP: {tag_name} - {str(e)}")
            
            if len(tags) == initial_tag_count:
                break
        
        return {
            'conversation_id': conversation.get('conversation_id', conversation.get('id', 'unknown')),
            'title': conversation.get('title', 'Untitled'),
            'tags': list(tags),
            'debug_info': dict(debug_info),
            'conversation': conversation
        }
    
    def tag_conversations(self, conversations: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """Tag multiple conversations."""
        return [self.tag_conversation(conv) for conv in conversations]
    
    def filter_by_tags(self, tagged_conversations: List[Dict[str, Any]], 
                      include_tags: List[Union[str, Dict]] = None,
                      exclude_tags: List[Union[str, Dict]] = None) -> List[Dict[str, Any]]:
        """
        Filter conversations by tags with attribute support.
        
        Args:
            include_tags: List of tag names or dicts with criteria
                Examples: ['web_search', {'name': 'gizmo', 'type': 'dalle'}]
            exclude_tags: Similar format for exclusions
        """
        filtered = []
        
        for tagged_conv in tagged_conversations:
            tags = tagged_conv['tags']
            
            # Check exclusions first
            if exclude_tags:
                should_exclude = False
                for exclude_criterion in exclude_tags:
                    if self._matches_criterion(tags, exclude_criterion):
                        should_exclude = True
                        break
                if should_exclude:
                    continue
            
            # Check inclusions
            if include_tags:
                should_include = True
                for include_criterion in include_tags:
                    if not self._matches_criterion(tags, include_criterion):
                        should_include = False
                        break
                if not should_include:
                    continue
            
            filtered.append(tagged_conv)
        
        return filtered
    
    def _matches_criterion(self, tags: List[Tag], criterion: Union[str, Dict]) -> bool:
        """Check if any tag matches the given criterion."""
        if isinstance(criterion, str):
            return any(tag.name == criterion for tag in tags)
        
        elif isinstance(criterion, dict):
            name = criterion.get('name')
            if not name:
                return False
            
            criteria = {k: v for k, v in criterion.items() if k != 'name'}
            return any(tag.matches(name, **criteria) for tag in tags)
        
        return False
    
    def get_facet_value(self, tags: List[Tag], facet_tag_name: str, 
                       facet_attribute: Optional[str] = None) -> str:
        """Extract facet value from a conversation's tags."""
        matching_tags = [tag for tag in tags if tag.name == facet_tag_name]
        
        if not matching_tags:
            return "<none>"
        
        if facet_attribute is None:
            # Just check for presence of the tag
            return f"has_{facet_tag_name}"
        
        # Extract specific attribute values
        values = []
        for tag in matching_tags:
            if facet_attribute in tag.attributes:
                values.append(str(tag.attributes[facet_attribute]))
        
        if not values:
            return f"<{facet_tag_name}_no_{facet_attribute}>"
        
        # If multiple values, join them
        return "; ".join(sorted(set(values)))
    
    def facet_conversations(self, tagged_conversations: List[Dict[str, Any]], 
                           facet_tag_name: str, 
                           facet_attribute: Optional[str] = None,
                           max_facets: int = 50) -> Dict[str, List[Dict[str, Any]]]:
        """
        Group conversations by facet values.
        
        Args:
            facet_tag_name: Tag name to facet by (e.g., 'gizmo')
            facet_attribute: Optional attribute within tag (e.g., 'gizmo_id')
            max_facets: Maximum number of facet values to show
            
        Returns:
            Dictionary mapping facet values to lists of conversations
        """
        facets = defaultdict(list)
        
        for tagged_conv in tagged_conversations:
            facet_value = self.get_facet_value(tagged_conv['tags'], facet_tag_name, facet_attribute)
            facets[facet_value].append(tagged_conv)
        
        # Sort by facet size (largest first) and limit
        sorted_facets = dict(sorted(facets.items(), key=lambda x: len(x[1]), reverse=True))
        
        if len(sorted_facets) > max_facets:
            # Keep top facets and group rest into "others"
            items = list(sorted_facets.items())
            top_facets = dict(items[:max_facets-1])
            
            other_conversations = []
            for _, conversations in items[max_facets-1:]:
                other_conversations.extend(conversations)
            
            if other_conversations:
                top_facets["<other>"] = other_conversations
            
            return top_facets
        
        return sorted_facets
    
    def print_faceted_summary(self, tagged_conversations: List[Dict[str, Any]], 
                             facet_tag_name: str, 
                             facet_attribute: Optional[str] = None,
                             show_details: bool = False,
                             max_facets: int = 20):
        """
        Print tag summary broken down by facets.
        
        Args:
            facet_tag_name: Tag to facet by
            facet_attribute: Optional attribute within the tag  
            show_details: Show detailed tag attribute info
            max_facets: Maximum facets to show
            max_tags_per_facet: Maximum tags to show per facet
        """
        total = len(tagged_conversations)
        facets = self.facet_conversations(tagged_conversations, facet_tag_name, facet_attribute, max_facets)
        
        print(f"Tagged {total} conversations")
        print(f"Faceted by: {facet_tag_name}" + 
              (f".{facet_attribute}" if facet_attribute else ""))
        print(f"Found {len(facets)} facet values")
        
        print(f"\n{'='*80}")
        print(f"FACETED TAG SUMMARY")
        print(f"{'='*80}")
        
        for facet_value, facet_conversations in facets.items():
            facet_size = len(facet_conversations)
            facet_percentage = (facet_size / total) * 100
            
            print(f"\n📊 FACET: {facet_value}")
            print(f"    Conversations: {facet_size} ({facet_percentage:.1f}% of total)")
            print(f"    {'-' * 60}")
            
            # Calculate tag statistics for this facet
            tag_counts = defaultdict(int)
            tag_attributes = defaultdict(lambda: defaultdict(list))
            unique_structured_tags = defaultdict(set)
            
            for tagged_conv in facet_conversations:
                for tag in tagged_conv['tags']:
                    tag_counts[tag.name] += 1
                    
                    # Collect attribute information
                    for attr_name, attr_value in tag.attributes.items():
                        if isinstance(attr_value, (int, float)):
                            tag_attributes[tag.name][attr_name].append(attr_value)
                        else:
                            unique_structured_tags[tag.name].add(f"{attr_name}={attr_value}")
            
            # Sort tags for this facet (show all tags)
            sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
            
            for tag_name, count in sorted_tags:
                percentage = (count / facet_size) * 100
                print(f"    {tag_name}: {count} ({percentage:.1f}%)")
                
                if show_details:
                    # Show numeric attribute statistics
                    if tag_name in tag_attributes:
                        for attr_name, values in tag_attributes[tag_name].items():
                            if values:
                                avg_val = sum(values) / len(values)
                                min_val = min(values)
                                max_val = max(values)
                                print(f"        {attr_name}: avg={avg_val:.1f}, range=[{min_val}, {max_val}]")
                    
                    # Show unique structured values
                    if tag_name in unique_structured_tags:
                        unique_vals = sorted(unique_structured_tags[tag_name])
                        if len(unique_vals) <= 5:
                            print(f"        values: {', '.join(unique_vals)}")
                        else:
                            print(f"        values: {', '.join(unique_vals[:5])} ... (+{len(unique_vals)-5} more)")
    
    def compare_facets(self, tagged_conversations: List[Dict[str, Any]], 
                      facet_tag_name: str, 
                      facet_attribute: Optional[str] = None,
                      comparison_tags: List[str] = None,
                      min_facet_size: int = 10) -> None:
        """
        Compare specific tags across facets.
        
        Args:
            comparison_tags: List of tags to compare across facets
            min_facet_size: Minimum facet size to include in comparison
        """
        facets = self.facet_conversations(tagged_conversations, facet_tag_name, facet_attribute)
        
        # Filter facets by minimum size
        large_facets = {k: v for k, v in facets.items() if len(v) >= min_facet_size}
        
        if not large_facets:
            print(f"No facets with at least {min_facet_size} conversations found")
            return
        
        # If no specific tags provided, use most common tags overall
        if comparison_tags is None:
            overall_tag_counts = defaultdict(int)
            for tagged_conv in tagged_conversations:
                for tag in tagged_conv['tags']:
                    overall_tag_counts[tag.name] += 1
            
            # Get top 10 most common tags
            comparison_tags = [tag for tag, _ in 
                             sorted(overall_tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]]
        
        print(f"\n{'='*80}")
        print(f"FACET COMPARISON")
        print(f"Comparing tags: {', '.join(comparison_tags)}")
        print(f"Across facets: {facet_tag_name}" + 
              (f".{facet_attribute}" if facet_attribute else ""))
        print(f"{'='*80}")
        
        # Calculate percentages for each tag in each facet
        results = {}
        for facet_value, facet_conversations in large_facets.items():
            facet_size = len(facet_conversations)
            facet_tag_counts = defaultdict(int)
            
            for tagged_conv in facet_conversations:
                for tag in tagged_conv['tags']:
                    facet_tag_counts[tag.name] += 1
            
            results[facet_value] = {
                'size': facet_size,
                'percentages': {tag: (facet_tag_counts[tag] / facet_size) * 100 
                               for tag in comparison_tags}
            }
        
        # Print comparison table
        print(f"\n{'Facet':<30} {'Size':<8} " + 
              "".join(f"{tag:<15}" for tag in comparison_tags))
        print("-" * (30 + 8 + 15 * len(comparison_tags)))
        
        for facet_value, data in results.items():
            facet_display = facet_value[:28] + ".." if len(facet_value) > 30 else facet_value
            row = f"{facet_display:<30} {data['size']:<8} "
            row += "".join(f"{data['percentages'][tag]:<15.1f}" for tag in comparison_tags)
            print(row)
        
        # Highlight interesting differences
        print(f"\n🔍 NOTABLE DIFFERENCES:")
        for tag in comparison_tags:
            percentages = [results[facet]['percentages'][tag] for facet in results.keys()]
            if max(percentages) - min(percentages) > 20:  # 20% difference threshold
                max_facet = max(results.keys(), key=lambda f: results[f]['percentages'][tag])
                min_facet = min(results.keys(), key=lambda f: results[f]['percentages'][tag])
                print(f"    {tag}: {max_facet} ({results[max_facet]['percentages'][tag]:.1f}%) vs " +
                      f"{min_facet} ({results[min_facet]['percentages'][tag]:.1f}%)")
    
    def get_tag_values(self, tagged_conversations: List[Dict[str, Any]], 
                      tag_name: str, attribute: str) -> List[Any]:
        """Extract attribute values from tags across conversations."""
        values = []
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                if tag.name == tag_name and attribute in tag.attributes:
                    values.append(tag.attributes[attribute])
        return values
    
    def debug_code_detection(self, tagged_conversations: List[Dict[str, Any]], 
                            facet_tag_name: str = None, 
                            facet_value: str = None,
                            max_examples: int = 5):
        """
        Debug why conversations are being tagged as having code patterns.
        
        Args:
            facet_tag_name: Optional facet to filter by (e.g., 'gizmo')
            facet_value: Optional facet value to filter by (e.g., 'g-IibMsD7w8')
            max_examples: Number of example conversations to analyze
        """
        # Filter conversations if faceting specified
        conversations_to_analyze = tagged_conversations
        if facet_tag_name and facet_value:
            conversations_to_analyze = []
            for tagged_conv in tagged_conversations:
                facet_val = self.get_facet_value(tagged_conv['tags'], facet_tag_name, 
                                               'gizmo_id' if facet_tag_name == 'gizmo' else None)
                if facet_val == facet_value:
                    conversations_to_analyze.append(tagged_conv)
        
        # Show breakdown of different code indicators
        indicator_counts = {
            'code_blocks': 0,
            'function_definitions': 0,
            'import_statements': 0,
            'script_headers': 0,
            'high_keyword_density': 0,
            'code_structure_patterns': 0,
            'code_patterns': 0,  # Legacy combined
        }
        
        evidence_counts = {
            'strong_code_evidence': 0,
            'moderate_code_evidence': 0,
            'weak_code_evidence': 0,
            'likely_coding_assistance': 0,
            'conservative_coding_assistance': 0,
        }
        
        for tagged_conv in conversations_to_analyze:
            for tag in tagged_conv['tags']:
                if tag.name in indicator_counts:
                    indicator_counts[tag.name] += 1
                if tag.name in evidence_counts:
                    evidence_counts[tag.name] += 1
        
        total = len(conversations_to_analyze)
        
        print(f"\n{'='*80}")
        print(f"CODE DETECTION DEBUG")
        if facet_tag_name and facet_value:
            print(f"Filtering by: {facet_tag_name} = {facet_value}")
        print(f"Analyzing {total} conversations")
        print(f"{'='*80}")
        
        print(f"\n📊 INDIVIDUAL INDICATOR BREAKDOWN:")
        for indicator, count in indicator_counts.items():
            percentage = (count / total) * 100 if total > 0 else 0
            print(f"    {indicator}: {count} ({percentage:.1f}%)")
        
        print(f"\n🎯 EVIDENCE LEVEL BREAKDOWN:")
        for evidence, count in evidence_counts.items():
            percentage = (count / total) * 100 if total > 0 else 0
            print(f"    {evidence}: {count} ({percentage:.1f}%)")
        
        # Find conversations with any code indicators
        code_indicator_conversations = []
        for tagged_conv in conversations_to_analyze:
            has_any_indicator = any(tag.name in indicator_counts for tag in tagged_conv['tags'])
            if has_any_indicator:
                code_indicator_conversations.append(tagged_conv)
        
        print(f"\n🔍 DETAILED EXAMPLES:")
        print(f"Found {len(code_indicator_conversations)} conversations with code indicators")
        
        # Analyze sample conversations
        for i, tagged_conv in enumerate(code_indicator_conversations[:max_examples]):
            print(f"\n--- Example {i+1}: {tagged_conv['title'][:60]}... ---")
            
            # Show which indicators triggered
            triggered_indicators = [tag.name for tag in tagged_conv['tags'] if tag.name in indicator_counts]
            triggered_evidence = [tag.name for tag in tagged_conv['tags'] if tag.name in evidence_counts]
            
            print(f"    Triggered indicators: {', '.join(triggered_indicators) if triggered_indicators else 'None'}")
            print(f"    Evidence levels: {', '.join(triggered_evidence) if triggered_evidence else 'None'}")
            
            # Get the raw conversation data
            conversation = tagged_conv['conversation']
            
            # Analyze what triggered each detection
            mapping = conversation.get('mapping', {})
            
            detailed_findings = []
            total_text_length = 0
            
            for node_id, node in mapping.items():
                message = node.get('message')
                if not message:
                    continue
                
                content = message.get('content', {})
                text = content.get('text', '')
                parts = content.get('parts', [])
                all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
                total_text_length += len(all_text)
                
                # Check each indicator type
                if '```' in all_text:
                    detailed_findings.append(f"code_blocks: found {all_text.count('```')} occurrences")
                
                def_keywords = ['def ', 'function ', 'class ']
                def_count = sum(all_text.count(kw) for kw in def_keywords)
                if def_count > 0:
                    detailed_findings.append(f"function_definitions: {def_count} keywords")
                
                import_keywords = ['import ', 'from ', 'require(']
                import_count = sum(all_text.count(kw) for kw in import_keywords)
                if import_count > 0:
                    detailed_findings.append(f"import_statements: {import_count} keywords")
                
                script_indicators = ['#!/bin/', '#include', 'using namespace']
                script_count = sum(all_text.count(ind) for ind in script_indicators)
                if script_count > 0:
                    detailed_findings.append(f"script_headers: {script_count} indicators")
                
                # Check keyword density
                if len(all_text) > 1000:
                    coding_keywords = ['function', 'class', 'import', 'def ', 'const ', 'let ', 'var ', 'return', 'if ', 'for ', 'while ']
                    keyword_count = sum(1 for keyword in coding_keywords if keyword in all_text.lower())
                    if keyword_count >= 5:
                        detailed_findings.append(f"high_keyword_density: {keyword_count} keywords in {len(all_text)} chars")
                
                # Check structure patterns
                patterns = [
                    ('def ' in all_text and '(' in all_text and ':' in all_text and 'return' in all_text, 'Python function'),
                    ('class ' in all_text and '(' in all_text and ':' in all_text and 'def ' in all_text, 'Python class'),
                    (('function(' in all_text or 'function ' in all_text) and '{' in all_text and '}' in all_text, 'JavaScript function'),
                    (all_text.count('=') >= 3 and ('let ' in all_text or 'const ' in all_text or 'var ' in all_text), 'Variable assignments'),
                ]
                
                for pattern_check, description in patterns:
                    if pattern_check:
                        detailed_findings.append(f"code_structure_patterns: {description}")
            
            print(f"    Total text length: {total_text_length}")
            print(f"    Detailed findings:")
            if detailed_findings:
                for finding in detailed_findings:
                    print(f"        • {finding}")
            else:
                print(f"        • No specific indicators found (potential bug)")
            
            # Show a sample of the content
            print(f"    Content sample:")
            sample_texts = []
            for node_id, node in mapping.items():
                message = node.get('message')
                if message:
                    content = message.get('content', {})
                    text = content.get('text', '')
                    if text and len(text) > 50:
                        sample_texts.append(text[:200] + "..." if len(text) > 200 else text)
                        break
            
            if sample_texts:
                print(f"        \"{sample_texts[0]}\"")
            else:
                print(f"        No substantial text content found")
    
    def suggest_improved_code_detection(self):
        """Suggest improvements to code detection logic."""
        print(f"\n{'='*80}")
        print(f"SUGGESTED IMPROVED CODE DETECTION")
        print(f"{'='*80}")
        
        print("""
Current code detection is too broad. Here are suggestions for improvement:

1. STRICTER INDICATORS:
   - Require multiple strong indicators, not just one
   - Weight indicators differently (``` is stronger than 'def ')
   - Ignore common false positives in natural language

2. CONTEXT AWARENESS:
   - Check if code-like text is actually in code blocks (```)
   - Look for file extensions in attachments
   - Consider the role (user vs assistant) when evaluating code

3. BETTER KEYWORD ANALYSIS:
   - Increase threshold for keyword density detection
   - Use more specific programming keywords
   - Exclude common English words that happen to be programming keywords

4. GIZMO-SPECIFIC RULES:
   - Different detection logic for different gizmo types
   - Wiki gizmos should have much stricter code detection
   - Art/creative gizmos might mention "class" or "function" in non-coding contexts

5. PROPOSED NEW LOGIC:
   ```python
   def improved_has_code_patterns(conversation, gizmo_context=None):
       # Be much stricter for wiki/writing gizmos
       if gizmo_context and 'wiki' in gizmo_context.lower():
           return has_very_strong_code_indicators(conversation)
       else:
           return has_moderate_code_indicators(conversation)
   ```
        """)

    def print_summary(self, tagged_conversations: List[Dict[str, Any]], show_details: bool = True):
        """Print comprehensive summary with all tag types and optional details."""
        total = len(tagged_conversations)
        tag_counts = defaultdict(int)
        tag_attributes = defaultdict(lambda: defaultdict(list))
        unique_structured_tags = defaultdict(set)
        
        # Collect all tag information
        for tagged_conv in tagged_conversations:
            for tag in tagged_conv['tags']:
                tag_counts[tag.name] += 1
                
                # Collect attribute information
                for attr_name, attr_value in tag.attributes.items():
                    if isinstance(attr_value, (int, float)):
                        tag_attributes[tag.name][attr_name].append(attr_value)
                    else:
                        # For non-numeric attributes, track unique values
                        unique_structured_tags[tag.name].add(f"{attr_name}={attr_value}")
        
        print(f"Tagged {total} conversations")
        print(f"\n=== TAG SUMMARY ===")
        
        # Sort tags by frequency for better readability
        sorted_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
        
        for tag_name, count in sorted_tags:
            percentage = (count / total) * 100
            print(f"{tag_name}: {count} ({percentage:.1f}%)")
            
            if show_details:
                # Show numeric attribute statistics
                if tag_name in tag_attributes:
                    for attr_name, values in tag_attributes[tag_name].items():
                        if values:
                            avg_val = sum(values) / len(values)
                            min_val = min(values)
                            max_val = max(values)
                            print(f"    {attr_name}: avg={avg_val:.1f}, range=[{min_val}, {max_val}]")
                
                # Show unique structured values for non-numeric attributes
                if tag_name in unique_structured_tags:
                    unique_vals = sorted(unique_structured_tags[tag_name])
                    if len(unique_vals) <= 10:  # Show all if not too many
                        print(f"    values: {', '.join(unique_vals)}")
                    else:  # Show top 10 most common
                        print(f"    values: {', '.join(unique_vals[:10])} ... (+{len(unique_vals)-10} more)")


# Example usage functions to demonstrate the faceting capabilities

def demo_faceting_usage():
    """
    Demonstrate how to use the new faceting capabilities.
    """
    print("""
# FACETING USAGE EXAMPLES

# 1. Facet by gizmo presence (with/without gizmos)
tagger.print_faceted_summary(tagged_results, 'gizmo')

# 2. Facet by specific gizmo IDs  
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')

# 3. Facet by conversation length categories
tagger.print_faceted_summary(tagged_results, 'conversation_length', 'category')

# 4. Facet by prompt length categories
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'length_category')

# 5. Facet by prompt consistency patterns
tagger.print_faceted_summary(tagged_results, 'prompt_stats', 'consistency')

# 6. Compare coding patterns across gizmos
tagger.compare_facets(tagged_results, 'gizmo', 'gizmo_id', 
                     comparison_tags=['coding_assistance', 'code_patterns', 'web_search'])

# 7. Compare conversation patterns across length categories
tagger.compare_facets(tagged_results, 'conversation_length', 'category',
                     comparison_tags=['coding_assistance', 'web_search', 'large_content'])

# 8. Show detailed faceted summary (now shows all tags)
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id', 
                            show_details=True, max_facets=10)

# 9. Focus on specific gizmo usage patterns
gizmo_conversations = tagger.filter_by_tags(tagged_results, include_tags=['gizmo'])
tagger.print_faceted_summary(gizmo_conversations, 'gizmo', 'gizmo_id')

# 10. Analyze how conversation patterns differ by whether they start with code
tagger.print_faceted_summary(tagged_results, 'starts_code_patterns')

# 11. Debug code detection issues (especially for wiki gizmos)
tagger.debug_code_detection(tagged_results, 'gizmo', 'g-IibMsD7w8', max_examples=3)
tagger.suggest_improved_code_detection()

# 12. Multi-level analysis: first facet by conversation length, then by gizmo within each
# 12. Multi-level analysis: first facet by conversation length, then by gizmo within each
for length_cat in ['single', 'short', 'medium', 'long', 'very_long']:
    length_conversations = tagger.filter_by_tags(
        tagged_results, 
        include_tags=[{'name': 'conversation_length', 'category': length_cat}]
    )
    if length_conversations:
        print(f"\\n=== GIZMO USAGE WITHIN {length_cat.upper()} CONVERSATIONS ===")
        tagger.print_faceted_summary(length_conversations, 'gizmo', 'gizmo_id', max_facets=5)
    """)

# Enhanced rule functions for all previously discussed tagging rules

def get_all_user_messages(conversation: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Get all user messages in chronological order."""
    mapping = conversation.get('mapping', {})
    user_messages = []
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        author = message.get('author', {})
        if author.get('role') == 'user':
            create_time = message.get('create_time') or 0
            user_messages.append((create_time, message))
    
    user_messages.sort(key=lambda x: x[0])
    return [msg for _, msg in user_messages]


def get_first_user_message(conversation: Dict[str, Any]) -> Dict[str, Any]:
    """Find the first user message in the conversation."""
    user_messages = get_all_user_messages(conversation)
    return user_messages[0] if user_messages else None


def create_conversation_length_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for conversation length."""
    user_count = len(get_all_user_messages(conversation))
    
    # Determine category
    if user_count == 1:
        category = 'single'
    elif user_count <= 3:
        category = 'short'
    elif user_count <= 10:
        category = 'medium'
    elif user_count <= 25:
        category = 'long'
    else:
        category = 'very_long'
    
    return Tag('conversation_length', count=user_count, category=category)


def create_prompt_stats_tag(conversation: Dict[str, Any]) -> Tag:
    """Create structured tag for prompt statistics."""
    user_messages = get_all_user_messages(conversation)
    
    if not user_messages:
        return Tag('prompt_stats', count=0, mean=0, median=0, variance=0, 
                  length_category='none', consistency='none')
    
    # Calculate message lengths
    lengths = []
    for message in user_messages:
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        lengths.append(len(all_text))
    
    # Calculate statistics
    mean_length = sum(lengths) / len(lengths)
    sorted_lengths = sorted(lengths)
    n = len(sorted_lengths)
    median_length = (sorted_lengths[n//2] if n % 2 == 1 
                    else (sorted_lengths[n//2-1] + sorted_lengths[n//2]) / 2)
    variance = sum((x - mean_length) ** 2 for x in lengths) / len(lengths) if len(lengths) > 1 else 0
    
    # Determine categories
    if mean_length < 50:
        length_category = 'very_short'
    elif mean_length < 200:
        length_category = 'short'
    elif mean_length < 1000:
        length_category = 'medium'
    elif mean_length < 3000:
        length_category = 'long'
    else:
        length_category = 'very_long'
    
    if variance < 1000:
        consistency = 'consistent'
    elif variance < 10000:
        consistency = 'mixed'
    else:
        consistency = 'variable'
    
    return Tag('prompt_stats', 
               count=len(lengths),
               mean=round(mean_length, 1),
               median=round(median_length, 1),
               variance=round(variance, 1),
               length_category=length_category,
               consistency=consistency)


def create_gizmo_plugin_tags(conversation: Dict[str, Any]) -> List[Tag]:
    """Create structured tags for gizmos and plugins."""
    tags = []
    gizmos = set()
    plugins = set()
    
    # Check conversation-level
    if conversation.get('gizmo_id'):
        gizmos.add(conversation['gizmo_id'])
    
    plugin_ids = conversation.get('plugin_ids', [])
    if plugin_ids:
        plugins.update(plugin_ids)
    
    # Check message-level
    mapping = conversation.get('mapping', {})
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        
        # Invoked plugins
        invoked_plugin = metadata.get('invoked_plugin', {})
        if invoked_plugin:
            if invoked_plugin.get('plugin_id'):
                plugins.add(invoked_plugin['plugin_id'])
            if invoked_plugin.get('namespace'):
                plugins.add(invoked_plugin['namespace'])
        
        # Gizmo usage
        if metadata.get('gizmo_id'):
            gizmos.add(metadata['gizmo_id'])
    
    # Create tags
    for gizmo in gizmos:
        tags.append(Tag('gizmo', gizmo_id=gizmo))
    
    for plugin in plugins:
        tags.append(Tag('plugin', plugin_id=plugin))
    
    return tags


# Individual code indicator detection functions
def has_code_blocks(conversation: Dict[str, Any]) -> bool:
    """Check for explicit code blocks (``` markdown syntax)."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        if '```' in all_text:
            return True
    
    return False


def has_function_definitions(conversation: Dict[str, Any]) -> bool:
    """Check for function/class definition keywords."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        definition_keywords = ['def ', 'function ', 'class ']
        if any(keyword in all_text for keyword in definition_keywords):
            return True
    
    return False


def has_import_statements(conversation: Dict[str, Any]) -> bool:
    """Check for import/require statements."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        import_keywords = ['import ', 'from ', 'require(']
        if any(keyword in all_text for keyword in import_keywords):
            return True
    
    return False


def has_script_headers(conversation: Dict[str, Any]) -> bool:
    """Check for script headers and system includes."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        script_indicators = ['#!/bin/', '#include', 'using namespace']
        if any(indicator in all_text for indicator in script_indicators):
            return True
    
    return False


def has_high_keyword_density(conversation: Dict[str, Any]) -> bool:
    """Check for high density of programming keywords in large text."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Only check substantial text
        if len(all_text) <= 1000:
            continue
        
        coding_keywords = ['function', 'class', 'import', 'def ', 'const ', 'let ', 'var ', 'return', 'if ', 'for ', 'while ']
        keyword_count = sum(1 for keyword in coding_keywords if keyword in all_text.lower())
        
        # High threshold to avoid false positives in articles
        if keyword_count >= 5:
            return True
    
    return False


def has_code_structure_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for actual code structure patterns (syntax combinations that suggest real code)."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Look for combinations that strongly suggest actual code
        patterns = [
            # Function definition pattern
            ('def ' in all_text and '(' in all_text and ':' in all_text and 'return' in all_text),
            # Class definition pattern  
            ('class ' in all_text and '(' in all_text and ':' in all_text and 'def ' in all_text),
            # JavaScript function pattern
            ('function(' in all_text or 'function ' in all_text) and '{' in all_text and '}' in all_text,
            # Multiple assignment pattern
            all_text.count('=') >= 3 and ('let ' in all_text or 'const ' in all_text or 'var ' in all_text),
        ]
        
        if any(pattern for pattern in patterns):
            return True
    
    return False


# Combined code pattern detection (uses individual indicators)
def has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for any code patterns (combines individual indicators)."""
    return (has_code_blocks(conversation) or 
            has_function_definitions(conversation) or 
            has_import_statements(conversation) or 
            has_script_headers(conversation) or
            has_code_structure_patterns(conversation) or
            has_high_keyword_density(conversation))


# Boolean rule functions for basic content analysis
def has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if conversation has unusually large content anywhere."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        text = content.get('text', '')
        if len(text) > min_length:
            return True
            
        parts = content.get('parts', [])
        for part in parts:
            if isinstance(part, str) and len(part) > min_length:
                return True
    
    return False


def has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check for clear code patterns anywhere in conversation."""
    mapping = conversation.get('mapping', {})
    
    # Get gizmo context for smarter detection
    gizmo_ids = set()
    if conversation.get('gizmo_id'):
        gizmo_ids.add(conversation['gizmo_id'])
    
    # Known gizmos that should have stricter code detection
    wiki_writing_gizmos = {
        'g-IibMsD7w8',  # Your wiki gizmo
        'g-pYtHuQdGh',  # Potentially another writing gizmo
        # Add other known writing/article gizmos here
    }
    
    is_writing_gizmo = bool(gizmo_ids & wiki_writing_gizmos)
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
        
        content = message.get('content', {})
        text = content.get('text', '')
        parts = content.get('parts', [])
        all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
        
        # Very strong code indicators (reliable across all gizmos)
        very_strong_indicators = [
            '```',  # Code blocks
            '#!/bin/',  # Script headers
            '#include',  # C includes
            'using namespace',  # C++ using
        ]
        
        if any(indicator in all_text for indicator in very_strong_indicators):
            return True
        
        # Medium strength indicators (require more context)
        medium_indicators = [
            'def ', 'function ', 'class ',  # Function/class definitions
            'import ', 'from ', 'require(',  # Import statements
        ]
        
        if is_writing_gizmo:
            # For writing gizmos, require multiple strong indicators or very specific patterns
            strong_indicator_count = sum(1 for indicator in medium_indicators if indicator in all_text)
            if strong_indicator_count >= 3:  # Multiple function definitions, etc.
                return True
            
            # Look for actual code structure patterns, not just keywords
            if ('def ' in all_text and '(' in all_text and ':' in all_text and 
                'return' in all_text):  # Function definition pattern
                return True
                
        else:
            # For non-writing gizmos, use original logic
            if any(indicator in all_text for indicator in medium_indicators):
                return True
            
            # Check for high density of coding keywords (but stricter for writing gizmos)
            coding_keywords = ['function', 'class', 'import', 'def ', 'const ', 'let ', 'var ', 'return', 'if ', 'for ', 'while ']
            keyword_count = sum(1 for keyword in coding_keywords if keyword in all_text.lower())
            
            # Higher threshold for writing gizmos
            min_keywords = 5 if is_writing_gizmo else 3
            if len(all_text) > 1000 and keyword_count >= min_keywords:
                return True
    
    return False


def has_github_repos(conversation: Dict[str, Any]) -> bool:
    """Check if GitHub repositories were selected for context."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        repos = metadata.get('selected_github_repos', [])
        if repos:  # Non-empty list
            return True
    
    return False


def has_canvas_operations(conversation: Dict[str, Any]) -> bool:
    """Check for canvas/document operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if metadata.get('canvas'):
            return True
    
    return False


def has_web_search(conversation: Dict[str, Any]) -> bool:
    """Check for web search operations."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('search_queries') or 
            metadata.get('search_result_groups') or
            metadata.get('content_references')):
            return True
    
    return False


def has_reasoning_thoughts(conversation: Dict[str, Any]) -> bool:
    """Check for reasoning/thinking patterns."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        content = message.get('content', {})
        if content.get('thoughts'):  # Reasoning thoughts
            return True
    
    return False


def has_code_execution(conversation: Dict[str, Any]) -> bool:
    """Check for code execution artifacts."""
    mapping = conversation.get('mapping', {})
    
    for node_id, node in mapping.items():
        message = node.get('message')
        if not message:
            continue
            
        metadata = message.get('metadata', {})
        if (metadata.get('aggregate_result') or 
            metadata.get('jupyter_messages')):
            return True
    
    return False


# First user message specific rules
def first_user_has_large_content(conversation: Dict[str, Any], min_length: int = 2000) -> bool:
    """Check if the first user message has large content."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    if len(text) > min_length:
        return True
        
    parts = content.get('parts', [])
    for part in parts:
        if isinstance(part, str) and len(part) > min_length:
            return True
    
    return False


def first_user_has_code_patterns(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message contains code patterns."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    content = first_message.get('content', {})
    text = content.get('text', '')
    parts = content.get('parts', [])
    all_text = text + ' ' + ' '.join(str(p) for p in parts if isinstance(p, str))
    
    # Strong code indicators
    code_indicators = [
        '```',  # Code blocks
        'def ', 'function ', 'class ',  # Definitions
        'import ', 'from ', 'require(',  # Imports
        '#!/bin/', '#include',  # Script headers
    ]
    
    return any(indicator in all_text for indicator in code_indicators)


def first_user_has_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    return len(attachments) > 0


def first_user_has_code_attachments(conversation: Dict[str, Any]) -> bool:
    """Check if the first user message has code-related attachments."""
    first_message = get_first_user_message(conversation)
    if not first_message:
        return False
    
    metadata = first_message.get('metadata', {})
    attachments = metadata.get('attachments', [])
    
    for attachment in attachments:
        mime_type = attachment.get('mime_type', '').lower()
        name = attachment.get('name', '').lower()
        
        # Check for code file extensions
        code_extensions = ['.py', '.js', '.java', '.cpp', '.c', '.go', '.rs', '.ts', '.jsx', '.tsx', '.sql', '.sh', '.rb', '.php']
        if any(ext in name for ext in code_extensions):
            return True
            
        # Check for code-related MIME types
        code_mimes = ['text/x-python', 'text/x-java', 'application/javascript', 'text/x-script']
        if any(mime in mime_type for mime in code_mimes):
            return True
    
    return False


def create_default_tagger() -> ConversationTagger:
    """Create a tagger with all previously discussed rules in the enhanced structured framework."""
    tagger = ConversationTagger()
    
    # ===== BASIC CONTENT ANALYSIS RULES (Boolean) =====
    tagger.add_base_rule(
        'large_content', 
        lambda conv: has_large_content(conv, 2000),
        'Content longer than 2000 characters anywhere in conversation'
    )
    
    # ===== INDIVIDUAL CODE INDICATOR RULES =====
    tagger.add_base_rule(
        'code_blocks',
        has_code_blocks,
        'Contains explicit code blocks (``` syntax)'
    )
    
    tagger.add_base_rule(
        'function_definitions',
        has_function_definitions,
        'Contains function/class definition keywords (def, function, class)'
    )
    
    tagger.add_base_rule(
        'import_statements',
        has_import_statements,
        'Contains import/require statements'
    )
    
    tagger.add_base_rule(
        'script_headers',
        has_script_headers,
        'Contains script headers or system includes (#!/bin/, #include, using namespace)'
    )
    
    tagger.add_base_rule(
        'high_keyword_density',
        has_high_keyword_density,
        'High density of programming keywords in substantial text (5+ keywords in 1000+ chars)'
    )
    
    tagger.add_base_rule(
        'code_structure_patterns',
        has_code_structure_patterns,
        'Contains actual code structure patterns (function definitions with syntax)'
    )
    
    # ===== LEGACY COMBINED CODE RULE =====
    tagger.add_base_rule(
        'code_patterns', 
        has_code_patterns,
        'Contains any code patterns (combines individual indicators)'
    )
    
    # ===== OTHER BASIC RULES =====
    tagger.add_base_rule(
        'github_context',
        has_github_repos,
        'GitHub repositories selected for context'
    )
    
    tagger.add_base_rule(
        'canvas_operations',
        has_canvas_operations,
        'Uses canvas/document features'
    )
    
    tagger.add_base_rule(
        'web_search',
        has_web_search,
        'Includes web search functionality'
    )
    
    tagger.add_base_rule(
        'reasoning',
        has_reasoning_thoughts,
        'Contains reasoning/thinking content'
    )
    
    tagger.add_base_rule(
        'code_execution',
        has_code_execution,
        'Contains code execution (Jupyter, aggregate results)'
    )
    
    # ===== FIRST USER MESSAGE RULES (Boolean) =====
    tagger.add_base_rule(
        'starts_large_content',
        lambda conv: first_user_has_large_content(conv, 2000),
        'First user message has large content (>2000 chars)'
    )
    
    tagger.add_base_rule(
        'starts_code_patterns',
        first_user_has_code_patterns,
        'First user message contains code patterns'
    )
    
    tagger.add_base_rule(
        'starts_with_attachments',
        first_user_has_attachments,
        'First user message has any attachments'
    )
    
    tagger.add_base_rule(
        'starts_code_attachments',
        first_user_has_code_attachments,
        'First user message has code-related attachments'
    )
    
    # ===== STRUCTURED TAG RULES =====
    tagger.add_base_rule(
        'conversation_length',
        create_conversation_length_tag,
        'Conversation length with count and category'
    )
    
    tagger.add_base_rule(
        'prompt_stats',
        create_prompt_stats_tag,
        'User message statistics (length, variance, etc.)'
    )
    
    tagger.add_multi_tag_rule(
        'gizmo_plugin_usage',
        create_gizmo_plugin_tags,
        'Specific gizmos and plugins used in conversation'
    )
    
    # ===== SUPPLEMENTAL RULES (Based on existing tags) =====
    
    # Strong code evidence (very reliable indicators)
    tagger.add_supplemental_rule(
        'strong_code_evidence',
        lambda conv, tags: (
            any(tag.name in ['code_blocks', 'script_headers', 'code_structure_patterns'] for tag in tags)
        ),
        'Strong evidence of actual code (blocks, headers, or structure patterns)'
    )
    
    # Moderate code evidence (could be false positives in articles)
    tagger.add_supplemental_rule(
        'moderate_code_evidence',
        lambda conv, tags: (
            any(tag.name in ['function_definitions', 'import_statements'] for tag in tags)
        ),
        'Moderate evidence of code (keywords that could appear in articles)'
    )
    
    # Weak code evidence (likely false positive in articles)
    tagger.add_supplemental_rule(
        'weak_code_evidence',
        lambda conv, tags: (
            any(tag.name == 'high_keyword_density' for tag in tags) and
            not any(tag.name in ['strong_code_evidence', 'moderate_code_evidence'] for tag in tags)
        ),
        'Weak evidence of code (keyword density only, no other indicators)'
    )
    
    # Intelligent coding assistance detection
    tagger.add_supplemental_rule(
        'likely_coding_assistance',
        lambda conv, tags: (
            any(tag.name == 'strong_code_evidence' for tag in tags) or
            (any(tag.name == 'moderate_code_evidence' for tag in tags) and
             any(tag.name in ['github_context', 'code_execution'] for tag in tags)) or
            (len([tag for tag in tags if tag.name in ['function_definitions', 'import_statements', 'code_structure_patterns']]) >= 2)
        ),
        'Likely actual coding assistance (strong evidence OR moderate + context OR multiple indicators)'
    )
    
    # Conservative coding assistance (stricter threshold)
    tagger.add_supplemental_rule(
        'conservative_coding_assistance',
        lambda conv, tags: (
            any(tag.name == 'strong_code_evidence' for tag in tags) or
            any(tag.name in ['github_context', 'code_execution'] for tag in tags)
        ),
        'Conservative coding assistance detection (only strongest indicators)'
    )
    
    # Legacy coding assistance (original broad logic)
    tagger.add_supplemental_rule(
        'coding_assistance',
        lambda conv, tags: (
            any(tag.name in ['code_patterns', 'github_context', 'code_execution'] for tag in tags)
        ),
        'Legacy coding assistance (broad detection for comparison)'
    )
    
    tagger.add_supplemental_rule(
        'coding_assistance_start',
        lambda conv, tags: (
            any(tag.name in ['starts_code_patterns', 'starts_large_content', 
                           'starts_with_attachments', 'starts_code_attachments'] for tag in tags)
        ),
        'Likely coding assistance based on how conversation starts'
    )
    
    # Research and analysis patterns  
    tagger.add_supplemental_rule(
        'research_session',
        lambda conv, tags: (
            any(tag.name == 'web_search' for tag in tags) and 
            any(tag.name == 'large_content' for tag in tags)
        ),
        'Research session (web search + substantial content)'
    )
    
    tagger.add_supplemental_rule(
        'complex_analysis',
        lambda conv, tags: (
            any(tag.name == 'reasoning' for tag in tags) and 
            len([tag for tag in tags if tag.name in ['web_search', 'large_content', 'canvas_operations']]) >= 2
        ),
        'Complex analysis (reasoning + multiple advanced features)'
    )
    
    # Context and interaction patterns
    tagger.add_supplemental_rule(
        'context_heavy_start',
        lambda conv, tags: (
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Conversation starts with substantial context'
    )
    
    tagger.add_supplemental_rule(
        'enhanced_conversation',
        lambda conv, tags: (
            any(tag.name in ['gizmo', 'plugin'] for tag in tags)
        ),
        'Uses enhanced features (gizmos or plugins)'
    )
    
    # Length-based classifications using structured tags
    tagger.add_supplemental_rule(
        'brief_interaction',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags)
        ),
        'Brief interaction (1-3 user messages)'
    )
    
    tagger.add_supplemental_rule(
        'extended_conversation',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['long', 'very_long'] for tag in tags)
        ),
        'Extended conversation (11+ user messages)'
    )
    
    # Prompt pattern classifications using structured tags
    tagger.add_supplemental_rule(
        'long_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['long', 'very_long'] for tag in tags)
        ),
        'Conversation has consistently long prompts'
    )
    
    tagger.add_supplemental_rule(
        'short_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('length_category') in ['very_short', 'short'] for tag in tags)
        ),
        'Conversation has consistently short prompts'
    )
    
    tagger.add_supplemental_rule(
        'consistent_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'User prompts are consistent in length'
    )
    
    tagger.add_supplemental_rule(
        'variable_prompts',
        lambda conv, tags: (
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'User prompts vary significantly in length'
    )
    
    # Combined patterns for specific use cases
    tagger.add_supplemental_rule(
        'context_dump',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('category') in ['single', 'short'] for tag in tags) and
            any(tag.name in ['starts_large_content', 'starts_with_attachments'] for tag in tags)
        ),
        'Short conversation starting with large context (likely context dump)'
    )
    
    tagger.add_supplemental_rule(
        'interactive_session',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'consistent' for tag in tags)
        ),
        'Extended back-and-forth conversation with consistent prompt style'
    )
    
    tagger.add_supplemental_rule(
        'evolving_discussion',
        lambda conv, tags: (
            any(tag.name == 'conversation_length' and 
                tag.attributes.get('count', 0) >= 5 for tag in tags) and
            any(tag.name == 'prompt_stats' and 
                tag.attributes.get('consistency') == 'variable' for tag in tags)
        ),
        'Extended conversation where prompt style evolves'
    )
    
    return tagger



In [42]:
tagger = create_default_tagger()
tagged_results = tagger.tag_conversations(convs)
tagger.print_summary(tagged_results)
tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')


Tagged 1673 conversations

=== TAG SUMMARY ===
prompt_stats: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    mean: avg=1142.4, range=[1.0, 184662.5]
    median: avg=814.2, range=[1.0, 184662.5]
    variance: avg=51563065.2, range=[0, 34065162056.2]
    values: consistency=consistent, consistency=mixed, consistency=variable, length_category=long, length_category=medium, length_category=short, length_category=very_long, length_category=very_short
conversation_length: 1673 (100.0%)
    count: avg=7.6, range=[1, 160]
    values: category=long, category=medium, category=short, category=single, category=very_long
moderate_code_evidence: 1485 (88.8%)
import_statements: 1457 (87.1%)
large_content: 1362 (81.4%)
short_prompts: 1261 (75.4%)
coding_assistance: 1005 (60.1%)
code_patterns: 1000 (59.8%)
likely_coding_assistance: 914 (54.6%)
brief_interaction: 906 (54.2%)
consistent_prompts: 896 (53.6%)
function_definitions: 809 (48.4%)
conservative_coding_assistance: 754 (45.1%)
strong_code_evid

In [1]:
import sys, os
from pathlib import Path
import json
PATH=str((Path().cwd().parent /'src').absolute())
print(PATH)
if PATH not in sys.path:
    sys.path.append(PATH)

root = "../data/ingestion/chatgpt/a40ff5f79c1b3edd3c366f0f628fb79170bae83ecf3a1758b5b258c71f843f53-2025-06-05-03-28-15-df2ed357a4e64443bf464446686c9692/"
fpath = Path(root) / "conversations.json"
convs = json.load(fpath.open())



/Users/dmarx/proj/chat2obs/src


In [7]:
from conversation_tagger import create_default_tagger
from conversation_tagger.core.exchange_tagger import ExchangeTagger, DEFAULT_EXCHANGE_RULES
tagger = create_default_tagger()
#[tagger.add_exchange_rule(k, v) for k, v in DEFAULT_EXCHANGE_RULES.items()]

In [8]:
DEFAULT_EXCHANGE_RULES

{'has_wiki_links': <function conversation_tagger.core.exchange_tagger.has_wiki_links(exchange: conversation_tagger.core.exchange.Exchange) -> bool>,
 'has_latex_math': <function conversation_tagger.core.exchange_tagger.has_latex_math(exchange: conversation_tagger.core.exchange.Exchange) -> bool>}

In [9]:
tagged_results = [tagger.tag_conversation(c) for c in convs]

#tagger.print_summary(tagged_results)
#tagger.print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')

In [10]:
tagged_results[0].tags

[Tag('has_multiple_turns', {})]

In [11]:
def simple_summary(tagged_results):
    from collections import Counter
    all_tags = []
    for result in tagged_results:
        all_tags.extend([tag.name for tag in result.tags])
    
    tag_counts = Counter(all_tags)
    print(f"Tagged {len(tagged_results)} conversations")
    for tag, count in tag_counts.most_common():
        pct = count / len(tagged_results) * 100
        print(f"  {tag}: {count} ({pct:.1f}%)")

simple_summary(tagged_results)

Tagged 1673 conversations
  has_latex_math: 643 (38.4%)
  has_wiki_links: 547 (32.7%)
  has_multiple_turns: 432 (25.8%)


In [None]:
from conversation_tagger.analysis.faceting import print_faceted_summary #, facet_conversations

#print_faceted_summary(tagged_results, 'gizmo', 'gizmo_id')
print_faceted_summary(tagged_results, 'gizmo')

Tagged 1673 conversations
Faceted by: gizmo
Found 2 facet values

FACETED TAG SUMMARY

📊 FACET: <none>
    Conversations: 997 (59.6% of total)
    ------------------------------------------------------------
    exchange_count: 997 (100.0%)
    has_assistant_has_code_blocks: 261 (26.2%)
    has_user_has_code_blocks: 29 (2.9%)
    plugin: 5 (0.5%)

📊 FACET: has_gizmo
    Conversations: 676 (40.4% of total)
    ------------------------------------------------------------
    exchange_count: 676 (100.0%)
    gizmo: 676 (100.0%)
    has_assistant_has_code_blocks: 53 (7.8%)
    has_user_has_code_blocks: 1 (0.1%)
