In [2]:

import pandas as pd
import numpy as np
from collections import defaultdict
import json
import os
import gc
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class MobilityTextProcessor:
    """
    A memory-efficient processor for generating natural language descriptions
    from mobility trajectory data, designed for large-scale analysis.
    """
    
    def __init__(self, max_users=1000, batch_size=50):
        self.max_users = max_users
        self.batch_size = batch_size
        self.base_path = 'UserMobilityTexts'
        
        # Activity mapping for functional categories
        self.activity_mapping = {
            'food_dining': 'dining',
            'retail_shopping': 'shopping', 
            'entertainment': 'entertainment',
            'professional': 'work',
            'healthcare': 'medical appointment',
            'education': 'learning',
            'transportation': 'commuting',
            'residential': 'home activities'
        }
        
    def load_mobility_datasets(self):
        """Load all mobility datasets and merge them - works with your actual files"""
        try:
            # Load your actual CSV files
            base_df = pd.read_csv("scientific_mobility_base_dataset.csv")
            functional_df = pd.read_csv("scientific_mobility_functional_dataset.csv")
            logger.info(f"Loaded base dataset: {base_df.shape}")
            logger.info(f"Loaded functional dataset: {functional_df.shape}")
        except FileNotFoundError as e:
            logger.error(f"CSV files not found: {e}")
            logger.info("Please ensure these files are uploaded:")
            logger.info("- scientific_mobility_base_dataset.csv")
            logger.info("- scientific_mobility_functional_dataset.csv")
            return None
        
        # Merge base and functional datasets
        merged_df = base_df.merge(
            functional_df[['user_id', 'day', 'time_slot', 'functional_category']], 
            on=['user_id', 'day', 'time_slot'], 
            how='left'
        )
        
        logger.info(f"Merged dataset shape: {merged_df.shape}")
        logger.info(f"Columns available: {list(merged_df.columns)}")
        
        return merged_df
    
    @staticmethod
    def get_period_type(day):
        """Classify day into period type for temporal analysis."""
        if day <= 60:
            return 'normal'
        elif day <= 75:
            return 'emergency'
        else:
            return 'post_emergency'
    
    def generate_time_context(self, row):
        """Generate natural language time context from available data."""
        # Extract time information from available columns
        time_slot = row.get('time_slot', 0)
        day = row.get('day', 1)
        
        # Convert time_slot to hour (assuming time_slot represents hours or time periods)
        if 'hour' in row:
            hour = row['hour']
        else:
            # Estimate hour from time_slot (adjust based on your data structure)
            hour = int(time_slot) % 24 if time_slot else 12
            
        # Get minute if available, otherwise use 0
        minute = row.get('minute', 0)
        
        # Get day name if available
        if 'day_name' in row:
            day_name = row['day_name']
        else:
            # Generate day name from day number
            days = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
            day_name = days[day % 7]
        
        # Check weekend status
        if 'is_weekend' in row:
            is_weekend = row['is_weekend'] == 'True' or row['is_weekend'] == True
        else:
            is_weekend = (day % 7) >= 5  # Assume Saturday and Sunday are weekends
        
        time_str = f"{hour:02d}:{minute:02d}"
        
        if hour < 6:
            period = "early morning"
        elif hour < 12:
            period = "morning"
        elif hour < 17:
            period = "afternoon"
        elif hour < 21:
            period = "evening"
        else:
            period = "night"
        
        weekend_context = "weekend" if is_weekend else "weekday"
        
        return {
            'time_str': time_str,
            'period': period,
            'day_context': f"{day_name} {weekend_context}",
            'full_context': f"At {time_str} on {day_name} {period}"
        }
    
    def generate_location_context(self, row):
        """Generate natural language location context from available data."""
        # Get location information from available columns
        location_category = row.get('location_category', 'Unknown Location')
        location_function = row.get('location_function', 'general')
        poi_density = row.get('poi_density', 500)
        category_diversity = row.get('category_diversity', 10)
        distance_from_center = row.get('distance_from_center', 50)
        distance_quartile = row.get('distance_quartile', 'Medium')
        
        # Density classification
        if poi_density > 1500:
            density_desc = "high-density urban area"
        elif poi_density > 800:
            density_desc = "moderate-density area"
        else:
            density_desc = "low-density area"
        
        # Diversity classification
        if category_diversity > 25:
            diversity_desc = "with diverse amenities"
        elif category_diversity > 15:
            diversity_desc = "with several amenities"
        else:
            diversity_desc = "with limited amenities"
        
        center_context = f"{distance_quartile.lower()} from city center"
        
        return {
            'primary_location': location_category,
            'function': location_function,
            'density_desc': density_desc,
            'diversity_desc': diversity_desc,
            'center_context': center_context,
            'full_description': f"{location_category} in {density_desc} {center_context} {diversity_desc}"
        }
    
    def generate_activity_context(self, row, time_period):
        """Generate activity context based on functional category and time."""
        functional_category = row.get('functional_category', None)
        location_function = row.get('location_function', 'general')
        
        if functional_category and functional_category in self.activity_mapping:
            activity = self.activity_mapping[functional_category]
        else:
            activity = 'general activities'
        
        # Time-based activity refinement
        if time_period == 'morning' and activity == 'work':
            activity = 'commuting to work'
        elif time_period == 'evening' and 'residential' in str(functional_category):
            activity = 'returning home'
        
        return {
            'inferred_activity': activity,
            'functional_category': functional_category,
            'context': f"engaged in {activity}"
        }
    
    def create_mobility_sentence(self, row):
        """Generate natural language sentence for mobility record using available columns."""
        
        # Generate contexts using available data
        time_ctx = self.generate_time_context(row)
        location_ctx = self.generate_location_context(row)
        activity_ctx = self.generate_activity_context(row, time_ctx['period'])
        
        # Get grid coordinates if available
        grid_x = row.get('grid_x', 0)
        grid_y = row.get('grid_y', 0)
        day = row.get('day', 1)
        time_slot = row.get('time_slot', 1)
        
        # Generate sentences for required styles
        sentences = {
            'medium': f"At {time_ctx['time_str']}, user was at {location_ctx['primary_location']} in {location_ctx['density_desc']} {activity_ctx['context']}.",
            'detailed': f"{time_ctx['full_context']}, the user visited {location_ctx['full_description']} and {activity_ctx['context']}."
        }
        
        return {
            'sentences': sentences,
            'contexts': {
                'time': time_ctx,
                'location': location_ctx,
                'activity': activity_ctx
            },
            'metadata': {
                'grid_coords': (grid_x, grid_y),
                'timestamp': f"Day_{day}_Slot_{time_slot}",
                'poi_density': location_ctx.get('poi_density', 500),
                'distance_from_center': location_ctx.get('distance_from_center', 50)
            }
        }
    
    def create_daily_narrative(self, user_data, narrative_style='medium'):
        """Create daily narrative from user mobility data."""
        
        # Sort by time slot if available
        if 'time_slot' in user_data.columns:
            user_data_sorted = user_data.sort_values('time_slot')
        else:
            user_data_sorted = user_data
        
        daily_sentences = []
        contexts = []
        
        for _, row in user_data_sorted.iterrows():
            sentence_data = self.create_mobility_sentence(row)
            daily_sentences.append(sentence_data['sentences'][narrative_style])
            contexts.append(sentence_data)
        
        # Create coherent narrative flow
        narrative_parts = []
        connectors = ["Then,", "Next,", "Later,", "Subsequently,"]
        
        for i, sentence in enumerate(daily_sentences):
            if i == 0:
                narrative_parts.append(f"The day began: {sentence}")
            elif i == len(daily_sentences) - 1:
                narrative_parts.append(f"Finally, {sentence}")
            else:
                connector = connectors[i % len(connectors)]
                narrative_parts.append(f"{connector} {sentence}")
        
        return {
            'full_narrative': ' '.join(narrative_parts),
            'individual_sentences': daily_sentences,
            'sentence_contexts': contexts,
            'total_visits': len(daily_sentences)
        }
    
    def process_users_in_batches(self, df, style):
        """Process users in memory-efficient batches."""
        
        logger.info(f"Processing {style} style for up to {self.max_users} users in batches of {self.batch_size}")
        
        if 'user_id' not in df.columns:
            logger.error("user_id column not found")
            return {}
        
        # Select users to process
        all_users = df['user_id'].unique()
        users_to_process = all_users[:self.max_users]
        
        logger.info(f"Found {len(all_users)} total users, processing {len(users_to_process)}")
        
        mobility_corpus = {
            'normal_period': {},
            'emergency_period': {}
        }
        
        # Process in batches
        for batch_start in range(0, len(users_to_process), self.batch_size):
            batch_end = min(batch_start + self.batch_size, len(users_to_process))
            batch_users = users_to_process[batch_start:batch_end]
            
            logger.info(f"Processing batch {batch_start//self.batch_size + 1}: users {batch_start+1}-{batch_end}")
            
            for user_id in batch_users:
                user_data = df[df['user_id'] == user_id]
                
                # Initialize user data for both periods
                mobility_corpus['normal_period'][f'user_{user_id}'] = {}
                mobility_corpus['emergency_period'][f'user_{user_id}'] = {}
                
                # Process each day
                for day in user_data['day'].unique():
                    day_data = user_data[user_data['day'] == day]
                    
                    if len(day_data) > 0:
                        period_type = self.get_period_type(day)
                        
                        if period_type in ['normal', 'emergency']:
                            period_key = f'{period_type}_period'
                            narrative = self.create_daily_narrative(day_data, style)
                            mobility_corpus[period_key][f'user_{user_id}'][f'day_{day}'] = narrative
            
            # Memory management
            gc.collect()
            logger.info(f"Batch {batch_start//self.batch_size + 1} completed")
        
        return mobility_corpus
    
    def save_corpus_files(self, mobility_corpus, style):
        """Save processed corpus to files."""
        
        style_path = os.path.join(self.base_path, f'{style}_style')
        
        # Create directories
        for period in ['normal_period', 'emergency_period']:
            period_path = os.path.join(style_path, period)
            os.makedirs(period_path, exist_ok=True)
        
        logger.info(f"Saving {style} style files")
        
        for period_type, period_data in mobility_corpus.items():
            period_folder = os.path.join(style_path, period_type)
            user_count = 0
            
            for user_id, user_data in period_data.items():
                if user_data:
                    # Save JSON format
                    user_file = os.path.join(period_folder, f'{user_id}_{period_type}.json')
                    with open(user_file, 'w') as f:
                        json.dump(user_data, f, indent=2)
                    
                    # Save readable text format
                    text_file = os.path.join(period_folder, f'{user_id}_{period_type}_readable.txt')
                    with open(text_file, 'w') as f:
                        f.write(f"User: {user_id} - Period: {period_type} - Style: {style}\n")
                        f.write("=" * 80 + "\n\n")
                        
                        for day_id, day_data in user_data.items():
                            f.write(f"Day: {day_id}\n")
                            f.write("-" * 40 + "\n")
                            f.write(f"Narrative: {day_data['full_narrative']}\n\n")
                            f.write("Individual Sentences:\n")
                            for i, sentence in enumerate(day_data['individual_sentences'], 1):
                                f.write(f"{i}. {sentence}\n")
                            f.write(f"\nTotal visits: {day_data['total_visits']}\n")
                            f.write("=" * 80 + "\n\n")
                    
                    user_count += 1
            
            logger.info(f"Saved {user_count} users for {period_type}")
    
    def export_training_data(self, mobility_corpus, style):
        """Export data in formats suitable for machine learning training."""
        
        style_path = os.path.join(self.base_path, f'{style}_style')
        
        for period_type, period_data in mobility_corpus.items():
            
            # JSONL format for language model training
            jsonl_file = os.path.join(style_path, period_type, f'training_data_{period_type}_{style}.jsonl')
            
            with open(jsonl_file, 'w') as f:
                for user_id, user_data in period_data.items():
                    for day_id, day_data in user_data.items():
                        sample = {
                            'user_id': user_id,
                            'day': day_id,
                            'period_type': period_type,
                            'style': style,
                            'text': day_data['full_narrative'],
                            'sentences': day_data['individual_sentences'],
                            'metadata': {
                                'total_visits': day_data['total_visits'],
                                'period': period_type,
                                'style': style
                            }
                        }
                        f.write(json.dumps(sample) + '\n')
            
            # Plain text format
            txt_file = os.path.join(style_path, period_type, f'gpt_training_{period_type}_{style}.txt')
            
            with open(txt_file, 'w') as f:
                f.write(f"Mobility Text Corpus - {period_type.title()} Period - {style.title()} Style\n")
                f.write("=" * 80 + "\n\n")
                
                for user_data in period_data.values():
                    for day_data in user_data.values():
                        f.write(day_data['full_narrative'] + '\n\n')
    
    def generate_processing_statistics(self, mobility_df):
        """Generate comprehensive processing statistics."""
        
        stats_file = os.path.join(self.base_path, 'continuation_processing_statistics.txt')
        
        with open(stats_file, 'w') as f:
            f.write("Mobility Text Generation - Continuation Processing Statistics\n")
            f.write("IEEE Transactions on Knowledge and Data Engineering (TKDE)\n")
            f.write("=" * 80 + "\n\n")
            
            f.write("Processing Configuration:\n")
            f.write(f"  Narrative Styles: Medium, Detailed\n")
            f.write(f"  Maximum Users per Style: {self.max_users}\n")
            f.write(f"  Batch Size: {self.batch_size} users\n")
            f.write(f"  Memory Optimization: Enabled\n\n")
            
            f.write("Dataset Statistics:\n")
            f.write(f"  Total Records: {len(mobility_df):,}\n")
            f.write(f"  Dataset Columns: {list(mobility_df.columns)}\n\n")
            
            if 'user_id' in mobility_df.columns:
                f.write(f"  Total Users Available: {mobility_df['user_id'].nunique():,}\n")
                f.write(f"  Users Processed: {min(self.max_users, mobility_df['user_id'].nunique()):,}\n")
            
            if 'day' in mobility_df.columns:
                total_days = mobility_df['day'].nunique()
                normal_days = mobility_df[mobility_df['day'] <= 60]['day'].nunique()
                emergency_days = mobility_df[(mobility_df['day'] > 60) & (mobility_df['day'] <= 75)]['day'].nunique()
                
                f.write(f"  Total Days: {total_days}\n")
                f.write(f"  Normal Period Days (1-60): {normal_days}\n")
                f.write(f"  Emergency Period Days (61-75): {emergency_days}\n\n")
                
                f.write("Period Analysis:\n")
                normal_records = len(mobility_df[mobility_df['day'] <= 60])
                emergency_records = len(mobility_df[(mobility_df['day'] > 60) & (mobility_df['day'] <= 75)])
                
                f.write(f"  Normal Period Records: {normal_records:,}\n")
                f.write(f"  Emergency Period Records: {emergency_records:,}\n")
                
                if normal_records > 0 and emergency_records > 0:
                    f.write(f"  Record Distribution Ratio: {normal_records/emergency_records:.2f}:1\n")
        
        logger.info(f"Statistics saved to {stats_file}")

