In [2]:
import json
import os
from typing import List, Tuple
import re

def tokenize(sentence: str) -> List[str]:
    """Tokenizes a sentence into words."""
    return sentence.split()

def extract_labels(src: str, top_decoupled: str) -> List[str]:
    """
    Extracts labels for each token in the input sentence based on the TOP-DECOUPLED field.

    Args:
        src (str): Input sentence.
        top_decoupled (str): Decoupled TOP structure.

    Returns:
        List[str]: List of labels for each token in the input.
    """
    tokens = tokenize(src)
    labels = ["NONE"] * len(tokens)

    # Map of tags to patterns
    tag_patterns = {
        "NUMBER": r"\(NUMBER [^)]+ \)",
        "SIZE": r"\(SIZE [^)]+ \)",
        "STYLE": r"\(STYLE [^)]+ \)",
        "TOPPING": r"\(TOPPING [^)]+ \)",
        "COMPLEX_TOPPING": r"\(COMPLEX_TOPPING .*?\)",
        "QUANTITY": r"\(QUANTITY [^)]+ \)",
        "NOT": r"\(NOT \(TOPPING [^)]+ \)\)"
    }

    # Generate labels for tokens based on patterns
    for tag, pattern in tag_patterns.items():
        matches = re.finditer(pattern, top_decoupled)
        for match in matches:
            span_text = match.group(0)
            span_tokens = tokenize(span_text)
            for token in tokens:
                if token in span_tokens:
                    idx = tokens.index(token)
                    labels[idx] = tag

    return labels

def generate_training_data(input_file: str, output_file: str):
    """
    Generates training data for the BiLSTM model.

    Args:
        input_file (str): Path to the input JSON file.
        output_file (str): Path to save the generated training data.
    """
    with open(input_file, "r") as infile, open(output_file, "w") as outfile:
        for line in infile:
            record = json.loads(line)
            src = record["train.SRC"]
            top_decoupled = record["train.TOP-DECOUPLED"]

            labels = extract_labels(src, top_decoupled)
            training_instance = {
                "text": src,
                "labels": labels
            }
            outfile.write(json.dumps(training_instance) + "\n")

# # File paths
# input_file = "../dataset/PIZZA_train.json"
# output_file = "../dataset/training_data_bilstm.json"

# # Generate and save the training data
# generate_training_data(input_file, output_file)

# print(f"Training data saved to {output_file}.")

### Named entity recognition

In [3]:
import json
import re

def tokenize(s):
    # Extract tokens: parentheses or sequences of non-whitespace, non-parenthesis characters.
    tokens = re.findall(r'\(|\)|[^\s()]+', s)
    return tokens

def parse_tokens(tokens):
    # Parse tokens into a nested list structure
    stack = []
    current_list = []
    for token in tokens:
        if token == '(':
            stack.append(current_list)
            current_list = []
        elif token == ')':
            finished = current_list
            current_list = stack.pop()
            current_list.append(finished)
        else:
            current_list.append(token)
    return current_list

# Keys that indicate entities we want to label
ENTITY_KEYS = {
    "NUMBER", "SIZE", "STYLE", "TOPPING", "COMPLEX_TOPPING", "QUANTITY",
    "VOLUME", "DRINKTYPE", "CONTAINERTYPE"
}

# Keys that indicate higher-level structures (orders)
ORDER_KEYS = {"PIZZAORDER", "DRINKORDER"}

def label_entity_tokens(values, entity_type, negated=False):
    """
    Given a list of tokens (values) inside an entity,
    return a list of (token, label) pairs with B-/I- tagging.

    entity_type: e.g. "NUMBER", "TOPPING", etc.
    negated: boolean, if True we prepend "NEG_" to the entity_type
    """
    if negated:
        prefix = "NEG_"
    else:
        prefix = ""

    labels = []
    for i, val in enumerate(values):
        tag = "B-" + prefix + entity_type if i == 0 else "I-" + prefix + entity_type
        labels.append((val, tag))
    return labels

