In [17]:
import torch
import torch.nn as nn
import numpy as np
from typing import List, Dict, Any

class CrimeEventEmbedding:
    def __init__(self, feature_dims: int = 10):
        """
        Initialize the Crime Event Embedding model
        
        Args:
            feature_dims: Uniform embedding dimension for all features
        """
        self.feature_embeddings = nn.ModuleDict()
        self.feature_weights = {}
        self.embedding_dim = feature_dims
        
        # Predefined features with their vocabulary sizes
        self.features = {
            'location_type': 50,     # Max 50 unique location types
            'crime_type': 50,        # Max 50 unique crime types
            'time_of_day': 24,       # 24 hours
            'suspect_age_group': 10  # 10 age group categories
        }
        
        # Create embeddings for each feature type with consistent dimension
        for feature, vocab_size in self.features.items():
            self.feature_embeddings[feature] = nn.Embedding(vocab_size, self.embedding_dim)
            self.feature_weights[feature] = 1.0
    
    def encode_event(self, event_features: Dict[str, torch.Tensor]) -> torch.Tensor:
        """
        Encode a single event by combining feature embeddings
        
        Args:
            event_features: Dictionary of feature tensors for a single event
        
        Returns:
            Combined event embedding vector
        """
        feature_embeddings = []
        
        for feature, embedding_layer in self.feature_embeddings.items():
            # Ensure the feature exists and is a valid tensor
            if feature in event_features:
                try:
                    # Get the embedding for this feature
                    feature_emb = embedding_layer(event_features[feature])
                    # Scale by feature weight
                    weighted_emb = feature_emb * self.feature_weights[feature]
                    feature_embeddings.append(weighted_emb)
                except Exception as e:
                    print(f"Error processing feature {feature}: {e}")
                    continue
        
        # Combine embeddings by averaging, handling potential empty list
        if not feature_embeddings:
            raise ValueError("No valid feature embeddings found")
        
        return torch.mean(torch.stack(feature_embeddings), dim=0)
    
    def calculate_similarity(self, event1: torch.Tensor, event2: torch.Tensor, method='cosine') -> float:
        """
        Calculate similarity between two event embeddings
        
        Args:
            event1: First event embedding
            event2: Second event embedding
            method: Similarity calculation method
        
        Returns:
            Similarity score
        """
        if method == 'cosine':
            # Cosine similarity
            return torch.nn.functional.cosine_similarity(event1.unsqueeze(0), event2.unsqueeze(0)).item()
        elif method == 'euclidean':
            # Euclidean distance (inverse, so higher means more similar)
            return -torch.dist(event1, event2).item()
        else:
            raise ValueError("Unsupported similarity method")
    
    def group_similarity(self, target_event: torch.Tensor, event_group: List[torch.Tensor], threshold: float = 0.7) -> bool:
        """
        Determine if a target event is similar to a group of events
        
        Args:
            target_event: Embedding of the event to compare
            event_group: List of event embeddings to compare against
            threshold: Similarity threshold for group classification
        
        Returns:
            Boolean indicating if the event is similar to the group
        """
        # Calculate similarities to all events in the group
        similarities = [self.calculate_similarity(target_event, event) for event in event_group]
        
        # Check if any similarity exceeds the threshold
        return any(sim > threshold for sim in similarities)

def create_sample_crime_dataset():
    """
    Create a sample dataset of crime events with different features
    """
    # Initialize the embedding model with a fixed dimension
    crime_embedding = CrimeEventEmbedding(feature_dims=10)
    
    # Create some sample events
    event1_features = {
        'location_type': torch.tensor(25),  # Encoded location type
        'crime_type': torch.tensor(10),     # Encoded crime type
        'time_of_day': torch.tensor(3),     # Encoded time of day
        'suspect_age_group': torch.tensor(2)  # Encoded suspect age group
    }
    
    event2_features = {
        'location_type': torch.tensor(25),  # Similar location type
        'crime_type': torch.tensor(11),     # Slightly different crime type
        'time_of_day': torch.tensor(4),     # Slightly different time
        'suspect_age_group': torch.tensor(2)  # Same age group
    }
    
    # Encode the events
    event1_embedding = crime_embedding.encode_event(event1_features)
    event2_embedding = crime_embedding.encode_event(event2_features)
    
    # Calculate similarity
    similarity = crime_embedding.calculate_similarity(event1_embedding, event2_embedding)
    print(f"Event Similarity: {similarity}")
    
    # Create a group of events to compare against
    event_group = [
        crime_embedding.encode_event(event1_features),
        
        crime_embedding.encode_event({
            'location_type': torch.tensor(26),
            'crime_type': torch.tensor(10),
            'time_of_day': torch.tensor(3),
            'suspect_age_group': torch.tensor(2)
        })
    ]
    
    # Check if a new event is similar to the group
    is_similar = crime_embedding.group_similarity(event2_embedding, event_group)
    print(f"Is event similar to group: {is_similar}")

# Run the demonstration
create_sample_crime_dataset()

Event Similarity: 0.8152422308921814
Is event similar to group: True
