## C.N CIPET

### Project Title - Error Detection and Correction Techniques in Computer Networks using Python

# Abstract
 This project demonstrates error detection and correction techniques in computer networks using Python. During data transmission, noise or interference may flip bits, leading to corrupted information. To address this, networks employ redundancy methods such as parity bits, Internet checksums, cyclic redundancy checks (CRC), and Hamming codes. In this project, a text message is converted into binary form, encoded with redundancy, passed through a simulated noisy channel, and then decoded at the receiver’s end. The system reports whether errors were detected, corrected, or missed. Through this simulation, the project highlights the importance of error control in ensuring reliable communication across modern computer networks.

## Problem statement
In computer networks, data often gets corrupted during transmission due to noise, interference, or hardware faults. Without proper error detection and correction mechanisms, corrupted data can lead to communication failures, financial losses, or system malfunctions. This project aims to simulate and analyze different error control techniques to ensure reliable data transfer.

AIM- The project focuses on error detection and correction techniques such as Parity Bit, Internet Checksum, CRC, and Hamming Code, which are widely used in computer networks. These methods ensure data transmission reliability by detecting or correcting bit errors caused by noise and interference.

Project Flow:
1. Input message: User provides a text message.
2. Encoding: Message converted into bits with redundancy (Parity, Checksum, CRC, Hamming).
3. Transmission: Simulated noisy channel randomly flips bits.
4. Decoding/Verification: Receiver checks redundancy; detects or corrects errors.
5. Results: System reports bit flips, error detection status, and message recovery success.

In [None]:
import random
from typing import List, Tuple
# ---------------------------
# Utilities: bits <-> bytes
# ---------------------------
def bytes_to_bits(data: bytes) -> List[int]:
    bits = []
    for b in data:
        for i in range(8):
            bits.append((b >> (7 - i)) & 1)  # msb first
    return bits

def bits_to_bytes(bits: List[int]) -> bytes:
    # assumes len(bits) is multiple of 8
    if len(bits) % 8 != 0:
        raise ValueError("bits length must be multiple of 8")
    out = bytearray()
    for i in range(0, len(bits), 8):
        val = 0
        for j in range(8):
            val = (val << 1) | (bits[i + j] & 1)
        out.append(val)
    return bytes(out)

def count_bit_diff(a: List[int], b: List[int]) -> int:
    return sum(x ^ y for x, y in zip(a, b))

# ---------------------------\
# Channel: flips bits with prob p
# ---------------------------
def transmit_through_bsc(bits: List[int], p: float, rng: random.Random) -> List[int]:
    out = []
    for bit in bits:
        flip = 1 if rng.random() < p else 0
        out.append(bit ^ flip)
    return out

# ---------------------------
# Scheme 1: EVEN PARITY (whole frame parity)
# ---------------------------
def parity_encode(payload_bits: List[int]) -> List[int]:
    parity = sum(payload_bits) % 2  # 1 if odd
    parity_bit = parity  # even parity: parity_bit makes total even
    return payload_bits + [parity_bit]

def parity_check(frame_bits: List[int]) -> Tuple[bool, List[int]]:
    payload_bits = frame_bits[:-1]
    parity_bit = frame_bits[-1]
    total = (sum(payload_bits) + parity_bit) % 2
    ok = (total == 0)  # even total means pass
    return ok, payload_bits

# ---------------------------
# Scheme 2: Internet Checksum (16-bit 1's complement)
# ---------------------------
def ones_complement_add16(a: int, b: int) -> int:
    s = a + b
    return (s & 0xFFFF) + (s >> 16)

def internet_checksum(data: bytes) -> int:
    # pad odd length
    if len(data) % 2 == 1:
        data = data + b"\x00"
    s = 0
    for i in range(0, len(data), 2):
        word = (data[i] << 8) | data[i+1]
        s = ones_complement_add16(s, word)
    return (~s) & 0xFFFF

def checksum_encode(payload: bytes) -> bytes:
    csum = internet_checksum(payload)
    # append checksum (big-endian)
    return payload + bytes([csum >> 8, csum & 0xFF])

