# Imports

In [None]:
from scapy.all import Ether, wrpcap

# Functions

## Character and packet generating

In [None]:
def ascii_to_bits(character):
    """Convert ASCII character to 8-bit binary."""
    return f"{ord(character):08b}"

def bits_to_ascii(bits):
    """Convert 8-bit binary to ASCII character."""
    return chr(int(bits, 2))

def calculate_parity(bits):
    """Return parity bit to ensure odd parity."""
    return '0' if sum(int(b) for b in bits) % 2 == 1 else '1'

def build_data_char(ascii_char, prev_payload):
    """Build data character (P + 0 + 8-bit payload)."""
    control_flag = '0'
    payload = ascii_to_bits(ascii_char)
    p = calculate_parity(prev_payload + control_flag)
    return p, control_flag, payload

def build_control_char(name, prev_payload):
    """Build control character (P + 1 + 2-bit control code)."""
    codes = {
        "FCT": "00",
        "EOP": "01",
        "EEP": "10",
        "Escape": "11"
    }
    if name not in codes:
        raise ValueError(f"Invalid control character name: {name}")
    control_flag = '1'
    payload = codes[name]
    p = calculate_parity(prev_payload + control_flag)
    return p, control_flag, payload

def create_null(prev_payload):
    """Build a NULL character (P + 1 + 110100)."""
    control_flag = '1'
    payload = '110100'
    p = calculate_parity(prev_payload + control_flag)
    return p + control_flag + payload

def create_fct(prev_payload):
    """Build an FCT character."""
    p, control_flag, payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
    return p + control_flag + payload

def create_character(char, is_control, prev_payload):
    """Unified builder for data/control characters."""
    return build_control_char(char, prev_payload) if is_control else build_data_char(char, prev_payload)

def create_data_packet(data, address, initial_prev_payload):
    """Builds a SpaceWire data packet (addr + payload + EOP)."""
    packet = ""
    prev_payload = initial_prev_payload

    for char in address:
        p, control_flag, payload = build_data_char(char, prev_payload)
        packet += p + control_flag + payload
        prev_payload = payload

    for char in data:
        p, control_flag, payload = build_data_char(char, prev_payload)
        packet += p + control_flag + payload
        prev_payload = payload

    p, control_flag, payload = build_control_char("EOP", prev_payload)
    packet += p + control_flag + payload

    return packet

## Functions for generating signal and transmission

In [None]:
def create_clock(length):
    """Generate clock sequence starting at 1"""
    return ''.join(str((i + 1) % 2) for i in range(length))

def create_strobe(bits, clock):
    """Generate strobe signal by XORing data and clock."""
    return ''.join(str(int(b) ^ int(c)) for b, c in zip(bits, clock))

def create_reset_signal(signal_length, reset_position):
    """Generate reset line with single 1 at reset_position."""
    reset = ['0'] * signal_length
    if 0 <= reset_position < signal_length:
        reset[reset_position] = '1'
    return ''.join(reset)

def save_to_file(data, strobe, reset, filename):
    """Save data, strobe, and reset signals to file."""
    with open(filename, "w") as file:
        file.writelines(f"{d}\t{s}\t{r}\n" for d, s, r in zip(data, strobe, reset))

def generate_transmission(null_count, include_fct, payload, buffer_length=1, initial_prev_payload=""):
    """Generate complete SpaceWire transmission."""
    bitstream = ""
    prev_payload = initial_prev_payload

    for _ in range(null_count):
        null_char = create_null(prev_payload)
        bitstream += null_char
        prev_payload = "110100"

    if include_fct:
        p, control_flag, payload_bits = create_character("FCT", is_control=True, prev_payload=prev_payload)
        bitstream += p + control_flag + payload_bits
        prev_payload = payload_bits

    packet = create_data_packet(payload, address="", initial_prev_payload=prev_payload)
    bitstream = bitstream + packet

    clk = create_clock(len(bitstream))
    strobe = create_strobe(bitstream, clk)
    
    # Create buffer if specified
    if buffer_length > 0:
        buffer = '0' * buffer_length
        bitstream = buffer + bitstream
        strobe = buffer + strobe
        

    reset = create_reset_signal(len(bitstream), reset_position=3)

    return bitstream, strobe, reset

