In [2]:
import os 
base_path = r"c:\Users\kau75421\AI startup\Datacollection\split_conversations"
file_name = "conversation_2031.txt"
file_path = os.path.join(base_path, file_name)

# Read the file
with open(file_path, 'r', encoding='utf-8') as f:
    conversation = f.read()



# Display the raw conversation
print("Raw conversation:")
print(conversation)
print("-" * 50)

Raw conversation:
Buyer: Hi, I'm interested in the apartment you listed.
Seller: Of course, do you have a budget in mind?
Buyer: Does it have parking?
Seller: Yes, it comes with one covered parking.
Buyer: What’s the price range?
Seller: It’s a 10-minute walk to the metro station.
Buyer: That sounds interesting, I’ll discuss with my family.
--------------------------------------------------


In [3]:
# Split conversation into messages
messages = conversation.strip().split('\n')

# Calculate engagement metrics
engagement_metrics = {
    'total_messages': len(messages),
    'buyer_messages': len([msg for msg in messages if msg.startswith('Buyer:')]),
    'seller_messages': len([msg for msg in messages if msg.startswith('Seller:')]),
    'response_rate': 0,
    'avg_message_length': sum(len(msg) for msg in messages) / len(messages),
    'questions_asked': len([msg for msg in messages if '?' in msg]),
}

# Calculate seller's response rate to buyer messages
buyer_messages_count = engagement_metrics['buyer_messages']
seller_responses = 0
previous_was_buyer = False

for message in messages:
    if message.startswith('Buyer:'):
        previous_was_buyer = True
    elif message.startswith('Seller:') and previous_was_buyer:
        seller_responses += 1
        previous_was_buyer = False
    else:
        previous_was_buyer = False

# Calculate response rate (seller responses / buyer messages that could have received responses)
if buyer_messages_count > 0:
    engagement_metrics['response_rate'] = seller_responses / buyer_messages_count

# Print analytics
print("Conversation Engagement Analytics:")
print("-" * 30)
print(f"Total Messages: {engagement_metrics['total_messages']}")
print(f"Buyer Messages: {engagement_metrics['buyer_messages']}")
print(f"Seller Messages: {engagement_metrics['seller_messages']}")
print(f"Seller Response Rate: {engagement_metrics['response_rate']:.2%}")
print(f"Direct Seller Responses: {seller_responses}")
print(f"Average Message Length: {engagement_metrics['avg_message_length']:.1f} characters")
print(f"Questions Asked: {engagement_metrics['questions_asked']}")

Conversation Engagement Analytics:
------------------------------
Total Messages: 7
Buyer Messages: 4
Seller Messages: 3
Seller Response Rate: 75.00%
Direct Seller Responses: 3
Average Message Length: 45.4 characters
Questions Asked: 3


In [4]:
# Split conversation into messages
messages = conversation.strip().split('\n')

# Separate buyer and seller messages
buyer_messages = [msg.replace('Buyer: ', '') for msg in messages if msg.startswith('Buyer:')]
seller_messages = [msg.replace('Seller: ', '') for msg in messages if msg.startswith('Seller:')]

# Calculate word lengths
message_word_metrics = {
    'buyer_avg_words': sum(len(msg.split()) for msg in buyer_messages) / len(buyer_messages) if buyer_messages else 0,
    'seller_avg_words': sum(len(msg.split()) for msg in seller_messages) / len(seller_messages) if seller_messages else 0,
    'overall_avg_words': sum(len(msg.split(': ')[1].split()) for msg in messages) / len(messages)
}

# Print analytics
print("Message Word Count Analytics:")
print("-" * 30)
print(f"Buyer Average Words per Message: {message_word_metrics['buyer_avg_words']:.1f} words")
print(f"Seller Average Words per Message: {message_word_metrics['seller_avg_words']:.1f} words")
print(f"Overall Average Words per Message: {message_word_metrics['overall_avg_words']:.1f} words")

# Detailed word counts
print("\nDetailed Word Counts:")
print("-" * 30)
print("\nBuyer Messages:")
for msg in buyer_messages:
    words = msg.split()
    print(f"Words: {len(words)} - Message: {msg}")

print("\nSeller Messages:")
for msg in seller_messages:
    words = msg.split()
    print(f"Words: {len(words)} - Message: {msg}")

Message Word Count Analytics:
------------------------------
Buyer Average Words per Message: 6.0 words
Seller Average Words per Message: 8.0 words
Overall Average Words per Message: 6.9 words

Detailed Word Counts:
------------------------------

Buyer Messages:
Words: 8 - Message: Hi, I'm interested in the apartment you listed.
Words: 4 - Message: Does it have parking?
Words: 4 - Message: What’s the price range?
Words: 8 - Message: That sounds interesting, I’ll discuss with my family.

Seller Messages:
Words: 9 - Message: Of course, do you have a budget in mind?
Words: 7 - Message: Yes, it comes with one covered parking.
Words: 8 - Message: It’s a 10-minute walk to the metro station.


In [17]:
# DATA preprocesing
# Lower casing 
# removing stopwords 
# lemmatization
# tokenization
# removing special characters and punctuation
# seprate buyer and seller messages into different lists
# save the preprocessed data into a csv file with columns 'role' and 'message'

In [18]:
print(conversation)