def main():
    """Main execution function for continuation processing."""
    
    logger.info("Starting mobility text generation continuation processing")
    logger.info("Target: IEEE Transactions on Knowledge and Data Engineering (TKDE)")
    
    # Initialize processor
    processor = MobilityTextProcessor(max_users=1000, batch_size=50)
    
    # Create base directory
    os.makedirs(processor.base_path, exist_ok=True)
    
    try:
        # Load datasets
        mobility_df = processor.load_mobility_datasets()
        
        if mobility_df is None:
            logger.error("Failed to load mobility datasets")
            return
        
        logger.info(f"Processing {len(mobility_df):,} mobility records")
        
        # Process medium style
        logger.info("Processing medium narrative style")
        medium_corpus = processor.process_users_in_batches(mobility_df, 'medium')
        processor.save_corpus_files(medium_corpus, 'medium')
        processor.export_training_data(medium_corpus, 'medium')
        
        # Clear memory
        del medium_corpus
        gc.collect()
        logger.info("Medium style processing completed")
        
        # Process detailed style
        logger.info("Processing detailed narrative style")
        detailed_corpus = processor.process_users_in_batches(mobility_df, 'detailed')
        processor.save_corpus_files(detailed_corpus, 'detailed')
        processor.export_training_data(detailed_corpus, 'detailed')
        
        # Clear memory
        del detailed_corpus
        gc.collect()
        logger.info("Detailed style processing completed")
        
        # Generate statistics
        processor.generate_processing_statistics(mobility_df)
        
        logger.info("Continuation processing completed successfully")
        logger.info(f"Output files saved in: {processor.base_path}")
        
    except Exception as e:
        logger.error(f"Error in main processing: {e}")
        raise

if __name__ == "__main__":
    main()

2025-08-06 16:19:40,390 - INFO - Starting mobility text generation continuation processing
2025-08-06 16:19:40,391 - INFO - Target: IEEE Transactions on Knowledge and Data Engineering (TKDE)
2025-08-06 16:20:36,511 - INFO - Loaded base dataset: (12247358, 21)
2025-08-06 16:20:36,511 - INFO - Loaded functional dataset: (12247358, 22)
2025-08-06 16:20:42,507 - INFO - Merged dataset shape: (12247358, 22)
2025-08-06 16:20:42,508 - INFO - Columns available: ['user_id', 'day', 'time_slot', 'grid_x', 'grid_y', 'location_category', 'location_function', 'poi_density', 'poi_proportion', 'category_diversity', 'functional_diversity', 'hour', 'time_period_detailed', 'day_of_week', 'day_name', 'is_weekend', 'is_weekday', 'distance_from_center', 'distance_quartile', 'grid_quadrant_name', 'displacement', 'functional_category']
2025-08-06 16:20:42,816 - INFO - Processing 12,247,358 mobility records
2025-08-06 16:20:42,818 - INFO - Processing medium narrative style
2025-08-06 16:20:42,818 - INFO - Proce