In [1]:
import time
import json
import paho.mqtt.client as mqtt
import sys
import threading
import os


In [None]:

# --- Configuration ---
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
TOPIC = "test/qos/#"
CLIENT_ID = "QoS_Test_Subscriber"

# Aggressive parameters to force duplicates
NUM_CYCLES = 5                     # More cycles = more chances for duplicates
CONNECT_TIME = 3                   # Very short connection time
DISCONNECT_TIME = 2                # Short disconnect time
KILL_DURING_PROCESSING = True      # Kill connection while processing messages

# --- Global Counters ---
message_counts = {"test/qos/0": 0, "test/qos/1": 0, "test/qos/2": 0}
received_ids = {"test/qos/0": set(), "test/qos/1": set(), "test/qos/2": set()}
duplicate_counts = {"test/qos/0": 0, "test/qos/1": 0, "test/qos/2": 0}
duplicate_details = {"test/qos/0": [], "test/qos/1": [], "test/qos/2": []}
total_messages = 0
is_connected = False
cycle_count = 0
force_disconnect = False
messages_since_connect = 0

# --- MQTT Callbacks (API v2 signatures) ---
def on_connect(client, userdata, flags, reason_code, properties):
    global is_connected, cycle_count, messages_since_connect
    if reason_code == 0:
        cycle_count += 1
        messages_since_connect = 0
        print(f"\n Connected (Cycle {cycle_count}) at {time.strftime('%H:%M:%S')}")
        is_connected = True

        # Subscribe with QoS 2
        client.subscribe([(TOPIC, 2)])
        print(f"  Subscribed to {TOPIC} with QoS 2")

        # Show session state
        if hasattr(flags, 'session_present') and flags.session_present:
            print(f"  Session RESUMED - queued messages will be delivered")
        else:
            print(f" New session started")
    else:
        print(f" Connection failed: {reason_code}")
        is_connected = False

def on_disconnect(client, userdata, flags, reason_code, properties=None):
    global is_connected
    is_connected = False
    if reason_code == 0:
        print(f" Clean disconnect at {time.strftime('%H:%M:%S')}")
    else:
        print(f" Forced disconnect at {time.strftime('%H:%M:%S')} (reason: {reason_code})")

def on_message(client, userdata, msg):
    global total_messages, force_disconnect, messages_since_connect
    topic = msg.topic
    qos = msg.qos
    total_messages += 1
    messages_since_connect += 1

    # Initialize tracking for new topics
    if topic not in message_counts:
        message_counts[topic] = 0
        received_ids[topic] = set()
        duplicate_counts[topic] = 0
        duplicate_details[topic] = []

    message_counts[topic] += 1

    # Parse payload and detect duplicates
    try:
        payload = json.loads(msg.payload.decode("utf-8"))
        message_id = payload.get("message_id")

        if message_id is not None:
            if message_id in received_ids[topic]:
                duplicate_counts[topic] += 1
                duplicate_details[topic].append(message_id)
                print(f" DUPLICATE #{duplicate_counts[topic]}: {topic} | ID={message_id} | QoS={qos}")
            else:
                received_ids[topic].add(message_id)

        # Show progress
        if message_counts[topic] % 10 == 0 or topic == "test/qos/0":
            print(f"  [{topic} | QoS {qos}] Count: {message_counts[topic]}")

    except json.JSONDecodeError:
        print(f"  Non-JSON payload on {topic}")

    # Force disconnect after receiving a few QoS 1 messages (before ACK sent)
    if KILL_DURING_PROCESSING and topic == "test/qos/1" and messages_since_connect >= 3:
        if force_disconnect:
            print(f" Triggering forced disconnect after QoS 1 message...")
            # This simulates network failure during message processing
            try:
                client._sock.close()  # Brutal socket close - no cleanup!
            except:
                pass

# --- Main Execution ---
print("=" * 85)
print("  MQTT QoS Test - EXTREME DUPLICATE GENERATION MODE")
print("=" * 85)
print(f"\n Ultra-Aggressive Strategy:")
print(f"  • Persistent session (clean_session=False)")
print(f"  • {NUM_CYCLES} rapid disconnect/reconnect cycles")
print(f"  • Force socket close during QoS 1 message processing")
print(f"  • This prevents PUBACK from being sent to broker")
print(f"  • Broker will redeliver unacknowledged QoS 1 messages")
print(f"  • Expected: QoS 1 duplicates, QoS 2 no duplicates")
print("=" * 85)

