# Pub/Sub Lab: From Queues to Auditors

Welcome to the hands-on lab for Week 4. Over the next several sections, you will build
a working publish-subscribe system using Amazon Web Services (AWS), and then watch it
break in instructive ways.

**What you will learn:**
- How message queues work (Amazon SQS)
- How publish-subscribe (pub/sub) decouples publishers from subscribers (Amazon SNS)
- How to define and publish structured events
- What happens when messaging is unreliable
- Why multiple auditors disagree, and how to resolve it

**How this notebook works:**
- Read the explanations carefully. They replace a textbook chapter.
- Run each code cell **in order** â€” later cells depend on earlier ones.
- Helper functions are defined once and reused throughout. Do not skip the setup cells.
- When you see **"Your Turn"** or **"Question"**, stop and think before moving on.

> This lab uses real AWS services. Every message you send costs fractions of a cent.
> Be thoughtful, but don't worry about cost â€” the entire lab costs less than a dollar.

**Estimated time:** 60-90 minutes

**By the end of this lab you will be able to:**
1. Explain the difference between point-to-point messaging and publish-subscribe
2. Send and receive messages using Amazon SQS queues
3. Publish events to an SNS topic and observe fan-out to multiple subscribers
4. Define a structured event schema and use it in a real system
5. Detect duplicate messages, traffic anomalies, and ordering violations
6. Compare independent auditor counts and implement majority voting
7. Articulate why a peer-to-peer architecture might replace centralized pub/sub

**Prerequisite knowledge:** Basic Python (functions, dicts, loops).
No prior AWS experience is required — everything is explained step by step.

---
## Setup

Before we begin, we need two things:
1. The `boto3` library â€” the official AWS SDK for Python
   ([documentation](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html))
2. Your AWS credentials, which your instructor has provided

Run the next two cells to get set up.

In [None]:
# Install the AWS SDK for Python
# boto3 docs: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html
!pip install -q boto3

In [None]:
import os

# --- FILL IN YOUR CREDENTIALS ---
# Your instructor will provide these. They are shared for the class.
os.environ["AWS_ACCESS_KEY_ID"]     = ""  # paste your access key
os.environ["AWS_SECRET_ACCESS_KEY"] = ""  # paste your secret key
os.environ["AWS_DEFAULT_REGION"]    = "us-east-1"

# --- YOUR IDENTITY ---
# Enter your first name in lowercase. This determines which queue you read from.
MY_NAME = ""  # e.g. "bilge", "sam", "phin", "manuel", "bruno"

### Connecting to AWS

[`boto3.client()`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client)
creates a connection to a specific AWS service. We will use two services:

| Service | What it does | Think of it as... |
|---------|-------------|-------------------|
| **SQS** (Simple Queue Service) | Message queues â€” store and retrieve messages | A mailbox |
| **SNS** (Simple Notification Service) | Pub/sub topics â€” broadcast messages to many subscribers | A radio station |

[SQS Documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html) Â·
[SNS Documentation](https://docs.aws.amazon.com/sns/latest/dg/welcome.html)

In [None]:
import boto3
import json
import uuid
import time
from datetime import datetime, timezone
from collections import Counter

# Connect to AWS services
sqs = boto3.client("sqs", region_name="us-east-1")
sns = boto3.client("sns", region_name="us-east-1")

print("Connected to AWS SQS and SNS in us-east-1")

### Helper Functions

The cell below defines functions we will reuse throughout the entire lab.
**Run this cell now** â€” every later section depends on it.

Take a moment to read each function. They are short on purpose.

In [None]:
# ========================================================
# HELPER FUNCTIONS â€” used throughout the entire lab
# ========================================================

# --- Resource Configuration ---
# These are the AWS resources your instructor has already created.

TOPIC_ARN = "arn:aws:sns:us-east-1:194722398367:ds2032-view-events.fifo"
FIFO_MESSAGE_GROUP = "view-events"

# Tier 1: simple standard queues (Section 1 only -- private, NOT connected to SNS)
SIMPLE_QUEUES = {
    "bilge":  "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-simple-bilge",
    "sam":    "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-simple-sam",
    "phin":   "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-simple-phin",
    "manuel": "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-simple-manuel",
    "bruno":  "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-simple-bruno",
    "gil":    "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-simple-gil",
}

# Tier 2: FIFO queues subscribed to SNS topic (Sections 2-7)
STUDENT_QUEUES = {
    "bilge":  "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-node-bilge.fifo",
    "sam":    "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-node-sam.fifo",
    "phin":   "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-node-phin.fifo",
    "manuel": "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-node-manuel.fifo",
    "bruno":  "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-node-bruno.fifo",
    "gil":    "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-node-gil.fifo",
}

AUDITOR_QUEUES = {
    "auditor-a": "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-auditor-a.fifo",
    "auditor-b": "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-auditor-b.fifo",
    "auditor-c": "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-auditor-c.fifo",
}

CHAOS_QUEUE_URL = "https://sqs.us-east-1.amazonaws.com/194722398367/ds2032-chaos-observer"

# Validate student name
assert MY_NAME in STUDENT_QUEUES, f"Set MY_NAME to one of: {list(STUDENT_QUEUES.keys())}"
MY_SIMPLE_QUEUE_URL = SIMPLE_QUEUES[MY_NAME]   # Section 1 only
MY_QUEUE_URL        = STUDENT_QUEUES[MY_NAME]   # Sections 2-7
print(f"You are: {MY_NAME}")
print(f"Section 1 queue (simple): {MY_SIMPLE_QUEUE_URL.split('/')[-1]}")
print(f"Section 2+ queue (FIFO):  {MY_QUEUE_URL.split('/')[-1]}")


# --- Sending Messages ---

def send_message(queue_url, body, group_id="default"):
    """Send a single message to an SQS FIFO queue.

    Args:
        queue_url: The URL of the SQS queue.
        body: A dict that will be JSON-serialized.
        group_id: FIFO message group (messages in the same group are ordered).

    Returns:
        The SQS response dict.

    Docs: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
    """
    return sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(body),
        MessageGroupId=group_id,
    )




