In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os
projectPath = '/content/drive/MyDrive/RAG_Chatbot_System'
os.chdir(projectPath)

In [None]:
# Required packages
!pip install -U sentence-transformers chromadb

In [None]:
!pip install transformers torch huggingface-hub -qqq

In [None]:
!pip install -q accelerate bitsandbytes

In [None]:
# Import Required packages
# import config  # Token Key
import pandas as pd
import numpy as np
import chromadb
import re
import logging
import torch
import textwrap
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, PreTrainedTokenizerBase
from typing import Dict, Any, List, Union, Optional
from huggingface_hub import login
import hashlib
import pickle

login(token = config.huggingFaceToken) # Your Hugging Face Login Token

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
# Load the CSV file
def load_csv(fileName: str = 'tnt.csv', encoding: Optional[str] = 'utf-8') -> pd.DataFrame:
  """ Load a CSV file into a pandas DataFrame.
  Args:
      fileName (str, optional): Path to the CSV file. Defaults to 'tnt.csv'.
      encoding (str, None, optional): Encoding of the CSV file. Defaults to 'utf-8'.

  Returns:
      pd.DataFrame: The loaded DataFrame.

  Raises:
      FileNotFoundError: If the specified file does not exist.
      pd.errors.ParserError: If the CSV file is malformed.
      UnicodeDecodeError: If the file cannot be decoded with the specified encoding.
      Exception: For other unexpected errors during CSV loading.
  """
  try:
    df = pd.read_csv(fileName, encoding=encoding)
  except FileNotFoundError:
    logger.error(f"Error: File '{fileName}' not found. Please ensure the file exists in the working directory.")
    raise
  except pd.errors.ParserError as e:
    logger.error(f"Error: Failed to parse CSV file '{fileName}'. The file may be malformed: {e}")
    raise
  except UnicodeDecodeError as e:
    logger.error(f"Error: Unable to decode file '{fileName}' with encoding '{encoding}': {e}")
    raise
  except Exception as e:
    logger.error(f"Error: An unexpected error occurred while loading '{fileName}': {e}")
    raise
  else:
    logger.info(f"Successfully loaded the file: {fileName}")
    return df

In [None]:
#Extract numeric amount for price or discount
def extract_numeric_amount(number: Union[str, float, int, None]) -> Optional[float]:

  """ Convert number strings or numbers to numeric float values.

  Handles strings with commas, alphabet letters (e.g., '9% off'), currency symbols, or decimal points (e.g., '$1,500.99')
  and numeric inputs (e.g., 1500, 1500.0). Returns None for invalid or missing inputs.

  Args:
      number (str, float, int, None): Input number, which can be a string (e.g., '1,500', '$1,500.99', '9% off'),
      a number (e.g., 1500, 1500.0), or None/Nan.

  Returns:
      Optional[float]: The cleaned numeric value as a float, or None if the input is invalid or NaN.

  Raises:
      TypeError: If the input type is not str, float, int, or None/Nan.
      ValueError: If the input string cannot be converted to a valid float (e.g., 'abc', '1.2.3').
      Exception: For other unexpected errors during cleaning number.

  Examples:
      >>> extract_numeric_amount('1,500')
      1500.0
      >>> extract_numeric_amount('$1,500.99')
      1500.99
      >>> extract_numeric_amount('1500')
      1500.0
      >>> extract_numeric_amount('None')
      None
      >>> extract_numeric_amount('abc')
      None
      >>> extract_numeric_amount('9% off')
      9.0
  """
  try:
    # Handle None, NaN, or pd.NA
    if number is None or pd.isna(number):
      return None

    # Handle numeric inputs (int or float)
    if isinstance(number, float):
      return round(number,2)
    if isinstance(number, int):
      return float(number)

    if isinstance(number, str):
      number_str = number.strip().lower()
      if not number_str or number_str in ('none', 'nan', ''):
        return None

      # Remove non-numeric characters except dots and negative sign
      cleaned_number = re.sub(r'[^0-9.\-]', '', number_str)

      # Validate the cleaned string
      if (not cleaned_number
          or cleaned_number.count('.') > 1
          or cleaned_number.count('-') > 1
          or cleaned_number.find('-') > 0
          ):
        return None

      return round(float(cleaned_number), 2)

    # If other type, raise error
    raise TypeError(f"Unsupported type for number: {type(number)}")

  except ValueError as e:
    logger.error(f"Error: Cannot convert number '{number}' to float: {e}")
    return None

  except TypeError as e:
    logger.error(f"Type error with number '{number}': {e}")
    raise

  except Exception as e:
    logger.error(f"Error: Unexpected error while processing number '{number}': {e}")
    raise

