# Australian Political Sentiment Analysis Configuration

This notebook provides a comprehensive sentiment analysis system for Australian political discourse on X (formerly Twitter).

## 📊 **Configuration Options**
You can easily customize the analysis by modifying the `AnalysisConfig` class parameters:

- `MAX_TWEETS_PER_QUERY`: Number of tweets to collect per party (default: 100)
- `DAYS_BACK`: Analysis period in days (default: 30)
- `LANG`: Tweet language filter (default: "en")
- `API_BATCH_SIZE`: API request batch size (default: 100)
- `MIN_TEXT_LENGTH`: Minimum tweet length filter (default: 10)

## 🏛️ **Political Party Focus**
The system focuses on Australian federal politics with comprehensive keyword sets:

**Labor Party**: Australian Labor Party is spelled **"Labor"** (not "Labour"). Includes:
- Official party names, handles, and variations
- Current leadership: Anthony Albanese, Richard Marles, Jim Chalmers, etc.
- State branches: NSW Labor, Victorian Labor, QLD Labor, etc.

**Liberal/Coalition**: Includes Liberal Party and Coalition partners:
- Party names: Liberal Party of Australia, LNP, Coalition
- Current leadership: Sussan Ley, Peter Dutton, Angus Taylor, etc.
- Coalition partners: The Nationals, LNP Queensland

## 🚀 **Architecture Overview**
- `TwitterDataFetcher`: API calls with rate limiting and error handling
- `DataCollector`: Multi-party data collection orchestration
- `SentimentAnalyzer`: VADER-based sentiment analysis with comprehensive metrics
- `ReportGenerator`: Professional visualizations and reporting
- `AnalysisOrchestrator`: Main pipeline coordination

In [None]:
#Install required packages (run once per env)

!pip -q install tweepy python-dotenv pandas vaderSentiment matplotlib

In [None]:
import os
import re
import math
import time
import unicodedata
import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from dotenv import load_dotenv
import tweepy

# Load env
load_dotenv()
BEARER = os.getenv("X_BEARER_TOKEN")
if not BEARER:
    raise RuntimeError("X_BEARER_TOKEN not found. Please create a .env file with X_BEARER_TOKEN.")
client = tweepy.Client(bearer_token=BEARER, wait_on_rate_limit=True)

In [None]:
# Configuration and Custom Exceptions
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class AnalysisConfig:
    """Configuration class for political sentiment analysis"""
    MAX_TWEETS_PER_QUERY: int = 10  # Reduced from 100 - Free tier has severe limits
    DAYS_BACK: int = 6  # Reduced from 7 to avoid edge cases with API timing
    LANG: str = "en"
    RATE_LIMIT_WAIT: int = 900  # 15 minutes wait for Free tier
    API_BATCH_SIZE: int = 10  # Much smaller batches for Free tier
    MIN_TEXT_LENGTH: int = 10
    INTER_QUERY_DELAY: int = 960  # 16 minutes between different party queries for Free tier
    
    # Australian political context terms
    AUS_CONTEXT: Optional[List[str]] = None
    LABOR_TERMS: Optional[List[str]] = None
    LIBERAL_TERMS: Optional[List[str]] = None
    
    def __post_init__(self):
        # Broad Australian-politics context seen on X
        if self.AUS_CONTEXT is None:
            self.AUS_CONTEXT = [
                # high-signal hashtags/terms - reduced for Free tier efficiency
                "auspol", "AusPol", "auspolitics",
                "Australian politics", "Australian Government",
                "Prime Minister", "Opposition"
            ]

        # Australian Labor Party (federal) — focused on highest-signal terms for Free tier
        if self.LABOR_TERMS is None:
            self.LABOR_TERMS = [
                # Most important party identifiers
                "Australian Labor Party", "ALP", "Labor Australia",
                "@AustralianLabor",
                # Key figures most likely to generate discussion
                "Anthony Albanese", "Albo", "@AlboMP",
                "Jim Chalmers", "@JEChalmers",
                "Penny Wong", "@SenatorWong"
            ]

        # Liberal / Coalition context - focused on highest-signal terms
        if self.LIBERAL_TERMS is None:
            self.LIBERAL_TERMS = [
                # Most important party identifiers
                "Liberal Party of Australia", "Liberal Australia", "Coalition",
                "@LiberalAus",
                # Key figures most likely to generate discussion  
                "Peter Dutton", "@PeterDutton_MP",
                "Sussan Ley", "@sussanley",
                "Angus Taylor", "@AngusTaylorMP"
            ]
    
    @property
    def since_date(self) -> str:
        """Calculate the start date for tweet collection"""
        # Use timezone-aware datetime and ensure we don't exceed Twitter's 7-day limit
        import datetime
        max_days_back = min(self.DAYS_BACK, 6)  # Conservative limit for API stability
        start_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=max_days_back)
        formatted_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        logger.info(f"Using start_time: {formatted_time} ({max_days_back} days back)")
        logger.warning(f"🚨 FREE TIER DETECTED: Only 1 API request per 15 minutes allowed!")
        logger.info(f"💡 Consider upgrading to Basic Tier ($200/month) for 60 requests per 15 minutes")
        return formatted_time

