In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import joblib
import numpy as np
import random
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from transformers import pipeline
import gradio as gr

In [None]:
BASE_DIR = "/content/drive/MyDrive/Supermarket_chatbot"

In [None]:
# Load trained SVD model
svd = joblib.load(f"{BASE_DIR}/svd_model.pkl")

# Load saved arrays
product_ids = np.load(f"{BASE_DIR}/user_product_matrix_columns.npy", allow_pickle=True)
user_ids = np.load(f"{BASE_DIR}/user_product_matrix_index.npy", allow_pickle=True)
user_product_matrix = np.load(f"{BASE_DIR}/user_product_matrix.npy", allow_pickle=True)


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [None]:
product_ids = np.array(product_ids, dtype=int).ravel()
user_ids = np.array(user_ids, dtype=int).ravel()

In [None]:
products_df = pd.read_csv(f"{BASE_DIR}/Products.csv")

In [None]:
# Check if everything is ok
if "ProductID" not in products_df.columns or "ProductName" not in products_df.columns:
    raise ValueError("Products.csv must include columns: 'ProductID' and 'ProductName'.")

products_df["ProductID"] = products_df["ProductID"].astype(int)
products_df["ProductName"] = products_df["ProductName"].astype(str)
PRODUCT_NAME = dict(zip(products_df["ProductID"], products_df["ProductName"]))

len(product_ids), len(user_ids), user_product_matrix.shape

(50, 100, (100, 50))

In [None]:
def get_product_name(product_id: int) -> str:
    return PRODUCT_NAME.get(int(product_id), f"Product {product_id}")

In [None]:
def recommend_products_for_user(user_id: int, top_n: int = 5):
    """
    Returns a list of product names recommended for the given user_id.
    - If the user is unknown, returns random products.
    - Excludes already-rated items for known users.
    """
    # If user not in known IDs -> return random products
    if int(user_id) not in set(user_ids.tolist()):
        safe_n = min(top_n, len(product_ids))
        random_product_ids = random.sample(list(map(int, product_ids)), safe_n)
        return [get_product_name(pid) for pid in random_product_ids]

    # Locate user's row in the matrix
    user_index = int(np.where(user_ids == int(user_id))[0][0])

    # Ratings vector for this user
    user_ratings = user_product_matrix[user_index]

    # Predict with SVD
    user_transformed = svd.transform(user_ratings.reshape(1, -1))
    predicted_ratings = np.dot(user_transformed, svd.components_)

    # Exclude already-rated items
    already_rated_mask = user_ratings > 0
    preds = predicted_ratings[0].copy()
    preds[already_rated_mask] = -np.inf

    # How many can we actually recommend?
    can_recommend = np.isfinite(preds).sum()
    safe_n = min(top_n, int(can_recommend))

    if safe_n == 0:
        # If user rated everything or no unrated items, return random
        safe_n = min(top_n, len(product_ids))
        random_product_ids = random.sample(list(map(int, product_ids)), safe_n)
        return [get_product_name(pid) for pid in random_product_ids]

    # Top-N indices
    top_idx = np.argsort(preds)[::-1][:safe_n]
    rec_product_ids = product_ids[top_idx]

    return [get_product_name(int(pid)) for pid in rec_product_ids]


In [None]:
# Test with a known user
test_user = 1
print(f"\nRecommendations for known user {test_user}:")
print(recommend_products_for_user(test_user, top_n=5))

# Test with an unknown user
unknown_user = 10000
print(f"\nRecommendations for unknown user {unknown_user}:")
print(recommend_products_for_user(unknown_user, top_n=5))



Recommendations for known user 1:
['Strawberries', 'Cream', 'Coca Cola', 'Nuts Mixed', 'Eggplant']

Recommendations for unknown user 99999999:
['Potatoes', 'Iced Coffee', 'Butter', 'Green Tea', 'Crackers']


In [None]:
# Load Dataset
faq = pd.read_csv(f"{BASE_DIR}/faq.csv")
products = pd.read_csv(f"{BASE_DIR}/Products.csv")

