In [None]:
# ============================================
# CIS 583 Term Project - Coffee Chain Chatbot
# LSTM + Transformer + GRU
# ============================================

from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import re
import random

from tensorflow import keras
from tensorflow.keras.layers import Embedding, LSTM, GRU, Dense, Dropout, Input
from tensorflow.keras.models import Model



def load_sales_data(file_path):
    """
    Loads a sales DataFrame from a CSV file.
    """
    try:
        df = pd.read_csv(file_path)
        df.columns = df.columns.str.strip().str.replace(' ', '_')
        return df
    except FileNotFoundError:
        print(f"Error: The file '{file_path}' was not found.")
        return None

df_sales = load_sales_data("/content/Coffee_Chain_Sales .csv")

if df_sales is None:
    # Exit if the file wasn't found.
    raise SystemExit("Exiting due to missing data file.")

print("Loaded Sales Data:")
print(df_sales.head())
print("\nDataFrame Info:")
df_sales.info()




class SalesChatbot:
    """
    A simple rule-based chatbot for sales analytics queries
    based on the coffee chain data.
    """
    def __init__(self, data_frame):
        self.df = data_frame
        self.greetings = [
            "Hello! How can I help you with sales analytics today?",
            "Hi there! I'm ready to answer your sales questions.",
            "Welcome! What sales data can I analyze for you?"
        ]
        self.default_responses = [
            "I'm not sure I understand that query. Can you please rephrase it?",
            "Sorry, I can't process that request right now.",
            "I need more information to answer that question."
        ]

    def get_greeting(self):
        """Returns a random greeting."""
        return random.choice(self.greetings)

    def get_response(self, query):
        """
        Processes a user query and returns a response based on keywords.
        (Purely rule-based, no ML here.)
        """
        query_lower = query.lower()

        if 'total sales' in query_lower or 'total revenue' in query_lower:
            return self.get_total_sales()
        elif 'best selling' in query_lower or 'most popular' in query_lower:
            return self.get_best_selling_product()
        elif 'sales by region' in query_lower or 'region sales' in query_lower:
            return self.get_sales_by_market()
        elif 'sales for' in query_lower:
            match = re.search(r'sales for (\w+)', query_lower)
            if match:
                product = match.group(1).capitalize()
                return self.get_sales_for_product(product)
            else:
                return "Please specify a product name, e.g., 'sales for Espresso'."
        else:
            return random.choice(self.default_responses)

    def get_total_sales(self):
        """Calculates and returns the total sales from the 'Sales' column."""
        total_sales = self.df['Sales'].sum()
        return f"The total sales for the period is: ${total_sales:,.2f}"

    def get_best_selling_product(self):
        """Identifies and returns the best-selling product based on total sales."""
        best_product = self.df.groupby('Product')['Sales'].sum().idxmax()
        return f"The best-selling product is: {best_product}."

    def get_sales_by_market(self):
        """Calculates and returns sales data by market from the 'Market' column."""
        regional_sales = (
            self.df.groupby('Market')['Sales']
            .sum()
            .sort_values(ascending=False)
        )
        response = "Total sales by market:\n"
        for region, sales in regional_sales.items():
            response += f"- {region}: ${sales:,.2f}\n"
        return response.strip()

    def get_sales_for_product(self, product):
        """Calculates and returns total sales for a specific product."""
        product_sales = self.df[self.df['Product'].str.lower() == product.lower()]['Sales'].sum()
        if product_sales > 0:
            return f"The total sales for {product} is: ${product_sales:,.2f}"
        else:
            return f"No sales data found for the product '{product}'."



queries = [
    "What is the total sales?",
    "Show me the sales by market.",
    "Which product is the best-selling?",
    "Show me sales for the Caffe Mocha.",
    "Tell me about the total revenue.",
    "What are the sales by region?",
    "How much was sold for espresso?",
    "What's the best product?"
]