def calculate_downtime_cycles(clock_cycle_speed, downtime):
    """Calculate number of clock cycles (i.e., '0's) during downtime."""
    clock_cycle_speed = float(clock_cycle_speed)
    downtime = float(downtime)

    if clock_cycle_speed <= 0:
        raise ValueError("Clock cycle speed must be positive.")
    if downtime < 0:
        return 0
    return int(clock_cycle_speed * downtime)

## Functions for VHDL formatting

In [None]:
def format_bitstream_for_vhdl(data_bits, strobe_bits):
    """Format paired data and strobe bits into VHDL ROM format as 2-bit vectors."""
    if len(data_bits) != len(strobe_bits):
        raise ValueError("Data and strobe bitstreams must be the same length.")
    
    entries = [f"{i} => \"{data_bits[i]}{strobe_bits[i]}\"" for i in range(len(data_bits))]
    entries.append("others => \"00\"")
    return ", ".join(entries)

def process_bitfile_to_vhdl(filepath):
    """Read first two columns (data, strobe) and format them for VHDL ROM."""
    data_bits = ""
    strobe_bits = ""
    with open(filepath, 'r') as f:
        for line in f:
            columns = line.strip().split()
            if len(columns) >= 2:
                data_bits += columns[0]
                strobe_bits += columns[1]
    return format_bitstream_for_vhdl(data_bits, strobe_bits)

## Functions for simulating QSYS/Platform Designer functionality

In [None]:
def unpack_modelsim_output_by_packets(filename):
    """
    Unpack ModelSim output file where:
    - Line N contains data_bits and nchar
    - Line N+1 contains valid
    - Bit 8 = EEP, bit 9 = EOP (either ends packet)
    Returns: list of (bit_matrix, nchar_flags, valid_flags) tuples, one per packet
    """
    packets = []
    current_bits = []
    current_nchars = []
    current_valids = []

    with open(filename, 'r') as file:
        raw_lines = [line.strip().ljust(13, '0') for line in file if line.strip()]

    for i in range(len(raw_lines) - 1):
        data_line = raw_lines[i]
        valid_line = raw_lines[i + 1]

        try:
            data_bits = [int(b) for b in data_line[0:8]]
            nchar = int(data_line[10])
            valid = int(valid_line[12])
        except (IndexError, ValueError):
            continue  # skip malformed pairs

        current_bits.append(data_bits)
        current_nchars.append(nchar)
        current_valids.append(valid)

        # Check for packet boundary: EEP or EOP
        try:
            eep = int(data_line[8])
            eop = int(data_line[9])
            if eep or eop:
                packets.append((current_bits, current_nchars, current_valids))
                current_bits, current_nchars, current_valids = [], [], []
        except IndexError:
            continue

    # Add trailing packet if any
    if current_bits:
        packets.append((current_bits, current_nchars, current_valids))

    return packets

def filter_valid_data_characters(bit_matrix, nchar_flags, valid_flags):
    """Keep only entries where valid == 1 and nchar == 1."""
    filtered_bits = []
    filtered_nchars = []

    for bits, nchar, valid in zip(bit_matrix, nchar_flags, valid_flags):
        if valid and nchar:
            filtered_bits.append(bits)
            filtered_nchars.append(nchar)

    return filtered_bits, filtered_nchars

def export_spacewire_pcap_ethertype(bit_matrix, nchar_flags, output_filename, ethertype=0x88B5):
    payload_bytes = bytearray()

    for bits, nchar in zip(bit_matrix, nchar_flags):
        if nchar == 1:
            byte_val = int("".join(str(b) for b in bits), 2)
            payload_bytes.append(byte_val)

    if payload_bytes:
        eth = Ether(dst="aa:bb:cc:dd:ee:ff", src="11:22:33:44:55:66", type=ethertype)
        frame = eth / payload_bytes
        wrpcap(output_filename, [frame])

