# Dependencies

In [None]:
# --- Core Python package tools ---
!pip install --upgrade pip

# --- Torch & GPU ---
!pip install torch
!pip install faiss-gpu

# --- Huggingface ecosystem (models, transformers, datasets, quant, etc.) ---
!pip install git+https://github.com/huggingface/transformers  # Latest transformers
!pip install transformers[agents]                             # Agents utilities
!pip install git+https://github.com/huggingface/peft.git      # Parameter Efficient Fine-Tuning
!pip install git+https://github.com/huggingface/accelerate.git
!pip install git+https://github.com/huggingface/trl.git       # Transformer Reinforcement Learning
!pip install -U sentence-transformers                         # Embeddings
!pip install datasets                                         # Datasets library
!pip install -i https://pypi.org/simple/ bitsandbytes         # 4-bit/8-bit quantization support

# --- LangChain (core + community + huggingface + chroma) ---
!pip install -U langchain                                     # Core
!pip install -U langchain-community                           # Community extensions
!pip install -U langchain-huggingface                         # HuggingFace connector
!pip install langchain-chroma                                 # Chroma DB integration
!pip install langchainhub                                     # LangChainHub for prompt/templates
!pip install -qU langchain-text-splitters                     # Text splitting

# --- Miscellaneous ---
!pip install jq                                               # JSON processor
!pip install -U pydantic                                      # Data models/validation
!pip install python-Levenshtein                               # Fuzzy string matching


In [None]:
# --- Core Python & System Utilities ---
import os
from typing import Optional, Type, List, Union, Dict, Tuple, Set

# --- PyTorch & Transformers (Huggingface) ---
import torch
import transformers
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    pipeline
)

# --- Huggingface PEFT (Parameter Efficient Fine-Tuning) ---
from peft import LoraConfig, PeftModel

# --- Huggingface Datasets ---
from datasets import load_dataset

# --- LangChain: Core & Community Modules ---
import pydantic

# Text splitters and document handling
from langchain.text_splitter import (
    CharacterTextSplitter,
    RecursiveCharacterTextSplitter,
    RecursiveJsonSplitter
)
from langchain.schema import Document
from langchain.prompts import PromptTemplate
from langchain.schema.runnable import RunnablePassthrough

# Vectorstores and Embeddings
from langchain.vectorstores import FAISS
from langchain_community.vectorstores import Chroma
from langchain_community.document_loaders import JSONLoader
from langchain_huggingface import HuggingFaceEmbeddings, HuggingFacePipeline

# LangChain Agents and Chains
from langchain.agents import AgentExecutor, create_structured_chat_agent
from langchain import hub

# LangChain advanced types (for tool/result parsing)
from langchain.schema import AgentAction, AgentFinish, OutputParserException


# meta-llama/Llama-3.2-3B-Instruct


In [None]:
def make_header(title, width=70):
    pad = (width - len(title) - 2) // 2
    line = "# " + "-" * width
    center = f"# {' ' * pad}{title}{' ' * pad}" + ("-" if (len(title) % 2 == 0) else " -")
    return f"{line}\n{center}\n{line}"

print(make_header("Utility Functions for Fromatting Date, Ticket Sales, and Halls"))

In [None]:
# GET THE HF TOKEN
from huggingface_hub import login
login(token=user_secrets.get_secret(hf_token))

# Load quantized LLama 3.2



In [None]:
# ----------------------------------------------------------------------
#                           Tokenizer Setup                            -
# ----------------------------------------------------------------------
import logging

logger = logging.getLogger(__name__)


model_name = "meta-llama/Llama-3.2-3B-Instruct"

# Load model directly
from transformers import AutoTokenizer, AutoModelForCausalLM

try:
  tokenizer = AutoTokenizer.from_pretrained(model_name, use_auth_token=True)
  logger.info("Loaded tokenizer for {model_name}")
except Exception as e:
  logger.error("Failed loading tokenizer for {model_name}", exc_info=e)
  raise

In [None]:
# ----------------------------------------------------------------------
#                  BitsAndBytes Quantization Settings                  -
# ----------------------------------------------------------------------

# Enable 4-bit quantization to reduce memory usage and improve efficiency
use_4bit = True

# Computation data type used during inference
bnb_4bit_compute_dtype = torch.bfloat16

# Quantization type "nf4" (Normalized Float 4) generally offers better accuracy than "fp4"
bnb_4bit_quant_type = "nf4"

# Disable nested quantization (double quantization), True only for extreme memory constraints
use_nested_quant = False

In [None]:
# ----------------------------------------------------------------------
#          Initialize BitsAndBytes 4-bit Quantization Config           -
# ----------------------------------------------------------------------

bnb_config = BitsAndBytesConfig(
    load_in_4bit=use_4bit,
    bnb_4bit_quant_type=bnb_4bit_quant_type,
    bnb_4bit_compute_dtype=bnb_4bit_compute_dtype,
    bnb_4bit_use_double_quant=use_nested_quant,
)

In [None]:
# ----------------------------------------------------------------------
#           Language Model: Quantized LLaMA 3.2 3B Instruct            -
# ----------------------------------------------------------------------

try:
  llama_model_original = AutoModelForCausalLM.from_pretrained(
      model_name,
      quantization_config=bnb_config,
      device_map="cuda"
  )
  logger.info(f"Loaded model {model_name}")
except Exception as e:
  logger.error(f"Failed loading model {model_name}")
  raise


print(f'Memory used by LLaMA model: {round(llama_model_original.get_memory_footprint()/1024/1024/1024, 2)} GB')

# Count number of trainable parameters

