# In-Browser Decoder Playground

This environment is hosted completely in the browser, and can be used to experiment with the _decoder_ implementation. First, we import `random` and provide a deterministic seed.

In [None]:
import random
random.seed(0)

## Status

We start by providing the status codes that we use in decoder. These are _complete_, _incomplete_, and _incorrect_.

In [None]:
import enum

In [None]:
class Status(enum.Enum):
    Complete = 0
    Incomplete = 1
    Incorrect = -1

## Alphabets
Our algorithm relies on iterating through all possible alphabets of the language; For convenience, we define it as the *printable* subset of ASCII letters.

In [None]:
import string
SET_OF_BYTES = {c for c in string.printable}

In [None]:
def new_byte(choices):
    v = random.choice(choices)
    return v

## Logger

We provide a simple logger.

In [None]:
import sys

In [None]:
def logit(*v):
    print(*v, file=sys.stderr)
    return

## Limits

We define a few limits to the algorithm. In particular, we do not go beyond `ITERATION_LIMIT` and we stop and discard the input if the input crosses `INPUT_LIMIT` without returning *complete*.

In [None]:
ITERATION_LIMIT=10000
INPUT_LIMIT=1000

In [None]:
import itertools

## Exceptions
We need a few exceptions first.

In [None]:
class NeedMoreException(Exception): ...
class InvalidValueException(Exception): ...
class InputLimitException(Exception): ...
class IterationLimitException(Exception): ...
class BacktrackLimitException(Exception): ...

## The Decoder

### Choices

Normally, alphabets are sufficient as concat units for checking validity of prefixes. But sometimes, you need to produce longer concate units.

In [None]:
def till_n_length_choices(my_choices, rs):
    all_choices = []
    for r in range(1, rs+1):
        v = [''.join(i) for i in itertools.product(my_choices, repeat=r)]
        random.shuffle(v)
        all_choices.extend(v)
    return all_choices

In [None]:
till_n_length_choices(string.digits, 1)

In [None]:
till_n_length_choices(string.digits, 2)

### Backtracking

Sometimes you have to backtrack.

In [None]:
def backtrack(prev_bytes, all_choices, seen_at):
    if not prev_bytes:
        raise BacktrackLimitException('Cant backtrack beyond zero index')
    # backtrack one byte
    seen = seen_at[len(prev_bytes)-1]
    seen_at.pop()
    last_byte = prev_bytes[-1]
    logit('backtracking %d %s' % (len(prev_bytes), last_byte))
    #assert (last_byte,) in seen
    prev_bytes = prev_bytes[:-1]
    choices = [i for i in all_choices if i not in seen]
    if not choices:
        return backtrack(prev_bytes, all_choices, seen_at)
    return seen, prev_bytes, choices

### The Decoder algorithm

In [None]:
def generate(validate, prev_bytes=None):
    seen_at = []
    all_choices = SET_OF_BYTES
    if prev_bytes is None: prev_bytes = ''
    seen = set()
    iter_limit = ITERATION_LIMIT
    while iter_limit:
        if len(prev_bytes) > INPUT_LIMIT:
            raise InputLimitException('Exhausted %d bytes' % INPUT_LIMIT)
        iter_limit -= 1
        choices = [i for i in all_choices if i not in seen]
        if not choices:
            seen, prev_bytes, choices = backtrack(prev_bytes, all_choices, seen_at)

        byte = new_byte(choices)
        cur_bytes = prev_bytes + byte
        l_cur_bytes = len(cur_bytes)

        logit('%s %s' % (cur_bytes, len(cur_bytes)))

        rv, n,s = validate(cur_bytes)
        if rv == Status.Complete:
            return cur_bytes
        elif rv == Status.Incomplete:
            seen.add(byte)  # dont explore this byte again
            prev_bytes = cur_bytes
            seen_at.append(seen)
            seen = set()

            # reset this if it was modified by incorrect
            all_choices = SET_OF_BYTES
        elif rv == Status.Incorrect:
            if n is None or n == -1:
                seen.add(byte)
                continue
            else:
                logit("-%s %s" % (len(choices), len(seen)))
                if n < len(seen_at):
                    seen = seen_at[n]
                    seen_at = seen_at[:n]
                seen.add(byte)
                rs = len(cur_bytes) - n
                all_choices = till_n_length_choices(SET_OF_BYTES, min(rs, 2))
                prev_bytes = prev_bytes[:n]
        else:
            raise Exception(rv)
    raise IterationLimitException('Exhausted %d loops' % ITERATION_LIMIT)


## Example Hello

In [None]:
def validate_hello(inputstr):
    try:
        if inputstr[0] != 'H':
            return Status.Incorrect, None, ''
        if inputstr[1] != 'E':
            return Status.Incorrect, None, ''
        if inputstr[2] != 'L':
            return Status.Incorrect, None, ''
        if inputstr[3] != 'L':
            return Status.Incorrect, None, ''
        if inputstr[4] != 'O':
            return Status.Incorrect, None, ''
        return Status.Complete, None, ''
    except IndexError:
        return Status.Incomplete, None, ''