Buyer: Hi, I'm interested in the apartment you listed.
Seller: Of course, do you have a budget in mind?
Buyer: Does it have parking?
Seller: Yes, it comes with one covered parking.
Buyer: What’s the price range?
Seller: It’s a 10-minute walk to the metro station.
Buyer: That sounds interesting, I’ll discuss with my family.


In [5]:
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS
import re

# Stopwords
stop_words = set(ENGLISH_STOP_WORDS)

def separate_buyer_seller_from_string(conversation: str, deduplicate: bool = False):
    """
    Process conversation string into Buyer and Seller tokens (stopwords removed).
    
    Args:
        conversation: str with lines like 'Buyer: ... Seller: ...'
        deduplicate: if True, removes duplicate words
    
    Returns:
        buyer_tokens, seller_tokens
    """
    
    def clean_text(text: str):
        tokens = re.findall(r"\b\w+\b", text.lower())
        return [w for w in tokens if w not in stop_words and len(w) > 1]
    
    buyer_list, seller_list = [], []
    
    # Split conversation by role markers
    parts = re.split(r"(Buyer:|Seller:)", conversation)
    print(parts)
    
    # parts will look like ["", "Buyer:", "Hi I'm ...", "Seller:", "Of course...", ...]
    for i in range(1, len(parts), 2):
        role = parts[i].replace(":", "").strip().lower()
        text = parts[i+1].strip()
        cleaned = clean_text(text)
        
        if role == "buyer":
            buyer_list.extend(cleaned)
        elif role == "seller":
            seller_list.extend(cleaned)
    
    if deduplicate:
        buyer_list = list(set(buyer_list))
        seller_list = list(set(seller_list))
    
    return buyer_list, seller_list


# ---------------- Example Usage ----------------
buyer_tokens, seller_tokens = separate_buyer_seller_from_string(conversation, deduplicate=True)

print("Buyer tokens:", buyer_tokens)
print("Seller tokens:", seller_tokens)


['', 'Buyer:', " Hi, I'm interested in the apartment you listed.\n", 'Seller:', ' Of course, do you have a budget in mind?\n', 'Buyer:', ' Does it have parking?\n', 'Seller:', ' Yes, it comes with one covered parking.\n', 'Buyer:', ' What’s the price range?\n', 'Seller:', ' It’s a 10-minute walk to the metro station.\n', 'Buyer:', ' That sounds interesting, I’ll discuss with my family.']
Buyer tokens: ['interested', 'listed', 'price', 'parking', 'family', 'range', 'apartment', 'interesting', 'does', 'discuss', 'hi', 'sounds', 'll']
Seller tokens: ['course', 'covered', 'yes', 'parking', 'minute', 'station', 'budget', 'metro', 'walk', '10', 'mind', 'comes']


In [6]:
# ---- Buyer-only Noun Phrase Extractor ----
from collections import Counter
import re