In [None]:
# ----------------------------------------------------------------------
#           Model Trainability Summary for Thea Chatbot App            -
# ----------------------------------------------------------------------
# This utility function calculates:
# - Total number of model parameters
# - Number of trainable (updatable) parameters
# - Percentage of parameters that are trainable
#
# It helps verify whether the model is fully fine-tuned or partially tuned

def print_number_of_trainable_model_parameters(model):
    trainable_model_params = 0
    all_model_params = 0

    # Iterate over each parameter tensor in the model
    for _, param in model.named_parameters():
        all_model_params += param.numel()  # count total parameters
        if param.requires_grad:
            trainable_model_params += param.numel()  # count only trainable ones

    # Return nicely formatted result
    return f"""
Trainability Report
-----------------------
Trainable parameters: {trainable_model_params:,}
Total model parameters: {all_model_params:,}
Trainable %: {100 * trainable_model_params / all_model_params:.2f}%
"""

# Call the function on your LLaMA model instance (change variable if needed)
print(print_number_of_trainable_model_parameters(llama_model_original))


# Build Custom Llama LLM Class

In [None]:
# --- Transformers: LLaMA model and tokenizer imports ---
from transformers.models.llama.modeling_llama import LlamaForCausalLM
from transformers.models.llama.tokenization_llama_fast import LlamaTokenizerFast
from transformers.tokenization_utils_fast import PreTrainedTokenizerFast

# --- LangChain: Base LLM and Callbacks ---
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManagerForLLMRun

# --- LangChain: Prompt engineering and agent utilities ---
from langchain.prompts import (
    ChatPromptTemplate,
    MessagesPlaceholder,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate
)
from langchain.agents import AgentExecutor, create_structured_chat_agent
from langchain.memory import ConversationBufferWindowMemory, ConversationBufferMemory

# --- Typing ---
from typing import Optional, List, Mapping, Any

# --- Custom LangChain-compatible wrapper for LLaMA ---
class CustomLLMLLaMA(LLM):
    model: LlamaForCausalLM
    tokenizer: PreTrainedTokenizerFast

    @property
    def _llm_type(self) -> str:
        return "custom"

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None
    ) -> str:
        # Prepare messages in chat format as expected by the LLaMA model
        messages = [{"role": "user", "content": prompt}]
        encodeds = self.tokenizer.apply_chat_template(messages, return_tensors="pt")
        model_inputs = encodeds.to(self.model.device)

        generated_ids = self.model.generate(
            model_inputs,
            max_new_tokens=1024,
            do_sample=True,
            pad_token_id=self.tokenizer.eos_token_id,
            top_k=4,
            temperature=0.1
        )
        decoded = self.tokenizer.batch_decode(generated_ids)
        raw = decoded[0]

        # Handle output extraction based on [INST] presence
        if "[/INST]" in raw:
            output = raw.split("[/INST]", 1)[1].replace("</s>", "").strip()
        else:
            output = raw.replace("</s>", "").strip()

        # Apply stop words, if any
        if stop is not None:
            for word in stop:
                output = output.split(word)[0].strip()

        # Pad with backticks if needed (for code block completion)
        while not output.endswith("```"):
            output += "`"

        return output

    @property
    def _identifying_params(self) -> Mapping[str, Any]:
        return {"model": self.model}

# --- Instantiate the wrapper with your model and tokenizer ---
llama_model = CustomLLMLLaMA(model=llama_model_original, tokenizer=tokenizer)

In [None]:
print(llama_model)

In [None]:
# ----------------------------------------------------------------------
#    Utility Functions for Fromatting Date, Ticket Sales, and Stages   -
# ----------------------------------------------------------------------

import re
from datetime import datetime
from typing import Tuple, Dict, List, Optional

TIME_SUFFIX_RE = re.compile(r"T00:00:00")

def strip_time_suffix(iso_date: str) -> str:
    """Removes the time component from ISO date strings (e.g. T00:00:00)."""
    return TIME_SUFFIX_RE.sub('', iso_date)

def ordinal_suffix(day: int) -> str:
    """Returns the appropriate ordinal suffix for a day (e.g. 1 → 'st')."""
    if 11 <= day <= 13:
        return 'th'
    return {1: 'st', 2: 'nd', 3: 'rd'}.get(day % 10, 'th')

def human_date(date_str: str, include_year: bool = True) -> str:
    """
    Converts a date string (YYYY-MM-DD) to a readable English format.
    e.g. '2024-04-16' → 'Tuesday 16th of April, 2024'
    """
    try:
        date = datetime.strptime(date_str, "%Y-%m-%d")
        suffix = ordinal_suffix(date.day)
        day_str = f"{date.strftime('%A')} {date.day}{suffix} of {date.strftime('%B')}"
        return f"{day_str}, {date.year}" if include_year else day_str
    except ValueError:
        return "Invalid date"

def human_date_range(date_range: Tuple[str, str]) -> str:
    """
    Formats a start/end ISO date pair to readable range.
    e.g. ('2024-05-01T00:00:00', '2024-05-10T00:00:00')
         → 'from Wednesday 1st of May to Friday 10th of May, 2024'
    """
    start, end = map(strip_time_suffix, date_range)
    return f"from {human_date(start, include_year=False)} to {human_date(end, include_year=True)}"


def pluralize(word: str, count: int) -> str:
    """Adds 's' to a word if count is not 1."""
    return f"{word}" if count == 1 else f"{word}s"

def tickets_sold_by_zone(
    data: Dict[str, Dict[str, List[str]]],
    zone: str = "afternoon"
) -> str:
    """
    Returns number of tickets sold for a given time zone (e.g. 'afternoon').
    """
    count = sum(len(slots.get(zone, [])) for slots in data.values())
    return f"{count} {pluralize('ticket', count)}"

