In [1]:
from typing import List, Union, Optional, Type
from dataclasses import dataclass
from pydantic import BaseModel, Field
import os
import re

os.environ["PYTHONPATH"] = "/home/aksha/Workbench/Research/Labs/e-lab/parser/constrain"

In [2]:
from constrain.grammars.gnbf import GNBF

In [3]:
class ThoughtState(BaseModel):
    thought: str
    goal: str
    tool: str = Field(
        ...,
        description="Choose one of ['Web_QA', 'Web_Search', 'Web_Scraping', 'Web_Automation', 'Web_Research']",
    )
    action: str = Field(
        ..., description="Choose one of ['Create', 'Update', 'Delete', 'Read']"
    )
    action_input: str = Field(..., description="The input data for the action")
    thought_id: list = Field(..., description="The unique identifier for the thought")


print(ThoughtState.schema())
print("-------------------")
converter = GNBF(ThoughtState.schema())
grammar = converter.generate_grammar()
converter.verify_grammar(grammar)

{'title': 'ThoughtState', 'type': 'object', 'properties': {'thought': {'title': 'Thought', 'type': 'string'}, 'goal': {'title': 'Goal', 'type': 'string'}, 'tool': {'title': 'Tool', 'description': "Choose one of ['Web_QA', 'Web_Search', 'Web_Scraping', 'Web_Automation', 'Web_Research']", 'type': 'string'}, 'action': {'title': 'Action', 'description': "Choose one of ['Create', 'Update', 'Delete', 'Read']", 'type': 'string'}, 'action_input': {'title': 'Action Input', 'description': 'The input data for the action', 'type': 'string'}, 'thought_id': {'title': 'Thought Id', 'description': 'The unique identifier for the thought', 'type': 'array', 'items': {}}}, 'required': ['thought', 'goal', 'tool', 'action', 'action_input', 'thought_id']}
-------------------
['string', 'number', 'bool', 'none']