# Intent IDs: 0, 1, 2, 3
intents = [0, 1, 2, 3, 0, 1, 3, 2]
intent_map = {
    0: "total_sales",
    1: "sales_by_market",
    2: "best_selling_product",
    3: "sales_for_product"
}

tokenizer = keras.preprocessing.text.Tokenizer(num_words=100)
tokenizer.fit_on_texts(queries)
sequences = tokenizer.texts_to_sequences(queries)
padded_sequences = keras.preprocessing.sequence.pad_sequences(sequences, padding='post')
labels = np.array(intents)

print("\n--- NLP Model Data Preparation ---")
print("Padded Sequences:")
print(padded_sequences)
print("Labels:")
print(labels)


# Build the LSTM model
def build_lstm_model(input_shape, vocab_size, embedding_dim, num_classes):
    """Builds a simple LSTM model for intent classification."""
    inputs = Input(shape=(input_shape,))
    x = Embedding(vocab_size, embedding_dim)(inputs)
    x = LSTM(128, return_sequences=False)(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)
    model = Model(inputs=inputs, outputs=outputs, name="LSTM_Model")
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Build the Transformer model
def build_transformer_model(input_shape, vocab_size, embedding_dim, num_classes, head_size, num_heads):
    """Builds a simple Transformer model for intent classification."""
    def transformer_encoder(inputs, head_size, num_heads):
        x = keras.layers.MultiHeadAttention(
            key_dim=head_size, num_heads=num_heads, dropout=0.5
        )(inputs, inputs)
        x = keras.layers.Dropout(0.5)(x)
        x = keras.layers.LayerNormalization(epsilon=1e-6)(x)
        res = x + inputs

        x = keras.layers.Dense(head_size * 2, activation="relu")(res)
        x = keras.layers.Dropout(0.5)(x)
        x = keras.layers.Dense(inputs.shape[-1])(x)
        x = keras.layers.LayerNormalization(epsilon=1e-6)(x)
        return x + res

    inputs = Input(shape=(input_shape,))
    x = Embedding(vocab_size, embedding_dim)(inputs)
    x = transformer_encoder(x, head_size=head_size, num_heads=num_heads)
    x = keras.layers.GlobalAveragePooling1D(data_format="channels_first")(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs, name="Transformer_Model")
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

# Model parameters
max_sequence_length = padded_sequences.shape[1]
vocab_size = len(tokenizer.word_index) + 1
embedding_dim = 64
num_classes = len(np.unique(labels))
head_size = 128
num_heads = 2

# Build and train the LSTM model
print("\n--- Training the LSTM Model ---")
lstm_model = build_lstm_model(max_sequence_length, vocab_size, embedding_dim, num_classes)
lstm_history = lstm_model.fit(
    padded_sequences, labels,
    epochs=10,
    verbose=0
)
print("LSTM Model training finished.")

# Build and train the Transformer model
print("\n--- Training the Transformer Model ---")
transformer_model = build_transformer_model(
    max_sequence_length, vocab_size, embedding_dim, num_classes, head_size, num_heads
)
transformer_history = transformer_model.fit(
    padded_sequences, labels,
    epochs=10,
    verbose=0
)
print("Transformer Model training finished.")




def predict_intent(model, query):
    """Predicts the intent of a user query using a given model."""
    new_sequence = tokenizer.texts_to_sequences([query])
    new_padded_sequence = keras.preprocessing.sequence.pad_sequences(
        new_sequence, maxlen=max_sequence_length, padding='post'
    )
    prediction = model.predict(new_padded_sequence, verbose=0)[0]
    predicted_intent = np.argmax(prediction)
    predicted_intent_name = intent_map.get(predicted_intent, "unknown_intent")
    return predicted_intent_name

print("\n--- Demonstrating NLP Model Prediction ---")
test_query = "What is the total revenue?"
predicted_intent_lstm = predict_intent(lstm_model, test_query)
print(f"User query: '{test_query}'")
print(f"LSTM model predicts intent: '{predicted_intent_lstm}'")