In [None]:
def generate_embeddings(texts:
                        Union[str, List[str]],
                        model: SentenceTransformer
                        ) -> List[List[float]]:
    """ Generate embeddings for a list of texts using the sentence transformer model.

    Args:
        texts (Union[str, List[str]]): Input text or list of tests to encode.
        model (SentenceTransformer): Pretrained SentenceTransformer model.

    Returns:
        List[List[float]]: List of embeddings as lists of floats.

    Raises:
        Exception: For unexpected errors during embeddings.
    """

    if not texts:
        return []

    # Ensure texts is a list; if a single string, convert to list
    if isinstance(texts, str):
        texts = [texts]

    try:
        # Generate embeddings in batch
        embeddings: np.ndarray = model.encode(texts,
                                              show_progress_bar=True,
                                              convert_to_numpy=True
                                              )
        # Convert NumPy array to list for compatibility with ChromaDB
        return embeddings.tolist()
    except Exception as e:
        print(f"Error generating embeddings: {e}")
        raise

In [None]:
# Create metadata for ChromaDB
def create_metadata(row: pd.Series) -> Dict[str, Any]:
    """Create JSON-serializable metadata dictionary."""
    return {
        'price': row['Price'] if pd.notna(row['Price']) else None,
        'original_price': row['Original Price'] if pd.notna(row['Original Price']) else None,
        'discount': row['Discount'] if pd.notna(row['Discount']) else 0.0,
        'sizes': row['Sizes'] if pd.notna(row['Sizes']) else '',
        'product_link': row['Product Link'] if pd.notna(row['Product Link']) else '',
        'image_urls': row['Image URLs'] if pd.notna(row['Image URLs']) else ''
    }

In [None]:
def create_and_populate_chromadb(df):
    """Create a ChromaDB collection and add data."""
    client = chromadb.PersistentClient(path="/content/drive/MyDrive/RAG_Chatbot_System/chroma_db")
    try:
        collection = client.get_or_create_collection("TNT_Store")
        print("Collection created successfully...")

        # Add data to the collection
        collection.add(
            ids=[str(i) for i in range(len(df))],
            documents = df['text_to_embed'].tolist(),
            embeddings=df['embeddings'].tolist(),
            metadatas=df['metadata'].tolist()
        )
        print("Data added to collection successfully...")
        return collection
    except Exception as e:
        print(f"Error creating or populating collection: {e}")
        return None

In [None]:
def format_context(results, df):
  context_lines = []
  for id_, metadata, distance in zip(results['ids'][0], results['metadatas'][0], results['distances'][0]):
    product_name = df.loc[int(id_), 'Title'] if int(id_) in df.index else 'N/A'
    description = df.loc[int(id_), 'Description'] if int(id_) in df.index else 'N/A'

    product_info = (
        f"\n Product ID: {id_}, Distance: {distance:.4f}\n"
        f"Product Name: {product_name}\n"
        f"Product Link: {metadata['product_link'] or 'N/A'}\n"
        f"Orginal Price: {metadata['original_price'] if metadata['original_price'] is not None else 'N/A'}\n"
        f"Price: {metadata['price'] if metadata['price'] is not None else 'N/A'}\n"
        f"Discount: {metadata['discount']}% off\n"
        f"Sizes: {metadata['sizes'] or 'N/A'}\n"
        f"Description: {description}\n"
    )
    context_lines.append(product_info)
  return "\n---\n".join(context_lines)