PRODUCT_NAME = dict(zip(products['ProductID'].astype(int), products['ProductName'].astype(str)))

In [None]:
# FAQ General
faq_vectorizer = TfidfVectorizer(stop_words="english")
faq_X = faq_vectorizer.fit_transform(faq['question'])

def get_faq_answer(user_question):
    user_vec = faq_vectorizer.transform([user_question])
    similarities = cosine_similarity(user_vec, faq_X)
    best_match_idx = similarities.argmax()
    best_score = similarities[0, best_match_idx]
    if best_score < 0.2:
        return None
    return faq['answer'][best_match_idx]

In [None]:
# FAQ Product
product_vectorizer = TfidfVectorizer(stop_words="english")
product_X = product_vectorizer.fit_transform(products['ProductName'].astype(str))

# State variables
waiting_for_reco_consent = False   # True after we showed a product and asked user whether they want recommendations
waiting_for_id = False             # True after user answered 'yes' and we are waiting for ID
last_product_shown = None          # store last product

def get_product_answer(user_question):
    """Return product info string."""
    global last_product_shown, waiting_for_reco_consent
    user_vec = product_vectorizer.transform([user_question])
    similarities = cosine_similarity(user_vec, product_X)
    best_match_idx = similarities.argmax()
    best_score = similarities[0, best_match_idx]
    if best_score < 0.7:
        return None
    row = products.iloc[best_match_idx]
    last_product_shown = row['ProductName']
    # set the variable so next user message will be in recommendation flow
    waiting_for_reco_consent = True
    return (f"🛒 {row['ProductName']} ({row['Category']}) costs {row['Price']}.\n"
            f"Description: {row['Description']}\n\n"
            "Would you like personalized recommendations based on your history? (yes/no)")

In [None]:
# Greeting
greetings = ["hello", "hi", "hey", "good morning", "good evening","how are you"]

def get_greeting_answer(user_question):
    user_question_lower = user_question.lower()
    if any(greet in user_question_lower for greet in greetings):
        return "Hello! 👋 How can I help you today?"
    return None


In [None]:
# Recommendation format
def format_recommendations(user_id):
    """
    Call your existing recommend_products_for_user(user_id) function,
    then format the returned recommendations with
    product category, price, and description.
    """
    try:
        recs = recommend_products_for_user(user_id, top_n=5)  # <-- uses your SVD recommender
    except Exception as e:
        return f"Sorry, I couldn't fetch recommendations due to an internal error: {e}"

    formatted = []
    for item in recs:
        try:
            # check if item looks like an int id
            if isinstance(item, (int, np.integer)) or (isinstance(item, str) and item.isdigit()):
                pid = int(item)
                name = PRODUCT_NAME.get(pid, None)
            else:
                name = str(item)
                pid = None
        except Exception:
            name = str(item)
            pid = None

        row = None
        if pid is not None:
            row = products[products['ProductID'] == pid]
        else:
            row = products[products['ProductName'].astype(str).str.lower() == name.lower()]

        if not row.empty:
            row = row.iloc[0]
            formatted.append(f"- {row['ProductName']} ({row['Category']}) — ${row['Price']}\n  {row['Description']}")
        else:
            formatted.append(f"- {name}")

    if not formatted:
        return "No recommendations available right now."
    return "Based on your history, we recommend:\n" + "\n".join(formatted)


In [None]:
model_pipeline = pipeline(
    "text2text-generation",
    model="google/flan-t5-large",
    device_map="auto",
    torch_dtype="auto"
)

def ask_model(question):
    response = model_pipeline(
        question,
        max_new_tokens=100,
        do_sample=True,
        temperature=0.4
    )
    return response[0]["generated_text"]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/662 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.13G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

Device set to use cuda:0


In [None]:
# Load sentiment model
sentiment_pipeline = pipeline("sentiment-analysis")