# Custom Exception Classes
class TwitterAPIError(Exception):
    """Raised when Twitter API encounters an error"""
    pass

class RateLimitError(Exception):
    """Raised when rate limit is exceeded"""
    pass

class DataProcessingError(Exception):
    """Raised when data processing fails"""
    pass

class SentimentAnalysisError(Exception):
    """Raised when sentiment analysis fails"""
    pass

# Initialize configuration
config = AnalysisConfig()

def build_query(terms: List[str], config: AnalysisConfig) -> str:
    """Build search query from terms and context - optimized for Free tier"""
    try:
        # Ensure we have non-null terms and context
        if not terms or not config.AUS_CONTEXT:
            raise DataProcessingError("Terms or AUS_CONTEXT cannot be empty")
        
        # For Free tier, use fewer terms to avoid complex queries
        combined_terms = terms + config.AUS_CONTEXT
        # Limit to most important terms for Free tier efficiency
        top_terms = combined_terms[:8]  # Reduced from all terms
        
        or_block = " OR ".join([f'"{t}"' if " " in t else t for t in top_terms])
        query = f"({or_block}) lang:{config.LANG} -is:retweet"
        logger.info(f"Built Free-tier optimized query: {query[:100]}...")
        return query
    except Exception as e:
        logger.error(f"Error building query: {e}")
        raise DataProcessingError(f"Failed to build query: {e}")

# Build queries using configuration - with Free tier warnings
QUERIES = {
    "Labor": build_query(config.LABOR_TERMS or [], config),
    "Liberal": build_query(config.LIBERAL_TERMS or [], config),
}

logger.warning("🚨 X API FREE TIER LIMITATIONS:")
logger.warning("   • Only 1 request per 15 minutes")
logger.warning("   • 100 tweet cap per month")  
logger.warning("   • Analysis will take ~32 minutes for both parties")
logger.warning("💡 For production use, consider Basic Tier ($200/month)")
logger.info(f"Configuration initialized: {config.MAX_TWEETS_PER_QUERY} tweets per query, {config.DAYS_BACK} days back")

QUERIES

# Fetch recent post with expansions and metrics

In [None]:
class TwitterDataFetcher:
    """Handles Twitter data fetching with rate limiting and error handling"""
    
    def __init__(self, client, config: AnalysisConfig):
        self.client = client
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def _handle_rate_limit(self) -> bool:
        """Handle rate limiting with interruptible wait"""
        self.logger.warning("Rate limited! Waiting 60s...")
        try:
            for i in range(self.config.RATE_LIMIT_WAIT, 0, -1):
                print(f"\r⏳ {i:2d}s remaining", end="", flush=True)
                time.sleep(1)
            print("\r✅ Resuming...           ")
            return True
        except KeyboardInterrupt:
            self.logger.info("Interrupted during rate limit wait")
            return False
    
    def _process_tweet_batch(self, resp) -> List[Dict[str, Any]]:
        """Process a batch of tweets from API response"""
        if not resp or not resp.data:
            return []
        
        tweets = []
        users_dict = {}
        
        # Build user lookup dictionary
        if resp.includes and 'users' in resp.includes:
            users_dict = {user.id: user for user in resp.includes['users']}
        
        # Process each tweet
        for tweet in resp.data:
            tweet_data = {
                'id': tweet.id,
                'text': tweet.text,
                'created_at': tweet.created_at.isoformat() if tweet.created_at else None,
                'lang': tweet.lang,
                'public_metrics': tweet.public_metrics,
                'author_id': tweet.author_id
            }
            
            # Add user data if available
            if tweet.author_id in users_dict:
                user = users_dict[tweet.author_id]
                tweet_data.update({
                    'username': user.username,
                    'name': user.name,
                    'verified': user.verified
                })
            
            tweets.append(tweet_data)
        
        return tweets
    
    def _make_api_call(self, query: str, batch_size: int, since_iso: str, next_token: Optional[str] = None):
        """Make a single API call with proper error handling"""
        try:
            kwargs = {
                'query': query,
                'max_results': batch_size,
                'start_time': since_iso,
                'tweet_fields': ["id", "text", "lang", "created_at", "public_metrics", "possibly_sensitive", "source"],
                'user_fields': ["username", "name", "public_metrics", "verified"],
                'expansions': ["author_id"]
            }
            
            # Only add next_token if it's not None
            if next_token is not None:
                kwargs['next_token'] = next_token
            
            return self.client.search_recent_tweets(**kwargs)
            
        except tweepy.TooManyRequests as e:
            raise RateLimitError("API rate limit exceeded")
        except tweepy.TwitterServerError as e:
            raise TwitterAPIError(f"Twitter server error: {e}")
        except Exception as e:
            raise TwitterAPIError(f"API call failed: {e}")
    
    def fetch_recent_safe(self, query: str, max_results: int, since_iso: str) -> List[Dict[str, Any]]:
        """Fetch recent tweets with safe error handling and progress tracking"""
        results = []
        per_call = min(self.config.API_BATCH_SIZE, max_results)
        next_token: Optional[str] = None
        fetched = 0
        
        self.logger.info(f"Fetching: '{query[:50]}...' (target: {max_results} tweets)")
        
        try:
            while fetched < max_results:
                remaining = max_results - fetched
                current_batch = min(per_call, remaining)
                
                print(f"⏳ Progress: {fetched}/{max_results} tweets", end="\r")
                
                try:
                    resp = self._make_api_call(query, current_batch, since_iso, next_token)
                    
                    if resp and resp.data:
                        tweets = self._process_tweet_batch(resp)
                        results.extend(tweets)
                        fetched += len(tweets)
                        
                        # Check for next token - handle both None and missing attribute cases
                        if hasattr(resp, 'meta') and resp.meta and 'next_token' in resp.meta:
                            next_token = resp.meta['next_token']
                        else:
                            next_token = None
                            self.logger.info("No more tweets available")
                            break
                    else:
                        self.logger.info("No data returned from API")
                        break
                
                except RateLimitError:
                    if not self._handle_rate_limit():
                        break
                
                except TwitterAPIError as e:
                    self.logger.error(f"API error: {e}")
                    break
                    
        except KeyboardInterrupt:
            self.logger.info(f"Fetch interrupted. Returning {len(results)} tweets collected so far")
        
        print(f"\n✅ Complete: {len(results)} tweets collected")
        self.logger.info(f"Successfully fetched {len(results)} tweets")
        return results

