# In-Browser Decoder Playground

This environment is hosted completely in the browser, and can be used to experiment with the _decoder_ implementation. Note that many of these programs will fail to terminate in the alloted limits of iteration, input etc. These are expected (especially given WASM+Python which introduces a few orders of magnitude performance hit). We need much less than 1% of these to complete to generate reasonable inputs.

In [None]:
import random

## Status

We start by providing the status codes that we use in decoder. These are _complete_, _incomplete_, and _incorrect_.

In [None]:
import enum

In [None]:
class Status(enum.Enum):
    Complete = 0
    Incomplete = 1
    Incorrect = -1

## Alphabets
Our algorithm relies on iterating through all possible alphabets of the language; For convenience, we define it as the *printable* subset of ASCII letters.

In [None]:
import string
SET_OF_BYTES = {c for c in string.printable}

In [None]:
def new_byte(choices):
    v = random.choice(choices)
    return v

## Logger

We provide a simple logger.

In [None]:
import sys

In [None]:
def logit(*v):
    print(*v, file=sys.stderr)
    return

## Limits

We define a few limits to the algorithm. In particular, we do not go beyond `ITERATION_LIMIT` and we stop and discard the input if the input crosses `INPUT_LIMIT` without returning *complete*.

In [None]:
ITERATION_LIMIT=1000
INPUT_LIMIT=100

In [None]:
import itertools

## Exceptions
We need a few exceptions first.

In [None]:
class NeedMoreException(Exception): ...
class InvalidValueException(Exception): ...
class InputLimitException(Exception): ...
class IterationLimitException(Exception): ...
class BacktrackLimitException(Exception): ...

## The Decoder

### Choices

Normally, alphabets are sufficient as concat units for checking validity of prefixes. But sometimes, you need to produce longer concate units.

In [None]:
def till_n_length_choices(my_choices, rs):
    all_choices = []
    for r in range(1, rs+1):
        v = [''.join(i) for i in itertools.product(my_choices, repeat=r)]
        random.shuffle(v)
        all_choices.extend(v)
    return all_choices

In [None]:
till_n_length_choices(string.digits, 1)

In [None]:
till_n_length_choices(string.digits, 2)

### Backtracking

Sometimes you have to backtrack.

In [None]:
def backtrack(prev_bytes, all_choices, seen_at):
    if not prev_bytes:
        raise BacktrackLimitException('Cant backtrack beyond zero index')
    # backtrack one byte
    seen = seen_at[len(prev_bytes)-1]
    seen_at.pop()
    last_byte = prev_bytes[-1]
    logit('backtracking %d %s' % (len(prev_bytes), last_byte))
    #assert (last_byte,) in seen
    prev_bytes = prev_bytes[:-1]
    choices = [i for i in all_choices if i not in seen]
    if not choices:
        return backtrack(prev_bytes, all_choices, seen_at)
    return seen, prev_bytes, choices

### The Decoder algorithm

In [None]:
def generate(validate, prev_bytes=None, tokens=set()):
    seen_at = []
    alphabet = SET_OF_BYTES | tokens
    all_choices = alphabet
    if prev_bytes is None: prev_bytes = ''
    seen = set()
    iter_limit = ITERATION_LIMIT
    while iter_limit:
        if len(prev_bytes) > INPUT_LIMIT:
            raise InputLimitException('Exhausted %d bytes' % INPUT_LIMIT)
        iter_limit -= 1
        choices = [i for i in all_choices if i not in seen]
        if not choices:
            seen, prev_bytes, choices = backtrack(prev_bytes, all_choices, seen_at)

        byte = new_byte(choices)
        cur_bytes = prev_bytes + byte
        l_cur_bytes = len(cur_bytes)

        logit('%s %s' % (repr(cur_bytes), len(cur_bytes)))

        rv, n,s = validate(cur_bytes)
        if rv == Status.Complete:
            return cur_bytes
        elif rv == Status.Incomplete:
            seen.add(byte)  # dont explore this byte again
            prev_bytes = cur_bytes
            seen_at.append(seen)
            seen = set()

            # reset this if it was modified by incorrect
            all_choices = list(alphabet)
        elif rv == Status.Incorrect:
            if n is None or n == -1:
                seen.add(byte)
                continue
            else:
                logit("-%s %s" % (len(choices), len(seen)))
                if n < len(seen_at):
                    seen = seen_at[n]
                    seen_at = seen_at[:n]
                seen.add(byte)
                rs = len(cur_bytes) - n
                all_choices = till_n_length_choices(list(alphabet), min(rs, 2))
                prev_bytes = prev_bytes[:n]
        else:
            raise Exception(rv)
    raise IterationLimitException('Exhausted %d loops' % ITERATION_LIMIT)