def generate_spacewire_pcaps_from_modelsim(input_filename, output_prefix="packet", ethertype=0x88B5):
    """
    Extracts packets from a ModelSim output file and saves each as a .pcap file.
    """
    packets = unpack_modelsim_output_by_packets(input_filename)
    total_valid = 0

    for i, (bit_matrix, nchar_flags, valid_flags) in enumerate(packets):
        filtered_bits, filtered_nchars = filter_valid_data_characters(bit_matrix, nchar_flags, valid_flags)
        
        if filtered_bits:
            filename = f"{output_prefix}_{i + 1}.pcap"
            export_spacewire_pcap_ethertype(filtered_bits, filtered_nchars, filename, ethertype)
            print(f"Packet {i + 1}: {len(filtered_bits)} valid bytes → saved to {filename}")
            total_valid += len(filtered_bits)

    print(f"\n Total valid SpaceWire data characters exported: {total_valid}")

In [None]:
generate_spacewire_pcaps_from_modelsim("output_file case1.txt", output_prefix="packet_case_1_", ethertype=0x88B5)
generate_spacewire_pcaps_from_modelsim("rigtig output_file case 3.txt", output_prefix="packet_case_2_", ethertype=0x88B5)
generate_spacewire_pcaps_from_modelsim("output_file case 4.txt", output_prefix="packet_case_4_", ethertype=0x88B5)

In [None]:
# Run the transmission generation
bitstream, strobe, reset = generate_transmission(
    null_count=11,
    include_fct=True,
    payload="H",
    buffer_length=10,
    initial_prev_payload=""
)

# Preview output
print(bitstream)
print(strobe)
print(reset)

# Save to file
save_to_file(bitstream, strobe, reset, "transmission.txt")

# Data for test cases

## Buffer-Nulls-FCT-Pakke-EOP-Pakke-EOP-Nulls

In [None]:
buffer = '0' * 320* 3  # 1-bit buffer
transmission = ''
prev_payload = ""

for _ in range(11):
    transmission += create_null(prev_payload)
    prev_payload = "110100"
print(f"Nulls: {transmission}")

p, control_flag, payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
print(f"FCT: {p + control_flag + payload}")
transmission += p + control_flag + payload
prev_payload = payload

data_packet_1 = create_data_packet("Hello", address="", initial_prev_payload=prev_payload)
transmission += data_packet_1
print(f"Data Packet 1: {data_packet_1}")
prev_payload = transmission[-2:]

data_packet_2 = create_data_packet("World", address="", initial_prev_payload=prev_payload)
transmission += data_packet_2
print(f"Data Packet 2: {data_packet_2}")
prev_payload = transmission[-2:]

for _ in range(10):
    transmission += create_null(prev_payload)
    prev_payload = "110100"

clk = create_clock(len(transmission))
strobe = create_strobe(transmission, clk)
reset = create_reset_signal(len(transmission), reset_position=1)

transmission = buffer + transmission
strobe = buffer + strobe
format_bitstream_for_vhdl(transmission, strobe)

## Buffer-Nulls-FCT-Pakke-EEP-Buffer-Nulls-FCT-Pakke-EOP

In [None]:
prev_payload = ""

### --- Segment 1: Nulls + FCT + Hello + EEP --- ###

# Nulls before FCT
nulls_1 = ''
for _ in range(5):
    nulls_1 += create_null(prev_payload)
    prev_payload = "110100"

# FCT 1
fct_1_p, fct_1_ctrl, fct_1_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_1 = fct_1_p + fct_1_ctrl + fct_1_payload
prev_payload = fct_1_payload

# Data Packet 1: "Hello"
data_packet_1 = create_data_packet("Hello", address="", initial_prev_payload=prev_payload)
data_packet_1 = data_packet_1[:-4] # Remove EOP to manually insert EEP
prev_payload = data_packet_1[-8:]

# Manual EEP after Hello
eep_1_p, eep_1_ctrl, eep_1_payload = create_character("EEP", is_control=True, prev_payload=prev_payload)
eep_1 = eep_1_p + eep_1_ctrl + eep_1_payload
prev_payload = eep_1_payload

