In [1]:
from transformers import pipeline, AutoModelForTokenClassification, AutoTokenizer

def load_pipeline(model_dir='models/pizza_model_fine_tuned', tokenizer_dir='models/pizza_tokenizer'):
    tokenizer = AutoTokenizer.from_pretrained(tokenizer_dir)
    model = AutoModelForTokenClassification.from_pretrained(model_dir)
    ner_pipeline_instance = pipeline('ner', model=model, tokenizer=tokenizer, aggregation_strategy="none")
    return ner_pipeline_instance





  torch.utils._pytree._register_pytree_node(


In [2]:
def postprocess_entities(ner_results):
    """
    Applies custom post-processing to NER predictions, handling subword tokens and merging them correctly.
    
    Args:
        ner_results (list of dict): Raw NER predictions from the pipeline.
    
    Returns:
        list of str: Post-processed entity labels corresponding to each word in the sentence.
    """
    predicted_labels = [entity['entity'] for entity in ner_results]
    words = [entity['word'] for entity in ner_results]

    connected_words = []
    connected_labels = []
    j = 0
    while j < len(words):
        if words[j].startswith('##'):
            # Remove '##' and append to the previous word without any separator
            if connected_words:
                connected_words[-1] += words[j][2:]
                # Update the label to the current token's label
                connected_labels[-1] = predicted_labels[j]
            else:
                # Edge case: Subword at the beginning (rare)
                connected_words.append(words[j][2:])
                connected_labels.append(predicted_labels[j])
            j += 1
        elif words[j].startswith('#'):
            # Handle single '#' if present (though typically '##' is used)
            if connected_words:
                connected_words[-1] += words[j][1:]
                connected_labels[-1] = predicted_labels[j]
            else:
                # Edge case: Hashtag at the beginning
                connected_words.append(words[j][1:])
                connected_labels.append(predicted_labels[j])
            j += 1
        elif words[j] == "'":
            # Handle apostrophes by merging with the next word
            if connected_words and (j + 1) < len(words):
                connected_words[-1] += ("'" + words[j + 1])
                connected_labels.pop()  # Remove the last label as it's now part of the previous word
                connected_labels.append(predicted_labels[j + 1])
                j += 2
            else:
                # Edge case: Apostrophe at the beginning or no next word
                connected_words.append(words[j])
                connected_labels.append(predicted_labels[j])
                j += 1
        else:
            # Regular word, add to the list
            connected_words.append(words[j])
            connected_labels.append(predicted_labels[j])
            j += 1

    # Custom label adjustment using a stack
    stack = []
    for j in range(len(connected_labels)):
        label = connected_labels[j]
        
        # Handle 'PIZZA' labels
        if label == 'PIZZA_BEGIN':
            stack.append(j)
        elif label == 'PIZZA_INTERMEDIATE':
            if len(stack) == 0:
                connected_labels[j] = 'PIZZA_BEGIN'
                stack.append(j)
        elif label == "OTHER":
            if j > 0 and connected_labels[j-1] == "PIZZA_INTERMEDIATE":
                stack.pop()
        
        # Handle 'DRINK' labels
        if label == 'DRINK_BEGIN':
            stack.append(j)
        elif label == 'DRINK_INTERMEDIATE':
            if len(stack) == 0:
                connected_labels[j] = 'DRINK_BEGIN'
                stack.append(j)
        elif label == "OTHER":
            if j > 0 and connected_labels[j-1] == "DRINK_INTERMEDIATE":
                stack.pop()
    
    return connected_labels


In [3]:
def process_sentence_detailed(sentence, ner_pipeline_instance):
    """
    Processes a sentence and returns the post-processed NER labels along with words.
    
    Args:
        sentence (str): The input sentence to process.
        ner_pipeline_instance (transformers.pipeline): The NER pipeline.
    
    Returns:
        list of tuples: Each tuple contains (word, label).
    """
    # Run NER pipeline
    ner_results = ner_pipeline_instance(sentence)
    
    # Extract words and labels
    predicted_labels = [entity['entity'] for entity in ner_results]
    words = [entity['word'] for entity in ner_results]

    # Apply post-processing
    processed_labels = postprocess_entities(ner_results)

    # Now, we need to merge words that were combined during post-processing
    connected_words = []
    j = 0
    while j < len(words):
        if words[j].startswith('##'):
            # Remove '##' and append to previous word
            if connected_words:
                connected_words[-1] += words[j][2:]
            j += 1
        elif words[j].startswith('#'):
            # Remove '#' and append to previous word
            if connected_words:
                connected_words[-1] += words[j][1:]
            j += 1
        elif words[j] == "'":
            # Merge apostrophe with the next word
            if connected_words and (j + 1) < len(words):
                connected_words[-1] += ("'" + words[j + 1])
                j += 2
            else:
                connected_words.append(words[j])
                j += 1
        else:
            connected_words.append(words[j])
            j += 1

    # Pair connected words with their processed labels
    final_output = list(zip(connected_words, processed_labels))
    
    return final_output


In [4]:
def get_postprocessed_labels(sentence, ner_pipeline_instance):
    """
    Processes an input sentence and returns the post-processed NER labels.

    Args:
        sentence (str): The input sentence to process.
        ner_pipeline_instance (transformers.pipeline): The NER pipeline.

    Returns:
        list of str: A list of post-processed labels corresponding to each word in the sentence.
    """
    # Get detailed labels (word, label pairs)
    detailed_labels = process_sentence_detailed(sentence, ner_pipeline_instance)
    
    # Extract labels only
    labels = [label for word, label in detailed_labels]
    
    return labels


In [5]:
def segment_orders(sentence, ner_pipeline_instance):
    """
    Segments the input sentence into pizza and drink orders based on NER labels.
    
    Args:
        sentence (str): The input sentence to process.
        ner_pipeline_instance (transformers.pipeline): The NER pipeline.
    
    Returns:
        tuple: Two lists containing pizza orders and drink orders respectively.
    """
    # Get detailed labels
    detailed_labels = process_sentence_detailed(sentence, ner_pipeline_instance)
    
    orders = []
    
    current_pizza = []
    current_drink = []
    
    in_pizza = False
    in_drink = False
    
    for word, label in detailed_labels:
        # Handle Pizza Orders
        if label == 'PIZZA_BEGIN':
            if in_pizza and current_pizza:
                orders.append((' '.join(current_pizza), True))
                current_pizza = []
            in_pizza = True
            current_pizza.append(word)
        elif label == 'PIZZA_INTERMEDIATE' and in_pizza:
            current_pizza.append(word)
        else:
            if in_pizza and current_pizza:
                orders.append((' '.join(current_pizza), True))
                current_pizza = []
            in_pizza = False
        
        # Handle Drink Orders
        if label == 'DRINK_BEGIN':
            if in_drink and current_drink:
                orders.append((' '.join(current_drink), False))
                current_drink = []
            in_drink = True
            current_drink.append(word)
        elif label == 'DRINK_INTERMEDIATE' and in_drink:
            current_drink.append(word)
        else:
            if in_drink and current_drink:
                orders.append((' '.join(current_drink), False))
                current_drink = []
            in_drink = False
    
    # Append any remaining orders after the loop
    if in_pizza and current_pizza:
        orders.append((' '.join(current_pizza), True))
    if in_drink and current_drink:
        orders.append((' '.join(current_drink), False))
    
    return orders


In [13]:
def generate_top_decoupled(sentence, labels, is_pizza_order):
    output_sentence = ""
    tokens = sentence.split()
    i = 0
    while i < len(tokens):
        if labels[i] == "OTHER": 
            i += 1
            continue
        if '-' in labels[i]:
            index = labels[i].rfind('-')
            parent_identifier = labels[i][:index]
            sub_tokens = []
            sub_labels = []
            while i < len(tokens) and labels[i][:index] == parent_identifier:
                sub_tokens.append(tokens[i])
                sub_labels.append(labels[i][index+1:])
                i += 1
                continue
            nested_part_string = generate_top_decoupled(' '.join(sub_tokens), sub_labels, -1)
            output_sentence += ('(' + parent_identifier + ' ' + nested_part_string + ') ')
            continue

        curr_label = labels[i]
        curr_element = []
        j = 0
        while i+j < len(labels) and labels[i+j] == curr_label:
            curr_element.append(tokens[i+j])
            j += 1
        i = i + j - 1
        j = 0
        curr_element_string = ' '.join(curr_element)
        output_sentence += ('(' + curr_label + ' ' + curr_element_string + ' ) ')

        i += 1

    if is_pizza_order == -1:
        return output_sentence
    
    if is_pizza_order == 0: 
        identifier = '(PIZZAORDER '
    else:
        identifier = '(DRINKORDER '

    output_sentence = identifier + output_sentence + ')'
    return output_sentence

In [14]:
isa = load_pipeline(model_dir='models/ISA_model', tokenizer_dir='models/pizza_tokenizer')
parser = load_pipeline(model_dir='models/order_parser', tokenizer_dir='models/pizza_tokenizer')
sentence = "i need a medium ham and pineapple pizza and a small iced tea"

print("Input Order: " + sentence)
print("\n----------------------------------------------------\n")

# Segment orders
orders = segment_orders(sentence, isa)
top_decoupled = ''
for order_pair in orders:
    order = order_pair[0]
    is_pizza = order_pair[1]
    labels = get_postprocessed_labels(order, parser)

    if is_pizza: id = 0
    else: id = 1

    top_decoupled += (generate_top_decoupled(order, labels, id) + ' ')

print("Top Decoupled: " + top_decoupled[:-1])

Input Order: i need a medium ham and pineapple pizza and a small iced tea

----------------------------------------------------

Top Decoupled: (PIZZAORDER (NUMBER a ) (SIZE medium ) (TOPPING ham ) (TOPPING pineapple ) ) (DRINKORDER (NUMBER a ) (SIZE small ) (DRINKTYPE iced tea ) )


In [9]:
import re

class Node:
    """
    Represents a node in the labeled hierarchical structure.
    Each node has a label and can have child nodes or text.
    """
    def __init__(self, label):
        self.label = label
        self.children = []  # List of child Node instances
        self.text = None    # Text content if the node is a leaf

    def to_string(self):
        """
        Recursively converts the node and its children back into the bracketed string format.
        """
        if self.text and not self.children:
            # Leaf node with text
            return f"({self.label} {self.text} )"
        elif self.children:
            # Node with child labels
            children_str = ' '.join(child.to_string() for child in self.children)
            return f"({self.label} {children_str} )"
        else:
            # Node without children or text
            return f"({self.label} )"

def tokenize(top_string):
    """
    Splits the TOP string into tokens: '(', ')', and words.
    """
    return re.findall(r'\(|\)|[^\s()]+', top_string)

def parse(tokens, index):
    """
    Recursively parses tokens to build the hierarchical structure.
    
    Parameters:
    - tokens: List of tokens from the TOP string.
    - index: Current position in the token list.
    
    Returns:
    - node: The parsed Node object.
    - index: Updated position after parsing.
    """
    if tokens[index] != '(':
        # Not a label, skip
        return None, index

    label = tokens[index + 1]
    node = Node(label)
    index += 2  # Move past '(' and label

    texts = []        # Collect text tokens
    has_children = False  # Flag to indicate presence of child labels

    while index < len(tokens):
        token = tokens[index]
        if token == '(':
            # Found a child label; parse it recursively
            child, index = parse(tokens, index)
            if child:
                node.children.append(child)
                has_children = True
        elif token == ')':
            index += 1  # Move past ')'
            if not has_children and texts:
                # If no child labels, set the text content
                node.text = ' '.join(texts)
            return node, index
        else:
            # Text token
            if not has_children:
                # Only collect text if no child labels have been found
                texts.append(token)
            index += 1

    return node, index

def generate_top_decoupled_from_top(top_string):
    """
    Converts a TOP string into a TOP-DECOUPLED string by removing redundant tokens.
    
    Parameters:
    - top_string (str): The input TOP string.
    
    Returns:
    - decoupled_str (str): The resulting TOP-DECOUPLED string.
    """
    tokens = tokenize(top_string)
    node, _ = parse(tokens, 0)
    if node:
        return node.to_string()
    else:
        return ""


top = "(ORDER can i have (PIZZAORDER (NUMBER a ) (SIZE large ) (TOPPING bbq pulled pork ) ) )"
expected = "(ORDER (PIZZAORDER (NUMBER a ) (SIZE large ) (TOPPING bbq pulled pork ) ) )"
decoupled = generate_top_decoupled_from_top(top)
print(f"Original TOP: {top}")
print(f"Generated TOP-DECOUPLED: {decoupled}")
print(f"Expected TOP-DECOUPLED: {expected}")
print(f"Match: {decoupled == expected}\n")


Original TOP: (ORDER can i have (PIZZAORDER (NUMBER a ) (SIZE large ) (TOPPING bbq pulled pork ) ) )
Generated TOP-DECOUPLED: (ORDER (PIZZAORDER (NUMBER a ) (SIZE large ) (TOPPING bbq pulled pork ) ) )
Expected TOP-DECOUPLED: (ORDER (PIZZAORDER (NUMBER a ) (SIZE large ) (TOPPING bbq pulled pork ) ) )
Match: True



In [16]:
import json
correct_count = 0
i = 0
with open("dataset/PIZZA_test.json", 'r') as infile:
    for line in infile:
        if i == 100: break
        instance = json.loads(line)
        input_sentence = instance.get(f"test.SRC", "")
        top = instance.get(f"test.TOP", "")
        true_top_decoupled = generate_top_decoupled_from_top(top)


        orders = segment_orders(input_sentence, isa)
        top_decoupled = ''
        for order_pair in orders:
            order = order_pair[0]
            is_pizza = order_pair[1]
            labels = get_postprocessed_labels(order, parser)

            if is_pizza: id = 0
            else: id = 1

            top_decoupled += (generate_top_decoupled(order, labels, id) + ' ')
        
        top_decoupled = "(ORDER " + top_decoupled + ')'

        if top_decoupled == true_top_decoupled: correct_count += 1
        else:
            print(f"Correct Count: {correct_count}")
            print("True Output: " + true_top_decoupled)
            print("Predicted Output: " + top_decoupled)
            print("----------------------------------------------------------------------")

        i += 1

Correct Count: 4
True Output: (ORDER (PIZZAORDER (NUMBER one ) (SIZE medium ) (TOPPING sausage ) (TOPPING mushrooms ) (NOT (TOPPING ham ) ) ) )
Predicted Output: (ORDER (PIZZAORDER (NUMBER one ) (SIZE medium ) (TOPPING sausage mushrooms ) (NOT (TOPPING ham ) ) ) )
----------------------------------------------------------------------
Correct Count: 5
True Output: (ORDER (PIZZAORDER (NUMBER one ) (TOPPING sausage ) (TOPPING olives ) (TOPPING pineapple ) ) )
Predicted Output: (ORDER (PIZZAORDER (NUMBER one ) (TOPPING sausage olives ) (TOPPING pineapple ) ) )
----------------------------------------------------------------------
Correct Count: 5
True Output: (ORDER (PIZZAORDER (NUMBER a ) (SIZE small ) (TOPPING mushrooms ) (TOPPING bacon ) (TOPPING pepperoni ) ) )
Predicted Output: (ORDER (PIZZAORDER (NUMBER a ) (SIZE small ) (TOPPING mushrooms ) (TOPPING bacon pepperoni ) ) )
----------------------------------------------------------------------
Correct Count: 7
True Output: (ORDER (PIZZ