def fetch_recent_safe(query, max_results, since_iso, client):
    """Legacy wrapper function for backward compatibility"""
    fetcher = TwitterDataFetcher(client, config)
    return fetcher.fetch_recent_safe(query, max_results, since_iso)

# Data Processing & Analysis Functions

In [None]:
class DataCollector:
    """Handles collection and organization of Twitter data"""
    
    def __init__(self, client, config: AnalysisConfig):
        self.client = client
        self.config = config
        self.fetcher = TwitterDataFetcher(client, config)
        self.logger = logging.getLogger(__name__)
    
    def collect_all_data(self) -> Dict[str, pd.DataFrame]:
        """Collect tweets for all political parties - optimized for Free tier"""
        all_data = {}
        
        self.logger.warning("🚨 FREE TIER: Each party query requires 15+ minute wait")
        self.logger.info("💡 For faster analysis, consider X API Basic Tier ($200/month)")
        
        for i, (party, query) in enumerate(QUERIES.items()):
            self.logger.info(f"Collecting data for {party}...")
            print(f"\n📊 Collecting data for {party}...")
            
            if i > 0:  # Add delay between different party queries for Free tier
                wait_time = self.config.INTER_QUERY_DELAY
                self.logger.info(f"⏳ FREE TIER: Waiting {wait_time//60} minutes before next party query...")
                print(f"⏳ FREE TIER: Waiting {wait_time//60} minutes before next party query...")
                
                try:
                    for remaining in range(wait_time, 0, -30):  # Update every 30 seconds
                        mins, secs = divmod(remaining, 60)
                        print(f"\r⏳ Wait time remaining: {mins:02d}:{secs:02d}", end="", flush=True)
                        time.sleep(30)
                    print("\n✅ Wait complete, proceeding with next query...")
                except KeyboardInterrupt:
                    self.logger.info("Wait interrupted by user")
                    print("\n🛑 Wait interrupted by user")
                    break
            
            try:
                tweets = self.fetcher.fetch_recent_safe(
                    query, 
                    self.config.MAX_TWEETS_PER_QUERY, 
                    self.config.since_date
                )
                
                # Convert to DataFrame
                df = pd.DataFrame(tweets)
                if not df.empty and 'public_metrics' in df.columns:
                    # Flatten public metrics - convert Series to list first
                    metrics_list = df['public_metrics'].tolist()
                    
                    # Only normalize if we have valid data
                    if metrics_list and all(isinstance(item, dict) for item in metrics_list if item is not None):
                        try:
                            metrics_df = pd.json_normalize(metrics_list)
                            df = pd.concat([df.drop('public_metrics', axis=1), metrics_df], axis=1)
                        except Exception as e:
                            self.logger.warning(f"Could not normalize public_metrics for {party}: {e}")
                    
                    df['party_query'] = party
                elif not df.empty:
                    # If no public_metrics column, just add party_query
                    df['party_query'] = party
                
                all_data[party] = df
                self.logger.info(f"✅ {party}: {len(df)} tweets collected")
                print(f"✅ {party}: {len(df)} tweets collected")
                
                # No additional pause needed here - the fetcher handles rate limiting
                
            except Exception as e:
                self.logger.error(f"Error collecting data for {party}: {e}")
                all_data[party] = pd.DataFrame()
        
        total_tweets = sum(len(df) for df in all_data.values())
        self.logger.info(f"🏁 Collection complete: {total_tweets} total tweets from all parties")
        
        if total_tweets == 0:
            self.logger.warning("⚠️ No tweets collected. This could be due to:")
            self.logger.warning("   • Free tier API limits (only 1 request per 15 minutes)")
            self.logger.warning("   • Low political activity in the past 6 days")
            self.logger.warning("   • Query terms not matching recent tweets")
            self.logger.warning("💡 Try running again later or consider API upgrade")
        
        return all_data