def send_direct(queue_url, body):
    """Send a message to a standard (non-FIFO) SQS queue.

    Used in Section 1 with MY_SIMPLE_QUEUE_URL.
    No MessageGroupId or deduplication needed.
    """
    return sqs.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(body),
    )

def publish_to_topic(message, topic_arn=TOPIC_ARN, group_id=FIFO_MESSAGE_GROUP):
    """Publish a message to the SNS FIFO topic (broadcast to all subscribers).

    Args:
        message: A dict that will be JSON-serialized.
        topic_arn: The ARN of the SNS topic.
        group_id: FIFO message group for ordering.

    Returns:
        The SNS response dict.

    Docs: https://docs.aws.amazon.com/sns/latest/api/API_Publish.html
    """
    return sns.publish(
        TopicArn=topic_arn,
        Message=json.dumps(message),
        MessageGroupId=group_id,
    )


# --- Receiving Messages ---

def receive_messages(queue_url, max_messages=10, wait_seconds=5, delete=True):
    """Receive messages from an SQS queue.

    Args:
        queue_url: The URL of the SQS queue.
        max_messages: Max messages per batch (1-10).
        wait_seconds: Long polling wait time (0-20).
        delete: If True, delete messages after receiving (acknowledge them).

    Returns:
        A list of parsed message body dicts.

    Docs: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
    """
    resp = sqs.receive_message(
        QueueUrl=queue_url,
        MaxNumberOfMessages=min(max_messages, 10),
        WaitTimeSeconds=wait_seconds,
    )
    results = []
    for m in resp.get("Messages", []):
        body = json.loads(m["Body"])
        results.append(body)
        if delete:
            sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"])
    return results


def receive_all(queue_url, max_total=200, timeout=30, delete=True):
    """Receive ALL available messages from a queue (multiple batches).

    Keeps reading until the queue is empty or limits are reached.

    Args:
        queue_url: The URL of the SQS queue.
        max_total: Stop after this many messages.
        timeout: Stop after this many seconds.
        delete: If True, delete messages after receiving.

    Returns:
        A list of parsed message body dicts.
    """
    all_msgs = []
    start = time.time()
    while len(all_msgs) < max_total and (time.time() - start) < timeout:
        batch = receive_messages(queue_url, max_messages=10, wait_seconds=3, delete=delete)
        if not batch:
            break
        all_msgs.extend(batch)
    return all_msgs


def drain_queue(queue_url):
    """Remove all messages from a queue. Returns the count of drained messages."""
    count = 0
    while True:
        batch = receive_messages(queue_url, max_messages=10, wait_seconds=1, delete=True)
        if not batch:
            break
        count += len(batch)
    return count


# --- ViewEvent Creation ---

def create_view_event(host_id, content_id, ad_id):
    """Create a ViewEvent dict with a unique ID and current timestamp.

    This is the standard event format for the 2032 system.
    """
    return {
        "event_id": str(uuid.uuid4()),
        "host_id": host_id,
        "content_id": content_id,
        "ad_id": ad_id,
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }


# --- Analysis ---

def count_by(events, field):
    """Count events by a given field. Returns a Counter."""
    return Counter(e.get(field, "?") for e in events)


def summarize_events(events, title="Event Summary"):
    """Print a summary of events by host, content, and ad."""
    print(f"\n--- {title} ---")
    print(f"Total events: {len(events)}")
    for field, label in [("host_id", "By host"), ("content_id", "By content"), ("ad_id", "By ad")]:
        counts = count_by(events, field)
        print(f"\n  {label}:")
        for val, cnt in counts.most_common():
            print(f"    {val}: {cnt}")


