# Convert a TS file into a file of BBFRAMES

In [None]:
ts_fname = 'ts/QPSK-1-4-3.6588.ts'
bbf_fname = 'bbf/QPSK-1-4-3.6588.bbf'
ts_frame_length = 188
bbframe_length_bits = 3072    # K(bch) from Table 5b in ETSI EN 302 307 (DVB-S2)
bbframe_payload_length = (bbframe_length_bits - 80) // 8 # in bytes

In [None]:
# Compute CRC-8 according to 5.1.4 of DVB-S2 Spec
def crc8_dvb_s2(crc, a):
    crc ^= a
    for _ in range(8):
        if crc & 0x80:
            crc = ((crc << 1) ^ 0xD5) % 256
        else:
            crc = (crc << 1) % 256
    return crc

def compute_crc8(buf: bytes) -> bytes:
    crc = 0
    for b in buf:
        crc = crc8_dvb_s2(crc, b)
    return crc.to_bytes(1, "big")

In [None]:
# Read the TS file and yield up each TS packet as a bytes
# Keeping some statistics along the way.

def ts_file_reader(fn):
    count = 0
    stuffing = 0
    payload_units = 0
    pids_seen = set()

    with open(fn, 'rb') as f:
        print(f"File {fn}")
        while tsframe := f.read(ts_frame_length):
            count += 1
            pu = bool(tsframe[1] & 0x40)
            if pu:
                payload_units += 1
            pid = (tsframe[1] & 0x1f) << 8 + tsframe[2]
            pids_seen.add(pid)
            if pid == 0x1fff:
                stuffing += 1
            #if count < 10:
            #    print(f'{chr(tsframe[0])} {pid} {tsframe[3]:x} {(" ", "*")[pu]}')
            yield tsframe
        print(f'{count} packets = {count*ts_frame_length} bytes read.')
        print(f'including {stuffing} stuffing packets')
        print(f'{payload_units} payload units')
        print(f'PIDs seen: {pids_seen}')

In [None]:
# Combine TS packets into BBFRAME payloads and yield them as bytes
# TS packets are sliced between two consecutive payloads when they don't fit.
#
# We return a tuple containing the payload and the offset into the payload where
# the first full TS packet starts, as required for the BBHeader SYNCD field.

def bb_payload_builder(fn: str, payload_length: int):
    ts_packet = ts_file_reader(fn)
    bbframe_count = 0

    remaining_bytes = b''
    payload = b''
    syncd = 0
    previous_crc = b'\00'

    for tsp in ts_packet:
        assert tsp[0] == 0x47, "Out of sync with TS packets"
        payload += previous_crc
        payload += tsp[1:]      # Previous TS packet's CRC replaces the first (sync) byte
        if len(payload) >= payload_length:
            bbframe_count += 1
            yield (payload[:payload_length], syncd)
            remaining_bytes = payload[payload_length:]
            syncd = len(remaining_bytes)
            payload = remaining_bytes
        previous_crc = compute_crc8(tsp[1:])    # CRC omits the first (sync) byte per 5.1.4 text

    if len(payload) > 0:
        payload += b'\00' * (payload_length - len(payload))   # pad last payload to length
        bbframe_count += 1
        yield (payload, syncd)  # SYNCD points to the padding if we have no whole TS packets
    
    print(f'{bbframe_count} BBFRAME payloads generated')

In [None]:
# Create BBHeader according to Table 4 of DVB-S2 Spec

# The DFL field for the last BBFRAME of a broadcast is poorly specified.
# Table 4 implies it should still be set to Kbch-80, but the semantics
# suggest that value should be reduced by the length of the padding.
# But 5.2.1 says "no padding shall be applied" for Broadcast Service
# applications, probably because they assume broadcasts never end.

def make_bbheader(Kbch: int, rolloff: int, syncd: int) -> bytes:
    header = bytearray(10)

    header[0] = 0xf0 | (rolloff & 0x03) # TS, Single, CCM, no ISSY, no NPD
    header[1] = 0   # not used
    header[2] = (188 * 8) >> 8      # UPL in bits
    header[3] = (188 * 8) & 0xff
    header[4] = (Kbch - 80) >> 8    # DFL in bits
    header[5] = (Kbch - 80) & 0xff
    header[6] = 0x47                # Sync word "copied" from the TS packet
    header[7] = (syncd * 8) >> 8    # SYNCD in bits
    header[8] = (syncd * 8) & 0xff
    header[9] = compute_crc8(header[:9])[0]

    return header

In [None]:
dummy_mode_word = b'\xaa\xbb\xcd\xef'

with open(bbf_fname, 'wb') as f:
    payloads = bb_payload_builder(ts_fname, bbframe_payload_length)

    for (payload, syncd) in payloads:
        f.write(dummy_mode_word)
        header = make_bbheader(bbframe_length_bits, 0x0, syncd)
        f.write(header)
        f.write(payload)

print(f'Wrote {bbf_fname}')

Todo:
* code review


### Notes

The output of this notebook has been compared to the output of GNU Radio's BBheader block, and it matches for the cases tested.