In [None]:
def generate_response(query: str,
                      tokenizer: AutoTokenizer,
                      model: AutoModelForCausalLM,
                      context: str = "",
                      extra: bool = False,
                      max_new_tokens: int = 200,
                      temperature: float = 0.3,
                      top_p: float = 0.8
                      ) -> str:
  """ Generate a response using the language model based on the query and context. """

  if context.strip():
    prompt = (
        f"You are a helpful and concise shopping assistant for an online clothing store. "
        f"The user asked: '{query}'. "
        f"Here are relevant products from the catalog:\n{context}\n\n"
        f"Reply briefly and naturally. "
        f"If the user asks about a product category or availability (e.g., 'Do you have sweaters?'), "
        f"respond with a short yes/no and mention some matching items. "
        f"If the user asks about a specific product, give a short description with key details like material, sizes, or price. "
        f"Only include product links if directly relevant. "
        f"Keep it friendly and to the point.\n\n"
        f"Response:"
        )

  elif extra:
    prompt = (
        f"You are a helpful assistant for a clothing store.\n"
        f"The user asked: {query}\n"
        f"Politely respond with something like: "
        f"'Sorry, we’re unable to answer your question as it falls outside the scope of our clothing store services.'\n"
        f"Also remind the user they can type 'exit' to end the conversation.\n"
        f"Response:"
        )

  else:
    prompt = (
        f"You are a helpful assistant for a clothing store.\n"
        f"The user asked: {query}\n"
        f"Start with a friendly greeting, ask what they're looking for, and offer help finding clothing items.\n"
        f"Response:"
        )

  # input_text = f"User query: {query}\n\nContext:\t{context}\n\nAnswer:\t"

  inputs = tokenizer(prompt, return_tensors="pt", padding=True, truncation=True, return_attention_mask=True)

  inputs = {key: value.to(model.device) for key, value in inputs.items()}

  with torch.inference_mode():
    outputs = model.generate(
        input_ids=inputs["input_ids"],
        attention_mask=inputs["attention_mask"],
        max_new_tokens=max_new_tokens,
        temperature=temperature,
        top_p=top_p,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id,
        num_return_sequences=1
        )

  response_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
  response_start = response_text.find("Response:") + len("Response:")
  return response_text[response_start:].strip()

In [None]:
def truncate_context(context: str, tokenizer: PreTrainedTokenizerBase, max_tokens: int = 1024) -> str:
  """Truncate context to a maximum number of tokens."""
  tokens = tokenizer.encode(context, truncation=True, max_length=max_tokens)
  return tokenizer.decode(tokens, skip_special_tokens=True)

In [None]:
def hash_file(filepath: str) -> str:
  """Generate SHA256 hash of a file"""
  with open(filepath, "rb") as f:
    return hashlib.sha256(f.read()).hexdigest()

In [None]:
def has_csv_changed(csv_path: str, hash_path: str = "data_hash.txt") -> bool:
  current_hash = hash_file(csv_path)

  # If hash file doesn't exist, assume it's changed
  if not os.path.exists(hash_path):
    with open(hash_path, "w") as f:
      f.write(current_hash)
    return True

  with open(hash_path, "r") as f:
    previous_hash = f.read().strip()

  # If hash differs, update and retrun True
  if current_hash != previous_hash:
    with open(hash_path, "w") as f:
      f.write(current_hash)
    return True

  return False

In [None]:
def extract_email(text: str) -> Optional[str]:
  """Extract a vaild email from a text string."""
  match = re.search(r"[\w\.-]+@[\w\.-]+\.\w+", text)
  if match:
    return match.group(0)
  return None

