In [1]:
#TEST DATASET
# Test Query Generator - Create queries without movement + separate ground truth
# For testing FinQuest on new 2024-12-20 to 2025-03-31 data

import pandas as pd
import json
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TestQueryGenerator:
    """
    Generate test queries without movement labels and separate ground truth file
    """
    
    def __init__(self, test_processed_data_path, output_dir="test_queries"):
        self.test_processed_data_path = test_processed_data_path
        self.output_dir = output_dir
        Path(output_dir).mkdir(parents=True, exist_ok=True)
        
        # Load test data
        self.df = pd.read_json(test_processed_data_path, lines=True)
        self.df['date'] = pd.to_datetime(self.df['date'])
        
        logger.info(f"Loaded {len(self.df)} rows, {self.df['ticker'].nunique()} companies")
        
    def generate_test_queries_and_ground_truth(self):
        """
        Generate test queries (without movement) and separate ground truth file
        """
        test_queries = []
        ground_truth = []
        
        # Process each company separately
        for ticker in self.df['ticker'].unique():
            # Get company data sorted by date
            company_data = self.df[self.df['ticker'] == ticker].sort_values('date').reset_index(drop=True)
            
            # Need at least 11 days (10 for history + 1 for prediction)
            if len(company_data) < 11:
                continue
            
            # Generate queries starting from day 11 (need 10-day history)
            for i in range(10, len(company_data)):
                current_date = company_data.loc[i, 'date']
                
                # Get 10-day history (days i-10 to i-1)
                history_data = company_data.loc[i-10:i-1]
                
                # Create test query (WITHOUT movement)
                test_query = {
                    'query_id': f"{ticker}_{current_date.strftime('%Y-%m-%d')}",
                    'query_stock': ticker,
                    'query_date': current_date.strftime('%Y-%m-%d'),
                    'recent_date_list': [d.strftime('%Y-%m-%d') for d in history_data['date']],
                    'adjusted_close_list': [round(float(v), 4) if pd.notna(v) else 0.0 
                                          for v in history_data['adj_close']],
                    'data_index': len(test_queries)
                }
                
                # Create ground truth entry (WITH movement)
                actual_movement = self._get_movement(company_data.loc[i])
                
                ground_truth_entry = {
                    'query_id': f"{ticker}_{current_date.strftime('%Y-%m-%d')}",
                    'query_stock': ticker,
                    'query_date': current_date.strftime('%Y-%m-%d'),
                    'actual_movement': actual_movement,
                    'actual_return': float(company_data.loc[i, 'Returns'] * 100) if pd.notna(company_data.loc[i, 'Returns']) else 0.0,
                    'actual_close': float(company_data.loc[i, 'adj_close']),
                    'previous_close': float(company_data.loc[i-1, 'adj_close']) if i > 0 else 0.0,
                    'data_index': len(ground_truth)
                }
                
                test_queries.append(test_query)
                ground_truth.append(ground_truth_entry)
        
        logger.info(f"Generated {len(test_queries)} test queries")
        return test_queries, ground_truth
    
    def _get_movement(self, row):
        """Determine movement from returns (same logic as training)"""
        return_val = row.get('Returns', 0) * 100
        if pd.isna(return_val):
            return 'freeze'
        if return_val > 0.55:
            return 'rise'
        elif return_val < -0.5:
            return 'fall'
        else:
            return 'freeze'
    
    def save_test_files(self, test_queries, ground_truth):
        """Save test queries and ground truth to separate files"""
        
        # Save test queries (for prediction)
        test_queries_file = Path(self.output_dir) / "test_queries.json"
        with open(test_queries_file, 'w') as f:
            for query in test_queries:
                f.write(json.dumps(query) + '\n')
        
        # Save ground truth (for evaluation)
        ground_truth_file = Path(self.output_dir) / "ground_truth.json"
        with open(ground_truth_file, 'w') as f:
            for gt in ground_truth:
                f.write(json.dumps(gt) + '\n')
        
        # Save metadata
        metadata = {
            'total_queries': len(test_queries),
            'total_companies': len(set(q['query_stock'] for q in test_queries)),
            'movement_distribution': self._analyze_movement_distribution(ground_truth),
            'created_at': datetime.now().isoformat()
        }
        
        metadata_file = Path(self.output_dir) / "test_metadata.json"
        with open(metadata_file, 'w') as f:
            json.dump(metadata, f, indent=2)
        
        logger.info(f"Saved files to: {self.output_dir}")
        return test_queries_file, ground_truth_file, metadata_file
    
    def _analyze_movement_distribution(self, ground_truth):
        """Analyze distribution of movements in ground truth"""
        movements = [gt['actual_movement'] for gt in ground_truth]
        return {
            'rise': movements.count('rise'),
            'fall': movements.count('fall'),
            'freeze': movements.count('freeze'),
            'total': len(movements)
        }
    
    def create_filtered_test_queries(self, exclude_freeze=True):
        """
        Create filtered test queries (optionally exclude freeze movements)
        """
        test_queries, ground_truth = self.generate_test_queries_and_ground_truth()
        
        if exclude_freeze:
            # Filter out freeze movements
            filtered_pairs = []
            for query, gt in zip(test_queries, ground_truth):
                if gt['actual_movement'] in ['rise', 'fall']:
                    filtered_pairs.append((query, gt))
            
            if filtered_pairs:
                filtered_test_queries, filtered_ground_truth = zip(*filtered_pairs)
                filtered_test_queries = list(filtered_test_queries)
                filtered_ground_truth = list(filtered_ground_truth)
                
                # Update indices
                for i, (query, gt) in enumerate(zip(filtered_test_queries, filtered_ground_truth)):
                    query['data_index'] = i
                    gt['data_index'] = i
            else:
                filtered_test_queries, filtered_ground_truth = [], []
            
            # Save filtered versions
            filtered_queries_file = Path(self.output_dir) / "test_queries_rise_fall_only.json"
            with open(filtered_queries_file, 'w') as f:
                for query in filtered_test_queries:
                    f.write(json.dumps(query) + '\n')
            
            filtered_gt_file = Path(self.output_dir) / "ground_truth_rise_fall_only.json"
            with open(filtered_gt_file, 'w') as f:
                for gt in filtered_ground_truth:
                    f.write(json.dumps(gt) + '\n')
            
            logger.info(f"Filtered to {len(filtered_test_queries)} rise/fall queries")
            return filtered_test_queries, filtered_ground_truth, filtered_queries_file, filtered_gt_file
        
        return test_queries, ground_truth, None, None

