In [None]:
import random
import numpy as np
from typing import List, Dict, Tuple
import json

class TwoTowerPracticeGenerator:
    """
    Generates simplified Two-Tower model practice problems that can be solved by hand.
    Focuses on core concepts like embeddings, dot products, ranking, and negative sampling.
    """
    
    def __init__(self):
        # Simple feature vocabularies for realistic problems
        self.user_features = {
            'age_group': ['18-25', '26-35', '36-45', '46+'],
            'location': ['urban', 'suburban', 'rural'],
            'device': ['mobile', 'desktop', 'tablet']
        }
        
        self.item_features = {
            'category': ['electronics', 'books', 'clothing', 'food', 'sports'],
            'price_range': ['low', 'medium', 'high'],
            'brand': ['A', 'B', 'C', 'D']
        }
        
        # Simple embedding mappings (in practice these would be learned)
        self.feature_embeddings = {
            # User feature embeddings (2D for hand calculation)
            'age_group': {'18-25': [1.0, 0.5], '26-35': [0.8, 0.8], '36-45': [0.5, 1.0], '46+': [0.2, 0.9]},
            'location': {'urban': [1.0, 0.3], 'suburban': [0.6, 0.6], 'rural': [0.2, 0.9]},
            'device': {'mobile': [0.9, 0.4], 'desktop': [0.4, 0.9], 'tablet': [0.7, 0.7]},
            
            # Item feature embeddings (2D for hand calculation)
            'category': {'electronics': [1.0, 0.2], 'books': [0.3, 1.0], 'clothing': [0.8, 0.6], 
                        'food': [0.5, 0.8], 'sports': [0.9, 0.3]},
            'price_range': {'low': [0.2, 0.9], 'medium': [0.6, 0.6], 'high': [1.0, 0.2]},
            'brand': {'A': [1.0, 0.0], 'B': [0.7, 0.7], 'C': [0.0, 1.0], 'D': [0.5, 0.5]}
        }
    
    def generate_embedding_calculation_problem(self) -> Dict:
        """Generate a problem about calculating user/item embeddings from features"""
        
        # Create a user with random features
        user = {
            'age_group': random.choice(self.user_features['age_group']),
            'location': random.choice(self.user_features['location']),
            'device': random.choice(self.user_features['device'])
        }
        
        # Create an item with random features
        item = {
            'category': random.choice(self.item_features['category']),
            'price_range': random.choice(self.item_features['price_range']),
            'brand': random.choice(self.item_features['brand'])
        }
        
        problem = {
            'type': 'embedding_calculation',
            'description': 'Calculate user and item embeddings, then compute similarity score',
            'user_features': user,
            'item_features': item,
            'feature_embeddings': self.feature_embeddings,
            'instructions': [
                '1. Calculate user embedding by averaging feature embeddings',
                '2. Calculate item embedding by averaging feature embeddings', 
                '3. Compute dot product similarity score',
                '4. Round final answer to 3 decimal places'
            ]
        }
        
        # Calculate solution
        user_embedding = self._calculate_user_embedding(user)
        item_embedding = self._calculate_item_embedding(item)
        similarity_score = np.dot(user_embedding, item_embedding)
        
        problem['solution'] = {
            'user_embedding': user_embedding,
            'item_embedding': item_embedding,
            'similarity_score': round(similarity_score, 3)
        }
        
        return problem
    
    def generate_ranking_problem(self) -> Dict:
        """Generate a problem about ranking items for a user"""
        
        # Create a user
        user = {
            'age_group': random.choice(self.user_features['age_group']),
            'location': random.choice(self.user_features['location']),
            'device': random.choice(self.user_features['device'])
        }
        
        # Create 4-5 items to rank
        num_items = random.randint(4, 5)
        items = []
        for i in range(num_items):
            item = {
                'id': f'item_{i+1}',
                'category': random.choice(self.item_features['category']),
                'price_range': random.choice(self.item_features['price_range']),
                'brand': random.choice(self.item_features['brand'])
            }
            items.append(item)
        
        problem = {
            'type': 'ranking',
            'description': 'Rank items by similarity score for the given user',
            'user_features': user,
            'items': items,
            'feature_embeddings': self.feature_embeddings,
            'instructions': [
                '1. Calculate user embedding',
                '2. Calculate embedding for each item',
                '3. Compute similarity scores (dot products)',
                '4. Rank items from highest to lowest score'
            ]
        }
        
        # Calculate solution
        user_embedding = self._calculate_user_embedding(user)
        item_scores = []
        
        for item in items:
            item_embedding = self._calculate_item_embedding(item)
            score = np.dot(user_embedding, item_embedding)
            item_scores.append((item['id'], round(score, 3)))
        
        # Sort by score descending
        item_scores.sort(key=lambda x: x[1], reverse=True)
        
        problem['solution'] = {
            'user_embedding': user_embedding,
            'ranked_items': item_scores
        }
        
        return problem
    
    def generate_negative_sampling_problem(self) -> Dict:
        """Generate a problem about understanding negative sampling"""
        
        # Create a user who interacted with one item (positive)
        user = {
            'age_group': random.choice(self.user_features['age_group']),
            'location': random.choice(self.user_features['location']),
            'device': random.choice(self.user_features['device'])
        }
        
        # Positive item (user interacted with this)
        positive_item = {
            'id': 'positive_item',
            'category': random.choice(self.item_features['category']),
            'price_range': random.choice(self.item_features['price_range']),
            'brand': random.choice(self.item_features['brand'])
        }
        
        # Generate 3 negative items (user didn't interact)
        negative_items = []
        for i in range(3):
            item = {
                'id': f'negative_item_{i+1}',
                'category': random.choice(self.item_features['category']),
                'price_range': random.choice(self.item_features['price_range']),
                'brand': random.choice(self.item_features['brand'])
            }
            negative_items.append(item)
        
        problem = {
            'type': 'negative_sampling',
            'description': 'Calculate loss for positive and negative samples',
            'user_features': user,
            'positive_item': positive_item,
            'negative_items': negative_items,
            'feature_embeddings': self.feature_embeddings,
            'instructions': [
                '1. Calculate user embedding',
                '2. Calculate positive item embedding and score',
                '3. Calculate negative item embeddings and scores',
                '4. For each negative, calculate: max(0, negative_score - positive_score + 0.1)',
                '5. Sum all losses (this is a simplified margin loss)'
            ]
        }
        
        # Calculate solution
        user_embedding = self._calculate_user_embedding(user)
        positive_embedding = self._calculate_item_embedding(positive_item)
        positive_score = np.dot(user_embedding, positive_embedding)
        
        negative_scores = []
        total_loss = 0
        margin = 0.1
        
        for item in negative_items:
            item_embedding = self._calculate_item_embedding(item)
            score = np.dot(user_embedding, item_embedding)
            negative_scores.append((item['id'], round(score, 3)))
            
            # Simplified margin loss: max(0, negative_score - positive_score + margin)
            loss = max(0, score - positive_score + margin)
            total_loss += loss
        
        problem['solution'] = {
            'user_embedding': user_embedding,
            'positive_score': round(positive_score, 3),
            'negative_scores': negative_scores,
            'total_loss': round(total_loss, 3)
        }
        
        return problem
    
    def generate_cold_start_problem(self) -> Dict:
        """Generate a problem about handling cold start (new user/item)"""
        
        # New user with minimal features
        new_user = {
            'age_group': random.choice(self.user_features['age_group']),
            'location': 'unknown',  # Missing feature
            'device': random.choice(self.user_features['device'])
        }
        
        # Regular item
        item = {
            'category': random.choice(self.item_features['category']),
            'price_range': random.choice(self.item_features['price_range']),
            'brand': random.choice(self.item_features['brand'])
        }
        
        problem = {
            'type': 'cold_start',
            'description': 'Handle missing user features in embedding calculation',
            'user_features': new_user,
            'item_features': item,
            'feature_embeddings': self.feature_embeddings,
            'instructions': [
                '1. For missing "location" feature, use default embedding [0.5, 0.5]',
                '2. Calculate user embedding by averaging available + default embeddings',
                '3. Calculate item embedding normally',
                '4. Compute similarity score'
            ]
        }
        
        # Calculate solution with default for missing feature
        user_embedding = self._calculate_user_embedding_with_default(new_user)
        item_embedding = self._calculate_item_embedding(item)
        similarity_score = np.dot(user_embedding, item_embedding)
        
        problem['solution'] = {
            'user_embedding': user_embedding,
            'item_embedding': item_embedding,
            'similarity_score': round(similarity_score, 3)
        }
        
        return problem
    
    def _calculate_user_embedding(self, user: Dict) -> List[float]:
        """Calculate user embedding by averaging feature embeddings"""
        embeddings = []
        for feature, value in user.items():
            if feature in self.feature_embeddings and value in self.feature_embeddings[feature]:
                embeddings.append(self.feature_embeddings[feature][value])
        
        # Average all embeddings
        avg_embedding = np.mean(embeddings, axis=0)
        return [round(x, 3) for x in avg_embedding]
    
    def _calculate_user_embedding_with_default(self, user: Dict) -> List[float]:
        """Calculate user embedding with default for missing features"""
        embeddings = []
        for feature, value in user.items():
            if feature in self.feature_embeddings and value in self.feature_embeddings[feature]:
                embeddings.append(self.feature_embeddings[feature][value])
            else:
                # Use default embedding for missing/unknown features
                embeddings.append([0.5, 0.5])
        
        # Average all embeddings
        avg_embedding = np.mean(embeddings, axis=0)
        return [round(x, 3) for x in avg_embedding]
    
    def _calculate_item_embedding(self, item: Dict) -> List[float]:
        """Calculate item embedding by averaging feature embeddings"""
        embeddings = []
        for feature, value in item.items():
            if feature in self.feature_embeddings and value in self.feature_embeddings[feature]:
                embeddings.append(self.feature_embeddings[feature][value])
        
        # Average all embeddings
        avg_embedding = np.mean(embeddings, axis=0)
        return [round(x, 3) for x in avg_embedding]
    
    def generate_random_problem(self) -> Dict:
        """Generate a random problem type"""
        problem_types = [
            self.generate_embedding_calculation_problem,
            self.generate_ranking_problem,
            self.generate_negative_sampling_problem,
            self.generate_cold_start_problem
        ]
        
        return random.choice(problem_types)()
    
    def print_problem(self, problem: Dict, show_solution: bool = False):
        """Pretty print a problem"""
        print("="*60)
        print(f"PROBLEM TYPE: {problem['type'].upper()}")
        print("="*60)
        print(f"Description: {problem['description']}")
        print()
        
        print("USER FEATURES:")
        for k, v in problem['user_features'].items():
            print(f"  {k}: {v}")
        print()
        
        if 'item_features' in problem:
            print("ITEM FEATURES:")
            for k, v in problem['item_features'].items():
                print(f"  {k}: {v}")
            print()
        
        if 'items' in problem:
            print("ITEMS TO RANK:")
            for item in problem['items']:
                print(f"  {item['id']}: {dict(item)}")
            print()
        
        if 'positive_item' in problem:
            print("POSITIVE ITEM:")
            for k, v in problem['positive_item'].items():
                print(f"  {k}: {v}")
            print()
            
            print("NEGATIVE ITEMS:")
            for item in problem['negative_items']:
                print(f"  {item['id']}: {dict(item)}")
            print()
        
        print("FEATURE EMBEDDINGS:")
        for category, mappings in problem['feature_embeddings'].items():
            print(f"  {category}:")
            for feature, embedding in mappings.items():
                print(f"    {feature}: {embedding}")
        print()
        
        print("INSTRUCTIONS:")
        for instruction in problem['instructions']:
            print(f"  {instruction}")
        print()
        
        if show_solution:
            print("SOLUTION:")
            print("-"*40)
            for k, v in problem['solution'].items():
                print(f"  {k}: {v}")
            print()

