In [1]:
"""
News article processing pipeline for multilingual text data.
Filters by geographic relevance, then cleans and tokenizes English and Arabic news articles.
"""

import pandas as pd
import re
import pickle
import torch
import logging
from pathlib import Path
from typing import List, Optional, Dict, Set, Tuple
from transformers import pipeline, Pipeline
from tqdm.auto import tqdm

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


class NewsArticleProcessor:
    """Processes and cleans news articles in multiple languages with geographic filtering."""
    
    # Regex patterns as class constants for reusability
    HTML_PATTERN = re.compile(r'<.*?>')
    URL_PATTERN = re.compile(r'https?://\S+|www\.\S+')
    WHITESPACE_PATTERN = re.compile(r'\s+')
    SENTENCE_SPLIT_PATTERN = re.compile(r'(?<=[.!?۔])\s+')
    LETTER_PATTERN = re.compile(r'[a-zA-Zء-ي]')
    
    def __init__(self, data_dir: str = '../data', 
                 filter_by_geography: bool = True,
                 max_text_length: int = 2000,
                 batch_size: int = 128,
                 ner_model: str = "Babelscape/wikineural-multilingual-ner"):
        """
        Initialize the processor with data directory path and filtering options.
        
        Args:
            data_dir: Root directory containing raw and processed data folders
            filter_by_geography: Whether to apply geographic filtering
            max_text_length: Maximum text length for NER processing
            batch_size: Batch size for NER processing
            ner_model: Model name for NER pipeline
        """
        self.data_dir = Path(data_dir)
        self.raw_dir = self.data_dir / '01_raw'
        self.processed_dir = self.data_dir / '02_processed'
        
        # Geographic filtering settings
        self.filter_by_geography = filter_by_geography
        self.max_text_length = max_text_length
        self.batch_size = batch_size
        self.ner_model = ner_model
        
        # Initialize components
        self.location_lookup: Dict[str, str] = {}
        self.ner_pipeline: Optional[Pipeline] = None
        self.device = self._get_device()
        
        # Ensure processed directory exists
        self.processed_dir.mkdir(parents=True, exist_ok=True)
    
    def _get_device(self) -> int:
        """Determine the best available device for processing."""
        if torch.cuda.is_available():
            logger.info(f"GPU available: {torch.cuda.get_device_name(0)}")
            return 0
        else:
            logger.info("No GPU found, using CPU")
            return -1
    
    def _load_locations(self) -> None:
        """Load location dictionaries for geographic filtering."""
        if not self.filter_by_geography:
            return
            
        eng_path = self.raw_dir / 'id_english_location_name.pkl'
        ara_path = self.raw_dir / 'id_arabic_location_name.pkl'
        
        try:
            with open(eng_path, 'rb') as f:
                eng_locations = pickle.load(f)
            with open(ara_path, 'rb') as f:
                ara_locations = pickle.load(f)
            
            # Build lookup dictionary
            for location_dict in [eng_locations, ara_locations]:
                for loc_id, names in location_dict.items():
                    for name in names:
                        self.location_lookup[name.lower()] = loc_id
            
            logger.info(f"Loaded {len(self.location_lookup):,} unique location aliases")
            
        except FileNotFoundError as e:
            logger.error(f"Location file not found: {e}")
            raise
    
    def _initialize_ner(self) -> None:
        """Initialize NER pipeline for geographic filtering."""
        if not self.filter_by_geography:
            return
            
        logger.info(f"Loading NER model: {self.ner_model}")
        
        try:
            self.ner_pipeline = pipeline(
                "ner",
                model=self.ner_model,
                aggregation_strategy="simple",
                device=self.device
            )
            logger.info("NER pipeline initialized successfully")
        except Exception as e:
            logger.error(f"Failed to load NER model: {e}")
            raise
    
    def _filter_by_geography(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Filter articles containing target geographic locations.
        
        Args:
            df: Combined dataframe of articles
            
        Returns:
            Filtered dataframe with location metadata
        """
        if not self.filter_by_geography:
            logger.info("Geographic filtering disabled, using all articles")
            df['matched_locations'] = [[] for _ in range(len(df))]
            return df
        
        # Prepare texts for NER
        article_bodies = df['body'].fillna('').tolist()
        texts_to_process = [text[:self.max_text_length] for text in article_bodies]
        
        logger.info(f"Running NER on {len(texts_to_process):,} articles...")
        
        # Extract entities
        all_entities = self.ner_pipeline(texts_to_process, batch_size=self.batch_size)
        
        # Check for relevant locations
        relevance_mask = []
        location_matches = []
        
        for article_entities in tqdm(all_entities, desc="Checking location relevance"):
            found_locations = []
            
            for entity in article_entities:
                if (entity.get('entity_group') == 'LOC' and 
                    entity.get('word', '').lower() in self.location_lookup):
                    found_locations.append(entity['word'])
            
            relevance_mask.append(len(found_locations) > 0)
            location_matches.append(found_locations)
        
        # Add metadata and filter
        df['matched_locations'] = location_matches
        df_filtered = df[relevance_mask].copy()
        
        logger.info(f"Geographic filtering complete:")
        logger.info(f"  - Original articles: {len(df):,}")
        logger.info(f"  - Relevant articles: {len(df_filtered):,}")
        logger.info(f"  - Retention rate: {len(df_filtered)/len(df)*100:.1f}%")
        
        return df_filtered
    
    def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Load English and Arabic news datasets.
        
        Returns:
            Tuple of (english_df, arabic_df)
            
        Raises:
            FileNotFoundError: If dataset files don't exist
        """
        eng_path = self.raw_dir / 'news-articles-eng.csv'
        ara_path = self.raw_dir / 'news-articles-ara.csv'
        
        try:
            df_eng = pd.read_csv(eng_path)
            df_ara = pd.read_csv(ara_path)
            
            # Add language column for tracking
            df_eng['language'] = 'english'
            df_ara['language'] = 'arabic'
            
            logger.info(f"Loaded {len(df_eng)} English and {len(df_ara)} Arabic articles")
            
            # Combine for geographic filtering
            df_combined = pd.concat([df_eng, df_ara], ignore_index=True)
            
            # Apply geographic filtering if enabled
            if self.filter_by_geography:
                self._load_locations()
                self._initialize_ner()
                df_combined = self._filter_by_geography(df_combined)
            
            # Split back into language-specific dataframes
            df_eng_filtered = df_combined[df_combined['language'] == 'english'].copy()
            df_ara_filtered = df_combined[df_combined['language'] == 'arabic'].copy()
            
            # Remove the language column as it's no longer needed
            df_eng_filtered = df_eng_filtered.drop('language', axis=1)
            df_ara_filtered = df_ara_filtered.drop('language', axis=1)
            
            return df_eng_filtered, df_ara_filtered
            
        except FileNotFoundError as e:
            logger.error(f"Dataset file not found: {e}")
            raise
        except Exception as e:
            logger.error(f"Error loading data: {e}")
            raise
    
    @staticmethod
    def clean_text(text: Optional[str]) -> str:
        """
        Clean raw text by removing HTML tags, URLs, and normalizing whitespace.
        
        Args:
            text: Raw text string to clean
            
        Returns:
            Cleaned text string
        """
        if not isinstance(text, str):
            return ""
        
        # Remove HTML tags
        text = NewsArticleProcessor.HTML_PATTERN.sub('', text)
        # Remove URLs
        text = NewsArticleProcessor.URL_PATTERN.sub('', text)
        # Normalize whitespace
        text = NewsArticleProcessor.WHITESPACE_PATTERN.sub(' ', text).strip()
        
        return text
    
    @staticmethod
    def tokenize_sentences(text: Optional[str]) -> List[str]:
        """
        Split text into sentences using regex and filter non-textual results.
        
        Args:
            text: Text to tokenize
            
        Returns:
            List of sentence strings
        """
        if not isinstance(text, str) or not text:
            return []
        
        # Split on sentence-ending punctuation
        sentences = NewsArticleProcessor.SENTENCE_SPLIT_PATTERN.split(text)
        
        # Filter empty strings and non-textual content
        valid_sentences = [
            s for s in sentences 
            if s and NewsArticleProcessor.LETTER_PATTERN.search(s)
        ]
        
        return valid_sentences
    
    def process_dataframe(self, df: pd.DataFrame, language: str) -> pd.DataFrame:
        """
        Apply cleaning and tokenization to a dataframe.
        
        Args:
            df: DataFrame with 'body' column containing article text
            language: Language identifier for logging
            
        Returns:
            Processed DataFrame with cleaned text and sentences
        """
        logger.info(f"Processing {language} articles...")
        
        # Clean text
        df['body_cleaned'] = df['body'].apply(self.clean_text)
        logger.info(f"Cleaned {len(df)} {language} articles")
        
        # Tokenize into sentences
        df['sentences'] = df['body_cleaned'].apply(self.tokenize_sentences)
        
        # Calculate statistics
        sentence_counts = df['sentences'].apply(len)
        logger.info(f"Tokenized {language} articles: "
                   f"avg {sentence_counts.mean():.1f} sentences per article")
        
        return df
    
    def save_processed_data(self, df_eng: pd.DataFrame, df_ara: pd.DataFrame) -> None:
        """
        Save processed dataframes to pickle files.
        
        Args:
            df_eng: Processed English dataframe
            df_ara: Processed Arabic dataframe
        """
        eng_path = self.processed_dir / 'news_eng_processed.pkl'
        ara_path = self.processed_dir / 'news_ara_processed.pkl'
        
        try:
            df_eng.to_pickle(eng_path)
            df_ara.to_pickle(ara_path)
            logger.info(f"Saved English data to: {eng_path}")
            logger.info(f"Saved Arabic data to: {ara_path}")
        except Exception as e:
            logger.error(f"Error saving processed data: {e}")
            raise
    
    def display_sample(self, df: pd.DataFrame, num_sentences: int = 3) -> None:
        """
        Display sample processed sentences for verification.
        
        Args:
            df: Processed dataframe
            num_sentences: Number of sentences to display
        """
        if df.empty or 'sentences' not in df.columns:
            logger.warning("No sentences to display")
            return
        
        first_article = df.iloc[0]
        sentences = first_article['sentences']
        
        print(f"\n--- Sample Tokenization Results ---")
        print(f"Article split into {len(sentences)} sentences")
        print(f"First {min(num_sentences, len(sentences))} sentences:")
        
        for i, sentence in enumerate(sentences[:num_sentences], 1):
            print(f"  {i}. {sentence}")
        
        # Show matched locations if available
        if 'matched_locations' in df.columns and first_article['matched_locations']:
            print(f"\nMatched locations: {', '.join(first_article['matched_locations'])}")
    
    def run_pipeline(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Execute the complete processing pipeline.
        
        Returns:
            Tuple of processed (english_df, arabic_df)
        """
        logger.info("Starting news article processing pipeline")
        
        if self.filter_by_geography:
            logger.info("Geographic filtering is ENABLED")
        else:
            logger.info("Geographic filtering is DISABLED")
        
        # Load data (with geographic filtering if enabled)
        df_eng, df_ara = self.load_data()
        
        # Process each dataset
        df_eng = self.process_dataframe(df_eng, "English")
        df_ara = self.process_dataframe(df_ara, "Arabic")
        
        # Display sample for verification
        self.display_sample(df_eng)
        
        # Save processed data
        self.save_processed_data(df_eng, df_ara)
        
        logger.info("Pipeline completed successfully")
        return df_eng, df_ara


# Usage example with geographic filtering enabled (default)
processor = NewsArticleProcessor(
    data_dir='../data',
    filter_by_geography=True,  # Set to False to disable filtering
    max_text_length=2000,
    batch_size=128
)
df_eng_processed, df_ara_processed = processor.run_pipeline()

# Example of accessing processed data
print(f"\nProcessed {len(df_eng_processed)} English articles")
print(f"Processed {len(df_ara_processed)} Arabic articles")

# Check if location data is available
if 'matched_locations' in df_eng_processed.columns:
    total_locations = sum(len(locs) for locs in df_eng_processed['matched_locations'])
    print(f"Total location matches in English articles: {total_locations}")

2025-09-13 10:04:50,946 - INFO - GPU available: NVIDIA L4
2025-09-13 10:04:50,948 - INFO - Starting news article processing pipeline
2025-09-13 10:04:50,949 - INFO - Geographic filtering is ENABLED
2025-09-13 10:05:02,855 - INFO - Loaded 86660 English and 85511 Arabic articles
2025-09-13 10:05:02,895 - INFO - Loaded 918 unique location aliases
2025-09-13 10:05:02,897 - INFO - Loading NER model: Babelscape/wikineural-multilingual-ner
Device set to use cuda:0
2025-09-13 10:05:03,792 - INFO - NER pipeline initialized successfully
2025-09-13 10:05:04,069 - INFO - Running NER on 172,171 articles...


KeyboardInterrupt: 