class DataProcessor:
    """Handles data cleaning and preprocessing"""
    
    def __init__(self, config: AnalysisConfig):
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def clean_text(self, text: str) -> str:
        """Clean tweet text for sentiment analysis"""
        if pd.isna(text) or not text:
            return ""
        
        try:
            # Remove URLs
            text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
            
            # Remove user mentions and hashtags (keep the text)
            text = re.sub(r'@\w+', '', text)
            text = re.sub(r'#(\w+)', r'\1', text)
            
            # Normalize unicode characters
            text = unicodedata.normalize('NFKD', text)
            
            # Remove extra whitespace
            text = ' '.join(text.split())
            
            return text.strip()
        
        except Exception as e:
            self.logger.error(f"Error cleaning text: {e}")
            return ""
    
    def classify_party_sentiment(self, text: str) -> str:
        """Classify which party a tweet is more likely about"""
        if not text:
            return 'Neutral'
        
        try:
            text_lower = text.lower()
            
            labor_score = sum(
                1 for term in self.config.LABOR_TERMS 
                if term.lower().replace('@', '') in text_lower
            )
            liberal_score = sum(
                1 for term in self.config.LIBERAL_TERMS 
                if term.lower().replace('@', '') in text_lower
            )
            
            if labor_score > liberal_score:
                return 'Labor'
            elif liberal_score > labor_score:
                return 'Liberal'
            else:
                return 'Neutral'
        
        except Exception as e:
            self.logger.error(f"Error classifying party sentiment: {e}")
            return 'Neutral'
    
    def validate_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """Validate and filter data quality"""
        if df.empty:
            return df
        
        original_count = len(df)
        
        # Remove tweets that are too short
        if 'text' in df.columns:
            df = df[df['text'].str.len() >= self.config.MIN_TEXT_LENGTH]
        
        # Remove duplicates
        if 'id' in df.columns:
            df = df.drop_duplicates(subset=['id'])
        
        self.logger.info(f"Data validation: {original_count} -> {len(df)} tweets")
        return df

# Legacy wrapper functions for backward compatibility
def collect_all_data():
    """Legacy wrapper function"""
    collector = DataCollector(client, config)
    return collector.collect_all_data()

def clean_text(text):
    """Legacy wrapper function"""
    processor = DataProcessor(config)
    return processor.clean_text(text)

def classify_party_sentiment(text, labor_terms, liberal_terms):
    """Legacy wrapper function"""
    processor = DataProcessor(config)
    return processor.classify_party_sentiment(text)

# Sentiment Analysis with VADER

