# 🏷️ Entity Labeling for Security Log Analysis

This notebook provides tools for manually labeling entities in security logs to create training data for the Named Entity Recognition (NER) model.

## Entity Types
- **USER**: User identities, usernames, ARNs
- **ROLE**: IAM roles, service roles
- **RESOURCE**: AWS resources, S3 buckets, EC2 instances
- **IP**: IP addresses, source IPs

## BIO Tagging Format
- **B-**: Beginning of entity
- **I-**: Inside/continuation of entity
- **O**: Outside any entity

Examples:
- `john.doe@company.com` → B-USER
- `arn:aws:iam::123:role/S3Role` → B-ROLE
- `192.168.1.100` → B-IP

In [None]:
import json
import pandas as pd
import numpy as np
from pathlib import Path
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
import re
from datetime import datetime
import pickle
from typing import List, Dict, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

# Import project modules
import sys
sys.path.append('../')
from src.detector.schemas import CloudTrailEvent

print("📚 Libraries loaded successfully!")

## 📊 Data Loading and Preparation

In [None]:
class EntityLabeler:
    """Interactive entity labeling tool for security logs."""
    
    def __init__(self):
        self.entity_types = ['USER', 'ROLE', 'RESOURCE', 'IP']
        self.bio_tags = ['O'] + [f'{prefix}-{etype}' for etype in self.entity_types for prefix in ['B', 'I']]
        self.labeled_data = []
        self.current_index = 0
        self.events = []
        
        # Pre-defined patterns for auto-labeling suggestions
        self.patterns = {
            'USER': [
                r'\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b',  # Email
                r'arn:aws:iam::[0-9]+:user/[^\s]+',  # IAM User ARN
                r'arn:aws:sts::[0-9]+:assumed-role/[^/]+/[^\s]+',  # Assumed role session
            ],
            'ROLE': [
                r'arn:aws:iam::[0-9]+:role/[^\s]+',  # IAM Role ARN
                r'\b[A-Za-z][A-Za-z0-9]*Role\b',  # Role names ending in 'Role'
            ],
            'RESOURCE': [
                r'arn:aws:[^:]+:[^:]*:[0-9]*:[^\s]+',  # Generic AWS ARN
                r'\bi-[0-9a-f]{8,17}\b',  # EC2 Instance ID
                r'\bvol-[0-9a-f]{8,17}\b',  # EBS Volume ID
                r'\bsg-[0-9a-f]{8,17}\b',  # Security Group ID
                r'\bs3://[^\s]+',  # S3 URI
            ],
            'IP': [
                r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b',  # IPv4
                r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b',  # IPv6
            ]
        }
    
    def load_events(self, file_path: str) -> None:
        """Load CloudTrail events from JSON file."""
        with open(file_path, 'r') as f:
            data = json.load(f)
            
        if isinstance(data, list):
            self.events = data
        elif 'Records' in data:
            self.events = data['Records']
        else:
            self.events = [data]
            
        print(f"📄 Loaded {len(self.events)} events for labeling")
    
    def create_synthetic_events(self, count: int = 50) -> None:
        """Create synthetic CloudTrail events for demonstration."""
        synthetic_events = []
        
        templates = [
            "User {user} assumed role {role} from IP {ip}",
            "Admin {user} created IAM user {new_user} with policy {policy}",
            "Service {service} launched EC2 instance {instance} in subnet {subnet}",
            "User {user} accessed S3 bucket {bucket} from {ip}",
            "Failed login attempt for {user} from {ip} at {timestamp}",
            "Role {role} was attached to user {user} by {admin}",
            "Resource {resource} was modified by {user} using {tool}",
        ]
        
        sample_data = {
            'user': ['john.doe@company.com', 'alice.smith', 'bob.jones@corp.com', 'admin@company.com'],
            'role': ['arn:aws:iam::123456789012:role/S3ReadOnlyRole', 'PowerUserRole', 'AdminRole'],
            'ip': ['192.168.1.100', '10.0.1.50', '172.16.0.25', '203.0.113.1'],
            'instance': ['i-0123456789abcdef0', 'i-0987654321fedcba0'],
            'bucket': ['company-logs-bucket', 'user-data-store', 'backup-bucket'],
            'service': ['lambda.amazonaws.com', 'ec2.amazonaws.com'],
            'policy': ['PowerUserAccess', 'S3FullAccess', 'ReadOnlyAccess'],
            'resource': ['arn:aws:s3:::my-bucket/file.txt', 'arn:aws:ec2:us-east-1:123456789012:instance/i-1234567890abcdef0'],
            'subnet': ['subnet-12345678', 'subnet-87654321'],
            'tool': ['aws-cli', 'terraform', 'cloudformation']
        }
        
        for i in range(count):
            template = np.random.choice(templates)
            
            # Fill template with random values
            event_text = template
            for key, values in sample_data.items():
                if f'{{{key}}}' in event_text:
                    event_text = event_text.replace(f'{{{key}}}', np.random.choice(values))
            
            # Add timestamp
            event_text = event_text.replace('{timestamp}', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
            
            synthetic_events.append({
                'eventName': f'SyntheticEvent{i+1}',
                'eventSource': 'synthetic.generator',
                'eventDescription': event_text,
                'rawText': event_text
            })
        
        self.events = synthetic_events
        print(f"🔧 Generated {len(self.events)} synthetic events for labeling")
    
    def suggest_labels(self, text: str) -> List[Tuple[str, str, str]]:
        """Suggest entity labels based on patterns."""
        suggestions = []
        
        for entity_type, patterns in self.patterns.items():
            for pattern in patterns:
                matches = re.finditer(pattern, text)
                for match in matches:
                    suggestions.append((
                        match.group(),
                        entity_type,
                        f"Pattern: {pattern[:30]}..."
                    ))
        
        return suggestions

# Initialize the labeler
labeler = EntityLabeler()
print("🏷️ Entity Labeler initialized!")

In [None]:
# Create synthetic data for demonstration
labeler.create_synthetic_events(100)

# Display first few events
print("📝 Sample events to label:")
for i, event in enumerate(labeler.events[:5]):
    print(f"\n{i+1}. {event['rawText']}")
    suggestions = labeler.suggest_labels(event['rawText'])
    if suggestions:
        print("   💡 Auto-detected entities:")
        for entity, etype, reason in suggestions[:3]:  # Show top 3
            print(f"      • {entity} → {etype}")

## 🖱️ Interactive Labeling Interface

In [None]:
class InteractiveLabelingWidget:
    """Interactive widget for labeling entities in text."""
    
    def __init__(self, labeler: EntityLabeler):
        self.labeler = labeler
        self.current_labels = []
        self.current_tokens = []
        
        # Create widgets
        self.progress_label = widgets.HTML(value="Progress: 0/0")
        self.event_text = widgets.HTML(value="", layout=widgets.Layout(border='1px solid #ccc', padding='10px'))
        self.token_buttons = widgets.VBox()
        self.suggestions_box = widgets.VBox()
        
        # Navigation buttons
        self.prev_btn = widgets.Button(description="← Previous", button_style='info')
        self.next_btn = widgets.Button(description="Next →", button_style='success')
        self.save_btn = widgets.Button(description="💾 Save Labels", button_style='warning')
        self.auto_label_btn = widgets.Button(description="🤖 Auto-Label", button_style='primary')
        
        # Connect button events
        self.prev_btn.on_click(self.prev_event)
        self.next_btn.on_click(self.next_event)
        self.save_btn.on_click(self.save_current_labels)
        self.auto_label_btn.on_click(self.apply_auto_labels)
        
        # Layout
        nav_box = widgets.HBox([self.prev_btn, self.next_btn, self.auto_label_btn, self.save_btn])
        self.widget = widgets.VBox([
            self.progress_label,
            self.event_text,
            widgets.HTML("<h4>🎯 Token Labeling:</h4>"),
            self.token_buttons,
            widgets.HTML("<h4>💡 Auto-Suggestions:</h4>"),
            self.suggestions_box,
            nav_box
        ])
        
        # Initialize with first event
        if self.labeler.events:
            self.show_current_event()
    
    def tokenize_text(self, text: str) -> List[str]:
        """Simple tokenization (split on whitespace and punctuation)."""
        # Simple tokenization for demo - in practice, use proper tokenizer
        tokens = []
        current_token = ""
        
        for char in text:
            if char.isalnum() or char in ['@', '.', '-', '_', ':']:
                current_token += char
            else:
                if current_token:
                    tokens.append(current_token)
                    current_token = ""
                if not char.isspace():
                    tokens.append(char)
        
        if current_token:
            tokens.append(current_token)
        
        return tokens
    
    def show_current_event(self):
        """Display current event for labeling."""
        if not self.labeler.events:
            return
        
        event = self.labeler.events[self.labeler.current_index]
        text = event.get('rawText', event.get('eventDescription', ''))
        
        # Update progress
        progress = f"Progress: {self.labeler.current_index + 1}/{len(self.labeler.events)}"
        self.progress_label.value = f"<h3>{progress}</h3>"
        
        # Show event text
        self.event_text.value = f"<h4>Event Text:</h4><p style='font-family: monospace; background: #f5f5f5; padding: 10px;'>{text}</p>"
        
        # Tokenize and create labeling buttons
        self.current_tokens = self.tokenize_text(text)
        self.current_labels = ['O'] * len(self.current_tokens)
        
        self.create_token_buttons()
        self.show_suggestions(text)
        
        # Update navigation buttons
        self.prev_btn.disabled = (self.labeler.current_index == 0)
        self.next_btn.disabled = (self.labeler.current_index >= len(self.labeler.events) - 1)
    
    def create_token_buttons(self):
        """Create interactive buttons for each token."""
        token_widgets = []
        
        for i, token in enumerate(self.current_tokens):
            # Create dropdown for label selection
            label_dropdown = widgets.Dropdown(
                options=self.labeler.bio_tags,
                value=self.current_labels[i],
                description=f'{i}:',
                layout=widgets.Layout(width='150px')
            )
            
            # Create label update handler
            def make_update_handler(index):
                def update_label(change):
                    self.current_labels[index] = change['new']
                    self.update_token_display()
                return update_label
            
            label_dropdown.observe(make_update_handler(i), names='value')
            
            # Token display
            token_label = widgets.HTML(
                value=f"<span style='padding: 2px 5px; margin: 2px; background: #e7e7e7; border-radius: 3px;'>{token}</span>",
                layout=widgets.Layout(width='auto')
            )
            
            token_row = widgets.HBox([token_label, label_dropdown])
            token_widgets.append(token_row)
        
        # Show tokens in groups of 5 for better readability
        grouped_widgets = []
        for i in range(0, len(token_widgets), 5):
            group = widgets.VBox(token_widgets[i:i+5])
            grouped_widgets.append(group)
        
        self.token_buttons.children = tuple(grouped_widgets)
    
    def update_token_display(self):
        """Update visual display of tokens with their labels."""
        # This could be enhanced to show colored tokens based on their labels
        pass
    
    def show_suggestions(self, text: str):
        """Show auto-labeling suggestions."""
        suggestions = self.labeler.suggest_labels(text)
        
        if not suggestions:
            self.suggestions_box.children = (widgets.HTML("<p>No auto-suggestions found.</p>"),)
            return
        
        suggestion_widgets = []
        for entity, etype, reason in suggestions[:10]:  # Show top 10
            suggestion_html = widgets.HTML(
                value=f"<div style='margin: 5px; padding: 5px; border: 1px solid #ccc; background: #f9f9f9;'>"
                      f"<strong>{entity}</strong> → <span style='color: blue;'>{etype}</span><br>"
                      f"<small>{reason}</small></div>"
            )
            suggestion_widgets.append(suggestion_html)
        
        self.suggestions_box.children = tuple(suggestion_widgets)
    
    def apply_auto_labels(self, btn):
        """Apply automatic labels based on patterns."""
        event = self.labeler.events[self.labeler.current_index]
        text = event.get('rawText', event.get('eventDescription', ''))
        suggestions = self.labeler.suggest_labels(text)
        
        # Apply suggestions to tokens
        for entity, etype, _ in suggestions:
            # Find entity in tokens and apply BIO tags
            entity_tokens = self.tokenize_text(entity)
            
            # Simple matching - find consecutive tokens that match entity
            for i in range(len(self.current_tokens) - len(entity_tokens) + 1):
                if self.current_tokens[i:i+len(entity_tokens)] == entity_tokens:
                    # Apply BIO tags
                    self.current_labels[i] = f'B-{etype}'
                    for j in range(1, len(entity_tokens)):
                        if i + j < len(self.current_labels):
                            self.current_labels[i + j] = f'I-{etype}'
                    break
        
        # Refresh display
        self.create_token_buttons()
        print("🤖 Auto-labels applied!")
    
    def save_current_labels(self, btn):
        """Save current labels to dataset."""
        event = self.labeler.events[self.labeler.current_index]
        text = event.get('rawText', event.get('eventDescription', ''))
        
        labeled_example = {
            'text': text,
            'tokens': self.current_tokens.copy(),
            'labels': self.current_labels.copy(),
            'event_metadata': {
                'eventName': event.get('eventName', 'Unknown'),
                'eventSource': event.get('eventSource', 'Unknown'),
                'timestamp': datetime.now().isoformat()
            }
        }
        
        # Add or update in labeled data
        # Remove existing entry for this index if exists
        self.labeler.labeled_data = [
            item for item in self.labeler.labeled_data 
            if item.get('index') != self.labeler.current_index
        ]
        
        labeled_example['index'] = self.labeler.current_index
        self.labeler.labeled_data.append(labeled_example)
        
        print(f"💾 Saved labels for event {self.labeler.current_index + 1}")
        print(f"📊 Total labeled events: {len(self.labeler.labeled_data)}")
    
    def prev_event(self, btn):
        """Go to previous event."""
        if self.labeler.current_index > 0:
            self.labeler.current_index -= 1
            self.show_current_event()
    
    def next_event(self, btn):
        """Go to next event."""
        if self.labeler.current_index < len(self.labeler.events) - 1:
            self.labeler.current_index += 1
            self.show_current_event()
    
    def display(self):
        """Display the widget."""
        return self.widget

# Create and display the interactive labeling widget
if labeler.events:
    interactive_widget = InteractiveLabelingWidget(labeler)
    print("🖱️ Interactive labeling widget ready!")
    print("📝 Instructions:")
    print("1. Review the event text")
    print("2. Use the dropdowns to label each token")
    print("3. Click 'Auto-Label' for suggestions")
    print("4. Click 'Save Labels' when done")
    print("5. Use navigation buttons to move between events")
    
    # Display the widget
    interactive_widget.display()
else:
    print("❌ No events loaded. Please load data first.")

## 📈 Labeling Progress and Statistics

In [None]:
def show_labeling_progress():
    """Display labeling progress and statistics."""
    if not labeler.labeled_data:
        print("📊 No labeled data yet. Start labeling events!")
        return
    
    total_events = len(labeler.events)
    labeled_events = len(labeler.labeled_data)
    progress_pct = (labeled_events / total_events) * 100
    
    print(f"📊 Labeling Progress: {labeled_events}/{total_events} ({progress_pct:.1f}%)")
    print(f"{'█' * int(progress_pct // 2)}{'░' * int(50 - progress_pct // 2)} {progress_pct:.1f}%")
    
    # Entity statistics
    entity_counts = {'O': 0}
    for etype in labeler.entity_types:
        entity_counts[f'B-{etype}'] = 0
        entity_counts[f'I-{etype}'] = 0
    
    total_tokens = 0
    for example in labeler.labeled_data:
        for label in example['labels']:
            entity_counts[label] = entity_counts.get(label, 0) + 1
            total_tokens += 1
    
    print("\n🏷️ Entity Distribution:")
    for label, count in sorted(entity_counts.items()):
        if count > 0:
            pct = (count / total_tokens) * 100 if total_tokens > 0 else 0
            print(f"  {label:12} {count:6} ({pct:5.1f}%)")
    
    # Show some examples
    print("\n📝 Recent Labeled Examples:")
    for i, example in enumerate(labeler.labeled_data[-3:]):
        print(f"\n{i+1}. {example['text'][:100]}...")
        
        # Show entities found
        entities_found = []
        current_entity = None
        current_tokens = []
        
        for token, label in zip(example['tokens'], example['labels']):
            if label.startswith('B-'):
                if current_entity and current_tokens:
                    entities_found.append((current_entity, ' '.join(current_tokens)))
                current_entity = label[2:]
                current_tokens = [token]
            elif label.startswith('I-') and current_entity:
                current_tokens.append(token)
            else:
                if current_entity and current_tokens:
                    entities_found.append((current_entity, ' '.join(current_tokens)))
                current_entity = None
                current_tokens = []
        
        if current_entity and current_tokens:
            entities_found.append((current_entity, ' '.join(current_tokens)))
        
        if entities_found:
            print("   Entities:", ", ".join([f"{entity} ({etype})" for etype, entity in entities_found]))
        else:
            print("   No entities labeled")

# Button to show progress
progress_btn = widgets.Button(description="📊 Show Progress", button_style='info')
progress_btn.on_click(lambda x: show_labeling_progress())
display(progress_btn)

## 💾 Export Labeled Data

In [None]:
def export_labeled_data(format_type='json'):
    """Export labeled data in various formats."""
    if not labeler.labeled_data:
        print("❌ No labeled data to export")
        return
    
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    
    if format_type == 'json':
        filename = f'labeled_entities_{timestamp}.json'
        with open(filename, 'w') as f:
            json.dump(labeler.labeled_data, f, indent=2)
        print(f"💾 Exported {len(labeler.labeled_data)} labeled examples to {filename}")
    
    elif format_type == 'conll':
        # CoNLL-U format for NER
        filename = f'labeled_entities_{timestamp}.conll'
        with open(filename, 'w') as f:
            for example in labeler.labeled_data:
                for token, label in zip(example['tokens'], example['labels']):
                    f.write(f"{token}\t{label}\n")
                f.write("\n")  # Empty line between examples
        print(f"💾 Exported {len(labeler.labeled_data)} labeled examples to {filename} (CoNLL format)")
    
    elif format_type == 'csv':
        # CSV format
        filename = f'labeled_entities_{timestamp}.csv'
        rows = []
        for example in labeler.labeled_data:
            rows.append({
                'text': example['text'],
                'tokens': '|'.join(example['tokens']),
                'labels': '|'.join(example['labels']),
                'event_name': example['event_metadata']['eventName']
            })
        
        df = pd.DataFrame(rows)
        df.to_csv(filename, index=False)
        print(f"💾 Exported {len(labeler.labeled_data)} labeled examples to {filename} (CSV format)")
    
    elif format_type == 'pickle':
        # Pickle format for Python
        filename = f'labeled_entities_{timestamp}.pkl'
        with open(filename, 'wb') as f:
            pickle.dump(labeler.labeled_data, f)
        print(f"💾 Exported {len(labeler.labeled_data)} labeled examples to {filename} (Pickle format)")

# Export buttons
export_json_btn = widgets.Button(description="📄 Export JSON", button_style='success')
export_conll_btn = widgets.Button(description="📄 Export CoNLL", button_style='success')
export_csv_btn = widgets.Button(description="📄 Export CSV", button_style='success')
export_pickle_btn = widgets.Button(description="📄 Export Pickle", button_style='success')

export_json_btn.on_click(lambda x: export_labeled_data('json'))
export_conll_btn.on_click(lambda x: export_labeled_data('conll'))
export_csv_btn.on_click(lambda x: export_labeled_data('csv'))
export_pickle_btn.on_click(lambda x: export_labeled_data('pickle'))

export_box = widgets.HBox([export_json_btn, export_conll_btn, export_csv_btn, export_pickle_btn])
display(widgets.VBox([
    widgets.HTML("<h3>📤 Export Labeled Data</h3>"),
    export_box
]))

## ✅ Data Validation and Quality Checks

In [None]:
def validate_labeled_data():
    """Validate labeled data for common issues."""
    if not labeler.labeled_data:
        print("❌ No labeled data to validate")
        return
    
    print("🔍 Validating labeled data...\n")
    
    issues = []
    warnings = []
    
    for i, example in enumerate(labeler.labeled_data):
        tokens = example['tokens']
        labels = example['labels']
        
        # Check 1: Token and label count mismatch
        if len(tokens) != len(labels):
            issues.append(f"Example {i+1}: Token count ({len(tokens)}) != Label count ({len(labels)})")
        
        # Check 2: Invalid BIO sequence
        prev_label = 'O'
        for j, label in enumerate(labels):
            if label.startswith('I-'):
                entity_type = label[2:]
                if prev_label != f'B-{entity_type}' and not prev_label.startswith(f'I-{entity_type}'):
                    issues.append(f"Example {i+1}, Token {j+1}: Invalid I- tag without preceding B- tag")
            prev_label = label
        
        # Check 3: Unrecognized labels
        for j, label in enumerate(labels):
            if label not in labeler.bio_tags:
                issues.append(f"Example {i+1}, Token {j+1}: Unrecognized label '{label}'")
        
        # Check 4: Very long entities (potential issues)
        current_entity_length = 0
        for label in labels:
            if label.startswith('B-'):
                current_entity_length = 1
            elif label.startswith('I-'):
                current_entity_length += 1
                if current_entity_length > 10:
                    warnings.append(f"Example {i+1}: Very long entity ({current_entity_length} tokens)")
            else:
                current_entity_length = 0
        
        # Check 5: No entities in example
        if all(label == 'O' for label in labels):
            warnings.append(f"Example {i+1}: No entities labeled (all O tags)")
    
    # Report results
    if not issues and not warnings:
        print("✅ All labeled data looks good!")
    else:
        if issues:
            print(f"❌ Found {len(issues)} issues:")
            for issue in issues[:10]:  # Show first 10
                print(f"  • {issue}")
            if len(issues) > 10:
                print(f"  ... and {len(issues) - 10} more")
        
        if warnings:
            print(f"\n⚠️ Found {len(warnings)} warnings:")
            for warning in warnings[:10]:  # Show first 10
                print(f"  • {warning}")
            if len(warnings) > 10:
                print(f"  ... and {len(warnings) - 10} more")
    
    # Summary statistics
    print(f"\n📊 Quality Summary:")
    print(f"  • Total examples: {len(labeler.labeled_data)}")
    print(f"  • Issues found: {len(issues)}")
    print(f"  • Warnings: {len(warnings)}")
    
    if len(labeler.labeled_data) > 0:
        quality_score = max(0, 100 - (len(issues) * 10) - (len(warnings) * 2))
        print(f"  • Quality score: {quality_score}/100")

# Validation button
validate_btn = widgets.Button(description="🔍 Validate Data", button_style='warning')
validate_btn.on_click(lambda x: validate_labeled_data())
display(validate_btn)

## 🎯 Quick Start Guide

### For First-Time Users:

1. **📊 Run all cells above** to initialize the labeling system
2. **🖱️ Use the Interactive Widget** to label entities in security logs
3. **🤖 Try Auto-Label** for suggestions based on patterns
4. **💾 Save your work** frequently using the Save button
5. **📈 Check Progress** to see your labeling statistics
6. **✅ Validate Data** to ensure quality
7. **💾 Export Data** when ready for training

### Entity Labeling Guidelines:

- **USER**: `john.doe@company.com`, `alice.smith`, IAM user ARNs
- **ROLE**: `AdminRole`, `PowerUserRole`, IAM role ARNs
- **RESOURCE**: S3 buckets, EC2 instances, any AWS resource
- **IP**: `192.168.1.100`, `10.0.1.50`, any IP address

### BIO Tagging Rules:

- Use **B-** for the beginning of an entity
- Use **I-** for inside/continuation of an entity
- Use **O** for tokens outside any entity
- Example: `john.doe@company.com` → `B-USER`
- Example: `EC2 instance i-1234567890` → `O O B-RESOURCE`

### Tips for High-Quality Labels:

1. **Be consistent** with entity boundaries
2. **Include context** when unsure (e.g., full ARNs vs just names)
3. **Use auto-suggestions** as starting points
4. **Validate frequently** to catch issues early
5. **Export regularly** to save your work

Happy labeling! 🏷️✨