client = mqtt.Client(
    client_id=CLIENT_ID,
    clean_session=False,  # CRITICAL for message queuing
    protocol=mqtt.MQTTv311,
    callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)

client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message

try:
    # Clear old session
    print("\n[Phase 0] Clearing any old session data...")
    temp_client = mqtt.Client(
        client_id=CLIENT_ID,
        clean_session=True,
        protocol=mqtt.MQTTv311,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2
    )
    temp_client.connect(MQTT_BROKER, MQTT_PORT, 60)
    temp_client.disconnect()
    time.sleep(2)
    print("  Old session cleared")

    # Verify Node-RED is publishing
    print("\n[Phase 1] Verifying Node-RED is publishing...")
    client.connect(MQTT_BROKER, MQTT_PORT, 60)
    client.loop_start()

    print("  Waiting for messages...")
    for i in range(6):
        time.sleep(1)
        if total_messages > 0:
            break

    if total_messages == 0:
        print("\n  ERROR: No messages received!")
        print("   Please start Node-RED flow first.")
        sys.exit(1)

    print(f" Receiving messages ({total_messages} received)")

    # Initial disconnect to start fresh
    print("\n[Phase 2] Starting aggressive disconnect cycles...")
    client.loop_stop()
    client.disconnect()
    time.sleep(2)

    # Phase 2: Aggressive cycles with forced disconnects
    for i in range(NUM_CYCLES):
        print(f"\n  ━━━ Cycle {i+1}/{NUM_CYCLES} ━━━")
        force_disconnect = False
        messages_since_connect = 0

        # Connect
        print(f" Connecting...")
        try:
            client.connect(MQTT_BROKER, MQTT_PORT, 60)
            client.loop_start()
            time.sleep(1)  # Wait for connection

            # Enable forced disconnect after receiving messages
            force_disconnect = True

            # Stay connected briefly - will disconnect during message processing
            print(f" Receiving messages (will force disconnect)...")
            time.sleep(CONNECT_TIME)

            # If not already disconnected, force disconnect now
            if is_connected:
                print(f" Forcing brutal disconnect...")
                try:
                    client._sock.close()  # Brutal close
                except:
                    pass
                client.loop_stop()

        except Exception as e:
            print(f" Error during cycle: {e}")

        # Stay disconnected
        print(f" Disconnected for {DISCONNECT_TIME}s (messages queuing)...")
        time.sleep(DISCONNECT_TIME)

    # Phase 3: Final stable connection to receive queued messages
    print(f"\n[Phase 3] Final connection to receive all queued messages...")
    force_disconnect = False

    try:
        client = mqtt.Client(
            client_id=CLIENT_ID,
            clean_session=False,
            protocol=mqtt.MQTTv311,
            callback_api_version=mqtt.CallbackAPIVersion.VERSION2
        )
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_message = on_message

        client.connect(MQTT_BROKER, MQTT_PORT, 60)
        client.loop_start()
        print("  Waiting for queued message delivery...")
        time.sleep(15)
    except Exception as e:
        print(f" Error: {e}")

except KeyboardInterrupt:
    print("\n\n Test interrupted by user")
except Exception as e:
    print(f"\n Error: {e}")
    import traceback
    traceback.print_exc()
finally:
    try:
        client.loop_stop()
    except:
        pass
    try:
        if is_connected:
            client.disconnect()
    except:
        pass

# --- Print Final Results ---
print("\n" + "=" * 85)
print("             FINAL RESULTS - DUPLICATE ANALYSIS")
print("=" * 85)
print(f"Total messages received (including duplicates): {total_messages}")
print(f"Number of disconnect/reconnect cycles: {cycle_count - 1}")
print()

TARGET_SENT = message_counts.get("test/qos/2", 0)

print("{:<20} {:<10} {:<12} {:<12} {:<12} {:<10}".format(
    "Topic", "Target", "Received", "Unique", "Duplicates", "Dup %"
))
print("-" * 85)