# Extra null to ensure EEP is correctly processed
extra_null = create_null(prev_payload)
prev_payload = "110100"  # Reset to NULL payload

segment_1 = nulls_1 + fct_1 + data_packet_1 + eep_1 + extra_null
clk_1 = create_clock(len(segment_1))
strobe_1 = create_strobe(segment_1, clk_1)



### --- Segment 2: Nulls + FCT + World (with built-in EOP) --- ###

nulls_2 = ''
prev_payload = ""
for _ in range(11):
    nulls_2 += create_null(prev_payload)
    prev_payload = "110100"

# FCT 2
fct_2_p, fct_2_ctrl, fct_2_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_2 = fct_2_p + fct_2_ctrl + fct_2_payload
prev_payload = fct_2_payload

# Data Packet 2: "World" (auto EOP included)
data_packet_2 = create_data_packet("World", address="", initial_prev_payload=prev_payload)
prev_payload = data_packet_2[-2:]

segment_2 = nulls_2 + fct_2 + data_packet_2
clk_2 = create_clock(len(segment_2))
strobe_2 = create_strobe(segment_2, clk_2)

### --- Final Assembly --- ###

transmission = buffer + segment_1 + buffer + segment_2
strobe = buffer + strobe_1 + buffer + strobe_2

format_bitstream_for_vhdl(transmission, strobe)

## Buffer-Nulls-FCT-Pakke-EOP-Pakke-EOP-1 Null

In [None]:
transmission = ''
prev_payload = ""

# Nulls
for _ in range(5):
    transmission += create_null(prev_payload)
    prev_payload = "110100"

# FCT
p, control_flag, payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
transmission += p + control_flag + payload
prev_payload = payload

# First Data Packet
data_packet_1 = create_data_packet("Hello", address="", initial_prev_payload=prev_payload)
transmission += data_packet_1
prev_payload = transmission[-2:]

# Second Data Packet
data_packet_2 = create_data_packet("World", address="", initial_prev_payload=prev_payload)
transmission += data_packet_2
prev_payload = transmission[-2:]

extra_nulls = ''
# 5 Null character
for _ in range(5):
    extra_nulls += create_null(prev_payload)
    prev_payload = "110100"
transmission += extra_nulls


# Clock and Strobe
clk = create_clock(len(transmission))
strobe = create_strobe(transmission, clk)

# Add leading buffer
transmission = buffer + transmission
strobe = buffer + strobe

# Format for VHDL
format_bitstream_for_vhdl(transmission, strobe)

## Bufffer-Nulls-FCT-Pakke-EOP-Pakke-EEP-Buffer-Nulls-FCT-Pakke-EOP

In [None]:
transmission = ''
prev_payload = ""
nulls_1 = ""
null_2 = ""

# Nulls
for _ in range(3):
    nulls_1 += create_null(prev_payload)
    prev_payload = "110100"

# FCT
fct_1_p, fct_1_ctrl, fct_1_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_1 = fct_1_p + fct_1_ctrl + fct_1_payload
prev_payload = fct_1_payload

# Packet 1
data_packet_1 = create_data_packet("Hello", address="", initial_prev_payload=prev_payload)
prev_payload = data_packet_1[-2:]

# Packet 2 with EEP
data_packet_2 = create_data_packet("Midway", address="", initial_prev_payload=prev_payload)
data_packet_2 = data_packet_2[:-4]  # Remove 4-bit auto EOP
prev_payload = data_packet_2[-8:]

# EEP
eep_1_p, eep_1_ctrl, eep_1_payload = create_character("EEP", is_control=True, prev_payload=prev_payload)
eep_1 = eep_1_p + eep_1_ctrl + eep_1_payload
prev_payload = eep_1_payload

# Extra nulls to ensure EEP is correctly processed
extra_nulls = ""
for _ in range(3):
    extra_nulls += create_null(prev_payload)
    prev_payload = "110100"