def tickets_summary(data: Dict[str, Dict[str, List[str]]]) -> Dict[str, str]:
    """
    Returns a dictionary summarizing tickets sold per time slot.
    Useful for assistant summaries or admin dashboards.
    """
    zones = ['morning', 'afternoon', 'evening']
    return {zone: tickets_sold_by_zone(data, zone) for zone in zones if any(slots.get(zone) for slots in data.values())}


def stage_capacity(stage_name: str) -> str:
    """
    Returns the seat capacity of a known stage.
    Extend this as your venue expands.
    """
    capacities = {
        "Stage A": "40 seats",
        "Stage B": "46 seats",
    }
    return capacities.get(stage_name, "Capacity info not available")


def format_runtime(runtime_str: str) -> str:
    """
    Converts time formats like '1h 30min' to '1 hour and 30 minutes'.
    Handles partial values (e.g. '45min').
    """
    hours, minutes = 0, 0
    for part in runtime_str.lower().split():
        if 'h' in part:
            hours = int(part.replace('h', '').strip())
        elif 'min' in part:
            minutes = int(part.replace('min', '').strip())

    parts = []
    if hours:
        parts.append(f"{hours} {pluralize('hour', hours)}")
    if minutes:
        parts.append(f"{minutes} {pluralize('minute', minutes)}")

    return " and ".join(parts) if parts else "Duration not specified"


if __name__ == "__main__":
    # Mock dataset for previewing functions
    mock_data = {
        "2024-06-01": {
            "morning": ["A1"],
            "afternoon": ["B1", "B2"],
            "evening": []
        },
        "2024-06-02": {
            "afternoon": ["C1", "C2", "C3"],
            "evening": ["D1"]
        }
    }

In [None]:
def generate_play_description(doc: dict) -> str:
    """
    Generate a rich, assistive natural language description for a theater play.
    Designed for conversational delivery or accessibility-friendly UI display.
    """

    # Extract core info
    title = doc.get("title", "Unknown Title")
    playwriter = doc.get("playwriter", "Unknown Playwriter")
    headline = doc.get("headline", "")
    description = doc.get("description", "")
    genre = doc.get("genre", "Unknown Genre")
    runtime = format_runtime(doc.get("runtime", "Unknown"))
    cast = ", ".join(doc.get("cast", []))
    stage = doc.get("stage", "Unknown Stage")
    capacity = stage_capacity(stage)
    age_limit = doc.get("ageLimit", "All Ages")
    additional_info = doc.get("additionalInfo", "No additional info available.")
    afternoon_time = doc.get("afternoon", "Unknown")
    night_time = doc.get("night", "Unknown")
    regular_price = doc.get("regularTickets", {}).get("price", "?")
    special_price = doc.get("specialNeedsTickets", {}).get("price", "?")

    # Format date range
    available_dates = list(doc.get("availableDates", {}).keys())
    if len(available_dates) >= 2:
        date_range_str = human_date_range((available_dates[0], available_dates[-1]))
    else:
        date_range_str = "for a limited run"

    # Tickets sold info
    afternoon_sold = tickets_sold_by_zone(doc['availableDates'], 'afternoon')
    night_sold = tickets_sold_by_zone(doc['availableDates'], 'night')

    # Age limit phrase
    if age_limit == "All Ages":
        age_phrase = "suitable for all audience members."
    else:
        age_phrase = f"recommended for ages {age_limit.replace('+', '')} and above."

    # Construct description
    return f"""
**{title}** by *{playwriter}*
*{headline}*

{description}

**Cast**: {cast}
**Genre**: {genre}
**Runtime**: {runtime}
**Performances** take place {date_range_str}, with two daily showings:
  • Afternoon at **{afternoon_time}**
  • Evening at **{night_time}**

**Venue**: {stage} — capacity: {capacity}
  • Afternoon show: {afternoon_sold} sold
  • Night show: {night_sold} sold

**Tickets**:
  • Regular: €{regular_price}
  • Accessible: €{special_price}

**Age suitability**: {age_phrase}
**Additional Notes**: {additional_info}
"""

In [None]:
    print("Date Range Example:")
    print(human_date_range(("2024-06-01T00:00:00", "2024-06-10T00:00:00")))

    print("\nAfternoon Tickets Sold:")
    print(tickets_sold_by_zone(mock_data, "afternoon"))

    print("\nFull Ticket Summary:")
    print(tickets_summary(mock_data))

    print("\nStage A Info:")
    print(stage_capacity("Stage A"))

    print("\nRuntime Formatter:")
    print(format_runtime("1h 15min"))

In [None]:
def safe_nested_get(d: dict, keys: list, default=None):
    """Safely traverse nested dicts with a list of keys."""
    value = d
    try:
        for k in keys:
            value = value[k]
        return value
    except (KeyError, IndexError, TypeError):
        return default

def generate_play_metadata(play: dict) -> dict:
    """
    Generate structured metadata for a play: for RAG, vectorstore, UI cards, etc.
    """
    # Date range: get first and last date
    available_dates = list(play.get("availableDates", {}).keys())
    if available_dates:
        start = strip_time_suffix(available_dates[0])[:10]
        end = strip_time_suffix(available_dates[-1])[:10]
        date_range = f"{start} to {end}"
    else:
        date_range = "Date info unavailable"

    # Build metadata dictionary
    metadata = {
        "source": "Thea",
        "title": play.get("title", "Unknown Title"),
        "playwriter": play.get("playwriter", "Unknown Playwriter"),
        "genre": play.get("genre", "Unknown Genre"),
        "stage": play.get("stage", play.get("hall", "Unknown Stage")),
        "date_range": date_range,
        "age_limit": play.get("ageLimit", "All Ages"),
        "afternoon_time": play.get("afternoon", "Not scheduled"),
        "night_time": play.get("night", "Not scheduled"),
        "regular_price": f"{safe_nested_get(play, ['regularTickets', 'price'], '?')}€",
        "special_price": f"{safe_nested_get(play, ['specialNeedsTickets', 'price'], '?')}€",
        "cast_size": len(play.get("cast", [])),
        "cast": ", ".join(play.get("cast", [])),
    }

    return metadata