class BuyerNounExtractor:
    """
    Extracts ONLY nouns and noun phrases (multi-word like 'credit card') from Buyer messages.
    Ignores Seller completely.
    Excludes filler words: 'is', 'am', 'are'.
    Uses spaCy noun_chunks if available; falls back to NLTK chunking if not.
    Also returns a light tone summary for Buyer.
    """
    EXCLUDE_TOKENS = {"is", "am", "are"}

    def __init__(self, conversation_text: str):
        self.raw = conversation_text
        self.buyer_text = self._extract_buyer_text(conversation_text)
        self._nlp = None
        self._use_spacy = False

        # basic tone words
        self._positive = {"good", "great", "thanks", "thank", "excellent", "happy", "pleased", "fine", "perfect"}
        self._negative = {"bad", "problem", "issue", "angry", "disappointed", "no", "not", "worst", "never"}

    # ----------------- Message Parsing -----------------
    def _extract_buyer_text(self, text):
        """Extract all Buyer messages (lines starting with 'Buyer:') and join them."""
        lines = text.splitlines()
        buyer_msgs = []
        for ln in lines:
            if ln.strip().startswith("Buyer:"):
                msg = ln.split("Buyer:", 1)[1].strip()
                if msg:
                    buyer_msgs.append(msg)
        return "\n".join(buyer_msgs).strip()

    # ----------------- NLP Setup -----------------
    def _init_spacy(self):
        """Try to load spaCy model; fallback to NLTK if not available."""
        if self._nlp is not None:
            return
        try:
            import spacy
            try:
                self._nlp = spacy.load("en_core_web_sm")
            except Exception:
                from spacy.cli import download
                download("en_core_web_sm")
                self._nlp = spacy.load("en_core_web_sm")
            self._use_spacy = True
        except Exception:
            self._use_spacy = False
            self._nlp = None

    # ----------------- Noun Phrase Extraction -----------------
    def _extract_with_spacy(self, text):
        """Extract noun phrases using spaCy noun_chunks and single-word NOUN/PROPN tokens."""
        doc = self._nlp(text)
        phrases = []

        # multi-word noun chunks
        for chunk in doc.noun_chunks:
            tokens = [tok.lemma_.lower() for tok in chunk if tok.lemma_.lower() not in self.EXCLUDE_TOKENS]
            phrase = " ".join(tokens).strip()
            if phrase:
                phrases.append(phrase)

        # single-word nouns not already captured
        for tok in doc:
            if tok.pos_ in {"NOUN", "PROPN"}:
                lemma = tok.lemma_.lower().strip()
                if lemma not in self.EXCLUDE_TOKENS:
                    phrases.append(lemma)
        return phrases

    def _extract_with_nltk(self, text):
        """Fallback noun phrase extraction using NLTK."""
        try:
            import nltk
            nltk.download("punkt", quiet=True)
            nltk.download("averaged_perceptron_tagger", quiet=True)
            from nltk import word_tokenize, pos_tag, RegexpParser

            tokens = word_tokenize(text)
            pos_tags = pos_tag(tokens)
            grammar = r"NP: {<DT>?<JJ>*<NN.*>+}"
            chunk_parser = RegexpParser(grammar)
            tree = chunk_parser.parse(pos_tags)

            phrases = []
            for subtree in tree:
                if hasattr(subtree, "label") and subtree.label() == "NP":
                    phrase = " ".join(w.lower() for w, _ in subtree.leaves())
                    if phrase not in self.EXCLUDE_TOKENS:
                        phrases.append(phrase)
            return phrases
        except Exception:
            # fallback to simple word extraction
            return re.findall(r"\b[a-zA-Z]{2,}\b", text.lower())

    # ----------------- Main Logic -----------------
    def extract_buyer_nouns(self, top_k=30):
        """Return top noun phrases used by Buyer."""
        if not self.buyer_text:
            return Counter()
        self._init_spacy()

        if self._use_spacy:
            phrases = self._extract_with_spacy(self.buyer_text)
        else:
            phrases = self._extract_with_nltk(self.buyer_text)

        phrases = [p for p in phrases if p and p not in self.EXCLUDE_TOKENS]
        counter = Counter(phrases)
        return counter.most_common(top_k)

    # ----------------- Tone Analysis -----------------
    def analyze_tone(self):
        """Basic tone based on positive/negative words."""
        txt = self.buyer_text.lower()
        words = re.findall(r"\b[a-zA-Z]+\b", txt)
        total = max(1, len(words))
        pos = sum(1 for w in words if w in self._positive)
        neg = sum(1 for w in words if w in self._negative)
        score = (pos - neg) / total
        tone = "positive" if score > 0.03 else "negative" if score < -0.03 else "neutral"
        return {"tone": tone, "score": round(score, 3)}

    def run(self, top_k=30):
        """Run full analysis: buyer-only noun extraction + tone."""
        nouns = self.extract_buyer_nouns(top_k=top_k)
        tone = self.analyze_tone()
        return {"buyer_nouns": nouns, "tone": tone}

# ---------------- Example Usage ----------------
analyzer = BuyerNounExtractor(conversation)
result = analyzer.run(top_k=40)

print("Top Nouns/Noun-Phrases used by Buyer:")
for phrase, count in result["buyer_nouns"]:
    print(f"{phrase}: {count}")

print("\nBuyer Tone:")
for k, v in result["tone"].items():
    print(f"{k}: {v}")

# ---------------- Complexity ----------------
# Time: O(N) for tokenization/POS tagging, O(M log M) for sorting noun frequencies
# Space: O(N) for token storage


Top Nouns/Noun-Phrases used by Buyer:
hi: 1
the apartment: 1
the price range: 1
ll discuss: 1
family: 1

Buyer Tone:
tone: neutral
score: 0.0


In [7]:
# ---- Seller performance analysis (text-only, Buyer/Seller format) ----
from collections import Counter
import re