def checksum_verify(frame: bytes) -> Tuple[bool, bytes]:
    if len(frame) < 2:
        return False, b""
    payload, recv_csum_bytes = frame[:-2], frame[-2:]
    recv_csum = (recv_csum_bytes[0] << 8) | recv_csum_bytes[1]
    # Verify: sum(payload) + recv_csum should be 0xFFFF
    if len(payload) % 2 == 1:
        payload_padded = payload + b"\x00"
    else:
        payload_padded = payload
    s = 0
    for i in range(0, len(payload_padded), 2):
        word = (payload_padded[i] << 8) | payload_padded[i+1]
        s = ones_complement_add16(s, word)
    s = ones_complement_add16(s, recv_csum)
    ok = (s == 0xFFFF)
    return ok, payload

# ---------------------------
# Scheme 3: CRC-16/CCITT-FALSE (poly 0x1021, init 0xFFFF, no final XOR)
# ---------------------------
def crc16_ccitt(data: bytes, poly=0x1021, init=0xFFFF) -> int:
    crc = init
    for b in data:
        crc ^= (b << 8)
        for _ in range(8):
            if crc & 0x8000:
                crc = ((crc << 1) ^ poly) & 0xFFFF
            else:
                crc = (crc << 1) & 0xFFFF
    return crc

def crc_encode(payload: bytes) -> bytes:
    c = crc16_ccitt(payload)
    return payload + bytes([c >> 8, c & 0xFF])

def crc_verify(frame: bytes) -> Tuple[bool, bytes]:
    if len(frame) < 2:
        return False, b""
    payload, recv_crc_b = frame[:-2], frame[-2:]
    recv_crc = (recv_crc_b[0] << 8) | recv_crc_b[1]
    calc = crc16_ccitt(payload)
    ok = (calc == recv_crc)
    return ok, payload

def hamming74_encode_nibble(d1, d2, d3, d4) -> List[int]:
    p1 = (d1 ^ d2 ^ d4) & 1
    p2 = (d1 ^ d3 ^ d4) & 1
    p3 = (d2 ^ d3 ^ d4) & 1
    # order: 1 2 3 4 5 6 7
    return [p1, p2, d1, p3, d2, d3, d4]

def hamming74_decode_codeword(cw: List[int]) -> Tuple[bool, bool, List[int]]:
    # returns (detected_error, corrected, data_bits[4])
    p1, p2, d1, p3, d2, d3, d4 = cw
    s1 = (p1 ^ d1 ^ d2 ^ d4) & 1   # parity check for p1 set
    s2 = (p2 ^ d1 ^ d3 ^ d4) & 1   # parity check for p2 set
    s3 = (p3 ^ d2 ^ d3 ^ d4) & 1   # parity check for p3 set
    syndrome = (s3 << 2) | (s2 << 1) | s1  # 1..7 or 0
    detected = (syndrome != 0)
    corrected = False
    cw2 = cw[:]
    if detected:
        # correct the bit at 'syndrome' (1-indexed)
        idx = syndrome - 1
        cw2[idx] ^= 1
        corrected = True
    # extract data bits
    _, _, D1, _, D2, D3, D4 = cw2
    return detected, corrected, [D1, D2, D3, D4]

def hamming74_encode_bytes(payload: bytes) -> List[int]:
    bits = bytes_to_bits(payload)
    # nibble-ize: groups of 4
    assert len(bits) % 8 == 0
    out = []
    for i in range(0, len(bits), 4):
        # when i hits 4, we want two nibbles per byte, but list slicing will handle it
        d = bits[i:i+4]
        if len(d) < 4:
            d += [0]*(4-len(d))
        out.extend(hamming74_encode_nibble(*d))
    return out

