# Conversation Processing Pipeline

This notebook implements a three-stage conversation analysis pipeline:

1. **Stage 1**: Load and parse conversations from CSV → save to JSON
2. **Stage 2**: Run problem detection → update conversations with problems  
3. **Stage 3**: Run UX analysis with Groq → update conversations with UX data

Each stage can be run independently with progress tracking and resume capability.

In [None]:
import sys
import os
import json
import pandas as pd
from datetime import datetime
from typing import List
from tqdm import tqdm

# Add utils to path
sys.path.append('utils')

# Import custom modules
from utils.conv.parser import ConversationParser
from utils.conv.conversation import Conversation, ConversationMap, RequestCategory, ProblemDetection, UX
from utils.problem_eda import create_problem_detection_for_conversation
from utils.qroq.groq_processor import GroqProcessor

from config import settings

# Configuration
CSV_FILE = 'data/data.csv'
CONVERSATIONS_FILE = 'conversations_parsed.json'
GROQ_API_KEY = settings.api_key

## Stage 1: Load and Parse Conversations

In [2]:
def save_conversations_to_json(conversations: List[Conversation], filename: str):
    """Save conversations to JSON file with metadata."""
    data = {
        'metadata': {
            'total_conversations': len(conversations),
            'created_at': datetime.now().isoformat(),
            'source_file': CSV_FILE,
            'format_version': '2.0'
        },
        'conversations': [conv.model_dump() for conv in conversations]
    }
    
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2, default=str)
    
    print(f"Saved {len(conversations)} conversations to {filename}")