In [None]:
from typing import List, Dict, Tuple

def generate_play_descriptions_and_metadata(
    json_data: List[Dict]
) -> Tuple[List[str], List[Dict]]:
    """
    Transform a list of play dictionaries into:
      • Rich natural language summaries (for chat, screen readers, etc.)
      • Structured metadata dictionaries (for indexing, search, UI)

    Args:
        json_data: List of play dictionaries.

    Returns:
        Tuple containing:
          - documents: List of human-readable summaries for each play.
          - metadata: List of structured metadata dicts.
    """
    documents: List[str] = []
    metadata: List[Dict] = []

    for play in json_data:
        play_description = generate_play_description(play)
        play_metadata = generate_play_metadata(play)
        documents.append(play_description)
        metadata.append(play_metadata)

    return documents, metadata


# Load and Tranform Data

In [None]:
import json

try:
  # 1. Load
  with open('/content/data/plays.json', 'r') as f:
      data = json.load(f)

  # 2. Generate summaries + metadata
  summaries, metadata = generate_play_descriptions_and_metadata(data)

  # 3. Build a quick lookup by title (lowercased for matching)
  play_index = {
      md['title'].lower(): {
          'summary': doc,
          'metadata': md
      }
      for doc, md in zip(summaries, metadata)
  }
  logger.info("Successfully read JSON")
except FileNotFoundError:
  logger.error("JSON NOT FOUND")
except json.JSONDecodeError as e:
  logger.error("Failed parsing JSON", exc_info=e)
  raise

In [None]:
# Generate documents and metadata
info = generate_play_descriptions_and_metadata(data)
documents = info[0]
metadata = info[1]
print(len(documents), len(metadata))

In [None]:
def simple_chunk_text(text: str, chunk_size: int = 700) -> list:
    """
    Splits a single text string into chunks of up to 'chunk_size' characters.
    No overlap.
    """
    return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

chunks = []  # Holds all the chunks from all documents

for doc in documents:
    doc_chunks = simple_chunk_text(doc, chunk_size=700)
    chunks.extend(doc_chunks)

print(len(chunks))  # Total number of chunks produced
print(chunks)       # List of chunk strings


In [None]:
print(chunks[0] + chunks[1])

In [None]:
def extract_titles_from_metadata(metadata_list):
    """Extracts all titles from a list of metadata dictionaries."""
    return [md.get('title', 'Unknown Title') for md in metadata_list]

titles = extract_titles_from_metadata(metadata)
print(titles, len(titles))


In [None]:
# Install fuzzywuzzy if needed
!pip install fuzzywuzzy

from fuzzywuzzy import fuzz

def find_play_titles(text, titles=titles, threshold=70):
    """
    Find titles in the user's text using fuzzy matching and substring checks.

    Args:
        text (str): User input text.
        titles (list): List of known play titles.
        threshold (int): Fuzzy match score threshold (0-100).

    Returns:
        List of matching titles (may be empty).
    """
    found_titles = []
    text_lower = text.lower()  # Normalize case

    for title in titles:
        title_lower = title.lower()
        # Check for direct substring match
        if title_lower in text_lower:
            found_titles.append(title)
        else:
            # Check fuzzy match for each word in the input text
            for word in text_lower.split():
                ratio = fuzz.ratio(word, title_lower)
                if ratio >= threshold:
                    found_titles.append(title)
                    break  # Only add the same title once

    return found_titles

# Example usage:
# user_input = "I want info about Hamlet"
# print(find_play_titles(user_input, titles))


In [None]:
for i in range (0, len(metadata)+5, 2):
  metadata.insert(i+1, metadata[i])
print(len(metadata))

In [None]:
def custom_sort_documents(id_list, document_list):
    """
    Sorts document chunks based on their ID (e.g., 'id1a', 'id1b').
    Orders by numeric ID first, then suffix alphabetically.
    Returns a list of documents in sorted order.
    """
    # Dynamically build base_order based on unique bases in id_list
    unique_bases = sorted({id[:-1] for id in id_list})
    base_order = {base: i for i, base in enumerate(unique_bases)}

    # Custom sort: by base order, then by suffix (alphabetical)
    def sort_key(item):
        doc_id = item[0]
        base = doc_id[:-1]
        suffix = doc_id[-1]
        return (base_order.get(base, 0), suffix)

    # Zip, sort, unzip
    sorted_items = sorted(zip(id_list, document_list), key=sort_key)
    # If sorted_items is empty, zip(*) fails. Handle that:
    if not sorted_items:
        return []

    sorted_ids, sorted_documents = zip(*sorted_items)
    return list(sorted_documents)

# Example usage:
# ids = ['id2b', 'id1a', 'id1b', 'id2a']
# docs = ['D', 'A', 'B', 'C']
# print(custom_sort_documents(ids, docs))  # Output: ['A', 'B', 'C', 'D']


# Set Up Chroma Index

In [None]:
import chromadb
chroma_client = chromadb.Client()

In [None]:
try:
    if plays_collection:
        try:
            chroma_client.delete_collection(name="plays")
        except Exception as e:
            print(f"Warning: Could not delete collection 'plays'. Reason: {e}")
        try:
            del plays_collection
        except Exception as e:
            print(f"Warning: Could not delete variable 'plays_collection'. Reason: {e}")
except Exception as e:
    print(f"General error while handling plays_collection: {e}")


