<a href="https://colab.research.google.com/github/iamatul1214/System-Design-For-Machine-Learning/blob/main/Rate_Limiter_implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import time
from collections import defaultdict

# -----------------------------
# Token Bucket Rate Limiter
# -----------------------------
class TokenBucket:
    def __init__(self, capacity, refill_rate):
        """
        capacity: max tokens
        refill_rate: tokens per second
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()

    def allow_request(self, tokens=1):
        now = time.time()
        elapsed = now - self.last_refill

        # refill tokens
        refill = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + refill)
        self.last_refill = now

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False


# -----------------------------
# Rate Limiter Manager
# -----------------------------
class RateLimiterManager:
    def __init__(self):
        self.buckets = defaultdict(dict)

    def check(self, layer, key, capacity, refill_rate):
        if key not in self.buckets[layer]:
            self.buckets[layer][key] = TokenBucket(capacity, refill_rate)

        return self.buckets[layer][key].allow_request()


rate_limiter = RateLimiterManager()

# -----------------------------
# Microservices
# -----------------------------
def service_b(user_id):
    if not rate_limiter.check(
        layer="service_b",
        key=user_id,
        capacity=2,
        refill_rate=2/10  # 2 requests per 10 sec
    ):
        return "‚ùå Blocked at Service B"

    return "‚úÖ Service B response"


def service_a(user_id):
    if not rate_limiter.check(
        layer="service_a",
        key=user_id,
        capacity=4,
        refill_rate=4/10
    ):
        return "‚ùå Blocked at Service A"

    # Service A calls Service B
    return service_b(user_id)


def api_gateway(user_id):
    if not rate_limiter.check(
        layer="api_gateway",
        key=user_id,
        capacity=3,
        refill_rate=3/10
    ):
        return "‚ùå Blocked at API Gateway"

    return service_a(user_id)


def edge(ip, user_id):
    if not rate_limiter.check(
        layer="edge",
        key=ip,
        capacity=5,
        refill_rate=5/10
    ):
        return "‚ùå Blocked at Edge (IP Rate Limit)"

    return api_gateway(user_id)


# -----------------------------
# Simulation
# -----------------------------
IP = "192.168.1.10"
USER = "user_123"

print("üöÄ Sending requests...\n")

for i in range(1, 11):
    result = edge(IP, USER)
    print(f"Request {i}: {result}")
    time.sleep(1)


üöÄ Sending requests...

Request 1: ‚úÖ Service B response
Request 2: ‚úÖ Service B response
Request 3: ‚ùå Blocked at Service B
Request 4: ‚ùå Blocked at API Gateway
Request 5: ‚ùå Blocked at Service B
Request 6: ‚ùå Blocked at API Gateway
Request 7: ‚ùå Blocked at API Gateway
Request 8: ‚úÖ Service B response
Request 9: ‚ùå Blocked at API Gateway
Request 10: ‚ùå Blocked at Edge (IP Rate Limit)