In [None]:
class SentimentAnalyzer:
    """Handles sentiment analysis using VADER sentiment analyzer"""
    
    def __init__(self, config: AnalysisConfig):
        self.config = config
        self.analyzer = SentimentIntensityAnalyzer()
        self.processor = DataProcessor(config)
        self.logger = logging.getLogger(__name__)
    
    def _classify_sentiment(self, compound_score: float) -> str:
        """Classify sentiment based on compound score"""
        if compound_score >= 0.05:
            return 'Positive'
        elif compound_score <= -0.05:
            return 'Negative'
        else:
            return 'Neutral'
    
    def analyze_sentiment_batch(self, df: pd.DataFrame) -> pd.DataFrame:
        """Perform sentiment analysis on a DataFrame of tweets"""
        if df.empty:
            self.logger.warning("Empty DataFrame provided for sentiment analysis")
            return df
        
        try:
            # Validate and clean data first
            df = self.processor.validate_data(df.copy())
            
            if df.empty:
                self.logger.warning("No valid data after validation")
                return df
            
            # Clean text
            df['cleaned_text'] = df['text'].apply(self.processor.clean_text)
            
            # Apply VADER sentiment analysis
            sentiment_scores = df['cleaned_text'].apply(
                lambda x: self.analyzer.polarity_scores(x) if x else {'compound': 0, 'pos': 0, 'neg': 0, 'neu': 1}
            )
            
            # Extract sentiment components
            df['sentiment_compound'] = sentiment_scores.apply(lambda x: x['compound'])
            df['sentiment_positive'] = sentiment_scores.apply(lambda x: x['pos'])
            df['sentiment_negative'] = sentiment_scores.apply(lambda x: x['neg'])
            df['sentiment_neutral'] = sentiment_scores.apply(lambda x: x['neu'])
            
            # Classify overall sentiment
            df['sentiment_label'] = df['sentiment_compound'].apply(self._classify_sentiment)
            
            # Classify party relevance
            df['party_classification'] = df['text'].apply(self.processor.classify_party_sentiment)
            
            self.logger.info(f"Sentiment analysis completed for {len(df)} tweets")
            return df
        
        except Exception as e:
            self.logger.error(f"Error in sentiment analysis: {e}")
            raise SentimentAnalysisError(f"Sentiment analysis failed: {e}")
    
    def get_sentiment_summary(self, df: pd.DataFrame, party_name: str) -> Dict[str, Any]:
        """Generate comprehensive sentiment summary statistics"""
        if df.empty:
            return {'party': party_name, 'error': 'No data available'}
        
        try:
            summary = {
                'party': party_name,
                'total_tweets': len(df),
                'avg_compound_score': df['sentiment_compound'].mean() if 'sentiment_compound' in df.columns else 0,
                'sentiment_distribution': {},
                'party_classification': {},
                'avg_engagement': {},
                'quality_metrics': {
                    'avg_text_length': df['text'].str.len().mean() if 'text' in df.columns else 0,
                    'verified_users': df['verified'].sum() if 'verified' in df.columns else 0
                }
            }
            
            # Sentiment distribution
            if 'sentiment_label' in df.columns:
                summary['sentiment_distribution'] = df['sentiment_label'].value_counts().to_dict()
            
            # Party classification
            if 'party_classification' in df.columns:
                summary['party_classification'] = df['party_classification'].value_counts().to_dict()
            
            # Engagement metrics
            engagement_cols = ['like_count', 'retweet_count', 'reply_count', 'quote_count']
            for col in engagement_cols:
                if col in df.columns:
                    summary['avg_engagement'][col] = df[col].mean()
            
            self.logger.info(f"Summary generated for {party_name}: {summary['total_tweets']} tweets")
            return summary
        
        except Exception as e:
            self.logger.error(f"Error generating summary for {party_name}: {e}")
            return {'party': party_name, 'error': str(e)}

class AnalysisOrchestrator:
    """Main orchestrator for the complete analysis pipeline"""
    
    def __init__(self, client, config: AnalysisConfig):
        self.client = client
        self.config = config
        self.collector = DataCollector(client, config)
        self.analyzer = SentimentAnalyzer(config)
        self.logger = logging.getLogger(__name__)
    
    def run_complete_analysis(self) -> tuple[Dict[str, pd.DataFrame], Dict[str, Dict[str, Any]]]:
        """Run the complete analysis pipeline"""
        try:
            self.logger.info("Starting complete analysis pipeline")
            
            # Step 1: Collect data
            self.logger.info("Step 1: Data collection")
            data_dict = self.collector.collect_all_data()
            
            # Check if we have any data
            total_collected = sum(len(df) for df in data_dict.values())
            if total_collected == 0:
                raise DataProcessingError("No data collected. Check API credentials and network connection.")
            
            self.logger.info(f"Data collection complete! Total tweets: {total_collected}")
            
            # Step 2: Analyze sentiment
            self.logger.info("Step 2: Sentiment analysis")
            analyzed_data = {}
            summaries = {}
            
            for party, df in data_dict.items():
                if not df.empty:
                    self.logger.info(f"Analyzing sentiment for {party}...")
                    analyzed_df = self.analyzer.analyze_sentiment_batch(df)
                    analyzed_data[party] = analyzed_df
                    summaries[party] = self.analyzer.get_sentiment_summary(analyzed_df, party)
                else:
                    self.logger.warning(f"No data for {party} - skipping")
                    analyzed_data[party] = df
            
            self.logger.info("Analysis pipeline completed successfully")
            return analyzed_data, summaries
        
        except Exception as e:
            self.logger.error(f"Error in analysis pipeline: {e}")
            raise

# Legacy wrapper functions for backward compatibility
def analyze_sentiment_batch(df):
    """Legacy wrapper function"""
    analyzer = SentimentAnalyzer(config)
    return analyzer.analyze_sentiment_batch(df)

def get_sentiment_summary(df, party_name):
    """Legacy wrapper function"""
    analyzer = SentimentAnalyzer(config)
    return analyzer.get_sentiment_summary(df, party_name)

