In [1]:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import json

class CoffeeKiosk:
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained("huggingface/llama-3.2-3B")
        self.model = AutoModelForCausalLM.from_pretrained("huggingface/llama-3.2-3B")
        self.order_items = []  # List to hold each order item as a unique entry
        self.previous_inputs = []  # List to hold previous user inputs

    def get_console_order_summary(self):
        if not self.order_items:
            return "현재 주문 내역이 없습니다."
        summary = "현재 주문하신 내용은 다음과 같습니다:\n"
        for item in self.order_items:
            details = f"{item['temperature']} {item['name']} {item['size']} {item['quantity']}잔"
            if item['add_ons'] != "None":
                add_ons_details = ', '.join([f"{k}: {v}번" for k, v in item['add_ons'].items()])
                details += f" (추가: {add_ons_details})"
            summary += details + "\n"
        return summary.strip()

    def get_llama_order_summary(self):
        if not self.order_items:
            return "None."
        summary = {
            "current_orders": {
                "drinks": [
                    {
                        "index": item["index"],
                        "name": item["name"],
                        "size": item["size"],
                        "temperature": item["temperature"],
                        "add_ons": item["add_ons"],
                        "quantity": item["quantity"],
                        "quantity_indexes": item["quantity_indexes"]
                    }
                    for item in self.order_items
                ]
            }
        }
        return json.dumps(summary, ensure_ascii=False)

    def add_item(self, drink, size, temperature, quantity, add_ons):
        # Check if the new item matches any existing items
        for item in self.order_items:
            if (item["name"] == drink and
                item["size"] == size and
                item["temperature"] == temperature and
                item["add_ons"] == add_ons):
                # Merge into existing item: update quantity and quantity_indexes
                current_quantity = item["quantity"]
                for i in range(quantity):
                    new_index = f"{item['index']}-{current_quantity + i}"
                    item["quantity_indexes"].append(new_index)
                item["quantity"] += quantity
                return  # Exit after merging

        # If no matching item, add a new item
        new_index = len(self.order_items)  # Assign the next available index
        quantity_indexes = [f"{new_index}-{i}" for i in range(quantity)]
        new_item = {
            "index": new_index,
            "name": drink,
            "size": size,
            "temperature": temperature,
            "add_ons": add_ons if add_ons else "None",
            "quantity": quantity,
            "quantity_indexes": quantity_indexes
        }
        self.order_items.append(new_item)

    def remove_item(self, drink_index, quantity_indexes):
        item = next((item for item in self.order_items if item["index"] == drink_index), None)
        if not item:
            return "해당 음료를 찾을 수 없습니다."
        # Remove specified quantity indexes
        for q_index in quantity_indexes:
            if q_index in item["quantity_indexes"]:
                item["quantity_indexes"].remove(q_index)
        # Update quantity or remove item entirely if all quantities are removed
        item["quantity"] = len(item["quantity_indexes"])
        if item["quantity"] == 0:
            self.order_items = [i for i in self.order_items if i["index"] != drink_index]
        return "선택된 음료가 삭제되었습니다."

    def update_item(self, drink_index, quantity_indexes, updates):
        item = next((item for item in self.order_items if item["index"] == drink_index), None)
        if not item:
            return "해당 음료를 찾을 수 없습니다."
        # Handle updates only for specified quantity indexes
        if "new_name" in updates:
            item["name"] = updates["new_name"]
        if "new_size" in updates:
            item["size"] = updates["new_size"]
        if "new_temperature" in updates:
            item["temperature"] = updates["new_temperature"]
        if "new_quantity" in updates:
            item["quantity"] = updates["new_quantity"]
            item["quantity_indexes"] = [f"{drink_index}-{i}" for i in range(updates["new_quantity"])]
        if "new_add_ons" in updates:
            item["add_ons"] = updates["new_add_ons"]
        return "선택된 음료가 업데이트되었습니다."

    def cancel_order(self):
        self.order_items = []  # Clear the order
        return "주문이 취소되었습니다. 감사합니다!"

    def respond_to_input(self, user_input):
        """
        Processes user input, generates a model response, parses the actions, and executes them.
        """
        # Store the current user input
        self.previous_inputs.append(user_input)

        # Construct the prompt for LLaMA
        order_summary = self.get_llama_order_summary()
        previous_inputs_summary = " ".join([f"Input {i}: {input_}" for i, input_ in enumerate(self.previous_inputs)])
        prompt = (
            f"### Instruction:\n"
            f"When\n"
            f"Current Order Details:\n{order_summary}\n"
            f"Previous customer inputs:\n{previous_inputs_summary}\n"
            f"What would be the response in the correct format for the input provided?\n"
            f"### Input:\n{user_input}\n"
            f"### Response:"
        )

        # Encode and generate a response from the model
        inputs = self.tokenizer(prompt, return_tensors="pt")
        with torch.no_grad():
            outputs = self.model.generate(**inputs, max_length=150)

        # Decode the response from the model
        response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)

        # Parse the response and execute actions
        try:
            actions = response.strip().split("\n")  # Split multiple actions
            for action in actions:
                self._process_action(action.strip())
        except Exception as e:
            return {"error": str(e), "response": response}

        # Generate the final response
        return {
            "response": "명령이 성공적으로 처리되었습니다.",
            "order_summary": self.get_console_order_summary()
        }

    def _process_action(self, action):
        """
        Parses a single action string and executes the corresponding function.
        """
        try:
            # Split the action into command and details
            parts = action.split(" ", 1)
            command = parts[0]
            details = self._parse_action_details(parts[1])

            # Execute the corresponding function
            if command == "add_item":
                self.add_item(
                    details["drink_name"],
                    details["size"],
                    details["temperature"],
                    details["quantity"],
                    details.get("add_ons", {})
                )
            elif command == "remove_item":
                self.remove_item(details["quantity_indexes"])
            elif command == "update_item":
                self.update_item(details["quantity_indexes"], details["updates"])
            elif command == "cancel_order":
                self.cancel_order()
        except Exception as e:
            raise ValueError(f"Failed to process action: {action}. Error: {e}")

    def _parse_action_details(self, action_details):
        """
        Parses the details string for an action and returns a dictionary.
        """
        details = {}
        # Split the key-value pairs
        key_values = action_details.split(" ")
        for kv in key_values:
            key, value = kv.split(":")
            key = key.strip()
            value = value.strip().strip("[]").replace("(", "").replace(")", "")
            if key == "quantity_indexes":
                details[key] = value.split(",")  # Parse indexes as a list
            elif key == "add_ons":
                # Parse add-ons into a dictionary
                add_ons = {}
                if value:
                    for add_on in value.split(","):
                        add_on_key, add_on_val = add_on.split(":")
                        add_ons[add_on_key.strip()] = int(add_on_val.strip())
                details[key] = add_ons
            else:
                details[key] = value
        return details


# Example Usage
kiosk = CoffeeKiosk()
print(kiosk.respond_to_input("아메리카노 2잔 추가해 주세요."))  # Adds two Americanos
print(kiosk.respond_to_input("카푸치노 2잔 추가해 주세요."))    # Adds two Cappuccinos
print(kiosk.respond_to_input("아메리카노 한 잔에 샷 하나 추가해 주세요."))  # Updates one Americano with extra shot


KeyboardInterrupt: 