print("Helper functions loaded.")

---
# Section 1: I Can Talk Using Queues

## What is a message queue?

A **message queue** is one of the simplest building blocks in distributed systems.
It works exactly like it sounds:

1. A **sender** puts a message into the queue.
2. The message sits there until someone picks it up.
3. A **receiver** reads the message and processes it.
4. The receiver **deletes** the message to confirm it was handled.

This pattern decouples the sender from the receiver. The sender does not need to know
*who* will read the message, or *when*. It just drops the message and moves on.

## Amazon SQS

[Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html)
is a managed message queue service. We are starting with a **standard queue** — the simplest kind.
It gives you the core send/receive/delete model without extra complexity.

In Section 2 we will upgrade to **FIFO queues** which add two guarantees:
1. **Ordering**: Messages come out in the order they went in.
2. **Exactly-once delivery**: Duplicate message bodies won’t slip through.

> Your Section 1 queue is `{MY_SIMPLE_QUEUE_URL.split('/')[-1]}`.
> It is **private** — not connected to any SNS topic. Only you send and receive here.

### 1A: Send a message

Let's start simple. We will send a single message to your queue and then read it back.

In [None]:
# 1A: Send a single message to YOUR queue

message = {"greeting": "Hello from the queue!", "sender": MY_NAME}

response = send_direct(MY_SIMPLE_QUEUE_URL, message)

print(f"SENT: {json.dumps(message)}")
print(f"  MessageId:      {response['MessageId']}")
print(f"  (No SequenceNumber -- that only exists on FIFO queues)")

The `MessageId` is assigned by SQS â€” it is the queue's receipt for your message.
The `SequenceNumber` exists because this is a FIFO queue; it tells you where
your message falls in the total ordering.

Now let's read it back.

In [None]:
# 1A: Receive the message from YOUR queue

time.sleep(1)  # Brief pause to let the message arrive

received = receive_messages(MY_SIMPLE_QUEUE_URL, max_messages=1, wait_seconds=5)

if received:
    print(f"RECEIVED: {json.dumps(received[0])}")
    print(f"\nThe message was automatically deleted after reading (delete=True).")
else:
    print("No message received. The queue might be empty.")

Notice that `receive_messages` **deleted** the message after reading it.
This is important. In SQS, reading a message does not remove it.
Deletion is a separate, explicit step â€” our helper function handles it for you.

**Why does deletion exist?** Because reading might fail. If your code crashes
after reading but before processing, the message should come back so someone
else can try. This is called **at-least-once delivery**.

### Your Turn: Send a custom message

Modify the code below to send a message with your own content.
Try including different data types — a number, a list, a boolean.
What happens when you receive it back?

In [None]:
# YOUR TURN: Send your own custom message
# Change the message content to whatever you want.

my_message = {
    "from": MY_NAME,
    # Add your own fields here:
    # "favorite_number": ???,
    # "languages": ???,
}

send_direct(MY_SIMPLE_QUEUE_URL, my_message)
time.sleep(1)
result = receive_messages(MY_SIMPLE_QUEUE_URL, max_messages=1)
if result:
    print("Got back:", json.dumps(result[0], indent=2))

### 1B: FIFO ordering

FIFO means "first in, first out." Let's prove it by sending 10 numbered messages
and checking that they arrive in the same order.

In [None]:
# 1B: Send 10 numbered messages
print("Sending 10 numbered messages...")
for i in range(1, 11):
    send_direct(MY_SIMPLE_QUEUE_URL, {"sequence": i, "label": f"Message #{i}"})
    print(f"  Sent #{i}")

In [None]:
# 1B: Receive them and check order
time.sleep(1)

received = receive_all(MY_SIMPLE_QUEUE_URL, max_total=10, timeout=15)
received_order = [m.get("sequence") for m in received]

print(f"Sent order:     {list(range(1, 11))}")  # still 10 messages in the loop
print(f"Received order: {received_order}")
print(f"In order?       {received_order == list(range(1, 11))}")

With a FIFO queue, messages within the same **message group** are guaranteed
to arrive in order. We used `group_id="fifo-test"` for all 10 messages,
so they share a group and maintain their sequence.

> **Key concept:** Ordering is not free. It requires coordination by the queue
> service. Standard (non-FIFO) queues skip this coordination, which makes them
> faster but unordered. More on this in Section 6.

### Your Turn: Break the ordering

What happens if you send messages to **different message groups**?
FIFO ordering is only guaranteed *within* a group. Try it:

In [None]:
# YOUR TURN: Send multiple messages and see what arrives
# Standard queues don't have message groups -- all messages share one pool.

# Clear the queue first
drain_queue(MY_SIMPLE_QUEUE_URL)