class SellerPerformanceAnalyzer:
    """
    Analyze seller performance from a Buyer/Seller style conversation (text only).
    Provides:
      - counts & averages
      - seller response rate (seller replies following buyer messages)
      - estimated answer rate to buyer questions
      - politeness/helpfulness/resolution heuristics
      - suggested improvements
    Note: Without timestamps we cannot compute true response times; we use message order heuristics.
    """

    # small lexicons for heuristics
    POLITE_MARKERS = {"please", "thank you", "thanks", "regards", "sorry", "appreciate", "kindly"}
    NEGATIVE_MARKERS = {"can't", "cannot", "won't", "unable", "not available", "no stock", "out of stock", "delay"}
    RESOLUTION_MARKERS = {"shipped", "sent", "delivered", "resolved", "fixed", "done", "completed", "refund", "replaced", "invoice", "paid", "booked"}
    ACTION_OFFER_MARKERS = {"i can", "i will", "i'll", "we can", "we will", "we'll", "send", "share", "provide", "arrange", "schedule", "call you", "call"}

    def __init__(self, conversation_text: str):
        self.raw = conversation_text
        self.messages = self._split_messages(conversation_text)  # list[(speaker, text)]
        self.seller_messages = [t for s, t in self.messages if s == "Seller"]
        self.buyer_messages = [t for s, t in self.messages if s == "Buyer"]

    # ------------------ parsing ------------------
    def _split_messages(self, text):
        """
        Split conversation into (speaker, text) blocks using exact 'Buyer:' and 'Seller:' prefixes.
        Preserves message order.
        """
        lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
        messages = []
        current_speaker = None
        current_lines = []
        prefixes = ("Buyer:", "Seller:")

        for ln in lines:
            stripped = ln.strip()
            if not stripped:
                if current_speaker is not None:
                    current_lines.append("")
                continue

            matched = False
            for p in prefixes:
                if stripped.startswith(p):
                    if current_speaker is not None:
                        messages.append((current_speaker, "\n".join(current_lines).strip()))
                    current_speaker = p[:-1]  # 'Buyer' or 'Seller'
                    remainder = stripped[len(p):].strip()
                    current_lines = [remainder] if remainder else []
                    matched = True
                    break
            if not matched:
                if current_speaker is None:
                    # unknown speaker until first prefix - attach as Unknown (ignored later)
                    current_speaker = "Unknown"
                    current_lines = [stripped]
                else:
                    current_lines.append(stripped)

        if current_speaker is not None:
            messages.append((current_speaker, "\n".join(current_lines).strip()))
        return messages

    # ------------------ utilities ------------------
    @staticmethod
    def _word_count(text):
        return len(re.findall(r"\w+", text))

    @staticmethod
    def _contains_any(text, keywords):
        t = text.lower()
        return any(k in t for k in keywords)

    # ------------------ metrics ------------------
    def basic_activity_metrics(self):
        total_msgs = len(self.messages)
        seller_count = len(self.seller_messages)
        buyer_count = len(self.buyer_messages)
        avg_seller_len = (sum(self._word_count(m) for m in self.seller_messages) / seller_count) if seller_count else 0
        avg_buyer_len = (sum(self._word_count(m) for m in self.buyer_messages) / buyer_count) if buyer_count else 0

        return {
            "total_messages": total_msgs,
            "seller_messages": seller_count,
            "buyer_messages": buyer_count,
            "avg_seller_words": round(avg_seller_len, 1),
            "avg_buyer_words": round(avg_buyer_len, 1)
        }

    def seller_response_rate(self):
        """
        Response rate heuristic:
        Count instances where a Seller message immediately follows a Buyer message.
        response_rate = seller_responses_after_buyer / total_buyer_messages_that_could_receive_response
        """
        seller_responses = 0
        buyer_msgs_that_could_get_response = 0
        prev_is_buyer = False

        for spk, txt in self.messages:
            if spk == "Buyer":
                prev_is_buyer = True
                buyer_msgs_that_could_get_response += 1
            elif spk == "Seller":
                if prev_is_buyer:
                    seller_responses += 1
                prev_is_buyer = False
            else:
                prev_is_buyer = False

        rate = (seller_responses / buyer_msgs_that_could_get_response) if buyer_msgs_that_could_get_response else 0.0
        return {"seller_direct_responses": seller_responses,
                "buyer_msgs_that_could_get_response": buyer_msgs_that_could_get_response,
                "response_rate": round(rate, 3)}

    def answer_rate_to_questions(self):
        """
        Estimate how many buyer questions were answered.
        - Identify buyer messages that contain a question mark (or 'how', 'what', 'when', etc.)
        - For each such buyer question, inspect the next Seller message (if any) and see if it
          contains an apparent answer (resolution markers, action offers, or absence of clarifying question).
        This is heuristic: may be conservative.
        """
        q_markers = {"how", "what", "when", "where", "why", "which", "who", "can", "could", "would"}
        buyer_questions = []
        # collect indices of buyer question messages
        for idx, (spk, txt) in enumerate(self.messages):
            if spk == "Buyer":
                low = txt.lower()
                if "?" in txt or any(re.search(r"\b" + qm + r"\b", low) for qm in q_markers):
                    buyer_questions.append(idx)

        answered = 0
        unresolved = 0
        for idx in buyer_questions:
            # look for next seller message after this buyer question
            next_seller_text = None
            for j in range(idx + 1, len(self.messages)):
                if self.messages[j][0] == "Seller":
                    next_seller_text = self.messages[j][1]
                    break
                elif self.messages[j][0] == "Buyer":
                    # another buyer message before any seller reply -> likely unanswered
                    break
            if next_seller_text:
                low = next_seller_text.lower()
                # heuristics indicating answer/help:
                if (self._contains_any(low, self.RESOLUTION_MARKERS)
                        or self._contains_any(low, self.ACTION_OFFER_MARKERS)
                        or not re.search(r"\b(can you|could you|please provide|pls provide|please let me)\b", low)):
                    answered += 1
                else:
                    unresolved += 1
            else:
                unresolved += 1

        total_q = len(buyer_questions)
        answer_rate = (answered / total_q) if total_q else None
        return {"buyer_questions": total_q, "answered": answered, "unanswered": unresolved, "answer_rate": round(answer_rate, 3) if answer_rate is not None else None}

    def politeness_and_helpfulness(self):
        """
        Heuristic scoring for politeness and helpfulness:
          - politeness_score: fraction of seller messages containing polite markers
          - helpfulness_score: fraction containing action offers or resolution markers
        """
        seller_count = len(self.seller_messages)
        if seller_count == 0:
            return {"politeness_score": 0.0, "helpfulness_score": 0.0, "examples_polite": [], "examples_helpful": []}

        polite_hits = 0
        helpful_hits = 0
        polite_examples = []
        helpful_examples = []

        for msg in self.seller_messages:
            low = msg.lower()
            is_polite = self._contains_any(low, self.POLITE_MARKERS)
            is_helpful = (self._contains_any(low, self.ACTION_OFFER_MARKERS) or self._contains_any(low, self.RESOLUTION_MARKERS))
            if is_polite:
                polite_hits += 1
                if len(polite_examples) < 3:
                    polite_examples.append(msg[:200])
            if is_helpful:
                helpful_hits += 1
                if len(helpful_examples) < 3:
                    helpful_examples.append(msg[:200])

        return {
            "politeness_score": round(polite_hits / seller_count, 3),
            "helpfulness_score": round(helpful_hits / seller_count, 3),
            "examples_polite": polite_examples,
            "examples_helpful": helpful_examples
        }

    def escalation_and_negative_signals(self):
        """
        Look for negative / escalation signals in seller replies:
          - use of negative markers, refusal language, many unanswered buyer questions, or repeated 'not available' phrases.
        """
        neg_count = sum(1 for m in self.seller_messages if self._contains_any(m, self.NEGATIVE_MARKERS))
        caps_shouting = sum(1 for m in self.seller_messages if re.search(r"\b[A-Z]{3,}\b", m))
        return {"neg_count": neg_count, "caps_shouting": caps_shouting}

    # ------------------ overall assessment ------------------
    def performance_summary(self):
        basic = self.basic_activity_metrics()
        resp = self.seller_response_rate()
        answer = self.answer_rate_to_questions()
        tone_help = self.politeness_and_helpfulness()
        neg = self.escalation_and_negative_signals()

        # simple rule-based judgment
        score_components = []
        # response rate contributes (weight 0.4)
        score_components.append((resp["response_rate"] or 0) * 0.4)
        # answer rate contributes (0.3) - if None, assume 0.15 neutral
        answer_rate_val = answer["answer_rate"] if answer["answer_rate"] is not None else 0.15
        score_components.append(answer_rate_val * 0.3)
        # helpfulness (0.2)
        score_components.append(tone_help["helpfulness_score"] * 0.2)
        # politeness (0.1)
        score_components.append(tone_help["politeness_score"] * 0.1)

        overall_score = sum(score_components)  # range roughly 0..1
        overall_label = "Excellent" if overall_score >= 0.75 else "Good" if overall_score >= 0.5 else "Needs Improvement"

        suggestions = []
        # heuristics for improvement suggestions
        if resp["response_rate"] < 0.8:
            suggestions.append("Increase responsiveness: try to reply directly after buyer messages more often.")
        if answer["answer_rate"] is None or answer["answer_rate"] < 0.7:
            suggestions.append("Improve question answering: when buyer asks, provide direct answers or clear next steps.")
        if tone_help["politeness_score"] < 0.6:
            suggestions.append("Use more polite phrases (please, thank you) to improve rapport.")
        if tone_help["helpfulness_score"] < 0.5:
            suggestions.append("Offer clear actions (send link, arrange shipment, provide ETA) rather than short/deflecting replies.")
        if neg["neg_count"] > 0:
            suggestions.append("Avoid negative phrasing when possible; provide alternatives instead of blunt 'not available' answers.")
        if neg["caps_shouting"] > 0:
            suggestions.append("Avoid all-caps words; they may be perceived as shouting.")

        summary = {
            "basic_metrics": basic,
            "response_metrics": resp,
            "question_answering": answer,
            "politeness_helpfulness": tone_help,
            "neg_signals": neg,
            "overall_score": round(overall_score, 3),
            "overall_label": overall_label,
            "suggestions": suggestions
        }
        return summary

    # ------------------ run ------------------
    def run(self):
        return self.performance_summary()