In [None]:
# Using default embedding model: 'all-MiniLM-L6-v2'
plays_collection = chroma_client.create_collection(name="plays")

plays_collection.add(
    documents = chunks,
    metadatas = metadata,
    ids = ["id1a", "id1b", "id2a", "id2b", "id3a", "id3b", "id4a", "id4b", "id5a", "id5b", "id6a", "id6b"]
)

plays_collection.count()

# Funntion Calling - Tools

In [None]:
# Import things that are needed generically
from langchain.pydantic_v1 import BaseModel, Field
from pydantic import ValidationError
from langchain.tools import BaseTool, StructuredTool, tool

from typing import Optional, Type

from langchain_core.callbacks import (
    AsyncCallbackManagerForToolRun,
    CallbackManagerForToolRun,
)

In [None]:
inputs={'input': ""}

In [None]:
class GetPlayInformation(BaseTool):
    name: str = "getPlayInformation"
    description: str = (
        "The user wants information about a play or wants to ask a general question regarding the details of a play "
        "(such as ticket costs, age limit, ticket availability, etc.)."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """Finds all the relevant info to the user's prompt."""
        query = inputs.get('input', '')
        print(f"Received input: {query}")

        try:
            print("INSIDE GetPlayInformation TOOL:")

            # Attempt to identify the play(s) mentioned (catch misspellings)
            playFound = find_play_titles(query)  # List of close matches
            print(f"Plays found: {playFound}")

            # Query ChromaDB/collection using the play title if found
            if playFound:
                # Use the play title for more relevant results
                retrieved_plays_info = plays_collection.query(
                    query_texts=[query],
                    n_results=2,
                    where={"title": str(playFound[0])}
                )
            else:
                # No specific play found; search more broadly
                retrieved_plays_info = plays_collection.query(
                    query_texts=[query],
                    n_results=8
                )

            # Extract and sort documents if results exist
            if retrieved_plays_info and len(retrieved_plays_info['documents']) > 0:
                retrieved_documents_sorted = custom_sort_documents(
                    retrieved_plays_info['ids'][0],
                    retrieved_plays_info['documents'][0]
                )
                print(f"Retrieved and sorted {len(retrieved_documents_sorted)} documents.")
                context = "\n\n".join(retrieved_documents_sorted)
            else:
                context = ""

            # Build response with navigation info if a play was recognized
            if playFound:
                return (
                    "USER_WANTS_TO_GET_PLAY_INFO-" + playFound[0]
                    + "=> " + context
                )

            # Otherwise, just return the context without a play
            return (
                "USER_WANTS_TO_GET_PLAY_INFO=> " + context
            )

        except Exception as e:
            return f"Exception during execution of 'GetPlayInformation' tool: {str(e)}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async")


get_play_info = GetPlayInformation()


In [None]:
class PlayChooser(BaseTool):
    name: str = "choosePlay"
    description: str = "The user wants to choose a play to book a ticket for."

    def _to_args_and_kwargs(self, *args, **kwargs):
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        play_query = inputs.get('input', '')
        try:
            # Attempt to find close-matching play titles (robust to typos)
            play_found = find_play_titles(play_query)

            # If no play is found, prompt user with available titles
            if not play_found:
                available = ", ".join(titles)
                return (
                    "USER_CHOSE_INVALID_PLAY=> "
                    "Sorry, I couldn't find that play. Please provide a valid play title. "
                    f"Currently available plays: {available}."
                )

            # If found, confirm selection and prompt for next booking slot(s)
            play_title = play_found[0]
            return (
                f"USER_CHOSE_THE_PLAY-{play_title}=> "
                f"You selected **{play_title}**.\n"
                "I can't directly book a ticket for you due to Thea's theater policy. \n\nBy pressing the button below, I can redirect you to the screen for booking a ticket."
            )

        except Exception as e:
            # Catch any unexpected errors for robust operation
            return f"Exception during execution of 'PlayChooser' tool: {str(e)}"

choose_play = PlayChooser()


In [None]:
class HumanContact(BaseTool):
    name: str = "contactHuman"
    description: str = (
        "Doesn't take arguments. The user wants to contact a human at Thea. "
        "Provide the official contact details—phone number, website, Instagram—and encourage the user to reach out via their preferred method. "
        "Use a warm, polite, and professional tone. Do NOT make up any information."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Respond to the user with official, up-to-date contact options for Thea's human support.
        """
        try:
            print("INSIDE HUMAN CONTACT TOOL")
            return (
                "USER_WANTS_TO_CONTACT_A_HUMAN=> "
                "If you would like to speak with a member of our team, you can contact Thea directly by phone at +30 210 123 4567, "
                "visit our website at https://www.thea.com, or reach out via Instagram at https://www.instagram.com/thea. "
                "We're always happy to help—please choose whichever method is most convenient for you!"
                "\n\n"
                "By pressing the button below, I can redirect you to the screen for contacting a human employee of Thea."
            )
        except Exception as e:
            return f"Exception during execution of \"HumanContact\" tool: {e}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async.")

# Instantiate
contact_human = HumanContact()


In [None]:
class GetDirections(BaseTool):
    name: str = "getTheaterDirections"
    description: str = (
        "Doesn't take arguments. The user wants to know how to get to Thea or the official location. "
        "Respond with the official address, coordinates (LatLng), postal code, nearby landmarks, and provide contact info."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Provide official address, coordinates, postal code, and nearby landmark information for Thea Theater in Athens.
        """
        try:
            print("INSIDE GET DIRECTIONS TOOL:")
            return (
                "USER_WANTS_TO_GET_DIRECTIONS=> "
                "Thea Theater is located in central Athens, Greece, near Omonia Square (Plateia Omonias), which is a major commercial and transportation hub. "
                "Our official coordinates are **37.9838° N, 23.7275° E** (postal code: 10551). "
                "Nearby streets include Pireos Street and Stoa Lykourgou, making Thea easy to reach by metro or bus.\n\n"
                "For directions or map, please visit our website: https://www.thea.com\n"
                "You can also contact us by phone at +30 210 123 4567 or via Instagram at https://www.instagram.com/thea.\n"
                "If you need specific travel advice or accessibility information, just let us know!"
                "\n\n"
                "By pressing the button below, I can redirect you to the screen for getting directions to Thea Theater."
            )
        except Exception as e:
            return f"Exception during execution of \"GetDirections\" tool: {e}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async.")