## Example Hello

In [None]:
def conforming_hello(inputstr):
    try:
        if inputstr[0] != 'H':
            return Status.Incorrect, None, ''
        if inputstr[1] != 'E':
            return Status.Incorrect, None, ''
        if inputstr[2] != 'L':
            return Status.Incorrect, None, ''
        if inputstr[3] != 'L':
            return Status.Incorrect, None, ''
        if inputstr[4] != 'O':
            return Status.Incorrect, None, ''
        return Status.Complete, None, ''
    except IndexError:
        return Status.Incomplete, None, ''

In [None]:
generate(conforming_hello)

## Example Paren

In [None]:
def parens(xs):
    stack = [[]]
    while True:
        x, xs = xs[0], xs[1:]
        if x == '(':
            stack[-1].append([])
            stack.append(stack[-1][-1])
        elif x == ')':
            stack.pop()
            if not stack:
                raise Exception('error: opening bracket is missing')
                #raise ValueError('error: opening bracket is missing')
        elif x in '01':
            stack[-1].append(x)
        else:
            raise Exception('error: Only binary numbers')
        if xs == '':
            break
    if len(stack) > 1:
        raise Exception('incomplete: closing bracket is missing')
        #raise ValueError('error: closing bracket is missing')
    return stack.pop()

In [None]:
def conforming_parens(input_str):
    try:
        parens(input_str)
        return Status.Complete,-1,""
    except Exception as e:
        msg = str(e)
        if msg.startswith("incomplete:"):
            n = len(msg)
            return Status.Incomplete, None, ""
        elif msg.startswith("error"):
            return Status.Incorrect,None, input_str[-1]
        else:
            raise e

In [None]:
generate(conforming_parens)

## Example JSON

In [None]:
import json

In [None]:
JSON_TOKENS = ['true', 'false', 'null']

In [None]:
PREFIX = {}
for token in JSON_TOKENS:
    PREFIX[token] = [token[0:i+1] for i in range(len(token)-1)]

In [None]:
PREFIX

In [None]:
def it_fits(input_str):
    try:
        json.loads(input_str)
        logit('*', repr(input_str))
        return True
    except Exception as e:
        msg = str(e)
        if msg.startswith('Expecting'):
            # Expecting value: line 1 column 4 (char 3)
            n = int(msg.rstrip(')').split()[-1])
            if n >= len(input_str):
                logit('+', repr(input_str))
                return True
        return False