segment_1 = (nulls_1 + fct_1 + data_packet_1 + data_packet_2 + eep_1 + extra_nulls)
print(f'null_1: {nulls_1}')
print(f'fct_1: {fct_1}')
print(f'data_packet_1: {data_packet_1}')
print(f'data_packet_2: {data_packet_2}')
print(f'eep_1: {eep_1}')
print(f'extra_nulls: {extra_nulls}')

prev_payload = ""
# Nulls
nulls_2 = ""
for _ in range(3):
    nulls_2 += create_null(prev_payload)
    prev_payload = "110100"

# FCT
fct_2_p, fct_2_ctrl, fct_2_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_2 = fct_2_p + fct_2_ctrl + fct_2_payload
prev_payload = fct_2_payload

# Packet 3: Final (includes EOP)
data_packet_3 = create_data_packet("Final", address="", initial_prev_payload=prev_payload)
prev_payload = data_packet_3[-2:]

# Extra nulls
extra_nulls = ""
for _ in range(3):
    extra_nulls += create_null(prev_payload)
    prev_payload = "110100"

segment_2 = (nulls_2 + fct_2 + data_packet_3 + extra_nulls)
print(f'null_2: {nulls_2}')
print(f'fct_2: {fct_2}')
print(f'data_packet_3: {data_packet_3}')
print(f'extra_nulls: {extra_nulls}')

clk_1 = create_clock(len(segment_1))
strobe_1 = create_strobe(segment_1, clk_1)

clk_2 = create_clock(len(segment_2))
strobe_2 = create_strobe(segment_2, clk_2)


transmission = buffer + segment_1 + buffer + segment_2
strobe = buffer + strobe_1 + buffer + strobe_2

format_bitstream_for_vhdl(transmission, strobe)

## Buffer-Nulls-FCT-Datategn(med paritetsfejl)-Buffer-Nulls-FCT-Pakke-EOP

In [None]:
prev_payload = ""
extra_buffer = '0'* 16
extra_buffer_2 = '0' * 20

### --- Segment 1: Nulls + FCT + 5 Data Characters (4th has parity error) --- ###

# Nulls before FCT
nulls_1 = ''
for _ in range(5):
    nulls_1 += create_null(prev_payload)
    prev_payload = "110100"

print(f"Nulls: {nulls_1}")

# FCT 1
fct_1_p, fct_1_ctrl, fct_1_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_1 = fct_1_p + fct_1_ctrl + fct_1_payload
prev_payload = fct_1_payload  # For control chars, prev_payload is last 2 bits

# 5 Data Characters: A B C D* E (D has parity error)
data_chars = []
char_sequence = "ABCDE"

for i, ch in enumerate(char_sequence):
    p, control_flag, payload = build_data_char(ch, prev_payload)
    data_char = p + control_flag + payload

    if i == 3:
        # Introduce parity error in the 4th character (D)
        flipped_parity = '0' if data_char[0] == '1' else '1'
        data_char = flipped_parity + data_char[1:]

    data_chars.append(data_char)
    prev_payload = payload 

segment_1 = nulls_1 + fct_1 + ''.join(data_chars)
clk_1 = create_clock(len(segment_1))
strobe_1 = create_strobe(segment_1, clk_1)

### --- Segment 2: Nulls + FCT + "World" Packet (auto EOP) --- ###

nulls_2 = ''
prev_payload = ''
for _ in range(5):
    nulls_2 += create_null(prev_payload)
    prev_payload = "110100"

print(f"Nulls: {nulls_2}")
# FCT 2
fct_2_p, fct_2_ctrl, fct_2_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_2 = fct_2_p + fct_2_ctrl + fct_2_payload
prev_payload = fct_2_payload

# Data Packet: "World" with auto EOP
data_packet_2 = create_data_packet("World", address="", initial_prev_payload=prev_payload)
prev_payload = data_packet_2[-8:]

segment_2 = nulls_2 + fct_2 + data_packet_2
clk_2 = create_clock(len(segment_2))
strobe_2 = create_strobe(segment_2, clk_2)

### --- Final Assembly --- ###

transmission = extra_buffer_2 + buffer + segment_1 + buffer + extra_buffer + segment_2
strobe = extra_buffer_2 + buffer + strobe_1 + buffer + extra_buffer + strobe_2

