# Repository Pattern Tutorial 📚

## What is the Repository Pattern?

The Repository Pattern encapsulates the logic needed to access data sources. It centralizes common data access functionality, providing better maintainability and decoupling the infrastructure or technology used to access databases from the domain model layer.

**Real-world analogy**: Think of a library. You don't need to know exactly where books are stored, how the filing system works, or whether they're physical or digital. You just ask the librarian (repository) for a book, and they handle all the details of finding and retrieving it.

## Table of Contents
1. [What is the Repository Pattern?](#what-is-the-repository-pattern)
2. [Why Do We Need It?](#why-do-we-need-it)
3. [Simple Implementation](#simple-implementation)
4. [Understanding the Implementation](#understanding-the-implementation)
5. [Multiple Implementations and Testability (Advanced)](#advanced-example-multiple-implementations-and-testability)
6. [When to Use (and When NOT to Use)](#when-to-use-and-when-not-to-use)

## Why Do We Need It?

Let's see what happens when we directly access data in our business logic:

In [None]:
# Problem: Direct database access scattered throughout the code

import sqlite3
from typing import List, Optional
import tempfile
import os

class User:
    def __init__(self, id: int, name: str, email: str):
        self.id = id
        self.name = name
        self.email = email
    
    def __repr__(self):
        return f"User(id={self.id}, name='{self.name}', email='{self.email}')"

class BadUserService:
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL
            )
        ''')
        conn.commit()
        conn.close()
    
    def register_user(self, name: str, email: str) -> User:
        # Database access mixed with business logic!
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Check if email exists
        cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
        if cursor.fetchone():
            conn.close()
            raise ValueError("Email already exists")
        
        # Insert user
        cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", (name, email))
        user_id = cursor.lastrowid
        conn.commit()
        conn.close()
        
        return User(user_id, name, email)
    
    def send_welcome_email(self, user_id: int):
        # More database access in business logic!
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
        row = cursor.fetchone()
        conn.close()
        
        if row:
            print(f"Sending welcome email to {row[2]}")
        else:
            raise ValueError("User not found")

# Problems:
# 1. Database code scattered everywhere
# 2. Hard to test (tightly coupled to SQLite)
# 3. Difficult to change database technology
# 4. Business logic mixed with data access
# 5. Code duplication (connection handling)

# Let's see the problems in action (using temp file instead of :memory:)
temp_db = tempfile.NamedTemporaryFile(delete=False, suffix='.db')
temp_db.close()

try:
    service = BadUserService(temp_db.name)
    user = service.register_user("John Doe", "john@example.com")
    print(f"Created: {user}")
    service.send_welcome_email(user.id)
finally:
    # Clean up
    os.unlink(temp_db.name)

## Simple Implementation

Let's solve this with the Repository Pattern:

In [None]:
from abc import ABC, abstractmethod
from typing import List, Optional
import sqlite3

# Domain model
class User:
    def __init__(self, id: Optional[int], name: str, email: str):
        self.id = id
        self.name = name
        self.email = email
    
    def __repr__(self):
        return f"User(id={self.id}, name='{self.name}', email='{self.email}')"
    
    def __eq__(self, other):
        if not isinstance(other, User):
            return False
        return self.id == other.id and self.name == other.name and self.email == other.email

# Repository interface
class UserRepository(ABC):
    @abstractmethod
    def save(self, user: User) -> User:
        pass
    
    @abstractmethod
    def find_by_id(self, user_id: int) -> Optional[User]:
        pass
    
    @abstractmethod
    def find_by_email(self, email: str) -> Optional[User]:
        pass
    
    @abstractmethod
    def find_all(self) -> List[User]:
        pass
    
    @abstractmethod
    def delete(self, user_id: int) -> bool:
        pass

# SQLite implementation
class SqliteUserRepository(UserRepository):
    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()
    
    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute('''
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                email TEXT UNIQUE NOT NULL
            )
        ''')
        conn.commit()
        conn.close()
    
    def save(self, user: User) -> User:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        if user.id is None:
            # Insert new user
            cursor.execute("INSERT INTO users (name, email) VALUES (?, ?)", 
                         (user.name, user.email))
            user.id = cursor.lastrowid
        else:
            # Update existing user
            cursor.execute("UPDATE users SET name = ?, email = ? WHERE id = ?", 
                         (user.name, user.email, user.id))
        
        conn.commit()
        conn.close()
        return user
    
    def find_by_id(self, user_id: int) -> Optional[User]:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))
        row = cursor.fetchone()
        conn.close()
        
        if row:
            return User(row[0], row[1], row[2])
        return None
    
    def find_by_email(self, email: str) -> Optional[User]:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users WHERE email = ?", (email,))
        row = cursor.fetchone()
        conn.close()
        
        if row:
            return User(row[0], row[1], row[2])
        return None
    
    def find_all(self) -> List[User]:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM users")
        rows = cursor.fetchall()
        conn.close()
        
        return [User(row[0], row[1], row[2]) for row in rows]
    
    def delete(self, user_id: int) -> bool:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("DELETE FROM users WHERE id = ?", (user_id,))
        deleted = cursor.rowcount > 0
        conn.commit()
        conn.close()
        return deleted

In [None]:
# Business logic service (now clean and focused)
class UserService:
    def __init__(self, user_repository: UserRepository):
        self.user_repository = user_repository
    
    def register_user(self, name: str, email: str) -> User:
        # Business logic validation
        if not name.strip():
            raise ValueError("Name cannot be empty")
        
        if "@" not in email:
            raise ValueError("Invalid email format")
        
        # Check if email already exists
        existing_user = self.user_repository.find_by_email(email)
        if existing_user:
            raise ValueError("Email already exists")
        
        # Create and save user
        user = User(None, name, email)
        return self.user_repository.save(user)
    
    def get_user(self, user_id: int) -> Optional[User]:
        return self.user_repository.find_by_id(user_id)
    
    def send_welcome_email(self, user_id: int):
        user = self.user_repository.find_by_id(user_id)
        if not user:
            raise ValueError("User not found")
        
        # Business logic for welcome email
        print(f"Sending welcome email to {user.email}")
        print(f"Welcome {user.name}! Thanks for joining us.")
    
    def get_all_users(self) -> List[User]:
        return self.user_repository.find_all()

# Testing the clean implementation with temp file
import tempfile
import os

temp_db = tempfile.NamedTemporaryFile(delete=False, suffix='.db')
temp_db.close()

try:
    repository = SqliteUserRepository(temp_db.name)
    service = UserService(repository)
    
    print("--- Testing Repository Pattern ---")
    
    # Register users
    user1 = service.register_user("Alice Johnson", "alice@example.com")
    user2 = service.register_user("Bob Smith", "bob@example.com")
    
    print(f"Registered: {user1}")
    print(f"Registered: {user2}")
    
    # Send welcome emails
    service.send_welcome_email(user1.id)
    
    # List all users
    print("\nAll users:")
    for user in service.get_all_users():
        print(f"  {user}")
        
finally:
    # Clean up
    os.unlink(temp_db.name)

## Understanding the Implementation

### Key Concepts:

1. **Repository Interface**: Defines the contract for data access operations
2. **Concrete Repository**: Implements the interface for a specific data source
3. **Domain Model**: Business entities that are independent of data access
4. **Service Layer**: Contains business logic and uses repositories

### Benefits:
- **Separation of Concerns**: Business logic is separate from data access
- **Testability**: Easy to mock repositories for unit testing
- **Flexibility**: Can switch between different data sources
- **Maintainability**: Changes to data access don't affect business logic

## Advanced Example: Multiple Implementations and Testability

Let's create multiple repository implementations and show how easy testing becomes:

In [None]:
# In-memory implementation for testing
class InMemoryUserRepository(UserRepository):
    def __init__(self):
        self._users = {}
        self._next_id = 1
    
    def save(self, user: User) -> User:
        if user.id is None:
            user.id = self._next_id
            self._next_id += 1
        
        # Check for duplicate email
        for existing_user in self._users.values():
            if existing_user.email == user.email and existing_user.id != user.id:
                raise ValueError("Email already exists")
        
        self._users[user.id] = User(user.id, user.name, user.email)
        return self._users[user.id]
    
    def find_by_id(self, user_id: int) -> Optional[User]:
        return self._users.get(user_id)
    
    def find_by_email(self, email: str) -> Optional[User]:
        for user in self._users.values():
            if user.email == email:
                return user
        return None
    
    def find_all(self) -> List[User]:
        return list(self._users.values())
    
    def delete(self, user_id: int) -> bool:
        if user_id in self._users:
            del self._users[user_id]
            return True
        return False

# File-based implementation
import json
import os
import tempfile

class JsonFileUserRepository(UserRepository):
    def __init__(self, file_path: str):
        self.file_path = file_path
        self._ensure_file_exists()
    
    def _ensure_file_exists(self):
        if not os.path.exists(self.file_path):
            with open(self.file_path, 'w') as f:
                json.dump({"users": [], "next_id": 1}, f)
    
    def _load_data(self):
        try:
            with open(self.file_path, 'r') as f:
                return json.load(f)
        except (json.JSONDecodeError, FileNotFoundError):
            # If file is corrupted or doesn't exist, return default structure
            return {"users": [], "next_id": 1}
    
    def _save_data(self, data):
        with open(self.file_path, 'w') as f:
            json.dump(data, f, indent=2)
    
    def save(self, user: User) -> User:
        data = self._load_data()
        
        # Check for duplicate email
        for existing_user in data["users"]:
            if existing_user["email"] == user.email and existing_user["id"] != user.id:
                raise ValueError("Email already exists")
        
        if user.id is None:
            user.id = data["next_id"]
            data["next_id"] += 1
            data["users"].append({
                "id": user.id,
                "name": user.name,
                "email": user.email
            })
        else:
            # Update existing user
            for i, u in enumerate(data["users"]):
                if u["id"] == user.id:
                    data["users"][i] = {
                        "id": user.id,
                        "name": user.name,
                        "email": user.email
                    }
                    break
        
        self._save_data(data)
        return user
    
    def find_by_id(self, user_id: int) -> Optional[User]:
        data = self._load_data()
        for user_data in data["users"]:
            if user_data["id"] == user_id:
                return User(user_data["id"], user_data["name"], user_data["email"])
        return None
    
    def find_by_email(self, email: str) -> Optional[User]:
        data = self._load_data()
        for user_data in data["users"]:
            if user_data["email"] == email:
                return User(user_data["id"], user_data["name"], user_data["email"])
        return None
    
    def find_all(self) -> List[User]:
        data = self._load_data()
        return [User(u["id"], u["name"], u["email"]) for u in data["users"]]
    
    def delete(self, user_id: int) -> bool:
        data = self._load_data()
        for i, user_data in enumerate(data["users"]):
            if user_data["id"] == user_id:
                del data["users"][i]
                self._save_data(data)
                return True
        return False

# Testing different implementations
print("--- Testing In-Memory Repository ---")
memory_repo = InMemoryUserRepository()
memory_service = UserService(memory_repo)

user = memory_service.register_user("Charlie Brown", "charlie@example.com")
print(f"Created in memory: {user}")

print("\n--- Testing JSON File Repository ---")
# Create a unique temp file for this test
temp_json = tempfile.NamedTemporaryFile(delete=False, suffix='.json')
temp_json.close()

try:
    json_repo = JsonFileUserRepository(temp_json.name)
    json_service = UserService(json_repo)
    
    user = json_service.register_user("Diana Prince", "diana@example.com")
    print(f"Created in JSON file: {user}")
    
    # Show that the same service works with different repositories
    print(f"\nUsers in memory repo: {len(memory_service.get_all_users())}")
    print(f"Users in JSON repo: {len(json_service.get_all_users())}")
    
finally:
    # Clean up
    os.unlink(temp_json.name)

In [None]:
# Demonstrating easy testing with mock repository
class MockUserRepository(UserRepository):
    def __init__(self):
        self.save_called = False
        self.find_by_email_called = False
        self.should_find_existing = False
    
    def save(self, user: User) -> User:
        self.save_called = True
        user.id = 123  # Mock ID
        return user
    
    def find_by_email(self, email: str) -> Optional[User]:
        self.find_by_email_called = True
        if self.should_find_existing:
            return User(1, "Existing User", email)
        return None
    
    def find_by_id(self, user_id: int) -> Optional[User]:
        return None
    
    def find_all(self) -> List[User]:
        return []
    
    def delete(self, user_id: int) -> bool:
        return False

# Unit test example
def test_register_user_success():
    # Arrange
    mock_repo = MockUserRepository()
    service = UserService(mock_repo)
    
    # Act
    user = service.register_user("Test User", "test@example.com")
    
    # Assert
    assert mock_repo.find_by_email_called, "Should check if email exists"
    assert mock_repo.save_called, "Should save the user"
    assert user.id == 123, "Should return user with mock ID"
    assert user.name == "Test User", "Should preserve user name"
    print("✓ Test passed: User registration works correctly")

def test_register_user_duplicate_email():
    # Arrange
    mock_repo = MockUserRepository()
    mock_repo.should_find_existing = True  # Mock finding existing user
    service = UserService(mock_repo)
    
    # Act & Assert
    try:
        service.register_user("Test User", "existing@example.com")
        assert False, "Should have raised ValueError"
    except ValueError as e:
        assert str(e) == "Email already exists"
        print("✓ Test passed: Duplicate email handling works correctly")

# Run tests
print("--- Running Unit Tests ---")
test_register_user_success()
test_register_user_duplicate_email()

## Advanced Pattern: Unit of Work

The Repository Pattern often works with the Unit of Work pattern to manage transactions:

In [None]:
# Unit of Work pattern with Repository
class UnitOfWork:
    def __init__(self):
        self._new_objects = []
        self._dirty_objects = []
        self._removed_objects = []
        self._repositories = {}
    
    def register_new(self, obj):
        self._new_objects.append(obj)
    
    def register_dirty(self, obj):
        if obj not in self._dirty_objects:
            self._dirty_objects.append(obj)
    
    def register_removed(self, obj):
        self._removed_objects.append(obj)
    
    def commit(self):
        print("Starting transaction...")
        try:
            # In a real implementation, you'd start a database transaction here
            
            # Insert new objects
            for obj in self._new_objects:
                print(f"Inserting: {obj}")
            
            # Update dirty objects
            for obj in self._dirty_objects:
                print(f"Updating: {obj}")
            
            # Delete removed objects
            for obj in self._removed_objects:
                print(f"Deleting: {obj}")
            
            print("Transaction committed successfully")
            self._clear()
            
        except Exception as e:
            print(f"Transaction failed: {e}")
            print("Rolling back...")
            self._clear()
            raise
    
    def _clear(self):
        self._new_objects.clear()
        self._dirty_objects.clear()
        self._removed_objects.clear()

# Repository that works with Unit of Work
class TransactionalUserRepository(UserRepository):
    def __init__(self, unit_of_work: UnitOfWork):
        self.unit_of_work = unit_of_work
        self._users = {}  # Simplified storage
        self._next_id = 1
    
    def save(self, user: User) -> User:
        if user.id is None:
            user.id = self._next_id
            self._next_id += 1
            self.unit_of_work.register_new(user)
        else:
            self.unit_of_work.register_dirty(user)
        return user
    
    def delete(self, user_id: int) -> bool:
        user = self.find_by_id(user_id)
        if user:
            self.unit_of_work.register_removed(user)
            return True
        return False
    
    # Other methods remain the same for this example
    def find_by_id(self, user_id: int) -> Optional[User]:
        return self._users.get(user_id)
    
    def find_by_email(self, email: str) -> Optional[User]:
        for user in self._users.values():
            if user.email == email:
                return user
        return None
    
    def find_all(self) -> List[User]:
        return list(self._users.values())

# Service with Unit of Work
class TransactionalUserService:
    def __init__(self, user_repository: TransactionalUserRepository, unit_of_work: UnitOfWork):
        self.user_repository = user_repository
        self.unit_of_work = unit_of_work
    
    def bulk_register_users(self, users_data: List[tuple]):
        """Register multiple users in a single transaction"""
        try:
            for name, email in users_data:
                user = User(None, name, email)
                self.user_repository.save(user)
            
            # Commit all changes at once
            self.unit_of_work.commit()
            print("All users registered successfully")
            
        except Exception as e:
            print(f"Failed to register users: {e}")
            # Unit of Work handles rollback

# Testing Unit of Work
print("--- Testing Unit of Work Pattern ---")
uow = UnitOfWork()
trans_repo = TransactionalUserRepository(uow)
trans_service = TransactionalUserService(trans_repo, uow)

users_to_register = [
    ("Alice", "alice@company.com"),
    ("Bob", "bob@company.com"),
    ("Charlie", "charlie@company.com")
]

trans_service.bulk_register_users(users_to_register)

## When to Use (and When NOT to Use)

### Use Repository Pattern when:
- You want to centralize data access logic
- You need to support multiple data sources
- You want to make your code easily testable
- You have complex queries that you want to encapsulate
- You're working in a domain-driven design context
- You want to isolate domain logic from data access concerns

### Don't use when:
- You have a simple application with minimal data access
- You're using an ORM that already provides good abstraction
- The overhead of creating interfaces and multiple implementations isn't justified
- You have very specific, complex queries that are tightly coupled to the database

### Real-world applications:
- Domain-driven design applications
- Microservices that need to switch between different data stores
- Applications requiring extensive unit testing
- Systems that aggregate data from multiple sources
- Applications with complex business logic that needs to be isolated from data concerns
- Multi-tenant applications with different storage strategies