for topic in sorted(message_counts.keys()):
    received = message_counts[topic]
    duplicates = duplicate_counts.get(topic, 0)
    unique = len(received_ids[topic])

    if received > 0:
        dup_pct = (duplicates / received) * 100
    else:
        dup_pct = 0.0

    print(
        "{:<20} {:<10} {:<12} {:<12} {:<12} {:<10.1f}".format(
            topic,
            TARGET_SENT if TARGET_SENT > 0 else "N/A",
            received,
            unique,
            duplicates,
            dup_pct
        )
    )

print("\n" + "=" * 85)
print(" Analysis:")
print("=" * 85)

qos1_dups = duplicate_counts.get("test/qos/1", 0)
qos2_dups = duplicate_counts.get("test/qos/2", 0)

if qos1_dups > 0:
    print(f"  SUCCESS: QoS 1 generated {qos1_dups} duplicates!")
    print(f"  This demonstrates 'at-least-once' delivery behavior")
    print(f"  Duplicates occurred because PUBACK wasn't sent before disconnect")
else:
    print(f"  QoS 1: No duplicates generated")
    print(f"  The disconnects may not have been aggressive enough")
    print(f"  Try running the test again or increasing NUM_CYCLES")

if qos2_dups > 0:
    print(f"\n UNEXPECTED: QoS 2 has {qos2_dups} duplicates")
    print(f"  Exactly-once delivery should prevent this")
else:
    print(f"\n QoS 2: No duplicates (correct exactly-once behavior)")
    print(f"  4-way handshake prevents duplicate delivery")

qos0_received = message_counts.get("test/qos/0", 0)
qos0_loss = TARGET_SENT - qos0_received if TARGET_SENT > 0 else 0
print(f"\n QoS 0: Lost {qos0_loss}/{TARGET_SENT} messages ({(qos0_loss/TARGET_SENT*100) if TARGET_SENT > 0 else 0:.1f}%)")
print(f"  Fire-and-forget delivery - loss during disconnects is expected")

print()

  MQTT QoS Test - EXTREME DUPLICATE GENERATION MODE

 Ultra-Aggressive Strategy:
  • Persistent session (clean_session=False)
  • 5 rapid disconnect/reconnect cycles
  • Force socket close during QoS 1 message processing
  • This prevents PUBACK from being sent to broker
  • Broker will redeliver unacknowledged QoS 1 messages
  • Expected: QoS 1 duplicates, QoS 2 no duplicates

[Phase 0] Clearing any old session data...

 Error: [Errno 111] Connection refused

             FINAL RESULTS - DUPLICATE ANALYSIS
Total messages received (including duplicates): 0
Number of disconnect/reconnect cycles: -1

Topic                Target     Received     Unique       Duplicates   Dup %     
-------------------------------------------------------------------------------------
test/qos/0           N/A        0            0            0            0.0       
test/qos/1           N/A        0            0            0            0.0       
test/qos/2           N/A        0            0            0   

Traceback (most recent call last):
  File "/tmp/ipykernel_269560/1273314451.py", line 133, in <module>
    temp_client.connect(MQTT_BROKER, MQTT_PORT, 60)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/paho/mqtt/client.py", line 1435, in connect
    return self.reconnect()
           ~~~~~~~~~~~~~~^^
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/paho/mqtt/client.py", line 1598, in reconnect
    self._sock = self._create_socket()
                 ~~~~~~~~~~~~~~~~~~~^^
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/paho/mqtt/client.py", line 4609, in _create_socket
    sock = self._create_socket_connection()
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/paho/mqtt/client.py", line 4640, in _create_socket_connection
    return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source)
    

ERROR:tornado.general:Uncaught exception in ZMQStream callback
Traceback (most recent call last):
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/zmq/eventloop/zmqstream.py", line 565, in _log_error
    f.result()
    ~~~~~~~~^^
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/ipykernel/kernelbase.py", line 577, in shell_channel_thread_main
    _, msg2 = self.session.feed_identities(msg, copy=False)
              ~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/jupyter_client/session.py", line 994, in feed_identities
    raise ValueError(msg)
ValueError: DELIM not in msg_list
ERROR:tornado.general:Uncaught exception in ZMQStream callback
Traceback (most recent call last):
  File "/home/von_riecken/.pyenv/versions/3.14.0/lib/python3.14/site-packages/zmq/eventloop/zmqstream.py", line 565, in _log_error
    f.result()
    ~~~~~~~~^^
  File "/home/von_riecke