No model was supplied, defaulted to distilbert/distilbert-base-uncased-finetuned-sst-2-english and revision 714eb0f (https://huggingface.co/distilbert/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

Device set to use cuda:0


In [None]:
positive_responses = [
      "Thank you so much for the kind words!",
       "We’re glad you’re satisfied with our service!",
       "Your positive feedback means a lot to us!"
]

negative_responses = [
        "We’re sorry to hear that. We'll work on improving!",
        "Thanks for being honest, we’ll take your feedback seriously.",
        "We’ll do our best to make your experience better next time."
]

In [None]:
def get_sentiment_response(user_q):
    try:
        result = sentiment_pipeline(user_q)[0]
        label = result["label"].lower()
        score = result["score"]

        if score > 0.9:
            if "positive" in label:
                return random.choice(positive_responses)
            elif "negative" in label:
                return random.choice(negative_responses)
    except:
        pass

    return "Thank you for your review! Your feedback is valuable to us."

In [None]:
def get_review_prompt():
    return "Would you like to review our service? (yes/no)"

In [None]:
def process_review_consent(user_q):
    """Handles yes/no consent for leaving a review."""
    if "yes" in user_q.lower():
        return "Great! Please type your review below.", "review_text"
    elif "no" in user_q.lower():
        return "No worries! Thanks for chatting with us. 😊", None
    else:
        return "Would you like to leave a short review of your experience? (yes/no)", "review_consent"

In [None]:
def process_review_text(user_q):
    return get_sentiment_response(user_q)

In [None]:
# Chatbot
counter = 0
chat_state = None

def chatbot_response(user_question):
    global waiting_for_reco_consent, waiting_for_id, counter, chat_state

    user_q = user_question.strip()

    # review flow
    if chat_state == "review_consent":
        response, next_state = process_review_consent(user_q)
        chat_state = next_state
        return response

    elif chat_state == "review_text":
        response = process_review_text(user_q)
        chat_state = None
        return response

    # recommendation flow
    elif chat_state == "reco_consent":
        low = user_q.lower()
        if low.startswith("yes"):
            parts = user_q.split()
            if len(parts) > 1 and parts[1].isdigit():
                chat_state = None
                return format_recommendations(int(parts[1]))
            else:
                chat_state = "reco_id"
                return "Great, please enter your customer ID (numbers only)."
        elif low in ["no", "n", "nope", "nah"]:
            chat_state = None
            return "Alright, let me know if you need anything else."
        else:
            return "Please answer with 'yes' or 'no' (e.g., 'yes 1234')."

    elif chat_state == "reco_id":
        if user_q.isdigit():
            chat_state = None
            return format_recommendations(int(user_q))
        else:
            return "Please enter a valid numeric customer ID."

  # Flow
    answer = None

    # 1. Greeting
    answer = get_greeting_answer(user_q)
    if answer:
        counter += 1

    # 2. FAQ
    if not answer:
        answer = get_faq_answer(user_q)
        if answer:
            counter += 1

    # 3. Product
    if not answer:
        answer = get_product_answer(user_q)
        if answer:
            counter += 1
            chat_state = "reco_consent"  # Set for recommendation flow

    # 4. LLM model
    if not answer:
        answer = ask_model(user_q)
        if answer:
            counter += 1

    # ask a review
    if counter >= 5 and chat_state is None:
        counter = 0
        chat_state = "review_consent"
        if answer:
            return f"{answer}\n\n{get_review_prompt()}"
        else:
            return get_review_prompt()

    if answer:
        return answer
    else:
        return "Sorry, I couldn't understand that."

In [None]:
# Gradio UI
def gradio_chatbot_response(message, history):
    return chatbot_response(message)

In [None]:
# Create the Gradio interface
demo = gr.ChatInterface(
    fn=gradio_chatbot_response,
    title="🛒 Supermarket Chatbot",
    type="messages",
    theme="soft"
)

In [None]:
if __name__ == "__main__":
    demo.launch(share=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://92cedc86fbe80ef686.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)
