In [3]:
import pandas as pd
import numpy as np
import json
from datetime import datetime, timedelta
import hashlib
import sqlite3
from typing import Dict, List, Tuple, Optional
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import seaborn as sns
from textblob import TextBlob
import warnings
warnings.filterwarnings('ignore')

In [4]:
class DataPipeline:
    """
    Handles ingestion, normalization, and deduplication of customer support data
    """
    
    def __init__(self, db_path: str = "customer_support.db"):
        self.db_path = db_path
        self.setup_database()
    
    def setup_database(self):
        """Create database tables with proper indexing"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # Main interactions table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS interactions (
                id TEXT PRIMARY KEY,
                customer_id TEXT NOT NULL,
                brand TEXT NOT NULL,
                tweet_id TEXT UNIQUE,
                author_id TEXT,
                inbound BOOLEAN,
                created_at TIMESTAMP,
                text TEXT,
                response_tweet_id TEXT,
                in_response_to_tweet_id TEXT,
                conversation_thread TEXT,
                processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (conversation_thread) REFERENCES conversations(thread_id)
            )
        ''')
        
        # Conversation tracking table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS conversations (
                thread_id TEXT PRIMARY KEY,
                customer_id TEXT NOT NULL,
                brand TEXT NOT NULL,
                status TEXT DEFAULT 'open',
                created_at TIMESTAMP,
                last_updated TIMESTAMP,
                total_interactions INTEGER DEFAULT 0,
                customer_sentiment REAL,
                issue_category TEXT,
                resolution_status TEXT
            )
        ''')
        
        # Create indexes for performance
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_customer_id ON interactions(customer_id)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_thread ON interactions(conversation_thread)')
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON interactions(created_at)')
        
        conn.commit()
        conn.close()
    
    def generate_interaction_id(self, record: Dict) -> str:
        """Generate unique, deterministic ID for deduplication"""
        key_fields = f"{record.get('tweet_id', '')}{record.get('author_id', '')}{record.get('created_at', '')}"
        return hashlib.md5(key_fields.encode()).hexdigest()
    
    def ingest_cst_data(self, df: pd.DataFrame) -> int:
        """
        Ingest Customer Support Twitter data with idempotent processing
        Returns number of new records processed
        """
        conn = sqlite3.connect(self.db_path)
        new_records = 0
        
        for _, row in df.iterrows():
            interaction_id = self.generate_interaction_id(row.to_dict())
            
            # Check if already exists
            existing = pd.read_sql_query(
                "SELECT id FROM interactions WHERE id = ?", 
                conn, params=[interaction_id]
            )
            
            if len(existing) == 0:
                # Determine conversation thread
                thread_id = self._get_or_create_thread(row, conn)
                
                # Insert interaction
                cursor = conn.cursor()
                cursor.execute('''
                    INSERT INTO interactions 
                    (id, customer_id, brand, tweet_id, author_id, inbound, 
                     created_at, text, response_tweet_id, in_response_to_tweet_id, 
                     conversation_thread)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                ''', (
                    interaction_id,
                    row.get('author_id', ''),
                    row.get('brand', ''),
                    row.get('tweet_id', ''),
                    row.get('author_id', ''),
                    row.get('inbound', True),
                    row.get('created_at', ''),
                    row.get('text', ''),
                    row.get('response_tweet_id', ''),
                    row.get('in_response_to_tweet_id', ''),
                    thread_id
                ))
                new_records += 1
        
        conn.commit()
        conn.close()
        return new_records
    
    def _get_or_create_thread(self, row: Dict, conn) -> str:
        """Get or create conversation thread ID"""
        # Simple thread logic - could be enhanced with more sophisticated grouping
        thread_key = f"{row.get('author_id', '')}_{row.get('brand', '')}"
        thread_id = hashlib.md5(thread_key.encode()).hexdigest()[:16]
        
        cursor = conn.cursor()
        cursor.execute('SELECT thread_id FROM conversations WHERE thread_id = ?', [thread_id])
        
        if not cursor.fetchone():
            cursor.execute('''
                INSERT INTO conversations (thread_id, customer_id, brand, created_at, last_updated)
                VALUES (?, ?, ?, ?, ?)
            ''', (thread_id, row.get('author_id', ''), row.get('brand', ''), 
                  row.get('created_at', ''), row.get('created_at', '')))
        
        return thread_id

In [5]:
class UserBehaviorAnalyzer:
    """
    Analyzes user behavior patterns and creates customer cohorts
    """
    
    def __init__(self, db_path: str = "customer_support.db"):
        self.db_path = db_path
        self.customer_segments = {}
        self.conversation_flows = {}
    
    def analyze_conversation_patterns(self) -> pd.DataFrame:
        """Analyze conversation flows and customer behavior"""
        conn = sqlite3.connect(self.db_path)
        
        # Get conversation data with interactions
        query = '''
            SELECT 
                c.thread_id,
                c.customer_id,
                c.brand,
                COUNT(i.id) as interaction_count,
                AVG(CASE WHEN i.inbound THEN 1 ELSE 0 END) as customer_msg_ratio,
                GROUP_CONCAT(i.text, ' | ') as conversation_text,
                c.created_at,
                julianday('now') - julianday(c.last_updated) as days_since_last_update
            FROM conversations c
            LEFT JOIN interactions i ON c.thread_id = i.conversation_thread
            GROUP BY c.thread_id
            HAVING interaction_count > 0
        '''
        
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        # Feature engineering
        df['sentiment_score'] = df['conversation_text'].apply(self._get_sentiment)
        df['urgency_keywords'] = df['conversation_text'].apply(self._count_urgency_keywords)
        df['question_count'] = df['conversation_text'].apply(lambda x: x.count('?') if x else 0)
        df['avg_message_length'] = df['conversation_text'].apply(
            lambda x: np.mean([len(msg.strip()) for msg in x.split('|') if msg.strip()]) if x else 0
        )
        
        # Determine issue categories
        df['issue_category'] = df['conversation_text'].apply(self._categorize_issue)
        
        # Determine resolution status
        df['is_resolved'] = df.apply(self._determine_resolution_status, axis=1)
        
        return df
    
    def _get_sentiment(self, text: str) -> float:
        """Calculate sentiment score using TextBlob"""
        if not text:
            return 0.0
        try:
            return TextBlob(text).sentiment.polarity
        except:
            return 0.0
    
    def _count_urgency_keywords(self, text: str) -> int:
        """Count urgency indicators in text"""
        if not text:
            return 0
        
        urgency_words = [
            'urgent', 'emergency', 'asap', 'immediately', 'critical', 
            'broken', 'not working', 'error', 'issue', 'problem',
            'help', 'stuck', 'frustrated', 'angry'
        ]
        
        text_lower = text.lower()
        return sum(1 for word in urgency_words if word in text_lower)
    
    def _categorize_issue(self, text: str) -> str:
        """Categorize the nature of support request"""
        if not text:
            return 'unknown'
        
        text_lower = text.lower()
        
        # Login/Authentication issues
        if any(word in text_lower for word in ['login', 'password', 'account', 'authenticate', 'access']):
            return 'authentication'
        
        # Billing issues
        elif any(word in text_lower for word in ['bill', 'payment', 'charge', 'refund', 'subscription']):
            return 'billing'
        
        # Technical issues
        elif any(word in text_lower for word in ['bug', 'error', 'broken', 'crash', 'not working']):
            return 'technical'
        
        # Product inquiries
        elif any(word in text_lower for word in ['how to', 'feature', 'usage', 'tutorial']):
            return 'product_inquiry'
        
        # Complaints
        elif any(word in text_lower for word in ['complaint', 'dissatisfied', 'angry', 'terrible']):
            return 'complaint'
        
        else:
            return 'general'
    
    def _determine_resolution_status(self, row) -> bool:
        """Determine if issue appears resolved based on conversation patterns"""
        text = row['conversation_text'] if row['conversation_text'] else ''
        days_since_update = row['days_since_last_update']
        
        # Resolution indicators
        resolution_phrases = [
            'thank you', 'thanks', 'solved', 'resolved', 'fixed', 
            'working now', 'that helps', 'perfect', 'great'
        ]
        
        # If contains resolution phrases and no recent activity
        has_resolution_phrase = any(phrase in text.lower() for phrase in resolution_phrases)
        no_recent_activity = days_since_update > 7
        
        return has_resolution_phrase or (no_recent_activity and row['interaction_count'] > 2)
    
    def create_customer_cohorts(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create customer cohorts based on behavior patterns"""
        
        # Prepare features for clustering
        feature_cols = ['interaction_count', 'sentiment_score', 'urgency_keywords', 
                       'question_count', 'avg_message_length', 'days_since_last_update']
        
        # Fill NaN values
        df[feature_cols] = df[feature_cols].fillna(0)
        
        # Standardize features
        scaler = StandardScaler()
        scaled_features = scaler.fit_transform(df[feature_cols])
        
        # Perform clustering
        kmeans = KMeans(n_clusters=5, random_state=42)
        df['customer_cohort'] = kmeans.fit_predict(scaled_features)
        
        # Name cohorts based on characteristics
        cohort_names = {
            0: 'quick_resolvers',
            1: 'high_engagement',
            2: 'frustrated_customers',
            3: 'dormant_cases',
            4: 'complex_issues'
        }
        
        df['cohort_name'] = df['customer_cohort'].map(cohort_names)
        
        return df
    
    def visualize_patterns(self, df: pd.DataFrame):
        """Create visualizations of user behavior patterns"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        
        # Sentiment distribution by cohort
        sns.boxplot(data=df, x='cohort_name', y='sentiment_score', ax=axes[0,0])
        axes[0,0].set_title('Sentiment Score by Customer Cohort')
        axes[0,0].tick_params(axis='x', rotation=45)
        
        # Issue category distribution
        issue_counts = df['issue_category'].value_counts()
        axes[0,1].pie(issue_counts.values, labels=issue_counts.index, autopct='%1.1f%%')
        axes[0,1].set_title('Distribution of Issue Categories')
        
        # Resolution status by cohort
        resolution_df = df.groupby('cohort_name')['is_resolved'].mean().reset_index()
        sns.barplot(data=resolution_df, x='cohort_name', y='is_resolved', ax=axes[1,0])
        axes[1,0].set_title('Resolution Rate by Cohort')
        axes[1,0].tick_params(axis='x', rotation=45)
        
        # Interaction count distribution
        sns.histplot(data=df, x='interaction_count', bins=20, ax=axes[1,1])
        axes[1,1].set_title('Distribution of Interaction Counts')
        
        plt.tight_layout()
        plt.show()
        
        return fig

In [6]:
class NextBestActionEngine:
    """
    NBA engine that determines optimal next action for customer interactions
    """
    
    def __init__(self, db_path: str = "customer_support.db"):
        self.db_path = db_path
        self.action_rules = self._define_action_rules()
    
    def _define_action_rules(self) -> Dict:
        """Define NBA rules based on customer segments and conversation context"""
        return {
            'quick_resolvers': {
                'primary_channel': 'twitter_dm_reply',
                'timing_delay_hours': 2,
                'escalation_threshold': 2
            },
            'high_engagement': {
                'primary_channel': 'twitter_dm_reply',
                'timing_delay_hours': 1,
                'escalation_threshold': 3
            },
            'frustrated_customers': {
                'primary_channel': 'scheduling_phone_call',
                'timing_delay_hours': 0.5,
                'escalation_threshold': 1
            },
            'dormant_cases': {
                'primary_channel': 'email_reply',
                'timing_delay_hours': 24,
                'escalation_threshold': 1
            },
            'complex_issues': {
                'primary_channel': 'email_reply',
                'timing_delay_hours': 4,
                'escalation_threshold': 2
            }
        }
    
    def predict_next_action(self, customer_data: Dict) -> Dict:
        """
        Predict next best action for a customer
        """
        cohort = customer_data.get('cohort_name', 'quick_resolvers')
        rules = self.action_rules.get(cohort, self.action_rules['quick_resolvers'])
        
        # Determine channel based on multiple factors
        channel = self._select_channel(customer_data, rules)
        
        # Calculate optimal send time
        send_time = self._calculate_send_time(customer_data, rules)
        
        # Generate personalized message
        message = self._generate_message(customer_data, channel)
        
        # Generate reasoning
        reasoning = self._generate_reasoning(customer_data, channel, cohort)
        
        return {
            'customer_id': customer_data.get('customer_id', ''),
            'channel': channel,
            'send_time': send_time,
            'message': message,
            'reasoning': reasoning
        }
    
    def _select_channel(self, customer_data: Dict, rules: Dict) -> str:
        """Select optimal communication channel"""
        sentiment = customer_data.get('sentiment_score', 0)
        urgency = customer_data.get('urgency_keywords', 0)
        interaction_count = customer_data.get('interaction_count', 0)
        issue_category = customer_data.get('issue_category', 'general')
        
        # High urgency or negative sentiment -> phone call
        if urgency >= 3 or sentiment <= -0.5:
            return 'scheduling_phone_call'
        
        # Complex billing/technical issues with multiple interactions -> email
        elif issue_category in ['billing', 'technical'] and interaction_count >= 3:
            return 'email_reply'
        
        # Default to Twitter for quick, public resolution
        else:
            return 'twitter_dm_reply'
    
    def _calculate_send_time(self, customer_data: Dict, rules: Dict) -> str:
        """Calculate optimal send time"""
        delay_hours = rules['timing_delay_hours']
        
        # Adjust based on urgency
        urgency = customer_data.get('urgency_keywords', 0)
        if urgency >= 2:
            delay_hours = min(delay_hours, 1)  # Cap at 1 hour for urgent issues
        
        send_time = datetime.now() + timedelta(hours=delay_hours)
        return send_time.isoformat() + 'Z'
    
    def _generate_message(self, customer_data: Dict, channel: str) -> str:
        """Generate personalized message based on context"""
        issue_category = customer_data.get('issue_category', 'general')
        sentiment = customer_data.get('sentiment_score', 0)
        
        # Base templates by channel
        if channel == 'scheduling_phone_call':
            base = "Hi! I'd like to schedule a call to personally help resolve your {issue_type} concern. "
        elif channel == 'email_reply':
            base = "Thank you for reaching out about your {issue_type} issue. I'm sending detailed information to help resolve this. "
        else:  # twitter_dm_reply
            base = "Hi! I'm here to help with your {issue_type} question. "
        
        # Customize based on sentiment
        if sentiment <= -0.3:
            base += "I understand your frustration and want to make this right. "
        elif sentiment >= 0.3:
            base += "I appreciate your patience as we work through this together. "
        
        # Add specific guidance based on issue type
        issue_guidance = {
            'authentication': "Let me help you regain access to your account safely.",
            'billing': "I'll review your account and explain any charges or help with refunds.",
            'technical': "Let me troubleshoot this technical issue step by step.",
            'product_inquiry': "I'm happy to explain how our features work.",
            'complaint': "Your feedback is valuable and I want to address your concerns directly.",
            'general': "I'm here to assist with whatever you need."
        }
        
        guidance = issue_guidance.get(issue_category, issue_guidance['general'])
        
        return base.format(issue_type=issue_category) + guidance
    
    def _generate_reasoning(self, customer_data: Dict, channel: str, cohort: str) -> str:
        """Generate explanation for the NBA decision"""
        reasoning_parts = []
        
        # Channel reasoning
        if channel == 'scheduling_phone_call':
            reasoning_parts.append("Phone call selected due to high urgency/negative sentiment requiring personal touch")
        elif channel == 'email_reply':
            reasoning_parts.append("Email selected for complex issue requiring detailed explanation")
        else:
            reasoning_parts.append("Twitter DM selected for quick, public resolution")
        
        # Cohort reasoning
        reasoning_parts.append(f"Customer belongs to '{cohort}' segment")
        
        # Context factors
        sentiment = customer_data.get('sentiment_score', 0)
        if sentiment <= -0.3:
            reasoning_parts.append("negative sentiment detected")
        
        urgency = customer_data.get('urgency_keywords', 0)
        if urgency >= 2:
            reasoning_parts.append("high urgency indicators present")
        
        return "; ".join(reasoning_parts)

In [7]:
class NBAEvaluator:
    """
    Evaluates the NBA system performance and generates results
    """
    
    def __init__(self, pipeline: DataPipeline, analyzer: UserBehaviorAnalyzer, engine: NextBestActionEngine):
        self.pipeline = pipeline
        self.analyzer = analyzer
        self.engine = engine
    
    def run_end_to_end_evaluation(self, sample_size: int = 1000) -> pd.DataFrame:
        """Run complete evaluation pipeline"""
        
        # Step 1: Analyze customer behavior
        print("Analyzing customer behavior patterns...")
        behavior_df = self.analyzer.analyze_conversation_patterns()
        cohort_df = self.analyzer.create_customer_cohorts(behavior_df)
        
        # Step 2: Filter resolved cases
        open_cases = cohort_df[~cohort_df['is_resolved']].copy()
        resolved_count = len(cohort_df) - len(open_cases)
        
        print(f"Total cases: {len(cohort_df)}")
        print(f"Already resolved: {resolved_count}")
        print(f"Open cases for NBA: {len(open_cases)}")
        
        # Step 3: Sample for evaluation
        if len(open_cases) > sample_size:
            eval_cases = open_cases.sample(n=sample_size, random_state=42)
        else:
            eval_cases = open_cases.copy()
        
        # Step 4: Generate NBA predictions
        print(f"Generating NBA predictions for {len(eval_cases)} cases...")
        predictions = []
        
        for _, row in eval_cases.iterrows():
            prediction = self.engine.predict_next_action(row.to_dict())
            prediction.update({
                'chat_log': self._format_chat_log(row),
                'issue_status': self._predict_issue_status(row, prediction),
                'current_sentiment': row['sentiment_score'],
                'issue_category': row['issue_category'],
                'cohort_name': row['cohort_name']
            })
            predictions.append(prediction)
        
        results_df = pd.DataFrame(predictions)
        
        # Step 5: Calculate success metrics
        self._calculate_success_metrics(results_df)
        
        return results_df
    
    def _format_chat_log(self, row) -> str:
        """Format conversation history into readable chat log"""
        conversation_text = row.get('conversation_text', '')
        if not conversation_text:
            return "No conversation history available"
        
        # Simple formatting - in reality would need more sophisticated parsing
        messages = conversation_text.split(' | ')
        formatted_log = []
        
        for i, msg in enumerate(messages[:10]):  # Limit to 10 messages
            if msg.strip():
                role = "Customer" if i % 2 == 0 else "Support_agent"
                formatted_log.append(f"{role}: {msg.strip()}")
        
        return "\n".join(formatted_log)
    
    def _predict_issue_status(self, row, prediction) -> str:
        """Predict likely issue status after action"""
        channel = prediction['channel']
        sentiment = row.get('sentiment_score', 0)
        cohort = row.get('cohort_name', '')
        issue_category = row.get('issue_category', '')
        
        # Complex logic for status prediction
        if channel == 'scheduling_phone_call':
            if sentiment <= -0.5:
                return 'escalated'
            else:
                return 'resolved'
        
        elif channel == 'email_reply':
            if issue_category in ['billing', 'technical']:
                return 'pending_customer_reply'
            else:
                return 'resolved'
        
        else:  # twitter_dm_reply
            if cohort == 'quick_resolvers':
                return 'resolved'
            else:
                return 'pending_customer_reply'
    
    def _calculate_success_metrics(self, results_df: pd.DataFrame):
        """Calculate and display success metrics"""
        total_cases = len(results_df)
        
        # Channel distribution
        channel_dist = results_df['channel'].value_counts()
        print("\nChannel Distribution:")
        for channel, count in channel_dist.items():
            print(f"  {channel}: {count} ({count/total_cases*100:.1f}%)")
        
        # Predicted resolution rates
        resolution_dist = results_df['issue_status'].value_counts()
        print("\nPredicted Issue Status Distribution:")
        for status, count in resolution_dist.items():
            print(f"  {status}: {count} ({count/total_cases*100:.1f}%)")
        
        # Success prediction by cohort
        cohort_success = results_df.groupby('cohort_name')['issue_status'].apply(
            lambda x: (x == 'resolved').sum() / len(x) * 100
        ).round(1)
        
        print("\nPredicted Resolution Rate by Cohort:")
        for cohort, rate in cohort_success.items():
            print(f"  {cohort}: {rate}%")
    
    def export_results_csv(self, results_df: pd.DataFrame, filename: str = "nba_results.csv"):
        """Export results in the specified format"""
        
        # Select and order columns as specified
        output_columns = [
            'customer_id', 'channel', 'send_time', 'message', 'reasoning',
            'chat_log', 'issue_status'
        ]
        
        export_df = results_df[output_columns].copy()
        export_df.to_csv(filename, index=False)
        print(f"\nResults exported to {filename}")
        
        return export_df

In [8]:
# Demo usage and testing
def create_sample_data():
    """Create sample data for demonstration"""
    np.random.seed(42)
    
    sample_data = []
    brands = ['Nike', 'Starbucks', 'Apple', 'Amazon', 'Tesla']
    issue_types = ['login', 'billing', 'technical', 'product inquiry', 'complaint']
    
    for i in range(2000):
        sample_data.append({
            'tweet_id': f"tweet_{i}",
            'author_id': f"user_{i % 500}",  # 500 unique users
            'brand': np.random.choice(brands),
            'inbound': np.random.choice([True, False], p=[0.6, 0.4]),
            'created_at': f"2024-{np.random.randint(1,13):02d}-{np.random.randint(1,29):02d}T{np.random.randint(0,24):02d}:00:00Z",
            'text': f"Sample {np.random.choice(issue_types)} message {i}",
            'response_tweet_id': f"response_{i}" if np.random.random() > 0.5 else None,
            'in_response_to_tweet_id': f"tweet_{max(0, i-1)}" if i > 0 and np.random.random() > 0.7 else None
        })
    
    return pd.DataFrame(sample_data)

In [None]:
def main():
    """Main execution function"""
    print("=== Next-Best-Action System Demo ===\n")
    
    # Initialize components
    pipeline = DataPipeline()
    analyzer = UserBehaviorAnalyzer()
    engine = NextBestActionEngine()
    evaluator = NBAEvaluator(pipeline, analyzer, engine)
    
    # Create and ingest sample data
    print("1. Creating sample data...")
    sample_df = create_sample_data()
    new_records = pipeline.ingest_cst_data(sample_df)
    print(f"   Ingested {new_records} new records")
    
    # Run end-to-end evaluation
    print("\n2. Running end-to-end evaluation...")
    results_df = evaluator.run_end_to_end_evaluation(sample_size=1000)
    
    # Export results
    print("\n3. Exporting results...")
    final_export = evaluator.export_results_csv(results_df)
    
    print("\n=== Demo Complete ===")
    print(f"Processed {len(results_df)} customer cases")
    print("Check 'nba_results.csv' for detailed results")
    
    return results_df

if __name__ == "__main__":
    results = main()