def load_conversations_from_json(filename: str) -> List[Conversation]:
    """Load conversations from JSON file."""
    with open(filename, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    conversations = []
    for conv_data in data['conversations']:
        # Handle datetime fields
        conv_data['start_time'] = datetime.fromisoformat(conv_data['start_time'])
        conv_data['end_time'] = datetime.fromisoformat(conv_data['end_time'])
        
        # Reconstruct blocks and other complex fields if needed
        conversations.append(Conversation(**conv_data))
    
    print(f"Loaded {len(conversations)} conversations from {filename}")
    return conversations

# Stage 1: Parse conversations from CSV and save
if not os.path.exists(CONVERSATIONS_FILE):
    print("=== Stage 1: Parsing Conversations from CSV ===")
    
    # Initialize parser
    parser = ConversationParser(
        csv_file_path=CSV_FILE,
        time_threshold_minutes=30
    )
    
    print("Parsing conversations from CSV...")
    conversations = parser.parse_conversations()
    
    print(f"Parsed {len(conversations)} conversations")
    print(f"Total users: {len(set(c.user_id for c in conversations))}")
    
    # Get basic statistics
    stats = parser.get_conversation_stats(conversations)
    print("\n=== Conversation Statistics ===")
    for key, value in stats.items():
        print(f"{key}: {value}")
    
    # Save to JSON
    save_conversations_to_json(conversations, CONVERSATIONS_FILE)
    
else:
    print(f"Conversations file {CONVERSATIONS_FILE} already exists. Loading existing data...")
    conversations = load_conversations_from_json(CONVERSATIONS_FILE)

print(f"\n✅ Stage 1 Complete: {len(conversations)} conversations ready for analysis")

Conversations file conversations_parsed.json already exists. Loading existing data...
Loaded 2055 conversations from conversations_parsed.json

✅ Stage 1 Complete: 2055 conversations ready for analysis


## Stage 2: Problem Detection

In [3]:
def run_problem_detection(conversations: List[Conversation]) -> List[Conversation]:
    """Run problem detection on all conversations."""
    print("=== Stage 2: Problem Detection ===")
    
    updated_conversations = []
    problem_stats = {}
    
    print(f"Analyzing {len(conversations)} conversations for problems...")
    
    for i, conv in enumerate(tqdm(conversations, desc="Detecting problems")):
        # Create problem detection
        problem_detection = create_problem_detection_for_conversation(conv)
        
        # Update conversation with problem analysis
        conv_copy = conv.model_copy()
        
        # Create or update analysis with problems
        if conv_copy.analysis is None:
            conv_copy.analysis = ConversationMap(
                request=RequestCategory(
                    category=[],
                    intent=[]
                ),
                problems=problem_detection,
                ux=UX(
                    sentiment="neutral",
                    sentiment_confidence=0.5,
                    emotions=[],
                    feedback=[],
                    suggestions=[],
                    is_successful=True
                )
            )
        else:
            conv_copy.analysis.problems = problem_detection
        
        updated_conversations.append(conv_copy)
        
        # Track problem statistics
        for problem in problem_detection.problems:
            problem_stats[problem.value] = problem_stats.get(problem.value, 0) + 1
    
    # Print problem statistics
    print(f"\n=== Problem Detection Results ===")
    total_with_problems = sum(1 for conv in updated_conversations if conv.analysis.problems.problems)
    print(f"Conversations with problems: {total_with_problems}/{len(conversations)} ({total_with_problems/len(conversations)*100:.1f}%)")
    
    print("\nProblem type breakdown:")
    for problem_type, count in sorted(problem_stats.items(), key=lambda x: x[1], reverse=True):
        percentage = count / len(conversations) * 100
        print(f"  {problem_type}: {count} ({percentage:.1f}%)")
    
    return updated_conversations

# Run problem detection if not already done
problems_detected = any(
    conv.analysis and conv.analysis.problems and conv.analysis.problems.problems 
    for conv in conversations[:10]  # Check first 10 conversations
)

if not problems_detected:
    conversations = run_problem_detection(conversations)
    
    # Save updated conversations
    save_conversations_to_json(conversations, CONVERSATIONS_FILE)
    print(f"Updated conversations with problems saved to {CONVERSATIONS_FILE}")
else:
    print("Problem detection already completed. Skipping Stage 2.")

print(f"\n✅ Stage 2 Complete: Problem detection finished")

Problem detection already completed. Skipping Stage 2.

✅ Stage 2 Complete: Problem detection finished


## Stage 3: UX Analysis with Groq

In [None]:
# Stage 3: UX and Intent Analysis with Groq
import asyncio
import time

async def run_groq_ux_and_intent_analysis():
	"""Run enhanced UX and Intent analysis using the updated GroqProcessor."""
	print("=== Stage 3: Enhanced UX and Intent Analysis with Groq ===")
	
	# Initialize processor with enhanced capabilities
	processor = GroqProcessor(
		api_key=GROQ_API_KEY,
		model_name='llama3-8b-8192',  # Fast, good quality model
		max_concurrent_requests=3,
		batch_size=25,
		conversations_file=CONVERSATIONS_FILE
	)
	
	# Check for existing progress
	can_resume, start_index = processor.get_resume_info(len(conversations))
	
	# Check if analysis is already completed by examining recent conversations
	sample_convs = conversations[max(0, start_index-5):start_index+5] if start_index > 0 else conversations[:10]
	analysis_completed = any(
		conv.analysis and conv.analysis.ux and conv.analysis.request and
		hasattr(conv.analysis.ux, 'sentiment') and 
		conv.analysis.ux.sentiment != "neutral" and
		conv.analysis.request.intent  # Check if intent analysis exists
		for conv in sample_convs
	)
	
	if analysis_completed and start_index >= len(conversations):
		print("Enhanced UX and Intent analysis already completed for all conversations.")
		print("The analysis includes:")
		print("  - UX Analysis: sentiment, emotions, feedback, suggestions")
		print("  - Intent Analysis: user intent classification and categorization")
		return conversations
	
	if can_resume and start_index > 0:
		print(f"Found existing progress: {start_index}/{len(conversations)} conversations processed")
		should_resume = input("Resume from where you left off? (y/n): ").strip().lower()
		if should_resume != 'y':
			start_index = 0
			processor.cleanup_progress()
	
	# Get user confirmation for processing
	remaining_count = len(conversations) - start_index
	print(f"\nAbout to process {remaining_count} conversations with enhanced Groq analysis:")
	print("  • UX Analysis: sentiment, emotions, feedback, suggestions")
	print("  • Intent Analysis: user intent classification and request categorization")
	print("  • Problem Detection: technical issues, user confusion, etc.")
	print(f"  • Model: {processor.model_name}")
	print(f"  • Concurrent requests: {processor.max_concurrent_requests}")
	print(f"  • Batch size: {processor.batch_size}")
	
	process_all = input(f"\nProceed with processing? (y/n): ")
	
	if process_all.lower() == 'y':
		print(f"Starting enhanced Groq analysis from index {start_index}...")
		
		# Run the enhanced processing (includes both UX and Intent analysis)
		start_time = time.time()
		updated_conversations = await processor.process_ux_and_intent_analysis(start_index)
		end_time = time.time()
		
		print(f"\n=== Enhanced Analysis Complete ===")
		print(f"Provider: Groq ({processor.model_name})")
		print(f"Total time: {end_time - start_time:.2f} seconds")
		print(f"Average time per conversation: {(end_time - start_time) / remaining_count:.2f} seconds")
		print(f"Conversations analyzed: {remaining_count}")
		
		# Clean up progress file
		processor.cleanup_progress()
		
		return updated_conversations
	
	else:
		print("Enhanced analysis cancelled")
		return conversations

# Run enhanced UX and Intent analysis
conversations = await run_groq_ux_and_intent_analysis()

print(f"\n✅ Stage 3 Complete: Enhanced UX and Intent analysis finished")

=== Stage 3: Enhanced UX and Intent Analysis with Groq ===
No existing progress found, starting fresh

About to process 2055 conversations with enhanced Groq analysis:
  • UX Analysis: sentiment, emotions, feedback, suggestions
  • Intent Analysis: user intent classification and request categorization
  • Problem Detection: technical issues, user confusion, etc.
  • Model: llama3-8b-8192
  • Concurrent requests: 3
  • Batch size: 25
Starting enhanced Groq analysis from index 0...
Starting Groq UX and Intent processing from index 0
Total conversations to process: 2055
Successfully initialized Groq client with llama3-8b-8192
\nProcessing batch 1: conversations 1-25
  Completed 5/25 in batch
Error parsing UX analysis: 1 validation error for UX
suggestions.0
  Input should be a valid string [type=string_type, input_value={'text': 'It would be hel...completing these tasks'}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.11/v/string_type
  Completed 10/25 i

In [None]:
## Analysis Summary and Validation

def validate_analysis_completeness():
	"""Validate that all conversations have complete analysis."""
	print("=== Analysis Validation ===")
	
	# Check analysis completeness
	total_conversations = len(conversations)
	complete_analysis_count = 0
	ux_analysis_count = 0
	intent_analysis_count = 0
	problem_analysis_count = 0
	
	for conv in conversations:
		if conv.analysis:
			analysis_complete = True
			
			# Check UX analysis
			if conv.analysis.ux and hasattr(conv.analysis.ux, 'sentiment'):
				ux_analysis_count += 1
			else:
				analysis_complete = False
			
			# Check Intent analysis  
			if conv.analysis.request and conv.analysis.request.intent:
				intent_analysis_count += 1
			else:
				analysis_complete = False
				
			# Check Problem analysis
			if conv.analysis.problems:
				problem_analysis_count += 1
			else:
				analysis_complete = False
			
			if analysis_complete:
				complete_analysis_count += 1
	
	print(f"Total conversations: {total_conversations}")
	print(f"Complete analysis: {complete_analysis_count} ({complete_analysis_count/total_conversations*100:.1f}%)")
	print(f"UX analysis: {ux_analysis_count} ({ux_analysis_count/total_conversations*100:.1f}%)")
	print(f"Intent analysis: {intent_analysis_count} ({intent_analysis_count/total_conversations*100:.1f}%)")
	print(f"Problem analysis: {problem_analysis_count} ({problem_analysis_count/total_conversations*100:.1f}%)")
	
	return complete_analysis_count, ux_analysis_count, intent_analysis_count, problem_analysis_count

def show_analysis_samples():
	"""Show sample analysis results."""
	print("\n=== Sample Analysis Results ===")
	
	# Find conversations with complete analysis
	complete_conversations = [
		conv for conv in conversations 
		if conv.analysis and conv.analysis.ux and conv.analysis.request and conv.analysis.request.intent
	]
	
	if complete_conversations:
		# Show a few examples
		for i, conv in enumerate(complete_conversations[:3]):
			print(f"\n--- Sample {i+1}: Conversation {conv.dialogue_id} ---")
			print(f"Duration: {conv.duration_minutes} minutes, Messages: {conv.message_count}")
			
			# UX Analysis
			print(f"UX Analysis:")
			print(f"  Sentiment: {conv.analysis.ux.sentiment.value} (confidence: {conv.analysis.ux.sentiment_confidence:.2f})")
			print(f"  Emotions: {[e.value for e in conv.analysis.ux.emotions]}")
			print(f"  Success: {conv.analysis.ux.is_successful}")
			if conv.analysis.ux.feedback:
				print(f"  Feedback: {conv.analysis.ux.feedback[:1]}")  # Show first feedback item
			
			# Intent Analysis
			print(f"Intent Analysis:")
			print(f"  Category: {[c.value for c in conv.analysis.request.category]}")
			print(f"  Intent: {[i.value for i in conv.analysis.request.intent]}")
			
			# Problem Analysis
			if conv.analysis.problems.problems:
				print(f"Problems: {[p.value for p in conv.analysis.problems.problems]}")
			else:
				print("Problems: None detected")
	else:
		print("No conversations with complete analysis found.")

def generate_final_statistics():
	"""Generate comprehensive statistics."""
	print("\n=== Final Pipeline Statistics ===")
	
	# UX Statistics
	ux_sentiments = {}
	ux_emotions = {}
	success_rate = 0
	
	# Intent Statistics  
	intent_categories = {}
	intent_types = {}
	
	# Problem Statistics
	problem_types = {}
	
	analyzed_count = 0
	
	for conv in conversations:
		if conv.analysis:
			analyzed_count += 1
			
			# UX stats
			if conv.analysis.ux:
				sentiment = conv.analysis.ux.sentiment.value
				ux_sentiments[sentiment] = ux_sentiments.get(sentiment, 0) + 1
				
				if conv.analysis.ux.is_successful:
					success_rate += 1
					
				for emotion in conv.analysis.ux.emotions:
					ux_emotions[emotion.value] = ux_emotions.get(emotion.value, 0) + 1
			
			# Intent stats
			if conv.analysis.request:
				for category in conv.analysis.request.category:
					intent_categories[category.value] = intent_categories.get(category.value, 0) + 1
				for intent in conv.analysis.request.intent:
					intent_types[intent.value] = intent_types.get(intent.value, 0) + 1
			
			# Problem stats
			if conv.analysis.problems:
				for problem in conv.analysis.problems.problems:
					problem_types[problem.value] = problem_types.get(problem.value, 0) + 1
	
	print(f"Analyzed conversations: {analyzed_count}/{len(conversations)}")
	print(f"Success rate: {success_rate}/{analyzed_count} ({success_rate/analyzed_count*100:.1f}%)" if analyzed_count > 0 else "Success rate: N/A")
	
	print(f"\nTop UX Sentiments:")
	for sentiment, count in sorted(ux_sentiments.items(), key=lambda x: x[1], reverse=True):
		print(f"  {sentiment}: {count} ({count/analyzed_count*100:.1f}%)")
	
	print(f"\nTop Intent Categories:")
	for category, count in sorted(intent_categories.items(), key=lambda x: x[1], reverse=True)[:5]:
		print(f"  {category}: {count} ({count/analyzed_count*100:.1f}%)")
	
	print(f"\nTop Intent Types:")
	for intent, count in sorted(intent_types.items(), key=lambda x: x[1], reverse=True)[:5]:
		print(f"  {intent}: {count} ({count/analyzed_count*100:.1f}%)")
	
	print(f"\nTop Problem Types:")
	for problem, count in sorted(problem_types.items(), key=lambda x: x[1], reverse=True)[:5]:
		print(f"  {problem}: {count} ({count/analyzed_count*100:.1f}%)")

# Run validation and show results
validate_analysis_completeness()
show_analysis_samples()
generate_final_statistics()

print(f"\n🎉 Conversation Analysis Pipeline Complete!")
print(f"✅ All 3 stages completed successfully")
print(f"📊 Enhanced analysis includes UX, Intent, and Problem detection")
print(f"💾 Results saved to: {CONVERSATIONS_FILE}")