# ------------------ Example usage ------------------
analyzer = SellerPerformanceAnalyzer(conversation)
report = analyzer.run()

# Nicely print a human-friendly summary
print("=== Seller Performance Summary ===")
bm = report["basic_metrics"]
print(f"Total messages: {bm['total_messages']} | Buyer: {bm['buyer_messages']} | Seller: {bm['seller_messages']}")
print(f"Average seller words per message: {bm['avg_seller_words']}")
print()
rm = report["response_metrics"]
print(f"Seller direct responses following buyer messages: {rm['seller_direct_responses']} / {rm['buyer_msgs_that_could_get_response']} (rate: {rm['response_rate']*100:.1f}%)")
ans = report["question_answering"]
if ans["answer_rate"] is not None:
    print(f"Buyer questions: {ans['buyer_questions']} | Answered (estimated): {ans['answered']} | Answer rate: {ans['answer_rate']*100:.1f}%")
else:
    print(f"Buyer questions: {ans['buyer_questions']} | Answered (estimated): {ans['answered']}")
print()
ph = report["politeness_helpfulness"]
print(f"Politeness score (fraction of seller messages with polite markers): {ph['politeness_score']}")
print(f"Helpfulness score (fraction offering action/resolution): {ph['helpfulness_score']}")
if ph["examples_polite"]:
    print("\nExample polite seller messages:")
    for ex in ph["examples_polite"]:
        print(" -", ex)
if ph["examples_helpful"]:
    print("\nExample helpful seller messages:")
    for ex in ph["examples_helpful"]:
        print(" -", ex)