In [None]:
def conforming_json(input_str):
    try:
        json.loads(input_str)
        logit('*', repr(input_str))
        return Status.Complete, -1, ''
    except Exception as e:
        msg = str(e)
        if msg.startswith('Expecting'):
            # Expecting value: line 1 column 4 (char 3)
            n = int(msg.rstrip(')').split()[-1])
            # If the error is 'outside' the string, it can still be valid
            if n >= len(input_str):
                logit('+', repr(input_str))
                return Status.Incomplete, n, ''
            elif len(input_str) > 1 and input_str[-1] == '.' and input_str[-2].isdigit():
                # JSON returns incorrect for [3. rather than incomplete.
                return Status.Incomplete, n, ''
            else:
                logit('X', repr(input_str))
                remaining = input_str[n:]
                for word in JSON_TOKENS:
                    if remaining in PREFIX[word]:
                        # check if it fits first.
                        if it_fits(input_str[:n] + word):
                            return Status.Incomplete, n, input_str[n]
                return Status.Incorrect, None, input_str[n]
        elif msg.startswith('Unterminated'):
            # Unterminated string starting at: line 1 column 1 (char 0)
            n = int(msg.rstrip(')').split()[-1])
            if n >= len(input_str):
                logit('+', repr(input_str))
                return Status.Incomplete, n, ''
            else:
                logit('+', repr(input_str))
                return Status.Incomplete, n, input_str[n]
        elif msg.startswith('Extra data'):
            n = int(msg.rstrip(')').split()[-1])
            if n >= len(input_str):
                logit('X', repr(input_str))
                return Status.Incorrect, None, ''
            else:
                logit('X', repr(input_str))
                return Status.Incorrect, None, input_str[n]
        elif msg.startswith('Invalid '):
            idx = msg.find('(char ')
            eidx = msg.find(')')
            s = msg[idx + 6:eidx]
            n = int(s)
            logit('X', repr(input_str))
            return Status.Incorrect, None, input_str[n]
        else:
            raise e

In [None]:
(js_ex := generate(conforming_json))

In [None]:
print(json.dumps(json.loads(js_ex), indent=4))

## Example Imprecise Hello

In [None]:
def conforming_ihello(inputstr):
    try:
        if inputstr[0] != 'H':
            return Status.Incorrect, 0, ''
        if inputstr[1] != 'E':                                                   
            return Status.Incorrect, 1, ''
        if inputstr[2] != 'L':
            return Status.Incorrect, 2, ''
        v = inputstr[3:5]
        if len(v) != 2: raise IndexError
        if v != 'LO':
            return Status.Incorrect, 3, ''
        return Status.Complete, None, ''
    except IndexError:
        return Status.Incomplete, len(inputstr), ''

Unfortunately WASM+Python imposes a really huge overhead. So, we limit our alphabet to have any hope to finish in time.

In [None]:
SET_OF_BYTES = {c for c in string.ascii_uppercase}

In [None]:
generate(conforming_ihello)

## Example MathExpr