def is_valid_email(email: str) -> bool:
  return re.match(r"[^@]+@[^@]+\.[^@]+", email) is not None

def save_email_to_csv(email: str, query: str, file_path: str = "interested_customers.csv"):
  new_entry = pd.DataFrame([[email, query]], columns = ["Email", "Query"])

  if os.path.exists(file_path):
    existing_data = pd.read_csv(file_path)
    updated_data = pd.concat([existing_data, new_entry], ignore_index=True)
  else:
    updated_data = new_entry

  updated_data.to_csv(file_path, index=False)

# !pip install email-validator

# from email-validator import validate_email, EmailNotValidError

# def is_valid_email(email: str) -> bool:
#   try:
#     validate_email(email)
#     return True
#   except EmailNotValidError:
#     return False

In [None]:
def is_product_query(query: str) -> bool:
  keywords = [
      # Basic intent
      "buy", "purchase", "order", "have", "sell", "selling", "looking for", "need", "want",
      "available", "product", "item", "in stock", "get", "shop", "price", "cost", "quote",
      "how much", "find", "availability", "stock", "deal", "offer", "discount", "checkout",
      "cart", "wishlist", "restock", "can I get", "do you have", "i'm after", "pick up",
      "deliver", "shipping", "acquire", "procure",

      # Slang & casual speak
      "hook me up", "got some", "snag one", "cop", "grab one", "where’s it at",
      "buying vibes", "hit me with", "send me the link", "link to buy"
  ]
  return any(word in query.lower() for word in keywords)

In [None]:
def is_greeting_query(query: str) -> bool:
    keywords = [
        "hi", "hello", "hey", "greetings", "good morning", "good afternoon",
        "good evening", "howdy", "what's up", "how's it going", "yo", "sup",
        "what's good", "salutations", "hey there", "yo yo", "wassup", "hiya",
        "what's crackin'", "sup dude", "what's popping", "g'day", "mornin'",
        "heeeey", "wuddup", "cheers", "hail", "wazzup", "yello", "heyyy",
        "what's happening", "look who's here", "ahoy", "how do", "how's everything",
        "what’s new", "what’s up with you", "long time no see", "all good?",
        "what's the word", "what's going on", "good to see you", "hello there",
        "how are things", "what’s cracking", "what’s the haps", "sup fam", "hey hey",
        "yo, what's up", "how’s life", "what’s up with that", "what’s shaking",
        "how’s your day", "everything alright?", "what’s cooking", "howdy partner",
        "what’s the deal", "how goes it", "what’s the buzz", "what’s going down",
        "what’s the scoop"
    ]

    query_cleaned = re.sub(r'[^a-zA-Z\s]', '', query).lower()

    return any(word in query_cleaned for word in keywords)

In [None]:
 # Initilize the sentence transformer model
try:
  embeddingModel = SentenceTransformer('sentence-transformers/paraphrase-multilingual-mpnet-base-v2')
except Exception as e:
  print(f"Error loading model: {e}")
  raise

#Initilize the generation model
model_id = "mistralai/Mistral-7B-Instruct-v0.3"
generationTokenizer = AutoTokenizer.from_pretrained(model_id)

if generationTokenizer.pad_token is None:
  generationTokenizer.pad_token = generationTokenizer.eos_token

generationModel = AutoModelForCausalLM.from_pretrained(
    model_id,
    device_map="auto",  # put model on GPU automatically
    quantization_config=bnb_config
    )

In [None]:
csv_file = 'tnt.csv'

pkl_path = "embeddings.pkl"

# need_to_generate = (
#     has_csv_changed(csv_file)
#     or not os.path.exists(pkl_path)
#     or os.path.getsize(pkl_path) == 0
# )

need_to_generate = True