def flatten_nested_structure(structure, order_context=None, negated=False):
    """
    Recursively descend through the parsed structure and produce (token, label) pairs.
    All tokens in structure are from the same tokenization as the original input.

    order_context: tracks if we are inside PIZZAORDER or DRINKORDER
    negated: tracks if we are inside a NOT block
    """
    if not isinstance(structure, list):
        # It's a single token (string)
        # If it's not an entity value token, it's just structure or unknown => O
        # We'll label it as O
        return [(structure, "O")]

    # structure is a list
    # The first element might be a key like ORDER, PIZZAORDER, NUMBER, etc., or might be nested.
    if len(structure) == 0:
        return []

    first = structure[0]
    if isinstance(first, list):
        # This means we don't have a key at the start; just nested lists.
        # Flatten them recursively.
        result = []
        for elem in structure:
            result.extend(flatten_nested_structure(elem, order_context=order_context, negated=negated))
        return result
    else:
        # first is a token (string)
        key = first
        
        if key == "VOLUME":
            key = "SIZE"

        if key == "ORDER":
            # Just go through its children
            result = [(key, "O")]
            for elem in structure[1:]:
                result.extend(flatten_nested_structure(elem, order_context=None, negated=False))
            return result

        elif key in ORDER_KEYS:
            # Entering a pizza or drink order. We label the key itself as O.
            # Inside this, we find its fields.
            new_order_context = key  # "PIZZAORDER" or "DRINKORDER"
            result = [(key, "O")]
            for elem in structure[1:]:
                result.extend(flatten_nested_structure(elem, order_context=new_order_context, negated=negated))
            return result

        elif key == "NOT":
            # Negation block. We set negated=True inside this.
            result = [(key, "O")]
            for elem in structure[1:]:
                result.extend(flatten_nested_structure(elem, order_context=order_context, negated=True))
            return result

        elif key in ENTITY_KEYS:
            # This is an entity. Extract its values (non-list tokens).
            # Example: (NUMBER one), (TOPPING cheese), (STYLE thin crust)
            # We'll consider everything after key that is a token as part of this entity's value.
            result = [(key, "O")]
            entity_tokens = []
            for elem in structure[1:]:
                if isinstance(elem, list):
                    # Complex entity may have nested (e.g. COMPLEX_TOPPING)
                    # Flatten it and extract from subentities
                    sub_result = flatten_nested_structure(elem, order_context=order_context, negated=negated)
                    # sub_result might contain multiple tokens. They are already labeled, but we want them labeled as part of this entity.
                    # Actually, for COMPLEX_TOPPING, we have QUANTITY and TOPPING inside it.
                    # It's safer to just pass through and let sub-entities handle their own labeling.
                    result.extend(sub_result)
                else:
                    # It's a direct token value
                    entity_tokens.append(elem)

            # If entity_tokens are present directly under this key, label them:
            if entity_tokens:
                # The entity_type should reflect the key.
                # For COMPLEX_TOPPING, we actually break it down into QUANTITY and TOPPING subfields,
                # but if there's a direct token (there shouldn't be normally), treat it similarly.
                # Usually COMPLEX_TOPPING breaks down into more entities. If so, entity_tokens here might be empty.
                entity_type = key
                # label these entity tokens according to entity_type
                labeled = label_entity_tokens(entity_tokens, entity_type, negated=negated)
                result.extend(labeled)

            return result

        else:
            # This is not a recognized key. It's probably a token or extra word not fitting into a known entity.
            # Label as O.
            result = [(key, "O")]
            for elem in structure[1:]:
                result.extend(flatten_nested_structure(elem, order_context=order_context, negated=negated))
            return result

def post_process_complex_and_neg(result):
    """
    After the first pass, we might have labeled QUANTITY and TOPPING inside a NOT or COMPLEX_TOPPING as if they were separate.
    But we've already handled negation inline.
    This step might be optional if we've handled negation tagging inline.
    """

    # In this implementation, we already handle negation inline, so no additional step needed.
    return result

def label_input(text):
    tokens = tokenize(text)
    parsed = parse_tokens(tokens)

    # Flatten and label
    labeled = []
    for elem in parsed:
        labeled.extend(flatten_nested_structure(elem, order_context=None, negated=False))

    # Post-process if needed
    labeled = post_process_complex_and_neg(labeled)
    
    # remove all tuples where the first element is a ENTITY_KEY or ORDER/PizzaOrder/DrinkOrder
    labeled = [x for x in labeled if x[0] not in ENTITY_KEYS and x[0] not in ORDER_KEYS and x[0] != "ORDER"]

    return labeled

# Example:
input_str = "(ORDER i need (PIZZAORDER (NUMBER a ) (SIZE medium ) (TOPPING ham ) and (TOPPING pineapple ) pizza ) and (DRINKORDER (NUMBER a ) (VOLUME small ) (DRINKTYPE iced tea ) ) )"

labeled_output = label_input(input_str)
labeled_output


[('i', 'O'),
 ('need', 'O'),
 ('a', 'B-NUMBER'),
 ('medium', 'B-SIZE'),
 ('ham', 'B-TOPPING'),
 ('and', 'O'),
 ('pineapple', 'B-TOPPING'),
 ('pizza', 'O'),
 ('and', 'O'),
 ('a', 'B-NUMBER'),
 ('small', 'B-SIZE'),
 ('iced', 'B-DRINKTYPE'),
 ('tea', 'I-DRINKTYPE')]