get_directions = GetDirections()


In [None]:
class GetTheaterInformation(BaseTool):
    name: str = "getTheaterInformation"
    description: str = (
        "Doesn't take arguments. The user wants to learn more about Thea theater, either general or specific information."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments expected for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Provide general or specific information about Thea Theater.
        """
        try:
            print("INSIDE GET THEATER INFORMATION TOOL")
            return (
                "USER_WANTS_TO_GET_THEATER_INFO=> "
                "Thea is a modern theater located in the heart of Athens, Greece, near Omonia Square (LatLng: 37.9838° N, 23.7275° E). "
                "We feature two stages—Stage A and Stage B—each hosting a different play every day, with both afternoon and evening performances. "
                "Thea is fully wheelchair accessible, and select shows offer sign language interpreters and supertitles for the hearing impaired. "
                "Tickets can be purchased online (https://www.thea.com), through our mobile app, or directly at the counter. "
                "For more details about our facilities, current program, accessibility, or anything else, please ask! "
                "You can also visit our website at https://www.thea.com or call us at +30 210 123 4567."
                "\n\n"
                "By pressing the button below, I can redirect you to the screen for getting more information about Thea Theater."
            )
        except Exception as e:
            return f"Exception during execution of \"GetTheaterInformation\" tool: {e}"

# Instantiate for use
get_theater_info = GetTheaterInformation()


In [None]:
class ComplaintMaker(BaseTool):
    name: str = "makeComplaint"
    description: str = (
        "Doesn't take arguments. The user wants to make a complaint about the theater. "
        "Apologize for the inconvenience and just say that they can fill out a complaint submission form."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Respond to a user who wants to make a complaint about the theater.
        """
        try:
            print("INSIDE COMPLAINT MAKER TOOL: ")
            return (
                "USER_WANTS_TO_SUBMIT_A_COMPLAINT=> "
                "We sincerely apologize for any inconvenience you have experienced at Thea. "
                "You may submit a complaint using the form available in our app, and our team will review your feedback as soon as possible. "
                "If you need any assistance or would prefer to speak to someone, please contact us directly at +30 210 123 4567 or via our website at https://www.thea.com. "
                "Thank you for helping us improve your theater experience."
                "\n\n"
                "By pressing the button below, I can redirect you to the screen for making a complaint."
            )
        except Exception as e:
            return f"Exception during execution of \"ComplaintMaker\" tool: {e}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async.")

# Instantiate for use
make_complaint = ComplaintMaker()