def create_sample_queries_for_quick_test(test_queries_file, num_samples=50, output_file=None):
    """
    Create a small sample of test queries for quick testing
    """
    # Load test queries
    test_queries = []
    with open(test_queries_file, 'r') as f:
        for line in f:
            test_queries.append(json.loads(line.strip()))
    
    # Sample queries (spread across different stocks and dates)
    if len(test_queries) <= num_samples:
        sample_queries = test_queries
    else:
        # Try to get diverse sample across stocks and dates
        stocks = list(set(q['query_stock'] for q in test_queries))
        queries_per_stock = max(1, num_samples // len(stocks))
        
        sample_queries = []
        for stock in stocks:
            stock_queries = [q for q in test_queries if q['query_stock'] == stock]
            if stock_queries:
                # Take evenly spaced queries from this stock
                step = max(1, len(stock_queries) // queries_per_stock)
                stock_sample = stock_queries[::step][:queries_per_stock]
                sample_queries.extend(stock_sample)
        
        # If we still need more, add random ones
        if len(sample_queries) < num_samples:
            remaining = [q for q in test_queries if q not in sample_queries]
            import random
            additional = random.sample(remaining, min(num_samples - len(sample_queries), len(remaining)))
            sample_queries.extend(additional)
    
    # Update indices
    for i, query in enumerate(sample_queries):
        query['data_index'] = i
    
    # Save sample
    if output_file is None:
        output_file = str(Path(test_queries_file).parent / "test_queries_sample.json")
    
    with open(output_file, 'w') as f:
        for query in sample_queries:
            f.write(json.dumps(query) + '\n')
    
    logger.info(f"Created sample with {len(sample_queries)} queries")
    return sample_queries, output_file

def main():
    """
    Main function to generate test queries and ground truth
    """
    # Configuration - UPDATE THIS PATH
    config = {
        'test_processed_data_path': '/root/nfs/AJ FinRag/Test Data/Company Processed Test Data/all_companies_processed.json',  # Your new test data
        'output_dir': '/root/nfs/AJ FinRag/Evaluation Results/Test Queries'
    }
    
    # Initialize generator
    generator = TestQueryGenerator(
        test_processed_data_path=config['test_processed_data_path'],
        output_dir=config['output_dir']
    )
    
    # Generate all test queries and ground truth
    test_queries, ground_truth = generator.generate_test_queries_and_ground_truth()
    test_file, gt_file, metadata_file = generator.save_test_files(test_queries, ground_truth)
    
    # Generate filtered queries (rise/fall only)
    filtered_queries, filtered_gt, filtered_test_file, filtered_gt_file = generator.create_filtered_test_queries(exclude_freeze=True)
    
    # Create sample for quick testing
    if filtered_test_file:
        sample_queries, sample_file = create_sample_queries_for_quick_test(filtered_test_file, num_samples=50)
    else:
        sample_queries, sample_file = create_sample_queries_for_quick_test(test_file, num_samples=50)
    
    # Summary
    movement_dist = generator._analyze_movement_distribution(ground_truth)
    logger.info(f"Complete: {len(test_queries)} total, {len(filtered_queries) if filtered_queries else 0} rise/fall, {len(sample_queries)} sample")
    logger.info(f"Distribution: Rise={movement_dist['rise']}, Fall={movement_dist['fall']}, Freeze={movement_dist['freeze']}")

if __name__ == "__main__":
    main()

INFO:__main__:Loaded 4000 rows, 50 companies
INFO:__main__:Generated 3500 test queries
INFO:__main__:Saved files to: /root/nfs/AJ FinRag/Evaluation/Test Queries
INFO:__main__:Generated 3500 test queries
INFO:__main__:Filtered to 2642 rise/fall queries
INFO:__main__:Created sample with 50 queries
INFO:__main__:Complete: 3500 total, 2642 rise/fall, 50 sample
INFO:__main__:Distribution: Rise=1382, Fall=1260, Freeze=858