In [4]:
# get the number of tokens in the largest sentence in the dev set
# max_len = 0
# with open("../dataset/PIZZA_train.json", "r") as f:
#     for line in f:
#         record = json.loads(line)
#         src = record["train.SRC"]
#         tokens = tokenize(src)
#         if len(tokens) > max_len:
#             max_len = len(tokens)
            
# print(max_len)

### Compare the input to the output and map them

In [5]:
from collections import defaultdict

input_text = "i would like a pizza with onions and ham and tuna please"

# Convert labeled_output to a dictionary mapping tokens to a list of (label, order_number)
label_dict = defaultdict(list)
for tok, lbl in labeled_output:
    label_dict[tok.lower()].append(lbl)

input_tokens = input_text.split()
labeled_input = []

# Track used labels to avoid reusing the same label for the same word
used_labels = defaultdict(int)

for token in input_tokens:
    token_lower = token.lower()
    if token_lower in label_dict:
        # Get the next unused label for this token
        label_index = used_labels[token_lower]
        if label_index < len(label_dict[token_lower]):
            token_label = label_dict[token_lower][label_index]
            used_labels[token_lower] += 1  # Mark this label as used
        else:
            token_label = 'O'  # Default if all labels are used
    else:
        token_label = 'O'  # Default for unmatched tokens

    labeled_input.append((token, token_label))

print(labeled_input)


[('i', 'O'), ('would', 'O'), ('like', 'O'), ('a', 'B-NUMBER'), ('pizza', 'O'), ('with', 'O'), ('onions', 'O'), ('and', 'O'), ('ham', 'B-TOPPING'), ('and', 'O'), ('tuna', 'O'), ('please', 'O')]


In [17]:
ENTITY_KEYS = {
    "NUMBER", "SIZE", "STYLE", "TOPPING", "COMPLEX_TOPPING", "QUANTITY",
    "VOLUME", "DRINKTYPE", "CONTAINERTYPE"
}

# Define a mapping for entity keys to numerical labels
LABEL_MAP = {'B-DRINKTYPE': 1, 'I-DRINKTYPE': 2, 'B-SIZE': 3, 'I-SIZE': 4, 'B-NUMBER': 5, 'I-NUMBER': 6, 'B-CONTAINERTYPE': 7, 'I-CONTAINERTYPE': 8, 'B-COMPLEX_TOPPING': 9, 'I-COMPLEX_TOPPING': 10, 'B-VOLUME': 11, 'I-VOLUME': 12, 'B-TOPPING': 13, 'I-TOPPING': 14, 'B-QUANTITY': 15, 'I-QUANTITY': 16, 'B-STYLE': 17, 'I-STYLE': 18, 'O': 19}

def transform_to_labels(labeled_output):
    numerical_labels = [LABEL_MAP.get(label, LABEL_MAP["O"]) for _, label in labeled_output]
    return numerical_labels


def process_pizza_train(input_file, output_file):
    with open(input_file, 'r') as infile, open(output_file, 'w') as outfile:
        for line in infile:
            record = json.loads(line)
            src_text = record["train.SRC"]
            src_top = record["train.TOP"]
            labeled_output = label_input(src_top)
            numerical_labels = transform_to_labels(labeled_output)
            training_instance = {
                "text": src_text,
                "labels": numerical_labels
            }
            outfile.write(json.dumps(training_instance) + "\n")

# Paths to the input and output files
input_file = "../dataset/PIZZA_train.json"
output_file = "../dataset/PIZZA_train_model2.json"

# Process the data
process_pizza_train(input_file, output_file)

{'B-DRINKTYPE': 1, 'I-DRINKTYPE': 2, 'B-SIZE': 3, 'I-SIZE': 4, 'B-NUMBER': 5, 'I-NUMBER': 6, 'B-CONTAINERTYPE': 7, 'I-CONTAINERTYPE': 8, 'B-COMPLEX_TOPPING': 9, 'I-COMPLEX_TOPPING': 10, 'B-VOLUME': 11, 'I-VOLUME': 12, 'B-TOPPING': 13, 'I-TOPPING': 14, 'B-QUANTITY': 15, 'I-QUANTITY': 16, 'B-STYLE': 17, 'I-STYLE': 18, 'O': 19}


### Define orders