In [None]:
class TicketCanceler(BaseTool):
    name: str = "cancelTicket"
    description: str = (
        "Doesn't take arguments. The user wants or needs to cancel their booking/ticket for a play(s) they have booked. "
        "Respond by saying that it is a shame they won't be able to come to the play and tell them that you will redirect them to the appropriate screen so they can cancel their e-ticket."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Respond when a user wants to cancel their booking/ticket.
        """
        try:
            print("INSIDE TICKET CANCELER TOOL")
            return (
                "USER_CANCELS_TICKET=> "
                "We're sorry you won't be able to attend your selected performance at Thea. "
                "Your ticket was successfully deleted."
                "If you need any help with the cancellation process, please contact us at +30 210 123 4567 or visit https://www.thea.com for assistance."
                "\n\n"
                "By pressing the button below, I can redirect you to the screen for canceling your ticket."

            )
        except Exception as e:
            return f"Exception during execution of \"TicketCanceler\" tool: {e}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async.")

# Instantiate for use
cancel_ticket = TicketCanceler()


In [None]:
class ShowPurchasedTickets(BaseTool):
    name: str = "showTickets"
    description: str = (
        "Doesn't take arguments. The user wants to see all their purchased/booked tickets. "
        "Respond by telling them that you will redirect them to the 'My e-Tickets' screen, where they can view, cancel, or download their e-tickets."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Respond to a user who wants to view their purchased tickets.
        """
        try:
            print("INSIDE SHOW PURCHASED TICKETS TOOL")
            return (
                "USER_SEES_PURCHASED_TICKETS=> "
                "You can view all of your booked tickets in the 'My e-Tickets' section of the app. "
                "From there, you can also cancel or download your tickets as needed. "
                "Would you like to go to the 'My e-Tickets' screen now?"
                "\n\n"
                "By pressing the button below, I can redirect you to My e-Tickets' screen to see your tickets."
            )
        except Exception as e:
            return f"Exception during execution of \"ShowPurchasedTickets\" tool: {e}"

show_purchased_tickets = ShowPurchasedTickets()


In [None]:
class CannotUnderstand(BaseTool):
    """
    Tool for handling user input that is gibberish, incoherent, or in an unsupported language.
    Politely informs the user and prompts them to try again.
    """
    name: str = "cannotUnderstand"
    description: str = (
        "Doesn't take arguments. The user input is gibberish, incoherent, or in a language other than English. "
        "Say that you cannot understand, and prompt the user to try again."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Respond when user input is not understandable.
        """
        try:
            print("INSIDE CANNOT UNDERSTAND TOOL")
            return (
                "USER_INPUT_NOT_UNDERSTANDABLE=> "
                "I'm sorry, but I didn't understand your message. Could you please rephrase or try again?"
            )
        except Exception as e:
            return f"Exception during execution of \"CannotUnderstand\" tool: {e}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async.")

# Instantiate for use
cannot_understand = CannotUnderstand()


In [None]:
class UnrelatedInput(BaseTool):
    """
    Tool for handling input unrelated to the Thea theater.
    Politely informs the user about the assistant's domain.
    """
    name: str = "unrelatedInput"
    description: str = (
        "Doesn't take arguments. The user said something not related to Thea theater in any way. "
        "Inform the user that only questions related to Thea (such as plays, ticket booking or cancellation, etc.) can be answered."
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Respond when the user's input is unrelated to the theater.
        """
        try:
            print("INSIDE UNRELATED INPUT TOOL")
            return (
                "USER_INPUT_UNRELATED_TO_THEATER=> "
                "I'm here to help with questions related to Thea theater, such as information about our plays, ticket booking or cancellation, directions, and other theater-related topics. "
                "Please ask something related to Thea so I can assist you!"
            )
        except Exception as e:
            return f"Exception during execution of \"UnrelatedInput\" tool: {e}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async.")

# Instantiate for use
unrelated_input = UnrelatedInput()


In [None]:
class Other(BaseTool):
    name: str = "other"
    description: str = (
        "Doesn't take arguments. Use this tool if none of the others are a good fit. "
        "Don't hesitate to use this tool!"
    )

    def _to_args_and_kwargs(self, *args, **kwargs):
        # No arguments for this tool
        return (), {}

    def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
        """
        Default fallback when no other tool is appropriate.
        """
        try:
            print("INSIDE TOOL OTHER")
            return (
                "OTHER=>This question doesn't match any specific category, but I'm here to help! "
                "Please clarify your request or let me know how I can assist you further."
            )
        except Exception as e:
            return f"Exception during execution of \"Other\" tool: {e}"

    def _arun(self, *args, **kwargs):
        raise NotImplementedError("This tool does not support async.")

# Instantiate the tool
other = Other()


In [None]:
tools = [cannot_understand, unrelated_input, choose_play, make_complaint, contact_human, get_directions, cancel_ticket, show_purchased_tickets, get_theater_info, get_play_info, other]

# Create Prompts and Chains

In [None]:
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.tools import render_text_description_and_args, render_text_description
from langchain.memory import ConversationBufferWindowMemory,ConversationBufferMemory

from operator import itemgetter
from langchain_core.runnables import RunnableLambda

In [None]:
# Instantiate ConversationBufferMemory
memory = ConversationBufferWindowMemory(
 memory_key='chat_history', return_messages=True, k=4
)

loaded_memory = RunnablePassthrough.assign(
    chat_history=RunnableLambda(memory.load_memory_variables) | itemgetter("chat_history"),
)

In [None]:
rendered_tools = render_text_description(tools)
print(render_text_description_and_args(tools))

In [None]:
system_prompt = f"""
You are a **polite, warm, and professional AI assistant** for Thea, a theater in Athens, Greece.

**Thea - Information:**
- **Location:** Omonia StreetAthens, Greece (LatLng: 37.9838, 23.7275)
- **Website:** https://www.thea.com
- **Phone:** +30 210 123 4567
- **Instagram:** https://www.instagram.com/thea

**Hours of Operation:**
- Monday to Friday: 10:00 – 24:00
- Saturday & Sunday: 15:00 – 24:00

**Ticketing:** Tickets can be purchased online (https://www.thea.com), through the app, or at the counter desk.

**Accessibility:** The theater has a wheelchair accessible entrance and seating.

**Common User Queries:**
- "What's playing this weekend?"
- "How can I buy tickets?"
- "How can I cancel my ticket?" or "I want a refund for my ticket"
- "Where is the theatre located?"
- "What hours does the theatre work?"

**Important Response Guidelines:**
- Always use a warm, polite, professional, and informative tone.
- Only cite information you have (do **not** make up or "hallucinate" answers).
- If you do not know the answer, apologize and politely redirect or suggest the user contact the theater.
- Always provide exact URLs (e.g., https://www.thea.com) and phone numbers (+30 210 123 4567) when requested or relevant.
- Never include information that is not explicitly provided here.

**You have access to the following set of tools. Here are the tool names and descriptions:**
{rendered_tools}

**Instructions:**
- Given the user’s input, choose the tool that best fits their request.
- Return your answer strictly as a JSON object with **'name'** and **'arguments'** keys.
- The **arguments** value should be a dictionary, with argument names as keys and their values filled in appropriately.
- **Do not provide explanations or any text outside the JSON.**
"""

tool_prompt = ChatPromptTemplate.from_messages(
    [("system", system_prompt), MessagesPlaceholder("chat_history"), ("user", "{input}")]
)
print(tool_prompt)


In [None]:
system_prompt2 = """
You are a **polite, warm, and professional AI assistant** for Thea, a theater in Athens, Greece.

**Thea - Information:**
- **Location:** Omonia StreetAthens, Greece (LatLng: 37.9838, 23.7275)
- **Website:** https://www.thea.com
- **Phone:** +30 210 123 4567
- **Instagram:** https://www.instagram.com/thea

**Hours of Operation:**
- Monday to Friday: 10:00 – 24:00
- Saturday & Sunday: 15:00 – 24:00

**Ticketing:** Tickets can be purchased online (https://www.thea.com), through the app, or at the counter desk.

**Accessibility:** The theater has a wheelchair accessible entrance and seating.

**Common User Queries:**
- "What's playing this weekend?"
- "How can I buy tickets?"
- "How can I cancel my ticket?" or "I want a refund for my ticket"
- "Where is the theatre located?"
- "What hours does the theatre work?"

**Important Response Guidelines:**
- Always use a warm, polite, professional, and informative tone.
- Only cite information you have (do **not** make up or "hallucinate" answers).
- If you do not know the answer, apologize and politely redirect or suggest the user contact the theater.
- Always provide exact URLs (e.g., https://www.thea.com) and phone numbers (+30 210 123 4567) when requested or relevant.
- Never include information that is not explicitly provided here.

**Instructions:**
- Given the user’s input, choose the tool that best fits their request.
- Return your answer strictly as a JSON object with **'name'** and **'arguments'** keys.
- The **arguments** value should be a dictionary, with argument names as keys and their values filled in appropriately.
- **Do not provide explanations or any text outside the JSON.**
---

**Previous Conversations:**
{chat_history}

**The user has asked this:**
{input}

**You have already used a tool that returned this:**
"""

generation_prompt = ChatPromptTemplate.from_messages(
    [("system", system_prompt2),
     ("user", "{output}")]
)
print(generation_prompt)


In [None]:
from typing import Any, Dict, Optional, TypedDict

from langchain_core.runnables import RunnableConfig

class ToolCallRequest(TypedDict):
    """A typed dict that shows the inputs into the invoke_tool function."""

    name: str
    arguments: Dict[str, Any]


def invoke_tool(tool_call_request: ToolCallRequest, config: Optional[RunnableConfig] = None):
    """A function that we can use the perform a tool invocation.

    Args:
        tool_call_request: a dict that contains the keys name and arguments.
            The name must match the name of a tool that exists.
            The arguments are the arguments to that tool.
        config: This is configuration information that LangChain uses that contains
            things like callbacks, metadata, etc.See LCEL documentation about RunnableConfig.

    Returns:
        output from the requested tool
    """
    tool_name_to_tool = {tool.name: tool for tool in tools}
    name = tool_call_request["name"]
    requested_tool = tool_name_to_tool[name]
    return requested_tool.invoke(tool_call_request["arguments"], config=config)

In [None]:
import re

def extract_json_only(text):
    """
    Extract only the first JSON object found in a string.
    Useful for isolating JSON output from LLMs with extra text/tokens.
    """
    match = re.search(r"\{.*?\}", text, re.DOTALL)
    if match:
        return match.group(0)
    else:
        return text  # Fallback: return original if JSON not found

def chat(prompt):
    """
    Main chat pipeline:
    - Passes prompt through tool selection chain (with JSON extraction/parsing)
    - Invokes selected tool
    - Generates a final natural language answer using the output of the tool
    - Saves chat context in memory (unless unrelated/gibberish)
    """
    print(prompt)
    try:
        # Tool selection pipeline: prompt → system → LLM → extract JSON → parse JSON → call tool
        tool_chain = (
            loaded_memory
            | tool_prompt
            | llama_model
            | RunnableLambda(extract_json_only)
            | JsonOutputParser()
            | invoke_tool
        )

        # Tool result: expected "CODE=> output"
        tool_result = tool_chain.invoke(prompt).split("=>", 1)
        code = tool_result[0].strip()
        tool_output = {'output': tool_result[1].strip()} if len(tool_result) > 1 else {'output': ''}
        tool_output['input'] = prompt.get('input', '')

        # Generation chain: uses tool output for final answer
        generation_chain = loaded_memory | generation_prompt | llama_model
        generated_text = generation_chain.invoke(tool_output).replace('`','').replace("AI:", "")

        # Debugging
        print("Code:", code)
        print("Tool Output:", tool_output)
        print("Generated:", generated_text)

        outputs = {'output': generated_text}

        # Save chat only if it's a valid exchange
        if code not in ["USER_INPUT_UNRELATED_TO_THEATER", "USER_INPUT_NOT_UNDERSTANDABLE"]:
            memory.save_context(prompt, outputs)

        return code, generated_text

    except Exception as e:
        print("CAUGHT_EXCEPTION:", str(e))
        return (
            "USER_INPUT_NOT_UNDERSTANDABLE",
            "I am sorry, I am afraid I do not understand what you want me to do. Please try again."
        )


In [None]:
def show_history(memory):
    print("Chat History is as follows: \n")
    chat_history = memory.buffer
    for i in range(0, len(chat_history), 2):
        user_message = chat_history[i].content
        ai_message = chat_history[i + 1].content
        print(f"User Prompt: {user_message}")
        print(f"AI: {ai_message}")

In [None]:
def clear_history(memory):
    memory.clear()
    print("Chat history was cleared successfully!")

In [None]:
show_history(memory)

In [None]:
clear_history(memory)

In [None]:
inputs = {'input': "I want to see Romeo and Juliet"}
response = chat(inputs)
print(f"========> {response}")

# Connection with Thea App

In [None]:
!pip install pyngrok

In [None]:
from flask import Flask
from pyngrok import ngrok
from flask import request, jsonify

In [None]:
app = Flask(__name__)
ngrok.set_auth_token(user_secrets.get_secret("ngrok_token"))
public_url = ngrok.connect(5000).public_url

@app.route("/")
def home():
    return "Hello, World!"

@app.route('/send_message', methods=['POST'])
def send_message():
    data = request.get_json()
    message = data.get('message', '')
    global memory

    if message == "CLEAR_HISTORY":

        clear_history(memory)
        show_history(memory)

    else:
        message = {'input': message}

        global inputs
        inputs = message
        print("inputs: ", inputs)

        if message:
            response_message = chat(message)

            print("response_message:", response_message)

            return jsonify({'code': response_message[0], 'response': response_message[1]})
        else:
            return jsonify({'error': 'No message provided'}), 400

print(f"To access the global link please click {public_url}")

app.run(port=5000)