print()
ns = report["neg_signals"]
print(f"Negative/limiting replies: {ns['neg_count']} | All-caps tokens (possible raised tone): {ns['caps_shouting']}")
print()
print("Overall assessment:", report["overall_label"], f"(score: {report['overall_score']})")
print("\nSuggestions to improve seller performance:")
for s in report["suggestions"]:
    print(" -", s)

# ------------------ Complexity notes ------------------
# Time: O(N) to scan messages and simple pattern checks where N = number of messages / tokens
# Space: O(N) for storing split messages


=== Seller Performance Summary ===
Total messages: 7 | Buyer: 4 | Seller: 3
Average seller words per message: 8.7

Seller direct responses following buyer messages: 3 / 4 (rate: 75.0%)
Buyer questions: 2 | Answered (estimated): 2 | Answer rate: 100.0%

Politeness score (fraction of seller messages with polite markers): 0.0
Helpfulness score (fraction offering action/resolution): 0.0

Negative/limiting replies: 0 | All-caps tokens (possible raised tone): 0

Overall assessment: Good (score: 0.6)

Suggestions to improve seller performance:
 - Increase responsiveness: try to reply directly after buyer messages more often.
 - Use more polite phrases (please, thank you) to improve rapport.
 - Offer clear actions (send link, arrange shipment, provide ETA) rather than short/deflecting replies.


In [8]:
import re
from collections import defaultdict

