In [None]:
import os
import sys
# Use the current working directory to construct the src path, since __file__ is not defined in notebooks
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..", "src")))

import json
import time
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass
import pandas as pd

from data.data_loader import DataLoader, DataLoaderConfig, QueryData
import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s'
)


## Find time pattern in query logic

In [None]:
from logic.nlp.find_time_pattern_in_query_logic import FindTimePatternInQueryLogic
find_time_pattern_in_query_logic.forward("Ngưỡng Hiệu suất nâng cấp kết nối từng đối tác peering khác là bao nhiêu")

In [None]:
find_time_pattern_in_query_logic.forward("Trong công thức tính Thermal Noise, k, T và B đại diện cho những gì?")

In [None]:
#!/usr/bin/env python3
"""
Batch Test Script for FindTimePatternInQueryLogic

This script:
1. Loads data from dataset.xlsx using DataLoader
2. Selects 200 queries for testing
3. Runs FindTimePatternInQueryLogic on all queries in parallel
4. Saves results to reports/test_result directory
5. Generates comprehensive analysis and statistics
"""

from logic.nlp.find_time_pattern_in_query_logic import FindTimePatternInQueryLogic

find_time_pattern_in_query_logic = FindTimePatternInQueryLogic()

@dataclass
class TestResult:
    """Structure for storing individual test results."""
    query: str
    knowledge_domain: str
    workers: List[str]
    time_pattern_detected: bool
    confidence_score: float
    processing_time: float
    original_index: int

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for JSON serialization."""
        return {
            "query": self.query,
            "knowledge_domain": self.knowledge_domain,
            "workers": self.workers,
            "time_pattern_detected": self.time_pattern_detected,
            "confidence_score": self.confidence_score,
            "processing_time": self.processing_time,
            "original_index": self.original_index
        }


class BatchTimePatternTester:
    """
    Batch tester for FindTimePatternInQueryLogic using DataLoader.
    """

    def __init__(self, data_path: str = "../data/dataset.xlsx", max_workers: int = 8):
        """
        Initialize the batch tester.

        Args:
            data_path: Path to the Excel dataset
            max_workers: Maximum number of parallel workers
        """
        self.data_path = Path(data_path)
        self.max_workers = max_workers
        self.data_loader = None
        self.time_logic = None
        self.test_results: List[TestResult] = []

        # Setup output directory
        self.output_dir = Path("../reports/test_result")
        self.output_dir.mkdir(parents=True, exist_ok=True)

        logging.info(f"Initialized BatchTimePatternTester with data path: {data_path}")

    def setup_components(self):
        """Setup DataLoader and FindTimePatternInQueryLogic."""
        logging.info("Setting up components...")

        # Setup DataLoader
        config = DataLoaderConfig(
            file_path=self.data_path,
            max_workers=self.max_workers,
            validate_data=True,
            cache_enabled=True
        )
        self.data_loader = DataLoader(config)

        # Setup Time Pattern Logic
        self.time_logic = FindTimePatternInQueryLogic()

        logging.info("Components setup completed")

    def load_and_select_queries(self, num_queries: int = 200, random_seed: int = 42) -> List[QueryData]:
        """
        Load data and select queries for testing.

        Args:
            num_queries: Number of queries to select
            random_seed: Random seed for reproducibility

        Returns:
            List[QueryData]: Selected queries for testing
        """
        logging.info(f"Loading data and selecting {num_queries} queries...")

        # Load all processed data
        all_data = self.data_loader.get_processed_data()

        # Convert to DataFrame for easier sampling
        df = self.data_loader.to_dataframe()

        if len(df) < num_queries:
            logging.warning(f"Dataset has only {len(df)} queries, using all available")
            num_queries = len(df)

        # Sample queries
        sampled_df = df.sample(n=num_queries, random_state=random_seed)

        # Get the corresponding QueryData objects
        selected_queries = []
        sampled_indices = set(sampled_df['index'].tolist())

        for query_data in all_data:
            if query_data.index in sampled_indices:
                selected_queries.append(query_data)

        logging.info(f"Selected {len(selected_queries)} queries for testing")
        return selected_queries

    def run_single_test(self, query_data: QueryData) -> TestResult:
        """
        Run time pattern detection on a single query.

        Args:
            query_data: QueryData object to test

        Returns:
            TestResult: Test result for this query
        """
        start_time = time.time()

        try:
            # Run the time pattern detection
            result_tensor = self.time_logic.forward(query_data.query)

            # Extract results
            detected = result_tensor.item() > 0.5
            confidence = result_tensor.item()

            processing_time = time.time() - start_time

            return TestResult(
                query=query_data.query,
                knowledge_domain=query_data.knowledge_domain,
                workers=query_data.workers,
                time_pattern_detected=detected,
                confidence_score=confidence,
                processing_time=processing_time,
                original_index=query_data.index
            )

        except Exception as e:
            processing_time = time.time() - start_time
            logging.error(f"Error processing query {query_data.index}: {e}")

            return TestResult(
                query=query_data.query,
                knowledge_domain=query_data.knowledge_domain,
                workers=query_data.workers,
                time_pattern_detected=False,
                confidence_score=0.0,
                processing_time=processing_time,
                original_index=query_data.index
            )

    def run_parallel_tests(self, queries: List[QueryData]) -> List[TestResult]:
        """
        Run time pattern detection on all queries in parallel.

        Args:
            queries: List of QueryData objects to test

        Returns:
            List[TestResult]: List of test results
        """
        logging.info(f"Running parallel tests on {len(queries)} queries...")

        results = []
        total_start_time = time.time()

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_query = {
                executor.submit(self.run_single_test, query): query
                for query in queries
            }

            # Collect results as they complete
            completed = 0
            for future in as_completed(future_to_query):
                try:
                    result = future.result()
                    results.append(result)
                    completed += 1

                    if completed % 50 == 0:
                        logging.info(f"Completed {completed}/{len(queries)} tests")

                except Exception as e:
                    query = future_to_query[future]
                    logging.error(f"Failed to get result for query {query.index}: {e}")

        total_time = time.time() - total_start_time
        logging.info(".2f")

        return results

    def save_results(self, results: List[TestResult], filename_prefix: str = "time_pattern_test_200"):
        """
        Save test results to files.

        Args:
            results: List of TestResult objects
            filename_prefix: Prefix for output filenames
        """
        logging.info(f"Saving results to {self.output_dir}...")

        # Convert results to dictionaries
        results_dict = [result.to_dict() for result in results]

        # Save as JSON
        json_path = self.output_dir / f"{filename_prefix}.json"
        with open(json_path, 'w', encoding='utf-8') as f:
            json.dump(results_dict, f, ensure_ascii=False, indent=2)
        logging.info(f"Saved JSON results to {json_path}")

        # Save as CSV
        csv_path = self.output_dir / f"{filename_prefix}.csv"
        df = pd.DataFrame(results_dict)
        df.to_csv(csv_path, index=False, encoding='utf-8')
        logging.info(f"Saved CSV results to {csv_path}")

    def generate_summary(self, results: List[TestResult]) -> Dict[str, Any]:
        """
        Generate comprehensive summary statistics.

        Args:
            results: List of TestResult objects

        Returns:
            Dict[str, Any]: Summary statistics
        """
        logging.info("Generating summary statistics...")

        if not results:
            return {"error": "No results to analyze"}

        # Basic statistics
        total_queries = len(results)
        detected_count = sum(1 for r in results if r.time_pattern_detected)
        detection_rate = detected_count / total_queries if total_queries > 0 else 0

        processing_times = [r.processing_time for r in results]
        avg_processing_time = sum(processing_times) / len(processing_times)
        max_processing_time = max(processing_times)
        min_processing_time = min(processing_times)

        # Confidence score statistics
        confidence_scores = [r.confidence_score for r in results]
        avg_confidence = sum(confidence_scores) / len(confidence_scores)
        high_confidence_count = sum(1 for r in results if r.confidence_score > 0.8)
        low_confidence_count = sum(1 for r in results if r.confidence_score < 0.2)

        # Domain analysis
        domain_stats = {}
        for result in results:
            domain = result.knowledge_domain
            if domain not in domain_stats:
                domain_stats[domain] = {
                    "total": 0,
                    "detected": 0,
                    "detection_rate": 0.0
                }
            domain_stats[domain]["total"] += 1
            if result.time_pattern_detected:
                domain_stats[domain]["detected"] += 1

        for domain in domain_stats:
            stats = domain_stats[domain]
            stats["detection_rate"] = stats["detected"] / stats["total"] if stats["total"] > 0 else 0

        # Sample results
        detected_samples = [r for r in results if r.time_pattern_detected][:5]
        not_detected_samples = [r for r in results if not r.time_pattern_detected][:5]

        summary = {
            "total_queries": total_queries,
            "detected_count": detected_count,
            "detection_rate": detection_rate,
            "processing_time_stats": {
                "average": avg_processing_time,
                "max": max_processing_time,
                "min": min_processing_time,
                "total": sum(processing_times)
            },
            "confidence_stats": {
                "average": avg_confidence,
                "high_confidence_count": high_confidence_count,
                "low_confidence_count": low_confidence_count
            },
            "domain_analysis": domain_stats,
            "sample_detected_queries": [
                {
                    "query": r.query[:100] + "..." if len(r.query) > 100 else r.query,
                    "domain": r.knowledge_domain,
                    "confidence": r.confidence_score
                }
                for r in detected_samples
            ],
            "sample_not_detected_queries": [
                {
                    "query": r.query[:100] + "..." if len(r.query) > 100 else r.query,
                    "domain": r.knowledge_domain,
                    "confidence": r.confidence_score
                }
                for r in not_detected_samples
            ]
        }

        return summary

    def save_summary(self, summary: Dict[str, Any], filename: str = "time_pattern_test_summary_200.json"):
        """Save summary statistics to file."""
        summary_path = self.output_dir / filename
        with open(summary_path, 'w', encoding='utf-8') as f:
            json.dump(summary, f, ensure_ascii=False, indent=2)
        logging.info(f"Saved summary to {summary_path}")

    def run_full_test(self, num_queries: int = 200):
        """
        Run the complete test pipeline.

        Args:
            num_queries: Number of queries to test
        """
        logging.info("Starting full batch time pattern test...")

        try:
            # Setup components
            self.setup_components()

            # Load and select queries
            selected_queries = self.load_and_select_queries(num_queries)

            # Run parallel tests
            test_results = self.run_parallel_tests(selected_queries)
            self.test_results = test_results

            # Save results
            self.save_results(test_results)

            # Generate and save summary
            summary = self.generate_summary(test_results)
            self.save_summary(summary)

            # Print summary
            print("\n" + "="*60)
            print("BATCH TIME PATTERN TEST RESULTS")
            print("="*60)
            print(f"Total queries tested: {summary['total_queries']}")
            print(f"Time patterns detected: {summary['detected_count']}")
            print(".1%")
            print(".4f")
            print(f"High confidence (>0.8): {summary['confidence_stats']['high_confidence_count']}")
            print(f"Low confidence (<0.2): {summary['confidence_stats']['low_confidence_count']}")
            print("\nTop domains by detection rate:")
            sorted_domains = sorted(
                summary['domain_analysis'].items(),
                key=lambda x: x[1]['detection_rate'],
                reverse=True
            )
            for domain, stats in sorted_domains[:5]:
                print(".1%")
            print("\nResults saved to: reports/test_result/")

        except Exception as e:
            logging.error(f"Error in full test: {e}")
            raise


def main():
    """Main function to run the batch test."""
    # Create tester
    tester = BatchTimePatternTester(max_workers=8)

    # Run full test with 200 queries
    tester.run_full_test(num_queries=200)


if __name__ == "__main__":
    main()


## Find syntax in query logic

In [None]:
#!/usr/bin/env python3
"""
Test script to demonstrate DetectSyntaxInQueryLogic functionality
"""
from logic.nlp.detect_syntax_in_query_logic import DetectSyntaxInQueryLogic

# Test the new 1x4 tensor output
logic = DetectSyntaxInQueryLogic()

test_cases = [
    ('/remind me to call mom', [1.0, 0.0, 0.0, 0.0]),      # Only remind
    ('please /task finish work', [0.0, 1.0, 0.0, 0.0]),     # Only task
    ('use /tasks to organize', [0.0, 0.0, 1.0, 0.0]),      # Only tasks
    ('schedule /meeting tomorrow', [0.0, 0.0, 0.0, 1.0]),  # Only meeting
    ('/remind and /task both', [1.0, 1.0, 0.0, 0.0]),      # Both remind and task
    ('no commands here', [0.0, 0.0, 0.0, 0.0]),            # None
    ('/remind /task /meeting all', [1.0, 1.0, 0.0, 1.0]),  # Multiple commands
]

print('Testing 1x4 tensor output format:')
print('=' * 60)
print('Format: [remind, task, tasks, meeting]')
print('-' * 60)

for query, expected in test_cases:
    result = logic.forward(query)
    actual = result.tolist()

    status = '✓' if actual == expected else '✗'
    print(f'{status} "{query}"')
    print(f'    Expected: {expected}')
    print(f'    Actual:   {actual}')
    print(f'    Tensor:   {result}')
    print()

print('Test completed!')


## Detect Human Feature In Query

In [None]:
from logic.nlp.detect_human_feature_in_query_logic import DetectHumanFeatureInQueryLogic

detect_human_feature_in_query_logic = DetectHumanFeatureInQueryLogic()

In [None]:
detect_human_feature_in_query_logic.forward("Tôi là Nguyen Quang Tri. Tôi đang làm việc tại Viettel")

In [None]:
detect_human_feature_in_query_logic.forward("Tôi là trinq12. Tôi đang làm việc tại Viettel")