In [None]:
class Parser:
    def __init__(self, string, vars={}):
        self.string = string
        self.index = 0
        self.vars = {
            'pi': 3.141592653589793,
            'e': 2.718281828459045
        }
        for var in vars.keys():
            if self.getVarValue(var) != None:
                raise Exception("Cannot redefine the value of " + var)
            self.vars[var] = vars[var]

    def hasVar(self, v):
        for k in self.vars.keys():
            if v == k:
                return True
        return False

    def getVarValue(self, v, default):
        if not self.hasVar(v): return default
        return self.vars[v]

    def getValue(self):
        value = self.parseExpression()
        self.skipWhitespace()
        if self.hasNext():
            raise Exception(
                "Unexpected character found: '" +
                self.peek() +
                "' at index " +
                str(self.index))
        return value

    def peek(self):
        return self.string[self.index:self.index + 1]

    def hasNext(self):
        return self.string[self.index:] != ''

    def skipWhitespace(self):
        while self.hasNext():
            if self.peek() in ' \t\n\r':
                self.index += 1
            else:
                return

    def parseExpression(self):
        return self.parseAddition()

    def parseAddition(self):
        values = [self.parseMultiplication()]
        while True:
            self.skipWhitespace()
            char = self.peek()
            if char == '+':
                self.index += 1
                values.append(self.parseMultiplication())
            elif char == '-':
                self.index += 1
                values.append(-1 * self.parseMultiplication())
            else:
                break
        return sum(values)

    def parseMultiplication(self):
        values = [self.parseParenthesis()]
        while True:
            self.skipWhitespace()
            char = self.peek()
            if char == '*':
                self.index += 1
                values.append(self.parseParenthesis())
            elif char == '/':
                div_index = self.index
                self.index += 1
                denominator = self.parseParenthesis()
                if denominator == 0:
                    raise Exception(
                        "Division by 0 kills baby whales (occured at index " +
                        str(div_index) +
                        ")")
                values.append(1.0 / denominator)
            else:
                break
        value = 1.0
        for factor in values:
            value *= factor
        return value

    def parseParenthesis(self):
        self.skipWhitespace()
        char = self.peek()
        if char == '(':
            self.index += 1
            value = self.parseExpression()
            self.skipWhitespace()
            c = self.peek()
            if c and c != ')':
                raise Exception('Only numbers')
            if self.peek() != ')':
                raise Exception(
                    "No closing parenthesis found at character "
                    + str(self.index))
            self.index += 1
            return value
        else:
            return self.parseNegative()

    def parseNegative(self):
        self.skipWhitespace()
        char = self.peek()
        if char == '-':
            self.index += 1
            return -1 * self.parseParenthesis()
        else:
            return self.parseValue()

    def parseValue(self):
        self.skipWhitespace()
        char = self.peek()
        if char in '0123456789.':
            return self.parseNumber()
        else:
            raise Exception('Only numbers')
            #return self.parseVariable()

    def parseVariable(self):
        self.skipWhitespace()
        var = ''
        while self.hasNext():
            char = self.peek()
            if char.lower() in '_abcdefghijklmnopqrstuvwxyz0123456789':
                var += char
                self.index += 1
            else:
                break

        value = self.getVarValue(var, None)
        if value == None:
            raise Exception( "Unrecognized variable: '" + var + "'")
        return float(value)

    def parseNumber(self):
        self.skipWhitespace()
        strValue = ''
        decimal_found = False
        char = None

        while self.hasNext():
            char = self.peek()
            if char == '.':
                if decimal_found:
                    raise Exception(
                        "Found an extra period in a number at character " +
                        str(self.index) +
                        ". Are you European?")
                decimal_found = True
                strValue += '.'
            elif char in '0123456789':
                strValue += char
            else:
                break
            self.index += 1

        if len(strValue) == 0:
            if char == '' or char is None:
                raise Exception("Unexpected end found")
            else:
                raise Exception(
                    "I was expecting to find a number at character " +
                    str(self.index) +
                    " but instead I found a '" +
                    str(char) +
                    "'. What's up with that?")

        return float(strValue)

In [None]:
def conforming_mathexpr(s):
    try:
        p = Parser(s)
        p.getValue()
        return Status.Complete, None, ''
    except Exception as e:
        msg = str(e)
        if msg.startswith('Unexpected end'):
            return Status.Incomplete, None, ''
        if msg.startswith('Unrecognized variable:'):
            return Status.Incorrect, None, ''
        if msg.startswith('Unexpected character found'):
            return Status.Incorrect, None, ''
        if msg.startswith('Only numbers'):
            return Status.Incorrect, None, ''
        if msg.startswith('No closing parenthesis found'):
            return Status.Incomplete, None, ''
        if msg.startswith('could not convert string to float:'):
            # semantics
            return Status.Complete, None, ''
        if msg.startswith('Cannot redefine the value of '):
            # semantics
            return Status.Complete, None, ''
        if msg.startswith('Division by 0'):
            # semantics
            return Status.Complete, None, ''
        #print(e)
        #print(str(e))
        raise e

We reset the alphabet first.

In [None]:
SET_OF_BYTES = {c for c in string.printable}

In [None]:
(v := generate(conforming_mathexpr))

## PyParsing

In [None]:
import pyparsing
from pyparsing import *

Pyparsing can have numerous alternative parses. So, we limit our iteration, so that our browser does not hang.

In [None]:
ITERATION_LIMIT=1000

In [None]:
INPUT_LIMIT=100

With pyparsing, the parser provides the required information directly. We do not have to do much work.