generator = TwoTowerPracticeGenerator()
    
print("Two-Tower Model Practice Problem Generator")
print("========================================")
print()

# Generate different types of problems
problems = [
    generator.generate_embedding_calculation_problem(),
    generator.generate_ranking_problem(),
    generator.generate_negative_sampling_problem(),
    generator.generate_cold_start_problem()
]

for i, problem in enumerate(problems, 1):
    print(f"\n{'='*20} PROBLEM {i} {'='*20}")
    generator.print_problem(problem, show_solution=False)
    
    input("Press Enter to see solution...")
    generator.print_problem(problem, show_solution=True)
    
    if i < len(problems):
        input("Press Enter for next problem...")

print("\nGenerate more problems by running:")
print("problem = generator.generate_random_problem()")
print("generator.print_problem(problem)")

Two-Tower Model Practice Problem Generator


PROBLEM TYPE: EMBEDDING_CALCULATION
Description: Calculate user and item embeddings, then compute similarity score

USER FEATURES:
  age_group: 18-25
  location: suburban
  device: mobile

ITEM FEATURES:
  category: electronics
  price_range: medium
  brand: B

FEATURE EMBEDDINGS:
  age_group:
    18-25: [1.0, 0.5]
    26-35: [0.8, 0.8]
    36-45: [0.5, 1.0]
    46+: [0.2, 0.9]
  location:
    urban: [1.0, 0.3]
    suburban: [0.6, 0.6]
    rural: [0.2, 0.9]
  device:
    mobile: [0.9, 0.4]
    desktop: [0.4, 0.9]
    tablet: [0.7, 0.7]
  category:
    electronics: [1.0, 0.2]
    books: [0.3, 1.0]
    clothing: [0.8, 0.6]
    food: [0.5, 0.8]
    sports: [0.9, 0.3]
  price_range:
    low: [0.2, 0.9]
    medium: [0.6, 0.6]
    high: [1.0, 0.2]
  brand:
    A: [1.0, 0.0]
    B: [0.7, 0.7]
    C: [0.0, 1.0]
    D: [0.5, 0.5]

INSTRUCTIONS:
  1. Calculate user embedding by averaging feature embeddings
  2. Calculate item embedding by averaging fe