def hamming74_decode_bits(frame_bits: List[int]) -> Tuple[bool, bool, bytes]:
    # decode each 7-bit codeword; track if any detected/corrected
    detected_any = False
    corrected_any = False
    data_bits = []
    if len(frame_bits) % 7 != 0:
        raise ValueError("Hamming(7,4) frame must be multiple of 7 bits")
    for i in range(0, len(frame_bits), 7):
        cw = frame_bits[i:i+7]
        det, cor, d4 = hamming74_decode_codeword(cw)
        detected_any |= det
        corrected_any |= cor
        data_bits.extend(d4)
    # data_bits length is multiple of 4; original payload was multiple of 8 bits
    if len(data_bits) % 8 != 0:
        # drop trailing pad nibble if any (shouldn't happen with clean input)
        data_bits = data_bits[:len(data_bits)//8*8]
    return detected_any, corrected_any, bits_to_bytes(data_bits)

# ---------------------------
# Demo & Comparison
# ---------------------------
def demo(message: bytes, ber: float = 0.02, seed: int = 7):
    rng = random.Random(seed)
    print(f"Message: {message!r}  (len={len(message)} bytes)")
    msg_bits = bytes_to_bits(message)

    # ---- Parity ----
    parity_frame_bits = parity_encode(msg_bits)
    noisy_parity_bits = transmit_through_bsc(parity_frame_bits, ber, rng)
    ok_parity, recv_bits = parity_check(noisy_parity_bits)
    parity_flip_count = count_bit_diff(parity_frame_bits, noisy_parity_bits)
    print("\n[Parity]")
    print(f"  flipped bits: {parity_flip_count}")
    print(f"  parity pass: {ok_parity}  (fails => detected error)")
    print(f"  payload matches: {recv_bits == msg_bits}")

    # ---- Internet Checksum ----
    checksum_frame = checksum_encode(message)
    noisy_checksum_bits = transmit_through_bsc(bytes_to_bits(checksum_frame), ber, rng)
    ok_checksum, recv_payload = checksum_verify(bits_to_bytes(noisy_checksum_bits))
    checksum_flip_count = count_bit_diff(bytes_to_bits(checksum_frame), noisy_checksum_bits)
    print("\n[Internet Checksum]")
    print(f"  flipped bits: {checksum_flip_count}")
    print(f"  checksum pass: {ok_checksum}  (False means detected error)")
    print(f"  payload matches: {recv_payload == message}")

    # ---- CRC-16/CCITT ----
    crc_frame = crc_encode(message)
    noisy_crc_bits = transmit_through_bsc(bytes_to_bits(crc_frame), ber, rng)
    ok_crc, recv_payload_crc = crc_verify(bits_to_bytes(noisy_crc_bits))
    crc_flip_count = count_bit_diff(bytes_to_bits(crc_frame), noisy_crc_bits)
    print("\n[CRC-16/CCITT]")
    print(f"  flipped bits: {crc_flip_count}")
    print(f"  CRC pass: {ok_crc}  (False means detected error)")
    print(f"  payload matches: {recv_payload_crc == message}")

    # ---- Hamming(7,4) SEC ----
    # Encode to codebits, send via channel, decode, correct
    ham_bits = hamming74_encode_bytes(message)
    noisy_ham_bits = transmit_through_bsc(ham_bits, ber, rng)
    ham_flip_count = count_bit_diff(ham_bits, noisy_ham_bits)
    det, cor, recv_payload_ham = hamming74_decode_bits(noisy_ham_bits)
    print("\n[Hamming(7,4) SEC]")
    print(f"  flipped bits: {ham_flip_count}")
    print(f"  detected_any: {det}, corrected_any: {cor}")
    print(f"  payload matches after decoding: {recv_payload_ham == message}")

if __name__ == "__main__":
    # Try with a short message and 2% BER
    demo(b"HELLO", ber=0.02, seed=42)

## Gradio

In [None]:
import gradio as gr
import random
from typing import List, Tuple

# ---------------------------
# Utilities: bits <-> bytes
# ---------------------------
def bytes_to_bits(data: bytes) -> List[int]:
    """Converts a byte string to a list of bits."""
    bits = []
    for b in data:
        for i in range(8):
            bits.append((b >> (7 - i)) & 1)  # msb first
    return bits

def bits_to_bytes(bits: List[int]) -> bytes:
    """Converts a list of bits to a byte string. Assumes len(bits) is a multiple of 8."""
    if len(bits) % 8 != 0:
        # Pad with zeros to make length a multiple of 8
        bits += [0] * (8 - len(bits) % 8)
    out = bytearray()
    for i in range(0, len(bits), 8):
        val = 0
        for j in range(8):
            val = (val << 1) | (bits[i + j] & 1)
        out.append(val)
    return bytes(out)

def count_bit_diff(a: List[int], b: List[int]) -> int:
    """Counts the number of differing bits between two lists of bits."""
    return sum(x ^ y for x, y in zip(a, b))

# ---------------------------
# Channel: flips bits with prob p
# ---------------------------
def transmit_through_bsc(bits: List[int], p: float, rng: random.Random) -> List[int]:
    """Simulates a Binary Symmetric Channel that flips bits with probability p."""
    out = []
    for bit in bits:
        flip = 1 if rng.random() < p else 0
        out.append(bit ^ flip)
    return out

# ---------------------------
# Scheme 1: EVEN PARITY (whole frame parity)
# ---------------------------
def parity_encode(payload_bits: List[int]) -> List[int]:
    """Encodes a bit list with an even parity bit."""
    parity = sum(payload_bits) % 2
    parity_bit = parity
    return payload_bits + [parity_bit]

def parity_check(frame_bits: List[int]) -> Tuple[bool, List[int]]:
    """Checks an even parity frame for errors."""
    payload_bits = frame_bits[:-1]
    parity_bit = frame_bits[-1]
    total = (sum(payload_bits) + parity_bit) % 2
    ok = (total == 0)
    return ok, payload_bits

# ---------------------------
# Scheme 2: Internet Checksum (16-bit 1's complement)
# ---------------------------
def ones_complement_add16(a: int, b: int) -> int:
    """Adds two 16-bit numbers using one's complement arithmetic."""
    s = a + b
    return (s & 0xFFFF) + (s >> 16)

def internet_checksum(data: bytes) -> int:
    """Calculates the 16-bit Internet Checksum of a byte string."""
    if len(data) % 2 == 1:
        data = data + b"\x00"
    s = 0
    for i in range(0, len(data), 2):
        word = (data[i] << 8) | data[i + 1]
        s = ones_complement_add16(s, word)
    return (~s) & 0xFFFF

def checksum_encode(payload: bytes) -> bytes:
    """Encodes a payload with the Internet Checksum."""
    csum = internet_checksum(payload)
    return payload + bytes([csum >> 8, csum & 0xFF])

def checksum_verify(frame: bytes) -> Tuple[bool, bytes]:
    """Verifies the Internet Checksum of a frame."""
    if len(frame) < 2:
        return False, b""
    payload, recv_csum_bytes = frame[:-2], frame[-2:]
    recv_csum = (recv_csum_bytes[0] << 8) | recv_csum_bytes[1]
    if len(payload) % 2 == 1:
        payload_padded = payload + b"\x00"
    else:
        payload_padded = payload
    s = 0
    for i in range(0, len(payload_padded), 2):
        word = (payload_padded[i] << 8) | payload_padded[i + 1]
        s = ones_complement_add16(s, word)
    s = ones_complement_add16(s, recv_csum)
    ok = (s == 0xFFFF)
    return ok, payload

# ---------------------------
# Scheme 3: CRC-16/CCITT-FALSE (poly 0x1021)
# ---------------------------
def crc16_ccitt(data: bytes, poly=0x1021, init=0xFFFF) -> int:
    """Calculates the CRC-16/CCITT-FALSE checksum."""
    crc = init
    for b in data:
        crc ^= (b << 8)
        for _ in range(8):
            if crc & 0x8000:
                crc = ((crc << 1) ^ poly) & 0xFFFF
            else:
                crc = (crc << 1) & 0xFFFF
    return crc

def crc_encode(payload: bytes) -> bytes:
    """Encodes a payload with a CRC-16/CCITT checksum."""
    c = crc16_ccitt(payload)
    return payload + bytes([c >> 8, c & 0xFF])

def crc_verify(frame: bytes) -> Tuple[bool, bytes]:
    """Verifies the CRC-16/CCITT checksum of a frame."""
    if len(frame) < 2:
        return False, b""
    payload, recv_crc_b = frame[:-2], frame[-2:]
    recv_crc = (recv_crc_b[0] << 8) | recv_crc_b[1]
    calc = crc16_ccitt(payload)
    ok = (calc == recv_crc)
    return ok, payload

# ---------------------------
# Scheme 4: Hamming(7,4) SEC (single-error-correction)
# ---------------------------
def hamming74_encode_nibble(d1, d2, d3, d4) -> List[int]:
    """Encodes a 4-bit nibble using Hamming(7,4)."""
    p1 = (d1 ^ d2 ^ d4) & 1
    p2 = (d1 ^ d3 ^ d4) & 1
    p3 = (d2 ^ d3 ^ d4) & 1
    return [p1, p2, d1, p3, d2, d3, d4]

def hamming74_decode_codeword(cw: List[int]) -> Tuple[bool, bool, List[int]]:
    """Decodes and corrects a 7-bit Hamming codeword."""
    p1, p2, d1, p3, d2, d3, d4 = cw
    s1 = (p1 ^ d1 ^ d2 ^ d4) & 1
    s2 = (p2 ^ d1 ^ d3 ^ d4) & 1
    s3 = (p3 ^ d2 ^ d3 ^ d4) & 1
    syndrome = (s3 << 2) | (s2 << 1) | s1
    detected = (syndrome != 0)
    corrected = False
    cw2 = cw[:]
    if detected:
        idx = syndrome - 1
        if 0 <= idx < len(cw2):
            cw2[idx] ^= 1
            corrected = True

    # Check if a corrected codeword is valid for extraction
    if corrected:
        _, _, D1, _, D2, D3, D4 = cw2
        return detected, corrected, [D1, D2, D3, D4]
    else:
        # If not corrected, return original data bits
        _, _, D1, _, D2, D3, D4 = cw
        return detected, corrected, [D1, D2, D3, D4]

def hamming74_encode_bytes(payload: bytes) -> List[int]:
    """Encodes a byte string using Hamming(7,4)."""
    bits = bytes_to_bits(payload)
    out = []
    for i in range(0, len(bits), 4):
        d = bits[i:i + 4]
        if len(d) < 4:
            d += [0] * (4 - len(d))
        out.extend(hamming74_encode_nibble(*d))
    return out

def hamming74_decode_bits(frame_bits: List[int]) -> Tuple[bool, bool, bytes]:
    """Decodes Hamming-encoded bits and corrects errors."""
    detected_any = False
    corrected_any = False
    data_bits = []
    if len(frame_bits) % 7 != 0:
        return False, False, b""
    for i in range(0, len(frame_bits), 7):
        cw = frame_bits[i:i + 7]
        det, cor, d4 = hamming74_decode_codeword(cw)
        detected_any |= det
        corrected_any |= cor
        data_bits.extend(d4)
    if len(data_bits) % 8 != 0:
        data_bits = data_bits[:len(data_bits) // 8 * 8]
    return detected_any, corrected_any, bits_to_bytes(data_bits)

# ---------------------------
# Gradio Interface Function
# ---------------------------
def run_simulation(message: str, ber: float, seed: int):
    """
    Main function for the Gradio interface. Runs all simulations and returns formatted output.
    """
    if not message:
        return "Please enter a message to simulate transmission.", "", "", ""

    # Clear previous random seed and set a new one for reproducibility
    random.seed(seed)
    rng = random.Random(seed)

    message_bytes = message.encode('utf-8')
    msg_bits = bytes_to_bits(message_bytes)

    # Parity Simulation
    parity_frame_bits = parity_encode(msg_bits)
    noisy_parity_bits = transmit_through_bsc(parity_frame_bits, ber, rng)
    ok_parity, recv_bits_parity = parity_check(noisy_parity_bits)
    parity_flip_count = count_bit_diff(parity_frame_bits, noisy_parity_bits)

    parity_result = (
        f"**Parity**\n"
        f"  - Flipped bits: {parity_flip_count}\n"
        f"  - Error detected: {'Yes' if not ok_parity else 'No'}\n"
        f"  - Message matches original: {'Yes' if recv_bits_parity == msg_bits else 'No'}\n"
        f"  - Original Message: {message_bytes}\n"
        f"  - Received Message: {bits_to_bytes(recv_bits_parity) if recv_bits_parity == msg_bits else 'N/A'}"
    )

    # Reset RNG for next simulation
    rng = random.Random(seed)

    # Internet Checksum Simulation
    checksum_frame = checksum_encode(message_bytes)
    noisy_checksum_bits = transmit_through_bsc(bytes_to_bits(checksum_frame), ber, rng)
    ok_checksum, recv_payload_checksum = checksum_verify(bits_to_bytes(noisy_checksum_bits))
    checksum_flip_count = count_bit_diff(bytes_to_bits(checksum_frame), noisy_checksum_bits)

    checksum_result = (
        f"**Internet Checksum**\n"
        f"  - Flipped bits: {checksum_flip_count}\n"
        f"  - Error detected: {'Yes' if not ok_checksum else 'No'}\n"
        f"  - Message matches original: {'Yes' if recv_payload_checksum == message_bytes else 'No'}\n"
        f"  - Original Message: {message_bytes}\n"
        f"  - Received Message: {recv_payload_checksum if recv_payload_checksum == message_bytes else 'N/A'}"
    )

    # Reset RNG for next simulation
    rng = random.Random(seed)

    # CRC-16/CCITT Simulation
    crc_frame = crc_encode(message_bytes)
    noisy_crc_bits = transmit_through_bsc(bytes_to_bits(crc_frame), ber, rng)
    ok_crc, recv_payload_crc = crc_verify(bits_to_bytes(noisy_crc_bits))
    crc_flip_count = count_bit_diff(bytes_to_bits(crc_frame), noisy_crc_bits)

    crc_result = (
        f"**CRC-16/CCITT**\n"
        f"  - Flipped bits: {crc_flip_count}\n"
        f"  - Error detected: {'Yes' if not ok_crc else 'No'}\n"
        f"  - Message matches original: {'Yes' if recv_payload_crc == message_bytes else 'No'}\n"
        f"  - Original Message: {message_bytes}\n"
        f"  - Received Message: {recv_payload_crc if recv_payload_crc == message_bytes else 'N/A'}"
    )

    # Reset RNG for next simulation
    rng = random.Random(seed)

    # Hamming Simulation
    ham_bits = hamming74_encode_bytes(message_bytes)
    noisy_ham_bits = transmit_through_bsc(ham_bits, ber, rng)
    ham_flip_count = count_bit_diff(ham_bits, noisy_ham_bits)
    det, cor, recv_payload_ham = hamming74_decode_bits(noisy_ham_bits)

    hamming_result = (
        f"**Hamming (7,4) SEC**\n"
        f"  - Flipped bits: {ham_flip_count}\n"
        f"  - Error detected: {'Yes' if det else 'No'}\n"
        f"  - Error corrected: {'Yes' if cor else 'No'}\n"
        f"  - Message matches original: {'Yes' if recv_payload_ham == message_bytes else 'No'}\n"
        f"  - Original Message: {message_bytes}\n"
        f"  - Received Message: {recv_payload_ham if recv_payload_ham == message_bytes else 'N/A'}"
    )

    return parity_result, checksum_result, crc_result, hamming_result


# Gradio Interface definition
with gr.Blocks(title="Error Detection and Correction") as demo:
    gr.Markdown("# Error Detection and Correction in Computer Networks")
    gr.Markdown("A simulation of various error control techniques. Enter a message and adjust the Bit Error Rate (BER) to see how different methods handle data corruption.")

    with gr.Row():
        message_input = gr.Textbox(
            label="Input Message",
            placeholder="Enter a message to transmit...",
            value="Hello World"
        )
        ber_slider = gr.Slider(
            minimum=0,
            maximum=0.1,
            step=0.001,
            value=0.01,
            label="Bit Error Rate (BER)",
            info="The probability of a single bit flipping during transmission."
        )
        seed_number = gr.Number(
            label="Random Seed",
            value=42,
            precision=0,
            info="Set a seed for reproducible results."
        )

    run_button = gr.Button("Run Simulation", variant="primary")

    with gr.Row():
        parity_output = gr.Markdown(label="Parity Result")
        checksum_output = gr.Markdown(label="Internet Checksum Result")
        crc_output = gr.Markdown(label="CRC-16 Result")
        hamming_output = gr.Markdown(label="Hamming (7,4) Result")

    run_button.click(
        fn=run_simulation,
        inputs=[message_input, ber_slider, seed_number],
        outputs=[parity_output, checksum_output, crc_output, hamming_output]
    )

if __name__ == "__main__":
    demo.launch()


It looks like you are running Gradio on a hosted Jupyter notebook, which requires `share=True`. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://07ebf42ec78fb2a37a.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