# Send 5 'A' messages and 5 'B' messages alternating
for i in range(1, 6):
    send_direct(MY_SIMPLE_QUEUE_URL, {"group": "A", "seq": i})
    send_direct(MY_SIMPLE_QUEUE_URL, {"group": "B", "seq": i})

time.sleep(1)
received = receive_all(MY_SIMPLE_QUEUE_URL, max_total=10, timeout=10)

print("Received order:")
for m in received:
    print(f"  Group {m['group']}, Seq {m['seq']}")

# In Section 2 we'll use FIFO queues where group ordering is guaranteed.
# How does the ordering here compare to what you'd expect from FIFO?

### 1C: What happens if you don't delete?

SQS has a feature called **visibility timeout**. When you read a message,
it becomes *invisible* to other readers for a set period (default: 30 seconds).
If you don't delete it within that window, it reappears â€” as if you never read it.

This is *by design*. It prevents messages from being lost if a reader crashes.
But it can also cause **double-counting** if you are not careful.

Let's see it in action.

> **Note:** This cell takes about 35 seconds to run (it waits for the timeout to expire).

In [None]:
# 1C: Visibility timeout demo

# Send one message
send_direct(MY_SIMPLE_QUEUE_URL, {"purpose": "visibility demo", "value": 42})
print("Sent a message.")

time.sleep(1)

# Read it but DO NOT delete it
batch = receive_messages(MY_SIMPLE_QUEUE_URL, max_messages=1, wait_seconds=5, delete=False)
if batch:
    print(f"Read (1st time): {json.dumps(batch[0])}")
    print("  --> NOT deleting this message.")
else:
    print("No message received.")

# Wait for the visibility timeout to expire (30 seconds)
print(f"\nWaiting 35 seconds for visibility timeout to expire...")
time.sleep(35)

# Read again â€” the same message should reappear
batch2 = receive_messages(MY_SIMPLE_QUEUE_URL, max_messages=1, wait_seconds=5, delete=True)
if batch2:
    print(f"Read (2nd time!): {json.dumps(batch2[0])}")
    print("  --> The message came BACK because we didn't delete it the first time!")
    print("  --> Now deleted for real.")
else:
    print("Message did not reappear (might have been consumed by another reader).")

**Takeaways from Section 1:**

| Concept | What you saw |
|---------|-------------|
| Message queues | Send, receive, delete â€” three separate operations |
| FIFO ordering | Messages arrive in the order they were sent |
| Visibility timeout | Undeleted messages reappear after 30 seconds |
| At-least-once delivery | The same message *can* be delivered more than once |
| Explicit deletion | You must explicitly confirm you handled a message |

These properties will matter a lot when we start counting things.

---
# Section 2: Pub/Sub Concepts

## From point-to-point to broadcast

In Section 1, you sent messages to *your own queue* â€” that is **point-to-point**
messaging. One sender, one queue, one receiver.

But what if you want to send a message to *many* receivers at once?

That is **publish-subscribe** (pub/sub). Instead of sending to a specific queue,
you publish to a **topic**. Every queue that is **subscribed** to that topic
gets its own copy of the message.

## Amazon SNS