if need_to_generate:
  print("Generating embeddings and saving...")

  #Load the CSV file
  df = load_csv()

  # CLean and normalize data
  df['Price'] = df['Price'].apply(extract_numeric_amount)
  df['Original Price'] = df['Original Price'].apply(extract_numeric_amount)
  df['Discount'] = df['Discount'].apply(extract_numeric_amount)
  df['Title'] = df['Title'].fillna('').str.lower().str.strip()
  df['Variants'] = df['Variants'].fillna('').str.lower().str.strip()
  df['Description'] = df['Description'].fillna('').str.lower().str.strip()
  df['Sizes'] = df['Sizes'].fillna('').str.lower().str.strip()

  # Fill missing Original Price with Price
  df['Original Price'] = df['Original Price'].fillna(df['Price'])

  # Create a combined text field for embedding
  df['text_to_embed'] = (df['Title'] + ' ' +
                        df['Variants'] + ' ' +
                        df['Description'] + ' ' +
                        'Sizes: ' + df['Sizes']).str.strip()

  # Generate embeddings
  try:
    # embeddings = model.encode(df['text_to_embed'].tolist(), show_progress_bar=True)
    df['embeddings'] = generate_embeddings(df['text_to_embed'].tolist(), embeddingModel)
  except Exception as e:
    print(f"Error generating embedding: {e}")
    raise

  # Create metadata
  df['metadata'] = df.apply(create_metadata, axis=1)

  # Save embeddings and metadata
  with open("embeddings.pkl", "wb") as f:
    pickle.dump(df[['embeddings', 'metadata']], f)

  print("Embeddings saved.")

else:
  print("No CSV change: loading cached embeddings...")
  df = load_csv(csv_file)
  with open("embeddings.pkl", "rb") as f:
    loaded = pickle.load(f)
  df['embeddings'] = loaded['embeddings']
  df['metadata'] = loaded['metadata']


In [None]:
collection = create_and_populate_chromadb(df)

In [None]:
while(True):
  query = input("Hello, How can I help you:\n")
  if query.lower() == 'exit':
    break

  queryEmbedding = generate_embeddings([query], embeddingModel)[0]
  context = collection.query(query_embeddings=[queryEmbedding], n_results=10)

  topDistance = context['distances'][0][0]
  threshold = 8.0

  if topDistance > threshold:
    greeting = is_greeting_query(query)
    unknownProduct = is_product_query(query)

    if unknownProduct:
      print("Sorry, we couldn't find any products related to your query.")
      email_input = input("Would you like to be notified when such products become available? Please enter your email, or type 'no' to skip:\n")
      if email_input.lower() in ['no', 'n']:
        print(f"No problem. Thank you for visiting!")
      else:
        email = extract_email(email_input)
        if email is not None and is_valid_email(email):
          save_email_to_csv(email, query)
          print(f"Thank you! We'll notify you at {email} as soon as relevant products are available.")
        else:
          print("That doesn't seem like a valid email address. Please try again next time.")
      continue

    elif greeting:
      response = generate_response(query, generationTokenizer, generationModel)

    else:
      extra = True
      response = generate_response(query, generationTokenizer, generationModel, extra=extra)

  else:
    retrievedContext = format_context(context, df)
    truncatedContext = truncate_context(retrievedContext, generationTokenizer, max_tokens=1024)
    response = generate_response(query, generationTokenizer, generationModel, context=truncatedContext)

  print("Answer: \n")
  wrapped = textwrap.fill(response, width=100)
  print(wrapped)

In [None]:
# # Debugging
# while(True):
#   query = input("What's up:")
#   if query.lower() == 'exit':
#     break
#   queryEmbedding = generate_embeddings([query], embeddingModel)[0]
#   context = collection.query(query_embeddings=[queryEmbedding], n_results = 5)
#   results = context["documents"]
#   distance = context["distances"]
#   print(context)
#   print(results)
#   print("\n",distance)