<a href="https://colab.research.google.com/github/elangbijak4/Universal-Language-and-Protocol-Construction/blob/main/Demo_3_Contoh_Konstruksi_protokol_universal_lebih_lengkap.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# PFUL Protocol Demo â€” Full sequential pipeline
# Paste/Run this entire block in Google Colab.
# Outputs: prints, a PNG tape visualization at /content/tape_steps.png,
# and JSON saved at /content/pful_protocol_exchange.json

import json, math, matplotlib.pyplot as plt
from math import isqrt
from copy import deepcopy
from pprint import pprint

# --------------------------
# Utilities
# --------------------------
def sieve_primes(n):
    if n < 2:
        return []
    sieve = [True] * (n+1)
    sieve[0:2] = [False, False]
    for p in range(2, isqrt(n) + 1):
        if sieve[p]:
            step = p*p
            sieve[step:n+1:p] = [False] * (((n - step)//p) + 1)
    return [i for i, isprime in enumerate(sieve) if isprime]

def factorize(n, primes_list=None):
    if n < 2:
        return {}
    if primes_list is None:
        primes_list = sieve_primes(int(max(100, isqrt(n) + 10)))
    rem = n
    factors = {}
    for p in primes_list:
        if p*p > rem:
            break
        if rem % p == 0:
            cnt = 0
            while rem % p == 0:
                rem //= p
                cnt += 1
            factors[p] = cnt
    if rem > 1:
        factors[rem] = factors.get(rem, 0) + 1
    return factors

PRIMES = sieve_primes(5000)
def prime_at(i): return PRIMES[i]  # 0-based

def generate_fibonacci(k):
    if k <= 0:
        return []
    F = [0,1]
    for _ in range(2, k):
        F.append(F[-1] + F[-2])
    return F

# --------------------------
# Encoding helpers (PTL style)
# --------------------------
def encode_transition_tuple(tup, s2p, q2p, d2p):
    # tup = (q, a, q', b, D)
    q,a,qp,b,D = tup
    return q2p[q] * s2p[a] * q2p[qp] * s2p[b] * d2p[D]

def encode_configuration(state, tape_dict, q2p, s2p):
    # conf = p_state * product_{pos} (symprime ** (pos+2))
    conf = q2p[state]
    for pos,sym in tape_dict.items():
        conf *= (s2p[sym] ** (pos + 2))
    return conf

def decode_configuration(conf_int, state2prime, symbol2prime, primes=PRIMES):
    fac = factorize(conf_int, primes)
    state = None
    for st, p in state2prime.items():
        if p in fac:
            state = st
            break
    tape = {}
    for p,e in fac.items():
        if p in {v for v in symbol2prime.values()}:
            # find symbol
            sym = [k for k,v in symbol2prime.items() if v==p][0]
            pos = e - 2
            tape[pos] = sym
    tape = {k:tape[k] for k in sorted(tape.keys())}
    return state, tape, fac

# --------------------------
# Protocol: Sender side function
# --------------------------
def build_handshake_and_payload_for_machine(machine_name, transitions_tuples, alphabet, states, dirs, tape_initial, start_state):
    """
    Build handshake (blueprint) and payload configuration integer.
    Returns handshake dict and payload integer.
    """
    # Choose prime ranges for symbol/state/dir to avoid accidental overlap
    # Symbols: start at prime index 2
    # States: offset at 50
    # Dirs: offset at 100
    s2p = {s: prime_at(i) for i,s in enumerate(alphabet)}  # mapping symbols -> primes
    q2p = {q: prime_at(50 + i) for i,q in enumerate(states)}
    d2p = {d: prime_at(100 + i) for i,d in enumerate(dirs)}

    transitions_encoded = [encode_transition_tuple(t, s2p, q2p, d2p) for t in transitions_tuples]
    handshake = {
        'machine_name': machine_name,
        'symbol2prime': s2p,
        'state2prime': q2p,
        'dir2prime': d2p,
        'transitions': transitions_encoded
    }

    payload_conf = encode_configuration(start_state, tape_initial, q2p, s2p)

    return handshake, payload_conf

# --------------------------
# Receiver: reconstruct machine from handshake
# --------------------------
def reconstruct_machine_from_handshake(handshake):
    s2p = {k:int(v) for k,v in handshake['symbol2prime'].items()}
    q2p = {k:int(v) for k,v in handshake['state2prime'].items()}
    d2p = {k:int(v) for k,v in handshake['dir2prime'].items()}
    trans_ints = [int(x) for x in handshake['transitions']]

    prime2symbol = {v:k for k,v in s2p.items()}
    prime2state  = {v:k for k,v in q2p.items()}
    prime2dir    = {v:k for k,v in d2p.items()}

    def decode_transition_int(T):
        fac = factorize(T, PRIMES)

        # Initialize components
        q = a = qp = b = D = None

        # Collect all state and symbol components, handling exponents
        extracted_states = []
        extracted_symbols = []

        for p, count in fac.items():
            if p in prime2state:
                extracted_states.extend([prime2state[p]] * count)
            elif p in prime2symbol:
                extracted_symbols.extend([prime2symbol[p]] * count)
            elif p in prime2dir:
                D = prime2dir[p] # There should only be one direction prime

        # Sort the extracted lists to ensure consistent assignment if multiple distinct values exist
        # and to handle cases where q=qp or a=b correctly.
        extracted_states.sort()
        extracted_symbols.sort()

        # Assign q, qp, a, b based on the ordered lists
        if len(extracted_states) >= 1:
            q = extracted_states[0]
        if len(extracted_states) >= 2:
            qp = extracted_states[1]

        if len(extracted_symbols) >= 1:
            a = extracted_symbols[0]
        if len(extracted_symbols) >= 2:
            b = extracted_symbols[1]

        return (q, a, qp, b, D)

    transitions = [decode_transition_int(T) for T in trans_ints]
    # Build transition table map: (state,symbol) -> (state',symbol',dir)
    table = {}
    for tup in transitions:
        table[(tup[0], tup[1])] = (tup[2], tup[3], tup[4])

    return {
        'symbol2prime': s2p, 'state2prime': q2p, 'dir2prime': d2p,
        'prime2symbol': prime2symbol, 'prime2state': prime2state, 'prime2dir': prime2dir,
        'transitions_tuples': transitions, 'transition_table': table
    }

# --------------------------
# Challenge-Response: simple verification step
# --------------------------
def sender_issue_challenge(handshake):
    """
    Sender builds a small test configuration and expects receiver to return the result after applying
    one step of the machine (or to compute predicted state).
    We'll create a tiny conf with a single cell and ask receiver to apply the encoded transitions once.
    """
    # create a tiny config: head at pos 0, containing '0' (if '0' in handshake)
    s2p = handshake['symbol2prime']
    q2p = handshake['state2prime']
    # choose test tape: {0: first symbol found that is not 'B' if possible}
    symbols = list(s2p.keys())
    sym0 = symbols[0]
    test_tape = {0: sym0}
    test_state = list(q2p.keys())[0]
    conf = encode_configuration(test_state, test_tape, q2p, s2p)
    return {'challenge_conf': conf, 'expected_tape_symbol': test_tape[0], 'start_state': test_state}

def receiver_answer_challenge(challenge_bundle, machine):
    """
    Receiver decodes challenge conf, applies exactly one step (if possible), and returns resulting conf.
    machine is the reconstructed dict from reconstruct_machine_from_handshake.
    """
    conf = challenge_bundle['challenge_conf']
    s2p = machine['symbol2prime']; q2p = machine['state2prime']
    state, tape, fac = decode_configuration(conf, q2p, s2p)
    # apply one step of TM (if rule exists)
    key = (state, tape.get(0, 'B'))
    table = machine['transition_table']
    if key not in table:
        # can't apply: return same config + note
        return {'response_conf': conf, 'applied': False}
    new_state, write_sym, direction = table[key]
    # write symbol at head
    if write_sym == 'B':
        if 0 in tape: del tape[0]
    else:
        tape[0] = write_sym
    # move head (for challenge we ignore position changes and just encode tape and new state)
    new_conf = encode_configuration(new_state, tape, q2p, s2p)
    return {'response_conf': new_conf, 'applied': True, 'tape_after': tape, 'state_after': new_state}

# --------------------------
# Small TM: bit-flipper (already shown earlier)
# --------------------------
alphabet = ['0','1','B']
states = ['q0','qH']
dirs = ['L','R']

bitflipper_trans = [
    ('q0','0','q0','1','R'),
    ('q0','1','q0','0','R'),
    ('q0','B','qH','B','R'),
]

# Build handshake & payload for bitflipper
handshake_bf, payload_bf = build_handshake_and_payload_for_machine(
    'bitflipper', bitflipper_trans, alphabet, states, dirs,
    tape_initial={0:'0',1:'1',2:'1',3:'0',4:'B'},
    start_state='q0'
)

print("=== Bitflipper handshake snippets ===")
pprint({k: (v if k!='transitions' else 'list(len='+str(len(handshake_bf['transitions']))+')') for k,v in handshake_bf.items()})
print("Encoded initial payload config (int):", payload_bf)

# Simulate handshake send/receive and CR
challenge = sender_issue_challenge(handshake_bf)
reconstructed = reconstruct_machine_from_handshake(handshake_bf)
answer = receiver_answer_challenge(challenge, reconstructed)
print("\nChallenge issued (conf int):", challenge['challenge_conf'])
print("Receiver applied?:", answer['applied'])
print("Receiver returned conf int:", answer['response_conf'])
print("Tape after step (receiver):", answer.get('tape_after'), "state after:", answer.get('state_after'))

# Basic validation: sender can decode response using same maps
state_r, tape_r, _ = decode_configuration(answer['response_conf'], handshake_bf['state2prime'], handshake_bf['symbol2prime'])
print("Sender decodes response -> state:", state_r, "tape:", tape_r)

# --------------------------
# Now: Binary Incrementer TM (more substantial) + handshake + simulation
# We'll design a small deterministic TM that increments a binary number (LSB at position 0) and halts.
#
# States: q0 (scan LSB->MSB, add carry), qc (carry propagate), qH halt
# Alphabet: '0','1','B'
# Strategy (simple design):
# - Start at LSB (position 0) in q0
# - If read 0 -> write 1, go to qH (done)
# - If read 1 -> write 0, move right and stay in carry state qc
# - In qc: if read 1 -> write 0, move right, stay qc; if read 0 -> write 1, go to qH; if read B -> write 1, go to qH
#
# Transition tuples accordingly.
# --------------------------
inc_alphabet = ['0','1','B']
inc_states = ['q0','qc','qH']
inc_dirs = ['L','R']

inc_transitions = [
    ('q0','0','qH','1','R'),   # 0 -> write 1, done
    ('q0','1','qc','0','R'),   # 1 -> write 0, carry
    ('qc','1','qc','0','R'),   # carry through 1s
    ('qc','0','qH','1','R'),   # carry resolves at 0
    ('qc','B','qH','1','R'),   # carry resolves at blank -> add new 1
]

# Build handshake + payload: example input binary number: 1 1 1 (7) -> result should be 0 0 0 1 (8)
payload_tape = {0:'1',1:'1',2:'1',3:'B'}  # LSBit at pos 0
handshake_inc, payload_inc = build_handshake_and_payload_for_machine(
    'bin_incrementer', inc_transitions, inc_alphabet, inc_states, inc_dirs,
    tape_initial=payload_tape, start_state='q0'
)

print("\n=== Binary incrementer handshake summary ===")
print("transitions encoded count:", len(handshake_inc['transitions']))
print("payload int sample:", payload_inc)

# Receiver reconstructs and simulates full TM now (full execution)
recon_inc = reconstruct_machine_from_handshake(handshake_inc)
initial_state, initial_tape, _ = decode_configuration(payload_inc, handshake_inc['state2prime'], handshake_inc['symbol2prime'])
print("\nInitial decoded (receiver): state:", initial_state, "tape:", initial_tape)

# Generic TM simulator (full run)
def simulate_tm_full(state, tape_dict, transition_table, blank='B', max_steps=200):
    head = 0
    steps = 0
    history = []
    # tape_dict is sparse dict pos->symbol
    # we'll ensure we have positive positions; TM moves right for our machines
    while steps < max_steps:
        sym = tape_dict.get(head, blank)
        history.append({'step': steps, 'state': state, 'head': head, 'symbol': sym, 'tape_snapshot': deepcopy(tape_dict)})
        key = (state, sym)
        if key not in transition_table:
            # halt
            break
        ns, write_sym, direction = transition_table[key]
        # write
        if write_sym == 'B':
            if head in tape_dict: del tape_dict[head]
        else:
            tape_dict[head] = write_sym
        # move
        if direction == 'R':
            head += 1
        elif direction == 'L':
            head -= 1
        # update state
        state = ns
        steps += 1
    return state, tape_dict, history

final_state, final_tape, history = simulate_tm_full(initial_state, deepcopy(initial_tape), recon_inc['transition_table'])
print("\nBinary incrementer final state:", final_state)
print("Final tape (sparse):", final_tape)

# Visualize tape per step: show first N steps or all
def visualize_tape_history(history, min_pos=0, max_pos=None, filename='/content/tape_steps.png'):
    # Determine extents
    all_positions = set()
    for h in history:
        all_positions.update(h['tape_snapshot'].keys())
    if not all_positions:
        all_positions = {0}
    minp = min(all_positions) if max_pos is None else min_pos
    maxp = max(all_positions) if max_pos is None else max_pos
    width = maxp - minp + 1
    nsteps = len(history)
    # Build matrix: nsteps x width
    mat = []
    for h in history:
        row = []
        for pos in range(minp, maxp+1):
            row.append(h['tape_snapshot'].get(pos, 'B'))
        mat.append(row)
    # map symbols to colors/ints
    symset = sorted({s for row in mat for s in row})
    cmap = {sym:i for i,sym in enumerate(symset)}
    import numpy as np
    arr = np.array([[cmap[val] for val in row] for row in mat])
    plt.figure(figsize=(max(6, width*0.5), max(4, nsteps*0.25)))
    plt.imshow(arr, aspect='auto', interpolation='nearest')
    # annotate ticks
    plt.yticks(range(len(history)), [f"s{h['step']}|{h['state']}" for h in history])
    plt.xticks(range(width), [str(p) for p in range(minp, maxp+1)])
    # legend
    handles = [plt.Line2D([0],[0], marker='s', color='w', markerfacecolor='k') for _ in symset]
    plt.title("Tape history (rows: steps; cols: tape positions)")
    # simple colorbar-like legend printed
    legend_text = "symbol mapping: " + ", ".join([f"{sym}->{i}" for sym,i in cmap.items()])
    plt.xlabel(legend_text)
    plt.tight_layout()
    plt.savefig(filename)
    plt.close()
    return filename

imgfile = visualize_tape_history(history, filename='/content/tape_steps.png')
print("\nTape history visualization saved to:", imgfile)

# --------------------------
# Compose richer PFUL message that includes:
# - primes signature
# - fibonacci signature
# - PTL blueprint for the incrementer (handshake_inc)
# - initial payload encoded
# - FBL organism example
# --------------------------
F = generate_fibonacci(20)
fbl_example = [F[2], F[3], F[5]]  # small organism example (1,2,5)
pful_message = {
    'primes_signature': [2,3,5,7,11],
    'fibonacci_signature': [F[1],F[2],F[3],F[4],F[5]],
    'ptl_blueprint': handshake_inc,
    'payload_initial_conf': payload_inc,
    'fbl_organism_example': fbl_example,
    'note': 'This PFUL message contains handshake for binary incrementer TM and an FBL organism example.'
}
outpath = '/content/pful_protocol_exchange.json'
with open(outpath, 'w') as f:
    json.dump(pful_message, f, indent=2)
print("\nPFUL protocol exchange saved to:", outpath)

# --------------------------
# CHALLENGE-RESPONSE EXTENSION (optional verification roundtrip)
# Sender requests receiver to run a short test: run the reconstructed TM on a test payload and return result.
# We'll simulate that here and verify the result matches expected outcome.
# For the incrementer example: test payload '1 1 1' should become '0 0 0 1' (8)
test_payload = {0:'1',1:'1',2:'1',3:'B'}
test_conf = encode_configuration('q0', test_payload, handshake_inc['state2prime'], handshake_inc['symbol2prime'])
# Receiver runs machine on test_conf and returns final tape encoding integer
# simulate
recv_recon = reconstruct_machine_from_handshake(handshake_inc)
s_init, tape_init, _ = decode_configuration(test_conf, handshake_inc['state2prime'], handshake_inc['symbol2prime'])
s_fin, tape_fin, hist_test = simulate_tm_full(s_init, deepcopy(tape_init), recv_recon['transition_table'])
print("\nChallenge-Response (run incrementer on test payload):")
print(" initial tape:", tape_init)
print(" final tape:  ", tape_fin)
# Build final tape list representation for readability
if tape_fin:
    maxpos = max(tape_fin.keys())
    final_list = [tape_fin.get(i,'B') for i in range(0, maxpos+1)]
else:
    final_list = []
print(" final tape as list:", final_list)

# Save the result of CR
cr_result = {'test_initial': test_payload, 'test_final': tape_fin, 'final_list': final_list}
with open('/content/cr_result.json', 'w') as f:
    json.dump(cr_result, f, indent=2)

print("\nChallenge-response result saved to /content/cr_result.json")
print("\n=== Demo complete ===")

=== Bitflipper handshake snippets ===
{'dir2prime': {'L': 547, 'R': 557},
 'machine_name': 'bitflipper',
 'state2prime': {'q0': 233, 'qH': 239},
 'symbol2prime': {'0': 2, '1': 3, 'B': 5},
 'transitions': 'list(len=3)'}
Encoded initial payload config (int): 1019142000000

Challenge issued (conf int): 932
Receiver applied?: True
Receiver returned conf int: 2097
Tape after step (receiver): {0: '1'} state after: q0
Sender decodes response -> state: q0 tape: {0: '1'}

=== Binary incrementer handshake summary ===
transitions encoded count: 5
payload int sample: 14331684375

Initial decoded (receiver): state: q0 tape: {3: 'B', 7: '1'}

Binary incrementer final state: q0
Final tape (sparse): {3: 'B', 7: '1'}

Tape history visualization saved to: /content/tape_steps.png

PFUL protocol exchange saved to: /content/pful_protocol_exchange.json

Challenge-Response (run incrementer on test payload):
 initial tape: {3: 'B', 7: '1'}
 final tape:   {3: 'B', 7: '1'}
 final tape as list: ['B', 'B', 'B', '