[Amazon SNS](https://docs.aws.amazon.com/sns/latest/dg/welcome.html) provides
managed pub/sub topics. Here is our setup:

```
  Publisher (you)
       |
       v
  [SNS Topic: ds2032-view-events.fifo]
       |         |         |         |       |       |
       v         v         v         v       v       v
  [bilge]    [sam]    [phin]   [manuel]  [bruno]  [gil]
  (SQS)      (SQS)   (SQS)    (SQS)     (SQS)   (SQS)
```

When you publish **one** message to the topic, **all six queues** receive a copy.
This is called **fan-out**.

The publisher does not know who is subscribed. The subscribers do not know who
published. Neither side needs to change when subscribers are added or removed.
This is **decoupling** â€” one of the most powerful ideas in distributed systems.

### 2A: Publish to the topic

Let's publish a single message to the SNS topic and see if it arrives in multiple queues.

First, we need to drain your queue so we start with a clean slate.

In [None]:
# 2A: Clean start
drained = drain_queue(MY_QUEUE_URL)
print(f"Drained {drained} old messages from your queue.")

In [None]:
# 2A: Publish ONE message to the SNS topic

message = {
    "demo": True,
    "content": "This is a pub/sub broadcast",
    "from": MY_NAME,
}

response = publish_to_topic(message)
print(f"PUBLISHED to SNS topic:")
print(f"  Message: {json.dumps(message)}")
print(f"  MessageId: {response['MessageId']}")

That message was sent to the **topic**, not to any specific queue.
SNS will now deliver a copy to every subscribed queue.

Let's check your queue.

In [None]:
# 2A: Check YOUR queue for the message
time.sleep(2)

received = receive_messages(MY_QUEUE_URL, max_messages=1, wait_seconds=5)
if received:
    print(f"YOUR queue received: {json.dumps(received[0])}")
else:
    print("Nothing in your queue yet. Try running this cell again after a moment.")

Your queue got the message â€” but so did everyone else's queue.
One publish, six copies. That is fan-out.

### 2B: Fan-out at scale

Let's make this more convincing. We will publish 5 messages and then check
multiple queues to confirm each one got all 5.

### Your Turn: Predict before you run

Before running the next cell, **write down your prediction**:
- If you publish 5 messages to the SNS topic, and there are 6 student queues
  plus 3 auditor queues subscribed... how many total message copies will exist
  across all queues?

Think about it, write your answer, then run the cell to check.

In [None]:
# 2B: Drain first, then publish 5 messages
drain_queue(MY_QUEUE_URL)

print("Publishing 5 messages to the topic...")
for i in range(1, 6):
    msg = {"sequence": i, "label": f"broadcast-{i}", "from": MY_NAME}
    publish_to_topic(msg)
    print(f"  Published #{i}")

time.sleep(2)

In [None]:
# 2B: Read from YOUR queue and count
received = receive_all(MY_QUEUE_URL, max_total=10, timeout=10)
sequences = [m.get("sequence") for m in received]

print(f"Your queue received: {len(received)} messages")
print(f"Sequences: {sequences}")
print(f"\nEvery subscribed queue gets its own independent copy.")
print(f"That is fan-out: one publish, many subscribers.")

**Takeaways from Section 2:**

| Concept | What you saw |
|---------|-------------|
| SNS topics | A broadcast channel â€” publish once, deliver to many |
| Fan-out | Each subscriber queue gets its own copy |
| Decoupling | Publisher and subscribers are independent |
| Subscriptions | Queues opt in to a topic; adding/removing does not affect publishers |

> **Think about it:** If the publisher does not know who is listening,
> and the subscribers do not know who is publishing...
> who is in control? Is anyone?

---
# Section 3: I Am a Host

*The year is 2032. The One Persona Act has changed how online advertising works.
Content creators â€” "hosts" â€” serve content and display ads. They get paid based
on verified view counts. But who verifies the views?*

**You are a host.** When someone views your content and sees an ad, you need
to record that event. You publish a `ViewEvent` so that auditors â€” independent
observers â€” can verify the view actually happened.

Your livelihood depends on accurate counts. If auditors miss views, you lose money.
If they double-count, advertisers overpay. Everyone needs the numbers to be right.

### 3A: The ViewEvent schema

Every time a view happens, you create a structured event with these fields:

| Field | Type | Meaning |
|-------|------|---------|
| `event_id` | UUID | Unique identifier for this specific view |
| `host_id` | string | Who is hosting the content (you) |
| `content_id` | string | What content was viewed |
| `ad_id` | string | What ad was displayed alongside it |
| `timestamp` | ISO 8601 | When the view happened (UTC) |

The `event_id` is critical. It is how auditors detect duplicates. Two events
with the same `event_id` represent the *same view*, not two separate views.

In [None]:
# 3A: Create a sample ViewEvent

sample = create_view_event(
    host_id=MY_NAME,
    content_id="video-matrix-reloaded",
    ad_id="ad-coca-cola-2032",
)

print("ViewEvent:")
print(json.dumps(sample, indent=2))

### 3B: Publishing view events

Let's simulate 10 views on your content. In the real system, these would be
generated automatically as users watch your videos or read your articles.

In [None]:
# 3B: Simulate 10 views on your content

CONTENT_CATALOG = [
    ("video-matrix", "ad-pepsi"),
    ("article-ai-future", "ad-nvidia"),
    ("video-matrix-reloaded", "ad-coca-cola-2032"),
    ("podcast-distributed-sys", "ad-aws"),
    ("article-quantum-2032", "ad-google"),
]

published_events = []

print(f"Simulating 10 views on host '{MY_NAME}'...\n")
for i in range(10):
    content_id, ad_id = CONTENT_CATALOG[i % len(CONTENT_CATALOG)]
    event = create_view_event(MY_NAME, content_id, ad_id)
    publish_to_topic(event)
    published_events.append(event)
    print(f"  View #{i+1}: {content_id} with {ad_id}")

print(f"\nPublished {len(published_events)} ViewEvents.")

### 3C: Reading as an auditor

Those events went to the SNS topic and were delivered to every subscribed queue.
Now let's read from an **auditor queue** â€” a separate queue that exists solely
to observe and count events.

Think of the auditor as an independent accountant. They watch the event stream
and keep their own tally.

In [None]:
# 3C: Read from an auditor queue
time.sleep(2)

auditor_events = receive_all(AUDITOR_QUEUES["auditor-a"], max_total=50, timeout=15)

print(f"Auditor A received {len(auditor_events)} events.\n")
for i, e in enumerate(auditor_events[:10]):  # Show first 10
    print(f"  [{i+1}] host={e.get('host_id')}, content={e.get('content_id')}")
if len(auditor_events) > 10:
    print(f"  ... and {len(auditor_events) - 10} more")

In [None]:
# 3C: Count and summarize what the auditor saw
summarize_events(auditor_events, title="Auditor A's View")

# Verify: do the counts match what we published?
print(f"\n--- Verification ---")
print(f"You published:     {len(published_events)} events")
my_events = [e for e in auditor_events if e.get('host_id') == MY_NAME]
print(f"Auditor saw yours: {len(my_events)}")

pub_ids = {e['event_id'] for e in published_events}
aud_ids = {e.get('event_id') for e in my_events}
print(f"All IDs match:     {pub_ids == aud_ids}")

Right now, everything matches. The auditor saw exactly what you published.
Every event ID checks out. The count is correct.

**But ask yourself:**
- What if the auditor missed some events?
- What if the same event arrived twice?
- What if you had *no* auditor at all?
- Who gets the final say on whether a view "happened"?

We will break these assumptions one by one.

### Your Turn: Be a dishonest host

What if a host wanted to inflate their view count? They could publish
fake ViewEvents with made-up content IDs. Write code to do this —
then think about how an auditor would detect it.

In [None]:
# YOUR TURN: Publish 5 "fake" view events
# Use a content_id that doesn't exist in the catalog.
# Then think: how would an auditor know these are fake?

# fake_event = create_view_event(
#     host_id=MY_NAME,
#     content_id="video-DOES-NOT-EXIST",
#     ad_id="ad-FAKE",
# )
# publish_to_topic(fake_event)
# print("Published fake event:", fake_event["event_id"][:12])

### Checkpoint: Section 3

1. Why is the `event_id` field important? What would happen without it?
2. If you publish 10 events but the auditor only sees 8, whose count is "correct"?
3. The host creates the event and the timestamp. Can the auditor trust either of those?
4. What would prevent a host from publishing millions of fake events?

---
# Section 4: Guarantees and Their Limits

So far, everything has worked perfectly. Messages arrived in order. Counts matched.
No duplicates. That is because we used **FIFO queues**, which AWS guarantees will:

1. Deliver messages in order (within a message group)
2. Deduplicate identical messages (within a 5-minute window)

But these guarantees come with trade-offs.

### FIFO vs. Standard Queues

| Property | FIFO Queue | Standard Queue |
|----------|-----------|----------------|
| Ordering | Guaranteed (within message group) | Best-effort (no guarantee) |
| Deduplication | Exactly-once (5-min window) | At-least-once (may duplicate) |
| Throughput | 300 messages/sec (3,000 with batching) | Nearly unlimited |
| Cost | Higher | Lower |

[FIFO queue documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html) Â·
[Standard queue documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html)

**The trade-off:** FIFO gives you correctness but limits throughput. Standard gives
you speed but no ordering or dedup guarantees. Most real-world distributed systems
use standard queues and handle ordering/dedup in application code.

### Visibility Timeout Revisited

You saw in Section 1 that undeleted messages reappear. Here is why this matters
for counting: if your auditor reads a message, starts counting it, but crashes
before deleting it â€” the message comes back. A second reader counts it again.
Now you have a **double count**.

This is not a bug. It is a fundamental property of at-least-once delivery.
The queue guarantees your message will not be *lost*, but it cannot guarantee
it will be delivered only *once*.

### How do you handle duplicates?

The standard approach is called **idempotent processing**:

1. Keep a set of event IDs you have already processed
2. When you receive a message, check if you have seen that `event_id` before
3. If yes, skip it â€” do not count it again
4. If no, process it and add the `event_id` to your set

In [None]:
# Section 4: Idempotent processing demo

# Simulate receiving the same events multiple times
fake_stream = published_events + published_events[:3]  # 13 messages, 3 are repeats

print(f"Incoming stream: {len(fake_stream)} messages")

# Naive count (no dedup)
naive_count = len(fake_stream)
print(f"Naive count (just len): {naive_count}")

# Idempotent count (with dedup)
seen_ids = set()
deduped = []
for event in fake_stream:
    eid = event["event_id"]
    if eid not in seen_ids:
        seen_ids.add(eid)
        deduped.append(event)

print(f"Deduplicated count:     {len(deduped)}")
print(f"Duplicates caught:      {naive_count - len(deduped)}")
print(f"\nIdempotent processing prevents double-counting.")

### Message Retention and Dead Letter Queues

Two more concepts that affect how queues behave in the real world:

**Message retention** â€” SQS keeps unread messages for a configurable time
(default: 4 days, max: 14 days). After that, they are permanently deleted.
([docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-basic-architecture.html))

If your auditor is offline for a week, those messages are gone. You lost data.

**Dead Letter Queues (DLQ)** â€” If a message fails processing repeatedly
(e.g., it keeps being received but never deleted), SQS can move it to a
separate "dead letter" queue for inspection.
([docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html))

This is useful for debugging. Instead of a bad message blocking the queue
forever, it gets moved aside so the rest of the queue keeps flowing.

> **Your Turn:** Think about what would happen if your 2032 auditor went offline
> for longer than the message retention period. Views happened, events were
> published, but the auditor never saw them. What happens to your payment?

---
# Section 5: I Am Not Alone

Up until now, you have been the only host publishing events. Your auditor
saw only your events, and everything was clean.

But in the real 2032 system, you are one of many hosts. The SNS topic carries
events from *all* hosts. Your auditor queue receives *everyone's* events.

**Your instructor has started a simulator** that publishes events from
multiple simulated hosts. Let's see what your queue looks like now.

In [None]:
# Section 5: Read all events from your queue

print(f"Reading from your queue ({MY_NAME})...\n")

events = receive_all(MY_QUEUE_URL, max_total=200, timeout=20)

print(f"Total events received: {len(events)}")

In [None]:
# Section 5: Who is publishing?

host_counts = count_by(events, "host_id")

print("Events by host:\n")
total = len(events) if events else 1
for host, count in host_counts.most_common():
    pct = count / total * 100
    bar = "#" * int(pct / 2)
    print(f"  {host:25s}: {count:4d} ({pct:5.1f}%) {bar}")

# How many are yours?
my_count = host_counts.get(MY_NAME, 0)
print(f"\nYour events: {my_count}")
print(f"Other hosts:  {len(events) - my_count}")

In [None]:
# Section 5: Full summary
summarize_events(events, title="Multi-Host Traffic")

# Check ordering
from datetime import datetime as dt
timestamps = []
for e in events:
    try:
        timestamps.append(dt.fromisoformat(e.get("timestamp", "")))
    except (ValueError, TypeError):
        pass

out_of_order = sum(1 for i in range(1, len(timestamps)) if timestamps[i] < timestamps[i-1])
print(f"\nOrdering: {out_of_order} events arrived out of chronological order")

**Questions:**

1. How many different hosts are publishing events?
2. Which host has the most traffic? Does that seem proportional?
3. If you were auditing only *your own* views, how would you filter
   out everyone else's events?
4. Could a host fake their own view events to inflate their count?
   What would stop them?
5. Were events in chronological order? If not, what does that mean
   for an auditor that processes events sequentially?

---
# Section 6: Something Is Wrong

Your instructor has set up a second environment â€” this one uses a **standard**
(non-FIFO) queue. The chaos scripts have been running. Things that were reliable
in the FIFO world no longer hold.

In this section, you will read from the chaos queue and see what broke.

In [None]:
# Section 6: Read from the chaos queue

print("Reading from the chaos queue (standard, non-FIFO)...\n")

chaos_events = receive_all(CHAOS_QUEUE_URL, max_total=200, timeout=20)

print(f"Total messages received: {len(chaos_events)}")

In [None]:
# Section 6: Detect duplicates

event_id_counts = Counter(e.get("event_id", "?") for e in chaos_events)
duplicates = {eid: cnt for eid, cnt in event_id_counts.items() if cnt > 1}

print(f"--- Duplicate Detection ---")
print(f"  Total messages:       {len(chaos_events)}")
print(f"  Unique event IDs:     {len(event_id_counts)}")
print(f"  Duplicated event IDs: {len(duplicates)}")
print(f"  Extra copies:         {len(chaos_events) - len(event_id_counts)}")

if duplicates:
    print(f"\n  Worst offenders:")
    for eid, cnt in sorted(duplicates.items(), key=lambda x: x[1], reverse=True)[:5]:
        print(f"    {eid[:16]}... appeared {cnt} times")

In [None]:
# Section 6: Detect traffic spikes

host_counts = count_by(chaos_events, "host_id")
total = len(chaos_events) if chaos_events else 1

print(f"--- Traffic Spike Detection ---\n")
for host, count in host_counts.most_common():
    pct = count / total * 100
    bar = "#" * int(pct / 2)
    flag = "  ** SUSPICIOUS" if pct > 40 else ""
    print(f"  {host:25s}: {count:4d} ({pct:5.1f}%) {bar}{flag}")

In [None]:
# Section 6: Detect late events

from datetime import timezone as tz

now = datetime.now(tz.utc)
late = []
for e in chaos_events:
    try:
        ts = datetime.fromisoformat(e.get("timestamp", ""))
        age = (now - ts).total_seconds()
        if age > 300:  # older than 5 minutes
            late.append({"host": e.get("host_id"), "age_min": age / 60, "eid": e["event_id"][:16]})
    except (ValueError, TypeError):
        pass

print(f"--- Late Event Detection ---")
print(f"  Events older than 5 minutes: {len(late)}")
for l in late[:5]:
    print(f"    {l['eid']}... from {l['host']}, claimed {l['age_min']:.0f} min ago")

**What broke and why:**

| Problem | Cause | What it breaks |
|---------|-------|---------------|
| Duplicate events | Same event published twice + standard queue does not deduplicate | Count inflation |
| Traffic spike | One host flooding the topic | Proportionality assumptions |
| Late timestamps | Events claim to be from 30 minutes ago | Temporal ordering, windowed counting |
| Out-of-order arrival | Standard queues do not guarantee ordering | Sequential processing logic |

**The bottom line:** If you are building an auditor, you cannot blindly trust:
- Message counts (duplicates exist)
- Traffic patterns (spikes can be injected)
- Timestamps (can be faked by the sender)
- Ordering (not guaranteed by standard queues)

> **Your Turn:** Write a function called `safe_count(events)` that takes a list of
> events and returns a count that handles duplicates. Use the idempotent pattern
> from Section 4.

In [None]:
# YOUR TURN: Implement safe_count
# This should return the number of UNIQUE events (deduplicated by event_id).

def safe_count(events):
    # --- YOUR CODE HERE ---
    pass

# Test it:
if safe_count is not None:
    result = safe_count(chaos_events)
    print(f"Raw count:  {len(chaos_events)}")
    print(f"Safe count: {result}")

---
# Section 7: Multiple Auditors â€” Who Is Right?

If one auditor can miss events, get duplicates, or see inflated traffic,
perhaps the answer is **more auditors**. If three independent observers
each count the same events, you can compare their results.

Our system has three auditor queues â€” A, B, and C â€” all subscribed to
the same topic. Let's see what each one saw and whether they agree.

In [None]:
# Section 7: Read from all three auditor queues

auditor_results = {}

for name, url in AUDITOR_QUEUES.items():
    events = receive_all(url, max_total=200, timeout=15)
    event_ids = {e.get("event_id") for e in events}
    auditor_results[name] = {
        "events": events,
        "event_ids": event_ids,
        "count": len(events),
        "unique": len(event_ids),
        "hosts": count_by(events, "host_id"),
    }
    print(f"{name}: {len(events)} messages, {len(event_ids)} unique")

In [None]:
# Section 7: Do they agree?

print("\n--- Agreement Check ---\n")

# Event ID overlap
all_ids = [d["event_ids"] for d in auditor_results.values()]
common = all_ids[0]
for ids in all_ids[1:]:
    common = common & ids

union = set()
for ids in all_ids:
    union |= ids

print(f"Events ALL three saw:    {len(common)}")
print(f"Events at least one saw: {len(union)}")
if len(common) != len(union):
    print(f"Events NOT seen by all:  {len(union) - len(common)}")
    print("  --> Some auditors missed events that others saw!")

In [None]:
# Section 7: Per-host comparison

print("\n--- Per-Host Counts by Auditor ---\n")

all_hosts = set()
for d in auditor_results.values():
    all_hosts.update(d["hosts"].keys())

names = list(auditor_results.keys())
header = f"  {'Host':25s}" + "".join(f"{n:>12s}" for n in names) + "   Agree?"
print(header)
print("  " + "-" * (len(header) - 2))

for host in sorted(all_hosts):
    counts = [auditor_results[n]["hosts"].get(host, 0) for n in names]
    agree = "YES" if len(set(counts)) == 1 else "NO"
    row = f"  {host:25s}" + "".join(f"{c:12d}" for c in counts) + f"   {agree}"
    print(row)

In [None]:
# Section 7: Majority vote

print("\n--- Majority Vote ---\n")
print("When auditors disagree, we take the majority.\n")

for host in sorted(all_hosts):
    counts = [auditor_results[n]["hosts"].get(host, 0) for n in names]
    vote = Counter(counts).most_common(1)[0]
    print(f"  {host:25s}: majority says {vote[0]:4d} views ({vote[1]}/3 agree)")

### The Bridge

Right now, these three auditors are **SQS queues**. They have no relationship
with each other. They do not know the others exist. They do not communicate
or compare notes.

The majority vote happened in **your code**. You were the authority who
compared them and decided the answer.

But in the 2032 system, there is no central authority. Nobody runs YOUR code
on behalf of everyone. Each host, each auditor, each node in the network
needs to be able to:

1. **Discover** other nodes without a central registry
2. **Communicate** directly with peers
3. **Agree** on view counts without anyone being "in charge"

That is **peer-to-peer**. That is the next phase.

---

**End of Pub/Sub Lab.**
You have seen queues, pub/sub, fan-out, ViewEvents, unreliable messaging,
chaos detection, multi-auditor comparison, and majority voting.

The question you should be asking now:
*What if the auditors could talk to each other?*