# Data Visualization & Analysis

In [None]:
class ReportGenerator:
    """Handles visualization and report generation"""
    
    def __init__(self, config: AnalysisConfig):
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def _plot_sentiment_distribution(self, ax, data_dict, parties, colors):
        """Create sentiment distribution comparison plot"""
        sentiment_data = {}
        for party in parties:
            df = data_dict[party]
            if not df.empty and 'sentiment_label' in df.columns:
                sentiment_counts = df['sentiment_label'].value_counts()
                sentiment_data[party] = sentiment_counts
        
        # Create grouped bar chart
        sentiments = ['Positive', 'Negative', 'Neutral']
        x_pos = range(len(sentiments))
        width = 0.35
        
        for i, party in enumerate(parties):
            if party in sentiment_data:
                values = [sentiment_data[party].get(s, 0) for s in sentiments]
                ax.bar([x + width*i for x in x_pos], values, width, 
                       label=party, color=colors[i % len(colors)], alpha=0.8)
        
        ax.set_xlabel('Sentiment')
        ax.set_ylabel('Number of Tweets')
        ax.set_title('Sentiment Distribution by Party')
        ax.set_xticks([x + width/2 for x in x_pos])
        ax.set_xticklabels(sentiments)
        ax.legend()
        ax.grid(axis='y', alpha=0.3)
    
    def _plot_average_sentiment(self, ax, data_dict, parties, colors):
        """Create average sentiment scores plot"""
        avg_scores = []
        party_names = []
        for party in parties:
            df = data_dict[party]
            if not df.empty and 'sentiment_compound' in df.columns:
                avg_scores.append(df['sentiment_compound'].mean())
                party_names.append(party)
        
        if avg_scores:
            bars = ax.bar(party_names, avg_scores, color=colors[:len(party_names)], alpha=0.8)
            ax.set_ylabel('Average Sentiment Score')
            ax.set_title('Average Sentiment Score by Party')
            ax.set_ylim(-0.5, 0.5)
            ax.axhline(y=0, color='black', linestyle='-', alpha=0.3)
            ax.grid(axis='y', alpha=0.3)
            
            # Add value labels on bars
            for bar, score in zip(bars, avg_scores):
                height = bar.get_height()
                ax.text(bar.get_x() + bar.get_width()/2., height + 0.01 if height > 0 else height - 0.03,
                        f'{score:.3f}', ha='center', va='bottom' if height > 0 else 'top')
    
    def _plot_engagement_metrics(self, ax, data_dict, parties, colors):
        """Create engagement metrics comparison plot"""
        engagement_metrics = ['like_count', 'retweet_count', 'reply_count']
        width = 0.35
        
        for i, party in enumerate(parties):
            df = data_dict[party]
            if not df.empty and all(col in df.columns for col in engagement_metrics):
                values = [df[col].mean() for col in engagement_metrics]
                ax.bar([x + width*i for x in range(len(engagement_metrics))], values, 
                       width, label=party, color=colors[i % len(colors)], alpha=0.8)
        
        ax.set_xlabel('Engagement Type')
        ax.set_ylabel('Average Count')
        ax.set_title('Average Engagement by Party')
        ax.set_xticks([x + width/2 for x in range(len(engagement_metrics))])
        ax.set_xticklabels(['Likes', 'Retweets', 'Replies'])
        ax.legend()
        ax.grid(axis='y', alpha=0.3)
    
    def _plot_party_classification(self, ax, data_dict, parties):
        """Create party classification distribution plot"""
        classification_data = {}
        for party in parties:
            df = data_dict[party]
            if not df.empty and 'party_classification' in df.columns:
                class_counts = df['party_classification'].value_counts()
                classification_data[party] = class_counts
        
        # Stacked bar chart
        classifications = ['Labor', 'Liberal', 'Neutral']
        
        for i, class_type in enumerate(classifications):
            values = []
            for party in parties:
                if party in classification_data:
                    values.append(classification_data[party].get(class_type, 0))
                else:
                    values.append(0)
            
            bottom_values = [0] * len(parties)
            for j in range(i):
                prev_class = classifications[j]
                for k, party in enumerate(parties):
                    if party in classification_data:
                        bottom_values[k] += classification_data[party].get(prev_class, 0)
            
            ax.bar(parties, values, label=class_type, bottom=bottom_values, alpha=0.8)
        
        ax.set_ylabel('Number of Tweets')
        ax.set_title('Party Classification Distribution')
        ax.legend()
        ax.grid(axis='y', alpha=0.3)
    
    def plot_sentiment_comparison(self, data_dict: Dict[str, pd.DataFrame]):
        """Create comprehensive visualization comparing sentiment across parties"""
        try:
            fig, axes = plt.subplots(2, 2, figsize=(15, 12))
            fig.suptitle('Australian Political Sentiment Analysis', fontsize=16, fontweight='bold')
            
            # Prepare data for plotting
            parties = list(data_dict.keys())
            colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#FFA07A']  # Expandable color palette
            
            # 1. Sentiment Distribution Comparison
            self._plot_sentiment_distribution(axes[0, 0], data_dict, parties, colors)
            
            # 2. Average Sentiment Scores
            self._plot_average_sentiment(axes[0, 1], data_dict, parties, colors)
            
            # 3. Engagement Metrics
            self._plot_engagement_metrics(axes[1, 0], data_dict, parties, colors)
            
            # 4. Party Classification Distribution
            self._plot_party_classification(axes[1, 1], data_dict, parties)
            
            plt.tight_layout()
            self.logger.info("Sentiment comparison visualization created successfully")
            return fig
        
        except Exception as e:
            self.logger.error(f"Error creating visualization: {e}")
            raise DataProcessingError(f"Visualization failed: {e}")
    
    def print_analysis_summary(self, data_dict: Dict[str, pd.DataFrame], summaries: Dict[str, Dict[str, Any]]):
        """Print comprehensive analysis summary"""
        try:
            print("=" * 80)
            print("🇦🇺 AUSTRALIAN POLITICAL SENTIMENT ANALYSIS REPORT")
            print("=" * 80)
            print(f"📅 Analysis Period: Last {self.config.DAYS_BACK} days")
            print(f"🔍 Search Language: {self.config.LANG}")
            print(f"📊 Max Tweets per Query: {self.config.MAX_TWEETS_PER_QUERY}")
            print()
            
            total_tweets = sum(len(df) for df in data_dict.values() if not df.empty)
            print(f"📈 Total Tweets Analyzed: {total_tweets}")
            print()
            
            for party, summary in summaries.items():
                if 'error' in summary:
                    print(f"⚠️  {party.upper()} PARTY: {summary['error']}")
                    continue
                
                print(f"🏛️  {party.upper()} PARTY ANALYSIS")
                print("-" * 50)
                print(f"Total Tweets: {summary['total_tweets']}")
                print(f"Average Sentiment Score: {summary['avg_compound_score']:.3f}")
                print()
                
                # Sentiment Distribution
                if summary.get('sentiment_distribution'):
                    print("Sentiment Distribution:")
                    for sentiment, count in summary['sentiment_distribution'].items():
                        percentage = (count / summary['total_tweets']) * 100
                        print(f"  • {sentiment}: {count} tweets ({percentage:.1f}%)")
                    print()
                
                # Party Classification
                if summary.get('party_classification'):
                    print("Party Classification:")
                    for classification, count in summary['party_classification'].items():
                        percentage = (count / summary['total_tweets']) * 100
                        print(f"  • {classification}: {count} tweets ({percentage:.1f}%)")
                    print()
                
                # Average Engagement
                if summary.get('avg_engagement'):
                    print("Average Engagement:")
                    for metric, value in summary['avg_engagement'].items():
                        if pd.notna(value):
                            print(f"  • {metric.replace('_', ' ').title()}: {value:.1f}")
                    print()
            
            # Overall comparison
            self._print_comparative_analysis(summaries)
            
            print("=" * 80)
            self.logger.info("Analysis summary printed successfully")
        
        except Exception as e:
            self.logger.error(f"Error printing summary: {e}")
            print(f"❌ Error generating summary: {e}")
    
    def _print_comparative_analysis(self, summaries: Dict[str, Dict[str, Any]]):
        """Print comparative analysis between parties"""
        valid_summaries = {k: v for k, v in summaries.items() if 'error' not in v}
        
        if len(valid_summaries) >= 2:
            parties = list(valid_summaries.keys())
            scores = [valid_summaries[party]['avg_compound_score'] for party in parties]
            
            print("📊 COMPARATIVE ANALYSIS")
            print("-" * 50)
            
            # Find party with highest sentiment
            max_score_idx = scores.index(max(scores))
            winner = parties[max_score_idx]
            diff = max(scores) - min(scores)
            
            print(f"🏆 Most Positive Sentiment: {winner}")
            print(f"📈 Sentiment Difference: {diff:.3f}")
            print()
            
            # Overall statistics
            total_positive = sum(
                summary['sentiment_distribution'].get('Positive', 0) 
                for summary in valid_summaries.values()
            )
            total_negative = sum(
                summary['sentiment_distribution'].get('Negative', 0) 
                for summary in valid_summaries.values()
            )
            
            print(f"🟢 Overall Positive Tweets: {total_positive}")
            print(f"🔴 Overall Negative Tweets: {total_negative}")
            
            if total_negative > 0:
                ratio = total_positive / total_negative
                print(f"📊 Positive/Negative Ratio: {ratio:.2f}")
            else:
                print("📊 All sentiment is positive!")