In [None]:
def conforming_pyparse(expr, s):
    try:
        expr.parseString(s)
        return Status.Complete, None, ''
    except ParseException as e:
        if e.loc < len(s):
            return Status.Incorrect, None, ''
        else:
            return Status.Incomplete, None, ''
        print(e.pstr)
        print(e)

Pyparsing returns incorrect result when escape sequences are involved. So, we filter them out

In [None]:
aword = Word(alphas) + "!"

In [None]:
for my_string in ['ab', 'a\tb', 'a\nb', 'a\\b']:
    try:
        aword.parseString(my_string)
    except ParseException as e:
        print(e.loc, repr(e.pstr))

In [None]:
SET_OF_BYTES = {c for c in string.printable if c not in "\n\t\r\x0b\x0c\\"}

### Hello World

In [None]:
greet = Word(alphas) + "," + Word(alphas) + "!"

In [None]:
def conforming_greet(s):
    return conforming_pyparse(greet, s)

In [None]:
generate(conforming_greet)

### IP Address

With IP address, we limit to ipv4

In [None]:
tests="""#
127.0.0.1                       # The "localhost" IPv4 address
127.0.0.1:80                    # The "localhost" IPv4 address, with a specified port (80)
192.168.0.1                     # private
256.0.0.0                       # invalid, octet > 255 (currently not detected)
"""

def join(args):
    args[0]="".join(args)
    del args[1:]

def replace(val):
    def lambda_replace(args):
        args[0]=val
        del args[1:]
    return lambda_replace
 
def atoi(args): args[0]=int(args[0])
def itohex2(args): args[0]="%02x"%args[0]
 
def hextoi(args): args[0]=int(args[0], 16)
def itohex4(args): args[0]="%04x"%args[0]
 
def assert_in_range(lwb, upb):
    def range_check(args):
        return # turn range checking off
        if args[0] < lwb:
            raise ValueError("value %d < %d"%(args[0], lwb))
        if args[0] > upb:
            raise ValueError("value %d > %d"%(args[0], upb))
    return range_check
 
dot = Literal(".").suppress()("dot"); colon = Literal(":").suppress()("colon")
octet = Word(nums).setParseAction(atoi,assert_in_range(0,255),itohex2)("octet");
 
port = Word(nums).setParseAction(atoi,assert_in_range(0,256*256-1))("port")
ipv4 = (octet + (dot+octet)*3)("addr")
ipv4.setParseAction(join) #,hextoi)
 
ipv4_port = ipv4+colon.suppress()+port
 
ip_fmt = (
           (ipv4_port|ipv4)("ipv4")
         ) + LineEnd()


In [None]:
def conforming_ipaddress(s):
    return conforming_pyparse(ip_fmt, s)

In [None]:
generate(conforming_ipaddress)

### SSN

In [None]:
dash = '-'

ssn_parser = Combine(
  Word(nums, exact=3)
  + dash
  + Word(nums, exact=2)
  + dash
  + Word(nums, exact=4)
)

In [None]:
def conforming_ssn(s):
    return conforming_pyparse(ssn_parser, s)

In [None]:
generate(conforming_ssn)

### BNF

In [None]:
import math
import operator

exprStack = []


def push_first(toks):
    exprStack.append(toks[0])


def push_unary_minus(toks):
    for t in toks:
        if t == "-":
            exprStack.append("unary -")
        else:
            break


bnf = None