class ConversationCategorizer:
    """
    Classify a Buyer/Seller conversation into:
      - 'black_listed' (not interested / cut call / refused)
      - 'good_call' (showed interest / worth follow-up)
      - 'highly_important' (strong interest / requested more info or next steps)
    Heuristics are rule-based and operate on Buyer messages and call-related lines.
    """

    # Signals / phrases
    CALL_INDICATORS = [
        "picked up", "picked the call", "call connected", "on call", "in a call",
        "connected on call", "we called", "you called", "called you", "call me",
        "hung up", "hang up", "cut the call", "cut the line", "disconnected", "call dropped"
    ]
    NEGATIVE_MARKERS = [
        "not interested", "no interested", "no thanks", "no thank you", "nope", "not now",
        "don't call", "dont call", "unsubscribe", "not looking", "no need", "no thank",
        "no interest", "cancel", "stop calling"
    ]
    SHORT_REFUSAL_PATTERNS = [
        r"^\s*no\s*\.?$", r"^\s*nah\s*\.?$", r"^\s*not interested\s*\.?$", r"^\s*no thanks\s*\.?$",
        r"^\s*bye\s*\.?$", r"^\s*ok\s*\.?$"
    ]
    INTEREST_MARKERS = [
        "interested", "tell me more", "send details", "send me", "send info", "send information",
        "pricing", "price", "how much", "quote", "demo", "meeting", "schedule", "when can", "call me back",
        "i want", "we want", "can i", "can we", "order", "buy", "purchase", "sign up", "subscribe", "trial",
        "more info", "more information", "details", "contact me", "email me", "send proposal", "proposal"
    ]
    FOLLOWUP_REQUESTS = [
        "send details", "send me", "email me", "call me", "schedule", "book", "meeting", "demo", "proposal"
    ]
    QUESTION_WORDS = {"how", "what", "when", "where", "why", "which", "who", "can", "could", "would"}

    def __init__(self, conversation_text: str):
        self.raw = conversation_text
        self.messages = self._split_messages(conversation_text)  # list of (speaker, text)
        self.buyer_messages = [(i, t) for i, (s, t) in enumerate(self.messages) if s == "Buyer"]
        # Precompute lower versions
        self._buyer_lower = [(i, t, t.lower()) for i, t in self.buyer_messages]

    def _split_messages(self, text):
        """Split into (speaker, text) using 'Buyer:' / 'Seller:' prefixes preserving order."""
        lines = text.replace("\r\n", "\n").replace("\r", "\n").split("\n")
        messages = []
        current_speaker = None
        current_lines = []
        prefixes = ("Buyer:", "Seller:")

        for ln in lines:
            stripped = ln.strip()
            if not stripped:
                if current_speaker is not None:
                    current_lines.append("")
                continue
            matched = False
            for p in prefixes:
                if stripped.startswith(p):
                    if current_speaker is not None:
                        messages.append((current_speaker, "\n".join(current_lines).strip()))
                    current_speaker = p[:-1]  # Buyer or Seller
                    remainder = stripped[len(p):].strip()
                    current_lines = [remainder] if remainder else []
                    matched = True
                    break
            if not matched:
                if current_speaker is None:
                    current_speaker = "Unknown"
                    current_lines = [stripped]
                else:
                    current_lines.append(stripped)
        if current_speaker is not None:
            messages.append((current_speaker, "\n".join(current_lines).strip()))
        return messages

    def _contains_any(self, text_lower, keywords):
        """Return True if any keyword appears in the lower-cased text."""
        return any(k in text_lower for k in keywords)

    def _matches_short_refusal(self, text):
        for pat in self.SHORT_REFUSAL_PATTERNS:
            if re.search(pat, text.strip(), flags=re.IGNORECASE):
                return True
        return False

    def categorize(self):
        """
        Main categorization logic. Returns dict:
          { 'category': str, 'score': float, 'signals': {...}, 'recommendation': str, 'evidence': [...] }
        """
        signals = defaultdict(int)
        evidence = []

        # Scan buyer messages for call events nearby and signals
        for idx, text, low in self._buyer_lower:
            # call indicator present in same buyer line
            if self._contains_any(low, self.CALL_INDICATORS):
                signals['call_mentions'] += 1
                evidence.append(("call_mention", text))

            # negative markers
            if self._contains_any(low, self.NEGATIVE_MARKERS) or self._matches_short_refusal(text):
                signals['negative_phrases'] += 1
                evidence.append(("negative", text))

            # interest markers
            if self._contains_any(low, self.INTEREST_MARKERS):
                signals['interest_phrases'] += 1
                evidence.append(("interest", text))

            # explicit follow-up requests (strong signal)
            if self._contains_any(low, self.FOLLOWUP_REQUESTS):
                signals['followup_requests'] += 1
                evidence.append(("followup", text))

            # questions
            if ("?" in text) or any(re.search(r"\b" + qw + r"\b", low) for qw in self.QUESTION_WORDS):
                signals['questions'] += 1
                evidence.append(("question", text))

            # very short buyer messages (possible cut / hung up)
            word_count = len(re.findall(r"\b\w+\b", text))
            if word_count <= 3 and (self._matches_short_refusal(text) or self._contains_any(low, ["hung up", "hang up", "cut", "bye"])):
                signals['short_hangups'] += 1
                evidence.append(("short_hangup", text))
            elif word_count <= 2 and low in {"no", "nah", "nope"}:
                signals['short_hangups'] += 1
                evidence.append(("short_refusal", text))

        # Additional heuristic: check buyer message immediately after a call-indicator (which might be in seller message)
        # Find indices where seller says 'picked up' or 'call connected' etc, then inspect following buyer message
        for i, (spk, txt) in enumerate(self.messages):
            if spk == "Seller" and self._contains_any(txt.lower(), self.CALL_INDICATORS):
                # look at next buyer message (if exists)
                for j in range(i+1, len(self.messages)):
                    if self.messages[j][0] == "Buyer":
                        btxt = self.messages[j][1]
                        blow = btxt.lower()
                        wc = len(re.findall(r"\b\w+\b", btxt))
                        if self._contains_any(blow, self.NEGATIVE_MARKERS) or self._matches_short_refusal(btxt) or wc <= 3:
                            signals['short_hangups'] += 1
                            evidence.append(("post_call_short", btxt))
                        elif self._contains_any(blow, self.INTEREST_MARKERS) or self._contains_any(blow, self.FOLLOWUP_REQUESTS):
                            signals['interest_phrases'] += 1
                            evidence.append(("post_call_interest", btxt))
                        break

        # Compute a simple interest score
        # positive contributions
        score = 0.0
        score += signals['interest_phrases'] * 1.0
        score += signals['questions'] * 0.8
        score += signals['followup_requests'] * 1.8
        # negative contributions
        score -= signals['negative_phrases'] * 1.5
        score -= signals['short_hangups'] * 2.5

        # Determine category by rules (ordered)
        category = "black_listed"
        reason = []
        # Rule 1: strong negative / immediate hangup -> black listed
        if signals['short_hangups'] > 0 and signals['negative_phrases'] >= 1:
            category = "black_listed"
            reason.append("Short hangup / direct negative phrase detected right after call.")
        elif signals['short_hangups'] >= 2 and signals['interest_phrases'] == 0:
            category = "black_listed"
            reason.append("Multiple short hangups / refusals; likely not interested.")
        # Rule 2: highly important if follow-up requested or many interest signals
        elif signals['followup_requests'] > 0 or (signals['interest_phrases'] >= 2 and signals['questions'] >= 1):
            category = "highly_important"
            reason.append("Buyer requested follow-up or showed strong multi-signal interest.")
        # Rule 3: good call if at least some interest/questions and not dominated by negatives
        elif score >= 1.0:
            category = "good_call"
            reason.append("Buyer showed interest or asked questions — worthy of follow-up.")
        else:
            # fallback: if there are explicit negative phrases and no interest -> black_listed
            if signals['negative_phrases'] > 0 and signals['interest_phrases'] == 0:
                category = "black_listed"
                reason.append("Negative phrase(s) found with no interest signals.")
            else:
                # otherwise treat as good_call if any mild signals, else blacklisted
                if signals['interest_phrases'] > 0 or signals['questions'] > 0:
                    category = "good_call"
                    reason.append("Mild interest found.")
                else:
                    category = "black_listed"
                    reason.append("No clear interest signals; treat as not interested.")

        # Recommendation based on category
        if category == "black_listed":
            recommendation = "Blacklist / Do not prioritize. Optionally attempt one polite re-connect later (low priority)."
        elif category == "good_call":
            recommendation = "Follow up: schedule another call or send requested info. Medium priority."
        else:  # highly_important
            recommendation = "Immediate follow-up: assign high priority sales outreach; send proposal/arrange meeting."

        result = {
            "category": category,
            "score": round(score, 2),
            "signals": dict(signals),
            "reason": reason,
            "recommendation": recommendation,
            "evidence_snippets": evidence[:10]  # up to 10 supporting snippets
        }
        return result

# ---------------- Example usage ----------------
categorizer = ConversationCategorizer(conversation)
report = categorizer.categorize()

