# Low-Level Design (LLD) of a Pub Sub System

### 1. Requirements

1. The system should support multiple publishers and subscribers

2. Ensure thread-safe publishing, preventing race conditions

3. Allow dynamic addition of publishers, topics, and subscribers at runtime.

4. Support concurrent processing of messages for subscribers.

5. Ensure message ordering per topic.

6. 
    
---

### 2. Constraints

1. Thread-safety must be ensured when multiple publishers push messages.
2. Subscribers should process messages asynchronously to avoid delays.
3. The system should handle thousands of messages concurrently without bottlenecks.


---

### 3. Identify Entities

1. **Message**:
    - The data payload sent from publishers to topics

2. **Topic**:
    - A channel that holds messages and notifies subscribers

3. **Publisher**:
    - Publishes messages to a topic.

4. **Subscriber**:
    - Listens to one or more topics and processes messages asynchronously.

5. **MessageQueue**:
    - A FIFO queue for managing messages per topic.

6. ** PubSubManager**:
    - Manages publishers, topics, and subscribers.

### 4. Class Design

#### 4.1. Message Class

In [5]:
class Message:
    def __init__(self, content: str):
        self.content = content

#### 4.2. Publisher Class

In [7]:
class Publisher:
    def __init__(self, name: str, manager):
        self.name = name
        self.manager = manager

    def publish(self, topic: str, message: Message):
        self.manager.publish(topic, message)

#### 4.3. Subscriber Class

In [9]:
class Subscriber:
    def __init__(self, name: str):
        self.name = name

    def receive(self, topic: str, message: Message):
        print(f"{self.name} received message on {topic}: {message.content}")

#### 4.4. MessageQueue Class

In [11]:
from queue import Queue


class MessageQueue:
    def __init__(self):
        self.queue = Queue()

    def enqueue(self, message: Message):
        self.queue.put(message)

    def dequeue(self):
        if not self.queue.empty():
            return self.queue.get()
        return None

#### 4.5. Topic Class

In [13]:
import threading


class Topic:
    def __init__(self, name: str):
        self.name = name
        self.subscribers = []
        self.message_queue = MessageQueue()
        self.lock = threading.RLock()

    def add_subscriber(self, subscriber: Subscriber):
        with self.lock:
            self.subscribers.append(subscriber)

    def remove_subscriber(self, subscriber: Subscriber):
        with self.lock:
            self.subscribers.remove(subscriber)

    def add_message(self, message: Message):
        self.message_queue.enqueue(message)

    def get_messages(self):
        return self.message_queue.dequeue()

#### 4.6. PubSubManager Class

In [15]:
import threading


class PubSubManager:
    def __init__(self):
        self.topics = {}
        self.lock = threading.RLock()

    def add_topic(self, topic_name: str):
        with self.lock:
            if topic_name not in self.topics:
                self.topics[topic_name] = Topic(topic_name)

    def subscribe(self, topic_name: str, subscriber: Subscriber):
        with self.lock:
            if topic_name not in self.topics:
                self.add_topic(topic_name)
            self.topics[topic_name].add_subscriber(subscriber)

    def unsubscribe(self, topic_name: str, subscriber: Subscriber):
        with self.lock:
            if topic_name in self.topics:
                self.topics[topic_name].remove_subscriber(subscriber)

    def publish(self, topic_name: str, message: Message):
        with self.lock:
            if topic_name not in self.topics:
                raise TopicNotFoundException(topic_name)

            topic = self.topics[topic_name]
            try:
                topic.add_message(message)
                self.notify(topic_name)
            except Exception:
                raise PublishException(topic_name)

    def notify(self, topic_name: str):
        topic = self.topics[topic_name]
        while True:
            message = topic.get_messages()
            if message is None:
                break
                
            for subscriber in topic.subscribers:
                threading.Thread(target=subscriber.receive, args=(topic_name, message)).start()

### 5. Exception Handling

In [17]:
class TopicNotFoundException(Exception):
    def __init__(self, topic_name):
        super().__init__(f"Topic '{topic_name}' not found.")


class SubscriberNotFoundException(Exception):
    def __init__(self, subscriber_name):
        super().__init__(f"Subscriber '{subscriber_name}' not found.")


class PublishException(Exception):
    def __init__(self, topic_name):
        super().__init__(f"Failed to publish message to '{topic_name}'.")

### 6. Implementation

In [19]:
if __name__ == "__main__":
    manager = PubSubManager()

    # Create Publishers
    publisher1 = Publisher("Publisher1", manager)
    publisher2 = Publisher("Publisher2", manager)
    
    # Create Subscribers
    subscriber1 = Subscriber("Subscriber1")
    subscriber2 = Subscriber("Subscriber2")

    # Subscribe to topics
    manager.subscribe("TechNews", subscriber1)
    manager.subscribe("TechNews", subscriber2)
    manager.subscribe("Sports", subscriber1)

    # Multiple Publishers Publishing Messages
    publisher1.publish("TechNews", Message("New AI Model Released!"))
    publisher2.publish("TechNews", Message("Python 4.0 Announced!"))
    publisher1.publish("Sports", Message("Football World Cup Begins!"))

    # Unsubscribe
    manager.unsubscribe("TechNews", subscriber1)

    # More messages after unsubscription
    publisher2.publish("TechNews", Message("New JavaScript Framework Launched!"))


Subscriber1 received message on TechNews: New AI Model Released!
Subscriber2 received message on TechNews: New AI Model Released!
Subscriber1 received message on TechNews: Python 4.0 Announced!
Subscriber2 received message on TechNews: Python 4.0 Announced!
Subscriber1 received message on Sports: Football World Cup Begins!
Subscriber2 received message on TechNews: New JavaScript Framework Launched!