# Legacy wrapper functions for backward compatibility
def plot_sentiment_comparison(data_dict):
    """Legacy wrapper function"""
    generator = ReportGenerator(config)
    return generator.plot_sentiment_comparison(data_dict)

def print_analysis_summary(data_dict, summaries):
    """Legacy wrapper function"""
    generator = ReportGenerator(config)
    generator.print_analysis_summary(data_dict, summaries)

# Main Execution - Run Complete Analysis

In [None]:
class FileManager:
    """Handles file operations and data persistence"""
    
    def __init__(self, config: AnalysisConfig):
        self.config = config
        self.logger = logging.getLogger(__name__)
    
    def save_results(self, analyzed_data: Dict[str, pd.DataFrame], summaries: Dict[str, Dict[str, Any]]) -> tuple[str, str]:
        """Save analysis results to files"""
        try:
            timestamp = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
            
            # Save combined data
            all_results = [df for df in analyzed_data.values() if not df.empty]
            
            if all_results:
                combined_df = pd.concat(all_results, ignore_index=True)
                filename = f"aus_political_sentiment_{timestamp}.csv"
                combined_df.to_csv(filename, index=False)
                self.logger.info(f"Results saved to: {filename}")
                
                # Save summary as JSON
                import json
                summary_filename = f"aus_political_summary_{timestamp}.json"
                
                # Convert numpy types to native Python types for JSON serialization
                json_summaries = {}
                for party, summary in summaries.items():
                    json_summary = {}
                    for key, value in summary.items():
                        if isinstance(value, dict):
                            json_summary[key] = {k: float(v) if pd.notna(v) else None for k, v in value.items()}
                        elif pd.notna(value):
                            json_summary[key] = float(value) if isinstance(value, (int, float)) else value
                        else:
                            json_summary[key] = value
                    json_summaries[party] = json_summary
                
                with open(summary_filename, 'w') as f:
                    json.dump(json_summaries, f, indent=2, default=str)
                
                self.logger.info(f"Summary saved to: {summary_filename}")
                return filename, summary_filename
            else:
                self.logger.warning("No results to save")
                return "", ""
        
        except Exception as e:
            self.logger.error(f"Error saving results: {e}")
            return "", ""