format_bitstream_for_vhdl(transmission, strobe)

## Buffer-Nulls-FCT-Pakke-EEP-Buffer-Nulls(18)-FCT-Buffer-Nulls-FCT-Pakke-EOP

In [None]:
prev_payload = ""

### --- Segment 1: Nulls + FCT + "Hello" (no EOP) + EEP --- ###

nulls_1 = ''
for _ in range(3):
    nulls_1 += create_null(prev_payload)
    prev_payload = "110100"

# FCT 1
fct_1_p, fct_1_ctrl, fct_1_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_1 = fct_1_p + fct_1_ctrl + fct_1_payload
prev_payload = fct_1_payload

# "Hello" (remove auto EOP manually)
data_packet_1 = create_data_packet("Hello", address="", initial_prev_payload=prev_payload)
data_packet_1 = data_packet_1[:-4]  # Trim off auto EOP
prev_payload = data_packet_1[-8:]

# EEP
eep_1_p, eep_1_ctrl, eep_1_payload = create_character("EEP", is_control=True, prev_payload=prev_payload)
eep_1 = eep_1_p + eep_1_ctrl + eep_1_payload
prev_payload = eep_1_payload

# Extra null to ensure EEP is correctly processed
extra_nulls = ''
for _ in range(3):
    extra_nulls += create_null(prev_payload)
    prev_payload = "110100"


segment_1 = nulls_1 + fct_1 + data_packet_1 + eep_1 + extra_nulls
clk_1 = create_clock(len(segment_1))
strobe_1 = create_strobe(segment_1, clk_1)

### --- Segment 2: 18 Nulls + FCT --- ###

nulls_2 = ''
prev_payload = ""
for _ in range(18):
    nulls_2 += create_null(prev_payload)
    prev_payload = "110100"

# FCT 2
fct_2_p, fct_2_ctrl, fct_2_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_2 = fct_2_p + fct_2_ctrl + fct_2_payload
prev_payload = fct_2_payload

segment_2 = nulls_2 + fct_2
clk_2 = create_clock(len(segment_2))
strobe_2 = create_strobe(segment_2, clk_2)

### --- Segment 3: Nulls + FCT + "World" (with EOP) --- ###
extra_buffer = '0' * 640
nulls_3 = ''
for _ in range(3):
    nulls_3 += create_null(prev_payload)
    prev_payload = "110100"

# FCT 3
fct_3_p, fct_3_ctrl, fct_3_payload = create_character("FCT", is_control=True, prev_payload=prev_payload)
fct_3 = fct_3_p + fct_3_ctrl + fct_3_payload
prev_payload = fct_3_payload

# "World" packet (with auto EOP)
data_packet_2 = create_data_packet("World", address="", initial_prev_payload=prev_payload)
prev_payload = data_packet_2[-8:]

segment_3 = nulls_3 + fct_3 + data_packet_2
clk_3 = create_clock(len(segment_3))
strobe_3 = create_strobe(segment_3, clk_3)

### --- Final Assembly --- ### 

transmission = buffer + segment_1 + buffer + segment_2 + buffer + extra_buffer + segment_3
strobe = buffer + strobe_1 + buffer + strobe_2 + buffer + extra_buffer + strobe_3

format_bitstream_for_vhdl(transmission, strobe)


In [None]:
# Create full packet bitstream
packet_bits = create_data_packet("Hello World!", address="X", initial_prev_payload="")

# Split into 10-bit characters
char_bits = [packet_bits[i:i+10] for i in range(0, len(packet_bits), 10)]

# Extract only valid 8-bit payloads from data characters
bit_matrix = []
nchar_flags = []

for char in char_bits:
    if char[1] == '0':  # data character (control_flag == 0)
        payload = [int(b) for b in char[2:]]  # 8-bit payload
        bit_matrix.append(payload)
        nchar_flags.append(1)

# Export to PCAP
export_spacewire_pcap_ethertype(bit_matrix, nchar_flags, "spacewire_addressX_hello_world_fixed.pcap")