In [7]:
def extract_orders(structure, order_index=1):
    """
    Given the nested parsed structure, find all orders and their tokens.
    Return a list of tuples (word, order_type, order_sequence_number).
    order_index is used to track the sequence number of orders encountered so far.
    """
    results = []

    if not isinstance(structure, list) or len(structure) == 0:
        return results, order_index

    first = structure[0]
    if isinstance(first, list):
        # first is a list, recursively process each sub-element
        for elem in structure:
            sub_results, order_index = extract_orders(elem, order_index)
            results.extend(sub_results)
        return results, order_index

    # Check if first is a recognized order key
    if isinstance(first, str) and first in ORDER_KEYS:
        # We found a new order. Determine the order type.
        order_type = "PIZZAORDER" if first == "PIZZAORDER" else "DRINKORDER"
        
        # We have encountered a new order, so use the current order_index as its sequence
        current_order_sequence = order_index
        order_index += 1  # increment for the next order

        content_tokens = []

        # Collect tokens from this order block (ignoring structural keys)
        for elem in structure[1:]:
            content_tokens.extend(collect_tokens(elem))

        # Create tuples (word, order_type, order_sequence)
        for tok in content_tokens:
            results.append((tok, order_type, current_order_sequence))

        return results, order_index
    else:
        # Not an ORDER key, recursively check elements
        for elem in structure:
            sub_results, order_index = extract_orders(elem, order_index)
            results.extend(sub_results)
        return results, order_index

def collect_tokens(node):
    """
    Collect all non-structural tokens from a node (which could be nested lists).
    """
    collected = []
    if isinstance(node, list):
        for sub in node:
            sub_tokens = collect_tokens(sub)
            collected.extend(sub_tokens)
    else:
        # node is a token (string)
        if node not in ["(", ")"] and not is_structural_key(node):
            collected.append(node)
    return collected

def is_structural_key(token):
    # Keys for structure, not actual content words
    return token in [
        "ORDER","PIZZAORDER","DRINKORDER","NUMBER","SIZE","STYLE","TOPPING",
        "COMPLEX_TOPPING","QUANTITY","VOLUME","DRINKTYPE","CONTAINERTYPE","NOT"
    ]

# Example input
input_str = "(ORDER i need (PIZZAORDER (NUMBER a ) (SIZE medium ) (TOPPING ham ) and (TOPPING pineapple ) pizza ) and (DRINKORDER (NUMBER a ) (VOLUME small ) (DRINKTYPE iced tea ) ) )"

tokens = tokenize(input_str)
parsed = parse_tokens(tokens)

order_info, _ = extract_orders(parsed)
print(order_info)
# Example output might be something like:
# [('i', 'pizza', 1), ('need', 'pizza', 1), ('and', 'pizza', 1), ('pizza', 'pizza', 1),
#  ('and', 'drink', 2), ('iced', 'drink', 2), ('tea', 'drink', 2)]


[('a', 'PIZZAORDER', 1), ('medium', 'PIZZAORDER', 1), ('ham', 'PIZZAORDER', 1), ('and', 'PIZZAORDER', 1), ('pineapple', 'PIZZAORDER', 1), ('pizza', 'PIZZAORDER', 1), ('a', 'DRINKORDER', 2), ('small', 'DRINKORDER', 2), ('iced', 'DRINKORDER', 2), ('tea', 'DRINKORDER', 2)]


In [8]:
from collections import defaultdict

input_text = "i need a medium ham and pineapple pizza and a small iced tea"

# Convert labeled_output to a dictionary mapping tokens to a list of (label, order_number)
label_dict = defaultdict(list)
for tok, lbl, num in order_info:
    label_dict[tok.lower()].append((lbl, num))

input_tokens = input_text.split()
labeled_input = []

# Track used labels to avoid reusing the same label for the same word
used_labels = defaultdict(int)

for token in input_tokens:
    token_lower = token.lower()
    if token_lower in label_dict:
        # Get the next unused label for this token
        label_index = used_labels[token_lower]
        if label_index < len(label_dict[token_lower]):
            token_label, sequence_number = label_dict[token_lower][label_index]
            used_labels[token_lower] += 1  # Mark this label as used
        else:
            token_label, sequence_number = 'O', None  # Default if all labels are used
    else:
        token_label, sequence_number = 'O', None  # Default for unmatched tokens

    labeled_input.append((token, token_label, sequence_number))

print(labeled_input)


[('i', 'O', None), ('need', 'O', None), ('a', 'PIZZAORDER', 1), ('medium', 'PIZZAORDER', 1), ('ham', 'PIZZAORDER', 1), ('and', 'PIZZAORDER', 1), ('pineapple', 'PIZZAORDER', 1), ('pizza', 'PIZZAORDER', 1), ('and', 'O', None), ('a', 'DRINKORDER', 2), ('small', 'DRINKORDER', 2), ('iced', 'DRINKORDER', 2), ('tea', 'DRINKORDER', 2)]