from_string grammar:
root ::= thoughtstate ws 
thoughtstate ::= [{] ws ["] [t] [h] [o] [u] [g] [h] [t] ["] [:] ws string [,] ws ["] [g] [o] [a] [l] ["] [:] ws string [,] ws ["] [t] [o] [o] [l] ["] [:] ws string [,] ws ["] [a] [c] [t] [i] [o] [n] ["] [:] ws string [,] ws ["] [a] [c] [t] [i] [o] [n] [-] [i] [n] [p] [u] [t] ["] [:] ws string [,] ws ["] [t] [h] [o] [u] [g] [h] [t] [-] [i] [d] ["] [:] ws array [}] ws 
ws ::= ws_5 
string ::= ["] string_6 ["] ws 
array ::= [[] ws array_27 []] ws 
ws_5 ::= [ <U+0009><U+000A>] ws_5 | 
string_6 ::= string_7 
string_7 ::= [^"] string_7 | 
number ::= number_9 number_15 number_19 ws 
number_9 ::= number_10 number_11 
number_10 ::= [-] | 
number_11 ::= [0-9] | [1-9] number_12 
number_12 ::= [0-9] number_12 | 
number_13 ::= [.] number_14 
number_14 ::= [0-9] number_14 | [0-9] 
number_15 ::= number_13 | 
number_16 ::= [ee] number_17 number_18 
number_17 ::= [-+] | 
number_18 ::= [0-9] number_18 | [0-9] 
number_19 ::= number_16 | 
bool ::= bool_21 ws 

<llama_cpp.llama_grammar.LlamaGrammar at 0x7fab1946a3a0>

In [4]:
print(grammar)

root ::= thoughtstate ws
thoughtstate ::= "{" ws "\"thought\":" ws string "," ws "\"goal\":" ws string "," ws "\"tool\":" ws string "," ws "\"action\":" ws string "," ws "\"action-input\":" ws string "," ws "\"thought-id\":" ws array "}" ws
ws ::= [ \t\n]*
string ::= "\""   ([^"]*)   "\"" ws
number ::= ("-"? ([0-9] | [1-9] [0-9]*)) ("." [0-9]+)? ([ee] [-+]? [0-9]+)? ws
bool ::= ("true" | "false") ws
none ::= "none" ws
array ::= "[" ws (
                thought-id-value
                ("," ws thought-id-value)*
            )? "]" ws
thought-id-value ::= string | number | bool | none


In [133]:
import re


class GrammarParser:
    def __init__(self, grammar):
        self.grammar = grammar
        self.rules = {}
        self.parse_grammar()

    def parse_grammar(self):
        # Split the grammar into lines and parse each rule
        lines = self.grammar.strip().split("\n")
        for line in lines:
            non_terminal, expression = line.split("::=")
            non_terminal = non_terminal.strip()
            expression = expression.strip()
            self.rules[non_terminal] = self.parse_grammar_rule(expression)

    def parse_grammar_rule(self, expression):
        elements = []

        # Handle grouping and repetition symbols
        expression = self.process_repetitions_and_grouping(expression)

        # Split the expression into tokens while handling escape sequences correctly
        tokens = re.split(r"(?<!\\)\s+", expression)

        for token in tokens:
            if token.startswith('"') and token.endswith('"'):
                # Remove escape characters for quotes
                token = token[1:-1].replace('\\"', '"')
            elements.append(token)

        return elements

    def process_repetitions_and_grouping(self, expression):
        # This is a placeholder for processing repetition symbols and grouping
        # You would expand this method to handle *, +, ?, and () appropriately
        # For simplicity, this example does not implement the full logic
        print(expression)
        return expression.replace("*", " *").replace("+", " +").replace("?", " ?")


# Example usage
grammar = r"""
root ::= thoughtstate ws
thoughtstate ::= "{" ws "\"thought\":" ws string "," ws "\"goal\":" ws string "," ws "\"tool\":" ws string "," ws "\"action\":" ws string "," ws "\"action-input\":" ws string "," ws "\"thought-id\":" ws string "}" ws
ws ::= [ \t\n]*
string ::= "\""   ([^"]*)   "\"" ws
"""

parser = GrammarParser(grammar)
print(parser.rules)

thoughtstate ws
"{" ws "\"thought\":" ws string "," ws "\"goal\":" ws string "," ws "\"tool\":" ws string "," ws "\"action\":" ws string "," ws "\"action-input\":" ws string "," ws "\"thought-id\":" ws string "}" ws
[ \t\n]*
"\""   ([^"]*)   "\"" ws
{'root': ['thoughtstate', 'ws'], 'thoughtstate': ['{', 'ws', '"thought":', 'ws', 'string', ',', 'ws', '"goal":', 'ws', 'string', ',', 'ws', '"tool":', 'ws', 'string', ',', 'ws', '"action":', 'ws', 'string', ',', 'ws', '"action-input":', 'ws', 'string', ',', 'ws', '"thought-id":', 'ws', 'string', '}', 'ws'], 'ws': ['[', '\\t\\n]', '*'], 'string': ['"', '([^"]', '*)', '"', 'ws']}


In [134]:
class Token:
    def __init__(
        self,
        value,
        token_type,
        repetition=None,
        enclosing_char=None,
        assignment_char=None,
    ):
        self.value = value
        self.token_type = token_type  # 'terminal', 'non-terminal', 'literal'
        self.repetition = repetition  # '*', '+', '?'
        self.enclosing_char = enclosing_char  # '"', etc.
        self.assignment_char = assignment_char  # ':', '='

    def __repr__(self):
        return f"Token({self.value}, {self.token_type}, {self.repetition}, {self.enclosing_char}, {self.assignment_char})"


class GrammarParser:
    def __init__(self, grammar):
        self.grammar = grammar
        self.rules = {}
        self.parse_grammar()

    def parse_grammar_rule(self, expression):
        elements = []

        # Updated to handle tokens
        tokens = re.split(r"(?<!\\)\s+", expression)
        carry = None
        for token in tokens:
            if carry:
                token = carry + " " + token
                carry = None
            parsed_token = self.parse_token(token)
            if type(parsed_token) == str:
                carry = parsed_token
            if type(parsed_token) == list:
                elements.extend(parsed_token)
            if parsed_token and not carry:  # Ignore None returned for grouping symbols
                elements.append(parsed_token)

        return elements

    def parse_grammar(self):
        lines = self.grammar.strip().split("\n")
        for line in lines:
            non_terminal, expression = line.split("::=")
            non_terminal = non_terminal.strip()
            self.rules[non_terminal] = {}

        for line in lines:
            non_terminal, expression = line.split("::=")
            non_terminal = non_terminal.strip()
            expression = expression.strip()

            self.rules[non_terminal] = self.parse_grammar_rule(expression)

    def parse_token(self, element):
        if len(element) == 1:
            return element

        repetition = None
        if element.endswith(("*", "+", "?")):
            repetition = element[-1]
            element = element[:-1]

        # Check for literal enclosed in quotes
        if element.startswith('"') and element.endswith('"'):
            assignment_char = None
            if element[:-1].endswith(":"):
                assignment_char = ":"
            elif element[:-1].endswith("="):
                assignment_char = "="

            element = element[1:-1].replace(r"\"", '"')
            if assignment_char:
                element = element.replace(f"{assignment_char}", "")

            if ("(" in element and ")" in element) or (
                element.endswith(("*", "+", "?"))
            ):
                return self.parse_token(element.replace(")", "").replace("(", ""))

            if element in "{}./.,><!@#$%&":
                return Token(element, "terminal", repetition=repetition)

            if len(element) > 1:
                element = element.replace('"', "")
            else:
                element
            return Token(
                element,
                "literal",
                repetition=repetition,
                enclosing_char='"',
                assignment_char=assignment_char,
            )

        if element in self.rules.keys():
            return Token(element, "non-terminal", repetition=repetition)

        if element.startswith("[") and element.endswith("]"):
            return Token(element, "literal", repetition=repetition)
        elif element in ("(", ")"):
            print("ye", element)

        return Token(element, "terminal", repetition=repetition)


grammar = r"""
root ::= thoughtstate ws
thoughtstate ::= "{" ws "\"thought\":" ws string "," ws "\"goal\":" ws string "," ws "\"tool\":" ws string "," ws "\"action\":" ws string "," ws "\"action-input\":" ws string "," ws "\"thought-id\":" ws string "}" ws
ws ::= [ \t\n]+
string ::= "\"" ( [^"\\] | "\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) )* "\""
"""

parser = GrammarParser(grammar)
for key, value in parser.rules.items():
    print(f"{key}: {value}")

ye )
root: [Token(thoughtstate, non-terminal, None, None, None), Token(ws, non-terminal, None, None, None)]
thoughtstate: [Token({, terminal, None, None, None), Token(ws, non-terminal, None, None, None), Token(thought, literal, None, ", :), Token(ws, non-terminal, None, None, None), Token(string, non-terminal, None, None, None), Token(,, terminal, None, None, None), Token(ws, non-terminal, None, None, None), Token(goal, literal, None, ", :), Token(ws, non-terminal, None, None, None), Token(string, non-terminal, None, None, None), Token(,, terminal, None, None, None), Token(ws, non-terminal, None, None, None), Token(tool, literal, None, ", :), Token(ws, non-terminal, None, None, None), Token(string, non-terminal, None, None, None), Token(,, terminal, None, None, None), Token(ws, non-terminal, None, None, None), Token(action, literal, None, ", :), Token(ws, non-terminal, None, None, None), Token(string, non-terminal, None, None, None), Token(,, terminal, None, None, None), Token(ws, non-

In [75]:
class GrammarStack:
    def __init__(self, grammar_rules):
        self.grammar_rules = grammar_rules
        self.stack = []
        self.initialize_stack("root")

    def initialize_stack(self, rule_name):
        """Recursively traverse grammar rules to initialize the stack."""
        rule_tokens = self.grammar_rules[rule_name]
        token = rule_tokens[0]
        idx = 0
        if token.token_type == "non-terminal":
            self.stack.append((rule_name, idx))
            self.initialize_stack(token.value)
        else:
            self.stack.append((rule_name, idx))

    def push_next_token_or_rule(self, rule_name, idx):
        """Pushes the next token or rule based on the current index."""
        print("pushing", rule_name, idx)
        if idx < len(self.grammar_rules[rule_name]) - 1:
            self.stack.pop()  # Pop the completed rule
            self.stack.append((rule_name, idx + 1))
            print("pushed", rule_name, idx + 1)

            if self.grammar_rules[rule_name][idx + 1].token_type == "non-terminal":
                self.push_next_token_or_rule(
                    self.grammar_rules[rule_name][idx + 1].value, 0
                )
        elif rule_name not in [x[0] for x in self.stack]:
            print("pushed", rule_name, idx)
            self.stack.append((rule_name, idx))
        else:
            # If it was the last token, pop the current rule and update the parent rule
            self.stack.pop()  # Pop the completed rule
            if self.stack:
                self.push_next_token_or_rule(*self.stack[-1])  # Update parent rule

    def update_stack_with_valid_substring(self, valid_substring):
        if not self.stack:
            return

        current_rule, idx = self.stack[-1]
        current_tokens = self.grammar_rules[current_rule]
        current_token = current_tokens[idx]

        # Check if the valid_substring matches the current token's value
        if valid_substring == current_token.value:
            # If match found, move to the next token in the current rule
            self.push_next_token_or_rule(current_rule, idx)
        else:
            # Handle the case where a closing token might complete a non-terminal
            # And needs to return to the parent rule
            for i in range(len(self.stack) - 1, -1, -1):
                rule_name, token_idx = self.stack[i]
                tokens = self.grammar_rules[rule_name]

                if (
                    token_idx < len(tokens)
                    and tokens[token_idx].value == valid_substring
                ):
                    self.push_next_token_or_rule(rule_name, token_idx)
                    break

        print("stack shit", self.stack)

    def next_expected_token(self):
        if self.stack:
            current_rule, idx = self.stack[-1]
            return self.grammar_rules[current_rule][idx]
        return None

In [76]:
stack = GrammarStack(parser.rules)

In [77]:
stack.stack

[('root', 0), ('thoughtstate', 0)]

In [185]:
class GrammarParser:
    def __init__(self, grammar_stack, input_string):
        """
        Initializes the parser with a grammar stack and the input string.

        Args:
            grammar_stack (GrammarStack): An instance of GrammarStack managing the parsing context.
            input_string (str): The string to be parsed.
        """
        self.grammar_stack = grammar_stack
        self.input_string = input_string

    def convert_grammar_to_regex(self, token):
        """
        Converts a grammar token's value to a regex pattern, adding support for 
        enclosing characters and assignment characters.

        Args:
            token (Token): The grammar token to be converted to a regex pattern.

        Returns:
            str: A regex pattern derived from the token's characteristics.
        """

        # Start with the token's value, assuming it might be a character class or literal
        regex_pattern = token.value

        # If the token represents a pattern (e.g., whitespace, character class)

        # If an enclosing character is specified, ensure it is present around the token
        if token.enclosing_char:
            # Escape the enclosing character if it's a special regex character
            escaped_enclosing_char = token.enclosing_char
            regex_pattern = f"{escaped_enclosing_char}{regex_pattern.replace(
                escaped_enclosing_char, '')}{escaped_enclosing_char}"
        if token.token_type == 'literal':
            if token.repetition:
                regex_pattern += token.repetition
        # If an assignment character is specified, ensure it is present (with an optional space after)
        if token.assignment_char:
            # Escape the assignment character if it's a special regex character
            escaped_assignment_char = re.escape(token.assignment_char)
            regex_pattern += f"{escaped_assignment_char}\\s?"

        return regex_pattern

    def parse_input(self):
        """
        Iterates through the input string, matches tokens against the grammar stack,
        and updates the stack based on the parsing progression. Handles mismatches
        and enforces grammar rules as needed.
        """
        current_idx = 0
        expected_token = self.grammar_stack.next_expected_token()
        while current_idx < len(self.input_string):
            print('expected_token ', expected_token)

            self.grammar_stack.push_next_token_or_rule(
                *self.grammar_stack.stack[-1])
            next_expected_token = self.grammar_stack.next_expected_token()
            print('next_expected_token ', next_expected_token)

            if not next_expected_token:
                end_idx = len(self.input_string)
            else:
                # From the current index, keep incrementing the end index to find a matching token with next_expected_token using regex
                end_idx = current_idx + 1
                print(current_idx, end_idx)
                while end_idx <= len(self.input_string):
                    print('endidx', end_idx)
                    actual_substring = self.input_string[current_idx:end_idx]
                    print(f'whle checking |{actual_substring}|')
                    match, match_end_obj = self.match_token(
                        next_expected_token, actual_substring)
                    if match:
                        if type(match_end_obj) == re.Match:
                            end_idx = current_idx + match_end_obj.start()
                        else:
                            end_idx = current_idx + match_end_obj
                        break
                    else:
                        end_idx += 1
                print(current_idx, end_idx)

            # Determine the end index for the current token match attempt
            actual_substring = self.input_string[current_idx:end_idx]
            print(f'actual_substring |{actual_substring}|')
            match, end_obj = self.match_token(expected_token, actual_substring)

            if match:
                print('matched ', self.input_string[current_idx:end_idx])
                current_idx += end_obj.end() if type(end_obj) == re.Match else end_obj
                expected_token = self.grammar_stack.next_expected_token()
                # self.grammar_stack.push_next_token_or_rule(*self.grammar_stack.stack[-1])
            else:
                # Handle mismatch, including potential enforcement of missing tokens
                self.handle_mismatch(expected_token, actual_substring)
                break

            current_idx = end_idx

            expected_token = next_expected_token
            print('DONE WITH ', actual_substring)
            print('-----------------------------')
            print('stack ', self.grammar_stack.stack)

    def match_token(self, expected_token, actual_substring) -> (bool, int):
        """
        Checks if the actual substring matches the expected token and returns
        the index up to which it matches.

        Args:
            expected_token (Token): The token expected based on the current grammar context.
            actual_substring (str): The substring from the input string being examined.

        Returns:
            (bool, int): A tuple where the first element is True if the substring matches the expected token, 
                        False otherwise, and the second element is the index of the string up to which 
                        it can match or the index of mismatch.
        """

        if expected_token.token_type == 'terminal':
            match = expected_token.value == actual_substring[-1]
            idx = len(actual_substring) - 2 if match else 0
            if idx < 0:
                idx = 0
            print('matching terminal')
            if match:
                print('matched')
            return (match, idx)

        elif expected_token.token_type == 'literal':
            # Use regex for literals, particularly for pattern tokens
            pattern = self.convert_grammar_to_regex(expected_token)
            if '\' ' in pattern:
                pattern = re.escape(pattern)
            print('pattern', repr(pattern))
            print(f'matching |{actual_substring}|')
            match_obj = re.search(pattern, actual_substring)

            if match_obj:
                print('worked')
                # If there's a match, return True and the end index of the match
                return (True, match_obj)
            else:
                # Find the first character that doesn't match the pattern
                for idx, char in enumerate(actual_substring):
                    if not re.match(pattern, char):
                        return (False, idx)
                return (False, len(actual_substring))

    def handle_mismatch(self, expected_token, actual_substring):
        print('handling mismatch', expected_token, '|', actual_substring)

In [198]:
input_string = '{ "thought": "AI parsing example", }'

# Define grammar rules based on the provided structure
grammar_rules = {
    "root": [Token("thoughtstate", "non-terminal")],
    "thoughtstate": [
        Token("{", "terminal"),
        Token("ws", "non-terminal"),
        Token('"thought"', "literal", None, '"', ":"),
        Token("ws", "non-terminal"),
        Token("string", "non-terminal"),
        Token(",", "terminal"),
        Token("}", "terminal"),
    ],
    "ws": [Token("[ \t\n]", "literal", "+", None, None)],
    "string": [Token(r"\"[0-9a-fA-F]+\"", "literal", "+", None, None)],
}

# Initialize the GrammarStack with the defined grammar rules
grammar_stack = GrammarStack(grammar_rules)

# Initialize the GrammarParser with the grammar stack and the input string
parse = GrammarParser(grammar_stack, input_string)

# Run the parser
parse.parse_input()

expected_token  Token({, terminal, None, None, None)
pushing thoughtstate 0
pushed thoughtstate 1
pushing ws 0
pushed ws 0
next_expected_token  Token([ 	
], literal, +, None, None)
0 1
endidx 1
whle checking |{|
pattern '[ \t\n]+'
matching |{|
endidx 2
whle checking |{ |
pattern '[ \t\n]+'
matching |{ |
worked
0 1
actual_substring |{|
matching terminal
matched
matched  {
DONE WITH  {
-----------------------------
stack  [('root', 0), ('thoughtstate', 1), ('ws', 0)]
expected_token  Token([ 	
], literal, +, None, None)
pushing ws 0
pushing thoughtstate 1
pushed thoughtstate 2
next_expected_token  Token("thought", literal, None, ", :)
1 2
endidx 2
whle checking | |
pattern '"thought":\\s?'
matching | |
endidx 3
whle checking | "|
pattern '"thought":\\s?'
matching | "|
endidx 4
whle checking | "t|
pattern '"thought":\\s?'
matching | "t|
endidx 5
whle checking | "th|
pattern '"thought":\\s?'
matching | "th|
endidx 6
whle checking | "tho|
pattern '"thought":\\s?'
matching | "tho|
endidx 7
wh

In [196]:
re.search(' "thought"', '"thought":')

In [14]:
def calculateMismatchProbability(expected_token, input_string, current_idx, grammar_rules):
    """
    Calculate the mismatch probability between the expected grammar structure and the input string.

    Args:
    expected_token (Token): The current token being considered.
    input_string (str): The complete input string being parsed.
    current_idx (int): The current index in the input string under consideration.
    grammar_rules (dict): The grammar rules defining valid structures.

    Returns:
    float: A probability score indicating the likelihood of a mismatch.
    """

    # Initialize mismatch probability as a float between 0 and 1.
    mismatch_probability = 0.0

    # Check for the next expected structural token based on the current token type
    if expected_token.token_type == 'terminal':
        # For terminals, directly check for their presence at the current index
        if input_string[current_idx:current_idx+len(expected_token.value)] != expected_token.value:
            mismatch_probability += 0.5  # Increase mismatch probability for missing terminal

    elif expected_token.token_type == 'non-terminal':
        # For non-terminals, look ahead to see if the next expected structure matches
        # This requires parsing the next part of the input based on the non-terminal's definition

        # next_structure = grammar_rules[expected_token.value][0]  # Simplification: consider the first token of the rule
        lookahead_idx, found_mismatch = lookahead_and_evaluate_mismatch(
            input_string, current_idx, grammar_rules)
        if found_mismatch:
            # Increase mismatch probability for missing non-terminal structure
            mismatch_probability += 0.5

    # Adjust probability based on specifics, like missing enclosing or assignment chars
    if expected_token.enclosing_char and (input_string[current_idx] != expected_token.enclosing_char):
        mismatch_probability += 0.25  # Adjust for missing enclosing char

    if expected_token.assignment_char:
        # Look for the assignment character near the current position
        if input_string[current_idx:current_idx+2].find(expected_token.assignment_char) == -1:
            mismatch_probability += 0.25  # Adjust for missing assignment char

    # Cap the probability at 1
    mismatch_probability = min(mismatch_probability, 1.0)

    return mismatch_probability


def lookahead_and_evaluate_mismatch(input_string, current_idx, grammar_stack):
    expected_token = grammar_stack.next_expected_token()
    lookahead_idx = current_idx
    found_mismatch = False

    while lookahead_idx < len(input_string) and expected_token:
        lookahead_segment = input_string[current_idx:lookahead_idx+1]
        if expected_token.value in lookahead_segment:
            # Found the expected token value in the lookahead segment
            return lookahead_idx, False  # Return the index and mismatch status

        lookahead_idx += 1

    # If the loop completes without finding the expected token,
    # it indicates a potential mismatch.
    found_mismatch = True

    # Evaluate mismatch probability (simplified for demonstration)
    # In practice, this would involve more complex logic based on grammar rules
    # and potentially correcting the mismatch by inserting or skipping tokens.
    if found_mismatch:
        # Placeholder for mismatch handling logic
        print(f"Mismatch found at index {
              lookahead_idx}. Expected token: {expected_token.value}")

    return lookahead_idx, found_mismatch

In [15]:
grammar_rules = {
    "root": [Token("thoughtstate", "non-terminal")],
    "thoughtstate": [
        Token("{", "terminal"),
        Token("ws", "non-terminal"),
        Token("string", "non-terminal"),
        Token(":", "terminal"),
        Token("string", "non-terminal"),
        Token("}", "terminal"),
    ],
    "ws": [Token("[ \t\n]", "literal", "*")],
    # Simplified regex-like representation for demonstration
    "string": [Token('"[^"]*"', "literal")],
}

# Sample input string
input_string = '{"key": "value"}'

# Assume we're starting at the beginning of the string, looking for the thoughtstate structure
current_idx = 1

# The expected token at the beginning of a thoughtstate (assuming we're parsing a thoughtstate structure)
# This should be the "{" token
expected_token = grammar_rules["thoughtstate"][1]
print(expected_token)

Token(ws, non-terminal, None, None, None)


In [16]:
# Call the function with the sample fields
mismatch_probability = calculateMismatchProbability(
    expected_token, input_string, current_idx, stack
)

# Print the mismatch probability
print(f"Mismatch Probability: {mismatch_probability}")

Mismatch found at index 16. Expected token: {
Mismatch Probability: 0.5


In [17]:
from transformers import AutoTokenizer
import transformers
import torch

model = "mistralai/Mixtral-8x7B-Instruct-v0.1"

tokenizer = AutoTokenizer.from_pretrained(model)

In [18]:
text = """ 
```
{
"ThoughtState": {
"thought": "Vladimir Putin is the current President of Russia.",
"goal": "To provide information about Vladimir Putin.",
"tool": "Web_Search",
"action": "Read",
"action_input": "Vladimir Putin biography",
"thought_id": "12345"
    }
}
```
"""

In [19]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch


tokenizer = AutoTokenizer.from_pretrained(
    "microsoft/DialoGPT-medium", padding_side="left"
)
model = AutoModelForCausalLM.from_pretrained("microsoft/DialoGPT-medium")

# source: https://huggingface.co/microsoft/DialoGPT-medium

# encode the new user input, add the eos_token and return a tensor in Pytorch
new_user_input_ids = tokenizer.encode(
    "Can you generate a simple JSON object?" + tokenizer.eos_token, return_tensors="pt"
)

# append the new user input tokens to the chat history
bot_input_ids = new_user_input_ids

# generated a response while limiting the total chat history to 1000 tokens,
chat_history_ids = model.generate(
    bot_input_ids, max_length=1000, pad_token_id=tokenizer.eos_token_id
)

# pretty print last output tokens from bot
print(
    "DialoGPT: {}".format(
        tokenizer.decode(
            chat_history_ids[:, bot_input_ids.shape[-1] :][0], skip_special_tokens=True
        )
    )
)

  return self.fget.__get__(instance, owner)()
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.


KeyboardInterrupt: 

In [None]:
terminal_tokens_ids = [
    tokenizer.encode(token)[0] for token in ["{", "}", '"', ":", ","]
]

In [None]:
content_boost_factor = 1.1  # Slightly boost content token probabilities
structure_boost_factor = 5  # Significantly boost terminal token probabilities

In [None]:
%pip install accelerate

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model = AutoModelForCausalLM.from_pretrained(
    "mistralai/Mixtral-8x7B-v0.1", device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained("mistralai/Mixtral-8x7B-v0.1")

prompt = "My favourite condiment is"

model_inputs = tokenizer([prompt], return_tensors="pt").to("cuda")
model.to(device)

generated_ids = model.generate(**model_inputs, max_new_tokens=100, do_sample=True)
tokenizer.batch_decode(generated_ids)[0]

Downloading shards:   0%|          | 0/19 [00:00<?, ?it/s]

model-00001-of-00019.safetensors:   0%|          | 0.00/4.89G [00:00<?, ?B/s]

In [None]:
json_string = r'{"key": "value", "array": [1, 2, 3], "bro": {"ayo": "huh"}, "nested": {"another_key": "another_value"}} ohhhh'
JSON.parse(json_string)

{'key': 'value',
 'array': [1, 2, 3],
 'bro': {'ayo': 'huh'},
 'nested': {'another_key': 'another_value'}}

In [None]:
toml_string = """
[products]
name = "Hammer"
sku = 738594937

[products]

[products]
name = "Nail"
sku = 284758393
color = "gray"

[products.sub]
hey = "yo"
"""
print(TOML.parse(toml_string))

{'products': [{'name': 'Hammer', 'sku': 738594937}, {}, {'name': 'Nail', 'sku': 284758393, 'color': 'gray'}], 'products.sub': [{'hey': 'yo'}]}


In [None]:
text = """ 
<studentsList>
    <student id="1">
        <firstName>Greg</firstName>
        <lastName>Dean</lastName>
        <certificate>True</certificate>
        <scores>
            <module1>70</module1>
            <module12>80</module12>
            <module3>90</module3>
        </scores>
    </student>
    <student ind="2">
        <firstName>Wirt</firstName>
        <lastName>Wood</lastName>
        <certificate>True</certificate>
    </student>
</studentsList>
"""

text = """ 
<studentsList>
    <student ind="2">
        <firstName>Wirt</firstName>
        <lastName>Wood</lastName>
        <certificate>True</certificate>
    </student>
</studentsList>
"""

print(json.dumps(parse_xml(text), indent=4))

{
    "studentsList": {
        "student": {
            "attributes": {
                "ind": "2"
            },
            "firstName": {
                "value": "Wirt"
            },
            "lastName": {
                "value": "Wood"
            },
            "certificate": {
                "value": "True"
            }
        }
    }
}


In [None]:
class Complex(BaseModel):
    real: float
    imaginary: float


print(XML.make_format([{"model": Complex}], "single")[0])
print(TOML.make_format([{"model": Complex}], "single")[0])
print(JSON.make_format([{"model": Complex}], "single")[0])

Complex:
```
<Complex>
<real> #float# </real>
<imaginary> #float# </imaginary>
</Complex>
```


Complex:
```
[Complex]
real = # Type: float
imaginary = # Type: float
```



```
{
"Complex": {
real = # Type: float
imaginary = # Type: float
    }
}
```




In [None]:
class TOML:
    @staticmethod
    def make_format(grammars: List[dict], return_sequence: str) -> str:
        grammar, instruct = "", []
        for task in grammars:
            model = task.get("model")
            command = task.get("task_name", "")
            if isinstance(model, list):
                name = "_".join([m.__name__ for m in model])
            else:
                name = model.__name__
                model = [model]
            instruct.append(name)

            fields = ModelParser.extract_fields_with_descriptions(model)
            forma = TOML.generate_prompt_from_fields(fields, nested=True)
            grammar += f"{name}:\n```\n{forma}\n```\n\n"

        return grammar, instruct

    @staticmethod
    def generate_prompt_from_fields(fields_info: dict, nested: bool = False) -> str:
        prompt_lines = []
        for model_name, fields in fields_info.items():
            if nested:
                prompt_lines.append(f"[{model_name}]")
            for var_name, details in fields.items():
                line = f"{var_name} = "
                if details.get("description"):
                    line += f'"{details["description"]}"'
                line += f'# Type: {details["type"]}'
                if str(details.get("default")) not in ["PydanticUndefined", "None"]:
                    line += f', Default: "{details["default"]}"'
                prompt_lines.append(line)
        return "\n".join(prompt_lines)

    @staticmethod
    def _generate_single_model_prompt(
        fields: dict, model_name: str, nested: bool = False
    ) -> dict:
        model_data = {}
        for var_name, details in fields.items():
            model_data[var_name] = {
                "description": details["description"],
                "type": details["type"],
                "default": (
                    details["default"]
                    if str(details.get("default")) != "PydanticUndefined"
                    else None
                ),
            }
        return model_data

    @staticmethod
    def parse_toml(toml_string):
        def parse_section(toml_string, i):
            start = i
            while toml_string[i] != "]":
                i += 1
            key = toml_string[start:i]
            i = skip_whitespace(toml_string, i + 1)
            section = {key: {}}
            while i < len(toml_string) and toml_string[i] not in "[":
                subkey, i = parse_key(toml_string, i)
                i = skip_whitespace(toml_string, i)
                if toml_string[i] == "=":
                    i = skip_whitespace(toml_string, i + 1)
                    value, i = parse_value(toml_string, i)
                    section[key][subkey.replace("\n", "")] = value
                i = skip_whitespace(toml_string, i)
            return section, i

        def parse_key(toml_string, i):
            start = i
            while toml_string[i] not in "=":
                i += 1
            return toml_string[start:i], i

        def parse_value(toml_string, i):
            if toml_string[i] == '"':
                print("starting string")
                return parse_string(toml_string, i + 1)
            elif toml_string[i] == "[":
                print("starting array")
                return parse_array(toml_string, i)
            else:
                return parse_number(toml_string, i)

        def parse_string(toml_string, i):
            start = i
            while toml_string[i] != '"':
                print("char", toml_string[i])
                i += 1
            print("string", toml_string[start:i])
            return toml_string[start:i], i

        def parse_number(toml_string, i):
            start = i
            while toml_string[i] in "0123456789.-":
                i += 1

            val = toml_string[start:i]
            try:
                return int(val), i
            except ValueError:
                return "", i

        def parse_array(toml_string, i):
            array = []
            i = skip_whitespace(toml_string, i + 1)
            while toml_string[i] != "]":
                print("array", toml_string[:i])
                value, i = parse_value(toml_string, i)
                array.append(value)
                i = skip_whitespace(toml_string, i + 1)
            array = [x for x in array if x]
            return array, i + 1

        def skip_whitespace(toml_string, i):
            while i < len(toml_string) and toml_string[i] in " \t\n\r":
                print("skipped IN CONSIDERED", toml_string[i])
                i += 1
            print("skipped", toml_string[:i])
            return i

        i = 0
        storage = {}
        while i < len(toml_string):
            i = skip_whitespace(toml_string, i)
            if i < len(toml_string) and toml_string[i] == "[":
                section, i = parse_section(toml_string, i + 1)
                key = list(section.keys())[0]
                if key in storage:
                    storage[key].append(section[key])
                else:
                    storage[key] = [section[key]]
            else:
                break
        return storage

    @staticmethod
    def parse(text):
        return TOML.parse_toml(text)

In [None]:
txt = """ 
Man
[BudgetPlan]
total_cost = 500
all_items = ["cake", "balloons", "roses", "ice cream"]
[EventSchedule]
start_time = "12:00 PM"
end_time = "4:00 PM"
activities = ["cake cutting", "balloon decoration", "rose gifting", "ice cream party"]
"""

print(TOML.parse(txt))

skipped IN CONSIDERED  
skipped IN CONSIDERED 

skipped  

{}


In [None]:
from transformers import AutoTokenizer


def tokenize_and_count_hf(text, model_name="bert-base-uncased"):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokens = tokenizer.tokenize(text)
    return len(tokens)


# Example usage:
# tokens, count = tokenize_and_count_hf(, 'Hello, world!')
# print(f'Tokens: {tokens}, Count: {count}')

In [None]:
tokens = """ 
```
{
"ThoughtState": {
"thought": "Vladimir Putin is the current President of Russia.",
"goal": "To provide information about Vladimir Putin.",
"tool": "Web_Search",
"action": "Read",
"action_input": "Vladimir Putin biography",
"thought_id": "12345"
    }
}
```
"""

expected_tokens = """ 
{
"ThoughtState": {
"thought": "",
"goal": "",
"tool": "",
"action": "",
"action_input": "",
"thought_id": ""
    }
}
"""

In [None]:
class Decoder: 
    def __init__(self, expected): 
        self.encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
        self.expected_tokens  = self.tokenize(expected)
    
    def tokenize(self, text):
        return self.encoding.encode(text)
        
    def get_next_states(current_state, tokens, expected_tokens):
    # Initialize probabilities for each token in the input string
    token_probabilities = [0] * len(tokens)  # Placeholder for actual probability calculation

    # Determine the type of the next expected token (terminal or non-terminal)
    next_expected_token_type = get_token_type(expected_tokens, current_state)

    # Calculate probabilities for each token in the input string
    for i, token in enumerate(tokens):
        token_type = 'terminal' if token in expected_tokens else 'non-terminal'
        if token_type == next_expected_token_type:
            # Assign higher probability if the token matches the expected type
            token_probabilities[i] += calculate_probability(token, i, tokens, expected_tokens)

    # Choose the most probable next token
    next_token_index = token_probabilities.index(max(token_probabilities))
    next_token = tokens[next_token_index]

    return next_token
    
    def get_next_states(self, current_state, tokens):
        token_probabilities = [self.calculate_probability(token, i, tokens) for i, token in enumerate(tokens)]
        # Choose the token with the highest probability score as the most probable next state
        next_token_index = token_probabilities.index(max(token_probabilities))
        next_token = tokens[next_token_index]
        return next_token, next_token_index

    def decode(self, input_string):
        tokens = self.tokenize(input_string)
        path = []
        current_state = 'root'  # Starting state
        token_index = 0

        while token_index < len(tokens):
            current_token = tokens[token_index]
            next_token, next_token_index = self.get_next_states(current_state, tokens[token_index:])
            path.append(next_token)
            token_index += next_token_index + 1  # Move to the index of the next token

            # Update current_state based on your state transition logic
            # Placeholder: current_state = next_token
            current_state = next_token

            if current_state == 'complete':
                break  # Terminate if the decoding process is complete

        return ' '.join(path)

    def calculate_probability(token, index, tokens, expected_tokens):
        """
        Calculates a probability score for a token based on its match with expected type,
        its closeness to the expected position, and its fit within the expected sequence.
        
        :param token: The current token being evaluated.
        :param index: The index of the current token in the input tokens list.
        :param tokens: The list of all tokens in the input string.
        :param expected_tokens: The list of tokens in the expected format, including placeholders for non-terminal tokens.
        :return: A probability score for the token.
        """
        
        # Constants for weighting different factors
        TYPE_WEIGHT = 0.5
        POSITION_WEIGHT = 0.3
        SEQUENCE_WEIGHT = 0.2

        # Determine if the token is terminal or non-terminal
        is_terminal = token in expected_tokens
        type_score = TYPE_WEIGHT if is_terminal else 0

        # Calculate positional closeness score
        # Assuming expected position is based on the order in expected_tokens
        if token in expected_tokens:
            expected_index = expected_tokens.index(token)
            position_diff = abs(index - expected_index)
        else:
            # If it's a non-terminal, we find the closest terminal token's expected position
            closest_terminal_index = min([i for i, t in enumerate(tokens) if t in expected_tokens], key=lambda x: abs(x - index))
            expected_index = expected_tokens.index(tokens[closest_terminal_index])
            position_diff = abs(index - expected_index)
        
        # Normalize position_diff to a score between 0 and 1, assuming max diff is the length of tokens
        max_diff = len(tokens)
        positional_score = POSITION_WEIGHT * (1 - (position_diff / max_diff))

        # Calculate sequence fit score
        # This simplistic approach checks if the next expected token matches the next actual token
        # More complex logic could involve checking subsequences
        if index + 1 < len(tokens) and tokens[index + 1] in expected_tokens:
            next_expected_token = expected_tokens[expected_tokens.index(token) + 1] if token in expected_tokens else None
            sequence_fit_score = SEQUENCE_WEIGHT if tokens[index + 1] == next_expected_token else 0
        else:
            sequence_fit_score = 0

        # Combine scores for final probability
        probability_score = type_score + positional_score + sequence_fit_score
        return probability_score

    def get_token_type(expected_tokens, current_state):
        if current_state in expected_tokens:
            return 'terminal'
        else:
            return 'non-terminal'

In [None]:
decoder = Decoder(expected_tokens)
decoder.decode(tokens)

TypeError: 'int' object is not iterable

In [None]:
expected = tokenize_and_count_tiktoken(expected_tokens)
real = tokenize_and_count_tiktoken(tokens)

In [None]:
get_next_states(0, real, expected)

72

In [None]:
real[2].decode([72])

'i'

In [None]:
real[0]

[720,
 14196,
 4077,
 517,
 1,
 85269,
 1423,
 794,
 341,
 1,
 61665,
 794,
 330,
 53,
 18599,
 31204,
 21810,
 374,
 279,
 1510,
 4900,
 315,
 8524,
 10560,
 1,
 35039,
 794,
 330,
 1271,
 3493,
 2038,
 922,
 36011,
 21810,
 10560,
 1,
 14506,
 794,
 330,
 6109,
 67013,
 761,
 1,
 1335,
 794,
 330,
 4518,
 761,
 1,
 1335,
 6022,
 794,
 330,
 53,
 18599,
 31204,
 21810,
 48345,
 761,
 1,
 61665,
 851,
 794,
 330,
 4513,
 1774,
 702,
 262,
 457,
 534,
 14196,
 4077]