test_query_2 = "What are the sales for espresso?"
predicted_intent_trans = predict_intent(transformer_model, test_query_2)
print(f"User query: '{test_query_2}'")
print(f"Transformer model predicts intent: '{predicted_intent_trans}'")



print("\n===  GRU INTENT MODEL ===")

def build_gru_model(input_length, vocab_size, embedding_dim, num_classes):
    """
    GRU-based neural network for intent classification.

    """
    inputs = Input(shape=(input_length,))
    x = Embedding(vocab_size, embedding_dim)(inputs)
    x = GRU(128, return_sequences=False)(x)
    x = Dropout(0.5)(x)
    outputs = Dense(num_classes, activation='softmax')(x)

    model = Model(inputs=inputs, outputs=outputs, name="GRU_Intent_Model")
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

gru_model = build_gru_model(
    input_length=max_sequence_length,
    vocab_size=vocab_size,
    embedding_dim=embedding_dim,
    num_classes=num_classes
)

gru_history = gru_model.fit(
    padded_sequences,
    labels,
    epochs=15,
    verbose=1
)

print(" Finished training GRU model.")




def predict_intent_gru(model, query):
    """
    Use Germaine's GRU model to predict the intent of a query.
    Returns an intent name like 'total_sales', 'sales_by_market', etc.
    """
    seq = tokenizer.texts_to_sequences([query])
    pad = keras.preprocessing.sequence.pad_sequences(
        seq, maxlen=max_sequence_length, padding='post'
    )
    probs = model.predict(pad, verbose=0)[0]
    pred_id = int(np.argmax(probs))
    return intent_map.get(pred_id, "unknown_intent")

test_queries = [
    "What is the total revenue?",
    "Show me the sales by region.",
    "Which product sells the most?",
    "What are the sales for espresso?"
]

print("\n--- GRU Model Predictions (Germaine) ---")
for q in test_queries:
    print(q, "->", predict_intent_gru(gru_model, q))



class GRUSalesAssistant:
    """
    Chatbot that uses:
    - Germaine's GRU model to classify intent
    - SalesChatbot to compute actual answers from df_sales
    """
    def __init__(self, data_frame, model):
        self.model = model
        self.rule_bot = SalesChatbot(data_frame)

    def get_response(self, query):
        intent_name = predict_intent_gru(self.model, query)

        if intent_name == "total_sales":
            return self.rule_bot.get_total_sales()
        elif intent_name == "best_selling_product":
            return self.rule_bot.get_best_selling_product()
        elif intent_name == "sales_by_market":
            return self.rule_bot.get_sales_by_market()
        elif intent_name == "sales_for_product":
            # try to extract product name
            match = re.search(r'sales for (.+)', query.lower())
            if match:
                product = match.group(1).strip().title()
                return self.rule_bot.get_sales_for_product(product)
            else:
                return "I think you're asking about sales for a product, but I couldn't detect the product name. Try 'sales for Espresso'."
        else:
            # fallback: use original rule-based chatbot
            return self.rule_bot.get_response(query)

gru_assistant = GRUSalesAssistant(df_sales, gru_model)

print("\n=== GRU Sales Assistant Demo (Germaine) ===")
for q in test_queries:
    print("\nYou:", q)
    print("Bot:", gru_assistant.get_response(q))




if __name__ == '__main__':
    print("\n" + "="*50)
    print("= Running Rule-Based Chatbot Simulation =")
    print("="*50)
    bot = SalesChatbot(df_sales)
    print(bot.get_greeting())
    print("\nTry asking me things like:")
    print("- 'What is the total sales?'")
    print("- 'What is the best selling product?'")
    print("- 'Show me sales by market.'")
    print("- 'What are the sales for Decaf?'")
    print("-" * 50)

    while True:
        user_query = input("You: ")
        if user_query.lower() in ['exit', 'quit', 'bye']:
            print("Chatbot: Goodbye!")
            break

        response = bot.get_response(user_query)
        print("Chatbot:", response)