print("=== Conversation Categorization ===")
print("Category:", report["category"])
print("Score:", report["score"])
print("Recommendation:", report["recommendation"])
print("Signals:", report["signals"])
print("Reason:", report["reason"])
print("\nEvidence snippets (sample):")
for tag, snippet in report["evidence_snippets"]:
    print(f"- [{tag}] {snippet}")


=== Conversation Categorization ===
Category: highly_important
Score: 3.6
Recommendation: Immediate follow-up: assign high priority sales outreach; send proposal/arrange meeting.
Signals: {'interest_phrases': 2, 'questions': 2, 'followup_requests': 0, 'negative_phrases': 0, 'short_hangups': 0}
Reason: ['Buyer requested follow-up or showed strong multi-signal interest.']

Evidence snippets (sample):
- [interest] Hi, I'm interested in the apartment you listed.
- [question] Does it have parking?
- [interest] What’s the price range?
- [question] What’s the price range?


In [10]:
# ---- Final summary: Buyer Intent + Follow-up detection ----
import re

class BuyerIntentFollowupSummary:
    """
    Produces a clean final summary:
      - 'buyer_wants': what the buyer is looking for (intent)
      - 'follow_up_happened': True/False
    Works for good_call or highly_important conversations.
    """

    INTEREST_PATTERNS = {
        "apartment": ["apartment", "flat", "property", "house", "villa", "room", "accommodation"],
        "pricing": ["price", "pricing", "cost", "rate", "quote", "budget"],
        "demo": ["demo", "show", "visit", "inspection", "schedule", "book a visit"],
        "purchase": ["buy", "purchase", "order", "booking"],
        "subscription": ["subscribe", "plan", "membership", "package"],
        "followup": ["details", "information", "proposal", "brochure", "more info", "send info"],
        "job": ["job", "hiring", "position", "role", "career"],
        "service": ["service", "support", "maintenance", "repair", "installation"]
    }

    FOLLOWUP_CUES = [
        "call back", "call me", "send", "share", "email", "arrange", "schedule",
        "book", "meeting", "demo", "visit", "proposal", "quote", "any update", "follow up", "follow-up"
    ]

    def __init__(self, conversation_text: str, category: str):
        self.raw = conversation_text
        self.category = category.lower().strip() if category else "unknown"
        self.messages = self._split(conversation_text)

    def _split(self, text):
        lines = text.split("\n")
        msgs, current, buf = [], None, []
        for ln in lines:
            ln = ln.strip()
            if ln.startswith("Buyer:"):
                if current:
                    msgs.append((current, " ".join(buf).strip()))
                current = "Buyer"
                buf = [ln.split("Buyer:", 1)[1].strip()]
            elif ln.startswith("Seller:"):
                if current:
                    msgs.append((current, " ".join(buf).strip()))
                current = "Seller"
                buf = [ln.split("Seller:", 1)[1].strip()]
            else:
                buf.append(ln)
        if current:
            msgs.append((current, " ".join(buf).strip()))
        return msgs

    def detect_intent(self):
        """Infer what the buyer wants based on keywords in Buyer messages."""
        text = " ".join([t for s, t in self.messages if s == "Buyer"]).lower()
        intents = []
        for key, keywords in self.INTEREST_PATTERNS.items():
            if any(k in text for k in keywords):
                intents.append(key)
        if not intents:
            # fallback if no clear intent words, return a default guess
            return "general inquiry"
        # prioritize key domain intents if multiple matched
        if "apartment" in intents:
            return "looking for an apartment or property"
        if "purchase" in intents:
            return "interested in purchasing"
        if "demo" in intents:
            return "interested in scheduling a visit or demo"
        if "pricing" in intents:
            return "inquiring about pricing or quotation"
        if "subscription" in intents:
            return "interested in a subscription plan"
        if "job" in intents:
            return "inquiring about a job or position"
        if "service" in intents:
            return "interested in services or maintenance"
        return "general inquiry"

    def detect_followup(self):
        """Check if any follow-up conversation occurred after buyer interest."""
        followup_detected = False
        for spk, msg in self.messages:
            low = msg.lower()
            if spk == "Seller" and any(k in low for k in self.FOLLOWUP_CUES):
                followup_detected = True
                break
            if spk == "Buyer" and any(k in low for k in ["any update", "follow up", "follow-up", "did you", "when"]):
                followup_detected = True
                break
        return followup_detected

    def summarize(self):
        if self.category not in {"good_call", "highly_important"}:
            return {
                "category": self.category,
                "buyer_wants": "Not applicable (buyer not interested)",
                "follow_up_happened": False
            }

        intent = self.detect_intent()
        followup = self.detect_followup()

        return {
            "category": self.category,
            "buyer_wants": intent,
            "follow_up_happened": followup
        }

# ---------------- Example usage ----------------
# Assume you already have `conv_category` from ConversationCategorizer
conv_category = "highly_important"  # example
summary = BuyerIntentFollowupSummary(conversation, conv_category)
result = summary.summarize()

print("=== Final Summary ===")
print("Category:", result["category"])
print("Buyer wants:", result["buyer_wants"])
print("Follow-up conversation happened?:", "Yes" if result["follow_up_happened"] else "No")


=== Final Summary ===
Category: highly_important
Buyer wants: looking for an apartment or property
Follow-up conversation happened?: No