In [None]:
generate(validate_hello)

## Example Paren

In [None]:
def parens(xs):
    stack = [[]]
    while True:
        x, xs = xs[0], xs[1:]
        if x == '(':
            stack[-1].append([])
            stack.append(stack[-1][-1])
        elif x == ')':
            stack.pop()
            if not stack:
                raise Exception('error: opening bracket is missing')
                #raise ValueError('error: opening bracket is missing')
        elif x in '01':
            stack[-1].append(x)
        else:
            raise Exception('error: Only binary numbers')
        if xs == '':
            break
    if len(stack) > 1:
        raise Exception('incomplete: closing bracket is missing')
        #raise ValueError('error: closing bracket is missing')
    return stack.pop()

In [None]:
def validate_parens(input_str):
    try:
        parens(input_str)
        return Status.Complete,-1,""
    except Exception as e:
        msg = str(e)
        if msg.startswith("incomplete:"):
            n = len(msg)
            return Status.Incomplete, None, ""
        elif msg.startswith("error"):
            return Status.Incorrect,None, input_str[-1]
        else:
            raise e

In [None]:
generate(validate_parens)

## Example JSON

In [None]:
import json

In [None]:
JSON_TOKENS = ['true', 'false', 'null']

In [None]:
PREFIX = {}
for token in JSON_TOKENS:
    PREFIX[token] = [token[0:i+1] for i in range(len(token)-1)]

In [None]:
PREFIX

In [None]:
def it_fits(input_str):
    try:
        json.loads(input_str)
        logit('*', repr(input_str))
        return True
    except Exception as e:
        msg = str(e)
        if msg.startswith('Expecting'):
            # Expecting value: line 1 column 4 (char 3)
            n = int(msg.rstrip(')').split()[-1])
            if n >= len(input_str):
                logit('+', repr(input_str))
                return True
        return False

In [None]:
def validate_json(input_str):
    try:
        json.loads(input_str)
        logit('*', repr(input_str))
        return Status.Complete, -1, ''
    except Exception as e:
        msg = str(e)
        if msg.startswith('Expecting'):
            # Expecting value: line 1 column 4 (char 3)
            n = int(msg.rstrip(')').split()[-1])
            # If the error is 'outside' the string, it can still be valid
            if n >= len(input_str):
                logit('+', repr(input_str))
                return Status.Incomplete, n, ''
            elif len(input_str) > 1 and input_str[-1] == '.' and input_str[-2].isdigit():
                # JSON returns incorrect for [3. rather than incomplete.
                return Status.Incomplete, n, ''
            else:
                logit('X', repr(input_str))
                remaining = input_str[n:]
                for word in JSON_TOKENS:
                    if remaining in PREFIX[word]:
                        # check if it fits first.
                        if it_fits(input_str[:n] + word):
                            return Status.Incomplete, n, input_str[n]
                    return Status.Incorrect, None, input_str[n]
                return Status.Incorrect, None, input_str[n]
        elif msg.startswith('Unterminated'):
            # Unterminated string starting at: line 1 column 1 (char 0)
            n = int(msg.rstrip(')').split()[-1])
            if n >= len(input_str):
                logit('+', repr(input_str))
                return Status.Incomplete, n, ''
            else:
                logit('+', repr(input_str))
                return Status.Incomplete, n, input_str[n]
        elif msg.startswith('Extra data'):
            n = int(msg.rstrip(')').split()[-1])
            if n >= len(input_str):
                logit('X', repr(input_str))
                return Status.Incorrect, None, ''
            else:
                logit('X', repr(input_str))
                return Status.Incorrect, None, input_str[n]
        elif msg.startswith('Invalid '):
            idx = msg.find('(char ')
            eidx = msg.find(')')
            s = msg[idx + 6:eidx]
            n = int(s)
            logit('X', repr(input_str))
            return Status.Incorrect, None, input_str[n]
        else:
            raise e

In [None]:
(js_ex := generate(validate_json))

In [None]:
print(json.dumps(json.loads(js_ex), indent=4))

## Example Imprecise Hello

In [None]:
def validate_bthello(inputstr):
    try:
        if inputstr[0] != 'H':
            return Status.Incorrect, 0, ''
        if inputstr[1] != 'E':                                                   
            return Status.Incorrect, 1, ''
        if inputstr[2] != 'L':
            return Status.Incorrect, 2, ''
        v = inputstr[3:5]
        if len(v) != 2: raise IndexError
        if v != 'LO':
            return Status.Incorrect, 3, ''
        return Status.Complete, None, ''
    except IndexError:
        return Status.Incomplete, len(inputstr), ''

In [None]:
SET_OF_BYTES = {c for c in string.ascii_uppercase}

In [None]:
generate(validate_bthello)