def BNF():
    """
    expop   :: '^'
    multop  :: '*' | '/'
    addop   :: '+' | '-'
    integer :: ['+' | '-'] '0'..'9'+
    atom    :: PI | E | real | fn '(' expr ')' | '(' expr ')'
    factor  :: atom [ expop factor ]*
    term    :: factor [ multop factor ]*
    expr    :: term [ addop term ]*
    """
    global bnf
    if not bnf:
        # use CaselessKeyword for e and pi, to avoid accidentally matching
        # functions that start with 'e' or 'pi' (such as 'exp'); Keyword
        # and CaselessKeyword only match whole words
        e = CaselessKeyword("E")
        pi = CaselessKeyword("PI")
        # fnumber = Combine(Word("+-"+nums, nums) +
        #                    Optional("." + Optional(Word(nums))) +
        #                    Optional(e + Word("+-"+nums, nums)))
        # or use provided pyparsing_common.number, but convert back to str:
        # fnumber = ppc.number().addParseAction(lambda t: str(t[0]))
        fnumber = Regex(r"[+-]?\d+(?:\.\d*)?(?:[eE][+-]?\d+)?")
        ident = Word(alphas, alphanums + "_$")

        plus, minus, mult, div = map(Literal, "+-*/")
        lpar, rpar = map(Suppress, "()")
        addop = plus | minus
        multop = mult | div
        expop = Literal("^")

        expr = Forward()
        expr_list = delimitedList(Group(expr))
        # add parse action that replaces the function identifier with a (name, number of args) tuple
        def insert_fn_argcount_tuple(t):
            fn = t.pop(0)
            num_args = len(t[0])
            t.insert(0, (fn, num_args))

        fn_call = (ident + lpar - Group(expr_list) + rpar).setParseAction(
            insert_fn_argcount_tuple
        )
        atom = (
            addop[...]
            + (
                (fn_call | pi | e | fnumber | ident).setParseAction(push_first)
                | Group(lpar + expr + rpar)
            )
        ).setParseAction(push_unary_minus)

        # by defining exponentiation as "atom [ ^ factor ]..." instead of "atom [ ^ atom ]...", we get right-to-left
        # exponents, instead of left-to-right that is, 2^3^2 = 2^(3^2), not (2^3)^2.
        factor = Forward()
        factor <<= atom + (expop + factor).setParseAction(push_first)[...]
        term = factor + (multop + factor).setParseAction(push_first)[...]
        expr <<= term + (addop + term).setParseAction(push_first)[...]
        bnf = expr
    return bnf


In [None]:
def conforming_bnf(s):
    return conforming_pyparse(BNF(), s)

In [None]:
generate(conforming_bnf)

### URL Parser

In [None]:
url_chars = alphanums + '-_.~%+'

fragment  = Combine((Suppress('#') + Word(url_chars)))('fragment')

scheme = oneOf(['http', 'https', 'ftp', 'file'])('scheme')
host = Combine(delimitedList(Word(url_chars), '.'))('host')
port = Suppress(':') + Word(nums)('port')
user_info = (
  Word(url_chars)('username')
  + Suppress(':')
  + Word(url_chars)('password')
  + Suppress('@')
)

query_pair = Group(Word(url_chars) + Suppress('=') + Word(url_chars))
query = Group(Suppress('?') + delimitedList(query_pair, '&'))('query')

path = Combine(
  Suppress('/')
  + OneOrMore(~query + Word(url_chars + '/'))
)('path')

url_parser = (
  scheme
  + Suppress('://')
  + Optional(user_info)
  + host
  + Optional(port)
  + Optional(path)
  + Optional(query)
  + Optional(fragment)
)

In [None]:
def conforming_urls(s):
    return conforming_pyparse(url_parser, s)

Pyparser is bad at correctly accounting for spaces.

In [None]:
try:
    url_parser.parseString('http ://')
except ParseException as e:
    print(e.loc)
    print(str(e))

In [None]:
SET_OF_BYTES = {c for c in string.printable if c not in "\n\t\r\x0b\x0c\\ "}

In [None]:
(v := generate(conforming_urls, tokens={'http', 'https', 'ftp', 'file', '://'}))

In [None]:
(v1 := generate(conforming_urls, prev_bytes=v, tokens={'http', 'https', 'ftp', 'file', '://'}))

In [None]:
(v2 := generate(conforming_urls,  prev_bytes=v1, tokens={'http', 'https', 'ftp', 'file', '://'}))

## End