def main():
    """Main execution function - orchestrates the complete analysis pipeline"""
    try:
        logger.info("🚀 Starting Australian Political Sentiment Analysis")
        print("🚀 Starting Australian Political Sentiment Analysis")
        print("=" * 60)
        
        # Initialize components
        orchestrator = AnalysisOrchestrator(client, config)
        report_generator = ReportGenerator(config)
        file_manager = FileManager(config)
        
        # Step 1: Run complete analysis
        print("\n🧠 ANALYSIS PIPELINE")
        print("-" * 30)
        analyzed_data, summaries = orchestrator.run_complete_analysis()
        
        # Check if we have any data
        total_collected = sum(len(df) for df in analyzed_data.values() if not df.empty)
        if total_collected == 0:
            print("❌ No data collected. Check your API credentials and network connection.")
            return None
        
        print(f"\n✅ Analysis complete! Total tweets processed: {total_collected}")
        
        # Step 2: Print comprehensive summary
        print("\n📊 ANALYSIS RESULTS")
        print("-" * 30)
        report_generator.print_analysis_summary(analyzed_data, summaries)
        
        # Step 3: Create visualizations
        print("\n📈 CREATING VISUALIZATIONS")
        print("-" * 30)
        
        # Only create plots if we have data for at least one party
        parties_with_data = [party for party, df in analyzed_data.items() if not df.empty]
        if parties_with_data:
            try:
                fig = report_generator.plot_sentiment_comparison(analyzed_data)
                plt.show()
                print("✅ Visualizations created")
            except Exception as e:
                logger.error(f"Visualization error: {e}")
                print(f"⚠️  Visualization error: {e}")
        else:
            print("⚠️  No data available for visualization")
        
        # Step 4: Save results
        print("\n💾 SAVING RESULTS")
        print("-" * 30)
        
        csv_file, json_file = file_manager.save_results(analyzed_data, summaries)
        if csv_file:
            print(f"✅ Results saved to: {csv_file}")
        if json_file:
            print(f"✅ Summary saved to: {json_file}")
        
        if not csv_file and not json_file:
            print("⚠️  No results to save")
        
        print("\n🎉 Analysis Complete!")
        print("=" * 60)
        
        logger.info("Analysis pipeline completed successfully")
        return analyzed_data, summaries
        
    except Exception as e:
        error_msg = f"Error during analysis: {str(e)}"
        logger.error(error_msg)
        print(f"\n❌ {error_msg}")
        print("Please check your API credentials and try again.")
        return None

# Enhanced execution with better error handling
if __name__ == "__main__":
    results = main()