# GENSPARK

## Optimized Hierarchical Text Classifier v3

## Overview
Streamlined multi-label hierarchical text classifier focused on key evaluation metrics.

## Architecture
```
Input Text
  ↓
[TF-IDF Vectorization]
  ↓
[Parent Classifier] → Parent Labels
  ↓
[Text Features + Parent Labels]
  ↓
[Child Classifier] → Child Labels
  ↓
[Post-processing: Filter invalid parent-child combinations]
```

## Key Features
- **Streamlined Evaluation**: Returns only essential metrics
- **Two Evaluation Approaches**: Your approach vs HiClass approach
- **Minimal Verbose Output**: Focus on core functionality
- **Hierarchical Architecture**: Maintained parent-child relationship learning

## Usage

```python
from optimized_hierarchical_classifier import HierarchicalTextClassifier

# Initialize
classifier = HierarchicalTextClassifier(max_features=1000, random_state=42)

# Training data format
training_data = [
    {
        "text": "quantum mechanics theoretical physics", 
        "categories": {"Science": ["Physics"]}
    },
    {
        "text": "machine learning artificial intelligence", 
        "categories": {"Technology": ["AI"]}
    }
]

# Train
classifier.fit(training_data)

# Predict
predictions = classifier.predict(["advanced physics concepts"])
print(predictions)  # [{"Science": ["Physics"]}]

# Evaluate both approaches
test_data = [{"text": "physics text", "categories": {"Science": ["Math"]}}]
results = classifier.compare_approaches(test_data)
```

## Evaluation Metrics

### Your Approach
Returns hierarchical metrics with strict parent-child matching:
```python
{
    'f1_macro_parent': 0.850,
    'f1_macro_per_parent': {
        'Science': 0.750,
        'Technology': 0.800,
        'Arts': 0.600
    },
    'f1_macro_children_overall': 0.717
}
```

### HiClass Approach  
Returns metrics with ancestor expansion (partial credit):
```python
{
    'hierarchical_precision': 0.667,
    'hierarchical_recall': 0.667,
    'hierarchical_f1': 0.667
}
```

## Key Difference Example

**Test Case**: Science â†’ Physics vs Science â†’ Math

- **Your Approach**: 
  - f1_macro_parent = 1.0 (Science correct)
  - f1_macro_science = 0.0 (Physics â‰  Math)

- **HiClass Approach**:
  - Expands: TRUE: {ROOT, ROOTâ†’Science, ROOTâ†’Scienceâ†’Math}
  - Expands: PRED: {ROOT, ROOTâ†’Science, ROOTâ†’Scienceâ†’Physics}  
  - Intersection: {ROOT, ROOTâ†’Science} = 2/3 = 0.667
  - **Gives partial credit for hierarchy levels**

## Files Generated
- `optimized_hierarchical_classifier.py` - Complete optimized classifier
- `README.md` - This documentation

## Dependencies
- scikit-learn
- numpy
- pandas
- pickle

In [7]:
# Optimized Multi-Label Hierarchical Text Classifier - v3 (Streamlined)
"""
Optimized version focusing only on key evaluation metrics:
- Your approach: {'f1_macro_parent': float, 'f1_macro_per_parent': {...}, 'f1_macro_children_overall': float}  
- HiClass approach: {'hierarchical_precision': float, 'hierarchical_recall': float, 'hierarchical_f1': float}

Architecture: Text â†’ Parent Classifier â†’ [Text + Parent] â†’ Child Classifier
"""

import pandas as pd
import numpy as np
import json
import pickle
from pathlib import Path
from typing import List, Tuple, Union, Dict, Any, Optional
from collections import defaultdict

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.multioutput import MultiOutputClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import hamming_loss, jaccard_score, f1_score, precision_score, recall_score
import warnings
warnings.filterwarnings('ignore')

class HierarchicalTextClassifier:
    """
    Optimized Multi-Label Hierarchical Text Classifier

    Architecture:
    Text â†’ Parent Classifier â†’ Parent Labels
    [Text + Parent Labels] â†’ Child Classifier â†’ Child Labels

    Key Features:
    - Streamlined evaluation with focused metrics
    - Minimal verbose output
    - Core hierarchical architecture maintained
    """

    def __init__(self, 
                 max_features: int = 5000,
                 ngram_range: Tuple[int, int] = (1, 2),
                 base_classifier = None,
                 random_state: int = 42):
        """Initialize hierarchical classifier"""
        self.max_features = max_features
        self.ngram_range = ngram_range
        self.random_state = random_state

        # Initialize vectorizer
        self.vectorizer = TfidfVectorizer(
            max_features=max_features,
            ngram_range=ngram_range,
            stop_words='english'
        )

        # Initialize base classifier
        if base_classifier is None:
            base_classifier = RandomForestClassifier(
                n_estimators=100, 
                random_state=random_state,
                n_jobs=-1
            )

        # Hierarchical classifiers
        self.parent_classifier = MultiOutputClassifier(base_classifier)
        self.child_classifier = MultiOutputClassifier(base_classifier)

        # Label encoders
        self.mlb_parent = MultiLabelBinarizer()
        self.mlb_child = MultiLabelBinarizer()

        # Hierarchy mapping
        self.parent_to_children = {}
        self.child_to_parents = {}
        self.all_parent_categories = set()
        self.all_child_categories = set()

        self.is_fitted = False
        self.feature_names = None

    def _build_hierarchy_mapping(self, data: List[Dict]):
        """Build parent-child mapping from data"""
        self.parent_to_children = defaultdict(set)
        self.child_to_parents = defaultdict(set)
        self.all_parent_categories = set()
        self.all_child_categories = set()

        for item in data:
            categories = item['categories']
            for parent, children in categories.items():
                self.all_parent_categories.add(parent)

                for child in children:
                    self.all_child_categories.add(child)
                    self.parent_to_children[parent].add(child)
                    self.child_to_parents[child].add(parent)

        # Convert sets to lists
        self.parent_to_children = {k: list(v) for k, v in self.parent_to_children.items()}
        self.child_to_parents = {k: list(v) for k, v in self.child_to_parents.items()}

    def _extract_labels_from_data(self, data: List[Dict]) -> Tuple[List[str], List[List[str]], List[List[str]]]:
        """Extract texts and labels from nested data"""
        texts = []
        parent_labels = []
        child_labels = []

        for item in data:
            texts.append(item['text'])

            # Extract parent labels
            parents = list(item['categories'].keys())
            parent_labels.append(parents)

            # Extract all child labels
            children = []
            for parent, child_list in item['categories'].items():
                children.extend(child_list)
            child_labels.append(children)

        return texts, parent_labels, child_labels

    def fit(self, data: List[Dict], validation_split: float = 0.0):
        """
        Fit the hierarchical classifier

        Architecture:
        1. Train Parent Classifier: Text â†’ Parent Labels
        2. Train Child Classifier: [Text + Parent Labels] â†’ Child Labels
        """
        # Build hierarchy mapping
        self._build_hierarchy_mapping(data)

        # Extract labels
        texts, parent_labels, child_labels = self._extract_labels_from_data(data)

        # Vectorize texts
        X = self.vectorizer.fit_transform(texts).toarray()
        self.feature_names = self.vectorizer.get_feature_names_out()

        # Encode labels
        y_parent = self.mlb_parent.fit_transform(parent_labels)
        y_child = self.mlb_child.fit_transform(child_labels)

        # Split data for validation
        if validation_split > 0:
            X_train, X_val, y_p_train, y_p_val, y_c_train, y_c_val = train_test_split(
                X, y_parent, y_child, 
                test_size=validation_split, 
                random_state=self.random_state
            )
        else:
            X_train = X
            y_p_train = y_parent
            y_c_train = y_child

        # Step 1: Train Parent classifier
        self.parent_classifier.fit(X_train, y_p_train)

        # Step 2: Train Child classifier with parent information
        parent_pred_train = self.parent_classifier.predict(X_train)
        X_hierarchical = np.hstack([X_train, parent_pred_train])
        self.child_classifier.fit(X_hierarchical, y_c_train)

        self.is_fitted = True

    def _predict_binary(self, X):
        """Internal method for binary predictions using hierarchical approach"""
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first")

        # Step 1: Predict parents
        pred_parent = self.parent_classifier.predict(X)

        # Step 2: Predict children with parent context
        X_hierarchical = np.hstack([X, pred_parent])
        pred_child = self.child_classifier.predict(X_hierarchical)

        return pred_parent, pred_child

    def predict(self, texts: List[str]) -> List[Dict[str, List[str]]]:
        """
        Predict categories for new texts using hierarchical approach
        """
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first. Call fit() first.")

        # Vectorize texts
        X = self.vectorizer.transform(texts).toarray()

        # Get binary predictions
        pred_parent, pred_child = self._predict_binary(X)

        # Decode to labels
        parent_labels = self.mlb_parent.inverse_transform(pred_parent)
        child_labels = self.mlb_child.inverse_transform(pred_child)

        # Format results in nested structure with hierarchy filtering
        results = []
        for i in range(len(texts)):
            predicted_parents = list(parent_labels[i])
            predicted_children = list(child_labels[i])

            # Create nested structure with hierarchy constraints
            categories = {}

            # Filter children based on predicted parents and hierarchy
            for parent in predicted_parents:
                valid_children = []
                for child in predicted_children:
                    if parent in self.child_to_parents.get(child, []):
                        valid_children.append(child)

                if valid_children:
                    categories[parent] = valid_children
                elif parent in predicted_parents:
                    categories[parent] = []

            results.append(categories)

        return results

    # =============================================================================
    # CORE EVALUATION METHODS - STREAMLINED
    # =============================================================================

    def evaluate_your_approach(self, test_data: List[Dict]) -> Dict:
        """
        Your Approach Evaluation - Returns only key metrics

        Returns:
            {
                'f1_macro_parent': float,
                'f1_macro_per_parent': {parent: f1_score},
                'f1_macro_children_overall': float
            }
        """
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first")

        # Extract test data
        texts, parent_labels, child_labels = self._extract_labels_from_data(test_data)

        # Vectorize and get true labels
        X_test = self.vectorizer.transform(texts).toarray()
        y_true_parent = self.mlb_parent.transform(parent_labels)
        y_true_child = self.mlb_child.transform(child_labels)

        # Get predictions
        pred_parent, pred_child = self._predict_binary(X_test)

        # 1. F1 macro parent
        f1_macro_parent = f1_score(y_true_parent, pred_parent, average='macro', zero_division=0)

        # 2. F1 macro per parent (for children of each parent)
        f1_macro_per_parent = {}

        for parent_name in self.parent_to_children.keys():
            parent_children = self.parent_to_children[parent_name]

            if len(parent_children) == 0:
                f1_macro_per_parent[parent_name] = 0.0
                continue

            # Get indices of children for this parent
            child_indices = []
            for child in parent_children:
                if child in self.mlb_child.classes_:
                    idx = list(self.mlb_child.classes_).index(child)
                    child_indices.append(idx)

            if len(child_indices) == 0:
                f1_macro_per_parent[parent_name] = 0.0
                continue

            # Extract predictions for children of this parent
            y_true_parent_children = y_true_child[:, child_indices]
            y_pred_parent_children = pred_child[:, child_indices]

            # Calculate F1 macro for these children
            if y_true_parent_children.shape[1] > 0:
                f1_macro_per_parent[parent_name] = f1_score(
                    y_true_parent_children, 
                    y_pred_parent_children, 
                    average='macro', 
                    zero_division=0
                )
            else:
                f1_macro_per_parent[parent_name] = 0.0

        # 3. Overall children F1 (macro average of per-parent F1s)
        f1_macro_children_overall = np.mean(list(f1_macro_per_parent.values())) if f1_macro_per_parent else 0.0

        return {
            'f1_macro_parent': f1_macro_parent,
            'f1_macro_per_parent': f1_macro_per_parent,
            'f1_macro_children_overall': f1_macro_children_overall
        }

    def _expand_with_ancestors(self, nested_sample: Dict) -> set:
        """
        Expand nested sample with ancestors following HiClass approach

        Input: {"Science": ["Physics"]}
        Output: {"ROOT", "ROOTâ†’Science", "ROOTâ†’Scienceâ†’Physics"}
        """
        expanded = set()

        for parent, children in nested_sample.items():
            # Add ROOT
            expanded.add("ROOT")

            # Add ROOT â†’ Parent path
            expanded.add(f"ROOTâ†’{parent}")

            # Add ROOT â†’ Parent â†’ Child paths
            for child in children:
                expanded.add(f"ROOTâ†’{parent}â†’{child}")

        return expanded

    def evaluate_hiclass_approach(self, test_data: List[Dict]) -> Dict:
        """
        HiClass Approach Evaluation - Returns only key metrics

        Returns:
            {
                'hierarchical_precision': float,
                'hierarchical_recall': float, 
                'hierarchical_f1': float
            }
        """
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first")

        # Get predictions
        texts = [item['text'] for item in test_data]
        predictions = self.predict(texts)

        # Convert to expanded ancestor sets
        total_intersection = 0
        total_predicted = 0
        total_true = 0

        for true_sample, pred_sample in zip(test_data, predictions):
            true_categories = true_sample['categories']

            # Expand with ancestors
            true_expanded = self._expand_with_ancestors(true_categories)
            pred_expanded = self._expand_with_ancestors(pred_sample)

            # Calculate intersection
            intersection = true_expanded & pred_expanded

            # Accumulate for overall metrics
            total_intersection += len(intersection)
            total_predicted += len(pred_expanded)
            total_true += len(true_expanded)

        # Calculate hierarchical metrics
        h_precision = total_intersection / total_predicted if total_predicted > 0 else 0
        h_recall = total_intersection / total_true if total_true > 0 else 0
        h_f1 = 2 * h_precision * h_recall / (h_precision + h_recall) if (h_precision + h_recall) > 0 else 0

        return {
            'hierarchical_precision': h_precision,
            'hierarchical_recall': h_recall,
            'hierarchical_f1': h_f1
        }

    def compare_approaches(self, test_data: List[Dict]) -> Dict:
        """
        Compare both evaluation approaches

        Returns:
            {
                'your_approach': {...},
                'hiclass_approach': {...}
            }
        """
        your_metrics = self.evaluate_your_approach(test_data)
        hiclass_metrics = self.evaluate_hiclass_approach(test_data)

        return {
            'your_approach': your_metrics,
            'hiclass_approach': hiclass_metrics
        }

    # Utility methods
    def save_model(self, file_path: str):
        """Save the trained model to file"""
        with open(file_path, 'wb') as f:
            pickle.dump(self, f)

    @classmethod
    def load_model(cls, file_path: str) -> 'HierarchicalTextClassifier':
        """Load a trained model from file"""
        with open(file_path, 'rb') as f:
            model = pickle.load(f)
        return model


# =============================================================================
# USAGE EXAMPLE
# =============================================================================

def demo_example():
    """
    Single focused example showing both evaluation approaches
    """
    # Initialize classifier
    classifier = HierarchicalTextClassifier(max_features=1000, random_state=42)

    # Training data
    training_data = [
        {"text": "quantum mechanics theoretical physics", "categories": {"Science": ["Physics"]}},
        {"text": "calculus differential equations mathematics", "categories": {"Science": ["Math"]}},
        {"text": "machine learning artificial intelligence", "categories": {"Technology": ["AI"]}},
        {"text": "poetry creative writing literature", "categories": {"Arts": ["Poetry"]}},
        {"text": "business strategy management", "categories": {"Business": ["Strategy"]}},
    ]

    # Train
    classifier.fit(training_data)

    # Test case: Science â†’ Physics vs Science â†’ Math
    test_case = [
        {
            "text": "advanced physics concepts and quantum theory", 
            "categories": {"Science": ["Math"]}  # True: Math, Will predict: Physics
        }
    ]

    # Compare approaches
    results = classifier.compare_approaches(test_case)

    print("Your Approach:")
    print(results['your_approach'])
    print("\nHiClass Approach:")
    print(results['hiclass_approach'])

    return classifier, results

if __name__ == "__main__":
    demo_example()

Your Approach:
{'f1_macro_parent': 0.0, 'f1_macro_per_parent': {'Science': 0.0, 'Technology': 1.0, 'Arts': 1.0, 'Business': 1.0}, 'f1_macro_children_overall': np.float64(0.75)}

HiClass Approach:
{'hierarchical_precision': 0, 'hierarchical_recall': 0.0, 'hierarchical_f1': 0}


In [9]:
#!/usr/bin/env python3
# Simple test script for the optimized classifier

# from optimized_hierarchical_classifier import HierarchicalTextClassifier

def main():
    print("Testing Optimized Hierarchical Text Classifier")
    print("=" * 50)

    # Initialize
    classifier = HierarchicalTextClassifier(max_features=500, random_state=42)

    # Sample data
    data = [
        {"text": "quantum physics theory", "categories": {"Science": ["Physics"]}},
        {"text": "calculus mathematics", "categories": {"Science": ["Math"]}},
        {"text": "artificial intelligence", "categories": {"Technology": ["AI"]}},
    ]

    # Train
    classifier.fit(data)

    # Test
    test_case = [{"text": "physics concepts", "categories": {"Science": ["Math"]}}]
    results = classifier.compare_approaches(test_case)

    print("Results:")
    print("Your Approach:", results['your_approach'])
    print("HiClass Approach:", results['hiclass_approach'])

if __name__ == "__main__":
    main()

Testing Optimized Hierarchical Text Classifier
Results:
Your Approach: {'f1_macro_parent': 0.5, 'f1_macro_per_parent': {'Science': 0.0, 'Technology': 1.0}, 'f1_macro_children_overall': np.float64(0.5)}
HiClass Approach: {'hierarchical_precision': 1.0, 'hierarchical_recall': 0.6666666666666666, 'hierarchical_f1': 0.8}


# GPT 

In [1]:
# multilabel_hierarchical_classifier_v3_min.py
"""
Multi-Label Hierarchical Text Classifier (Hierarchical Only, Minimal Returns)

Pipeline:
Input Text
  ↓
[TF-IDF Vectorization]
  ↓
[Parent Classifier] → Parent Labels
  ↓
[Text Features + Parent Labels]
  ↓
[Child Classifier] → Child Labels
  ↓
[Post-processing: Filter invalid parent-child combinations]

Chỉ số trả về (tối giản):
- Your approach:
    { 'f1_macro_parent': float,
      'f1_macro_per_parent': Dict[str, float],
      'f1_macro_children_overall': float }
- HiClass:
    { 'hierarchical_precision': float,
      'hierarchical_recall': float,
      'hierarchical_f1': float }
- example_detail: chỉ 1 ví dụ (expanded sets + phép tính)
"""

import json
import pickle
import warnings
from typing import List, Tuple, Dict, Any
from collections import defaultdict

import numpy as np
import pandas as pd
from pathlib import Path

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import RandomForestClassifier
from sklearn.multioutput import MultiOutputClassifier
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    hamming_loss, jaccard_score, f1_score, precision_score, recall_score
)

warnings.filterwarnings('ignore')


class HierarchicalTextClassifier:
    """
    Multi-Label Hierarchical Text Classifier (Hierarchical Approach Only)

    Data Format:
    {
        "text": "student analyzes business data",
        "categories": {
            "Business": ["Economics", "Finance"],
            "Science": ["DataAnalysis"]
        }
    }
    """

    def __init__(
        self,
        max_features: int = 5000,
        ngram_range: Tuple[int, int] = (1, 2),
        base_classifier=None,
        random_state: int = 42,
    ):
        self.max_features = max_features
        self.ngram_range = ngram_range
        self.random_state = random_state

        self.vectorizer = TfidfVectorizer(
            max_features=max_features, ngram_range=ngram_range, stop_words="english"
        )

        if base_classifier is None:
            base_classifier = RandomForestClassifier(
                n_estimators=100, random_state=random_state, n_jobs=-1
            )

        self.parent_classifier = MultiOutputClassifier(base_classifier)
        self.child_classifier = MultiOutputClassifier(base_classifier)

        self.mlb_parent = MultiLabelBinarizer()
        self.mlb_child = MultiLabelBinarizer()

        self.parent_to_children: Dict[str, List[str]] = {}
        self.child_to_parents: Dict[str, List[str]] = {}
        self.all_parent_categories = set()
        self.all_child_categories = set()

        self.is_fitted = False
        self.feature_names = None

    # -------------------------
    # Data helpers
    # -------------------------
    def create_sample_data(self, save_to: str = None) -> List[Dict]:
        sample_data = [
            {
                "text": "student analyzes business data and economic market trends",
                "categories": {"Business": ["Economics", "Finance"], "Science": ["DataAnalysis"]},
            },
            {
                "text": "student studies mathematics programming and computer algorithms",
                "categories": {"Science": ["Math"], "Technology": ["Programming", "ComputerScience"]},
            },
            {
                "text": "student writes creative poetry and literature analysis",
                "categories": {"Arts": ["Poetry", "Literature", "Writing"]},
            },
            {
                "text": "student researches biology chemistry and environmental science",
                "categories": {"Science": ["Biology", "Chemistry", "Environment"]},
            },
            {
                "text": "student creates digital art and user interface design",
                "categories": {"Arts": ["DigitalArt", "Design"], "Technology": ["UserInterface"]},
            },
            {
                "text": "student develops artificial intelligence and machine learning models",
                "categories": {"Technology": ["AI", "MachineLearning", "Programming"], "Science": ["Research"]},
            },
            {
                "text": "student studies quantum physics and theoretical research",
                "categories": {"Science": ["Physics", "QuantumMechanics", "Research"]},
            },
            {
                "text": "student writes historical analysis and cultural studies papers",
                "categories": {"Humanities": ["History", "Culture", "Writing"]},
            },
            {
                "text": "student builds web applications and mobile development projects",
                "categories": {"Technology": ["WebDevelopment", "MobileDev", "Programming"]},
            },
            {
                "text": "student composes music and studies audio engineering technology",
                "categories": {"Arts": ["Music", "Composition"], "Technology": ["AudioEngineering"]},
            },
            {
                "text": "student investigates psychology neuroscience and cognitive research",
                "categories": {"Science": ["Psychology", "Neuroscience", "Research"]},
            },
            {
                "text": "student practices business management and entrepreneurship strategies",
                "categories": {"Business": ["Management", "Entrepreneurship", "Strategy"]},
            },
            {
                "text": "student explores philosophy ethics and critical thinking methods",
                "categories": {"Humanities": ["Philosophy", "Ethics", "CriticalThinking"]},
            },
            {
                "text": "student designs mechanical engineering and robotics systems",
                "categories": {"Technology": ["Engineering", "Robotics", "Design"], "Science": ["Engineering"]},
            },
            {
                "text": "student teaches education pedagogy and curriculum development",
                "categories": {"Education": ["Pedagogy", "Curriculum", "Teaching"]},
            },
        ]
        if save_to:
            with open(save_to, "w", encoding="utf-8") as f:
                json.dump(sample_data, f, indent=2, ensure_ascii=False)
        return sample_data

    def load_data_from_json(self, file_path: str) -> List[Dict]:
        with open(file_path, "r", encoding="utf-8") as f:
            return json.load(f)

    def save_data_to_json(self, data: List[Dict], file_path: str):
        with open(file_path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

    def load_data_from_csv(self, file_path: str, text_col: str = "text", categories_col: str = "categories") -> List[Dict]:
        df = pd.read_csv(file_path)
        data = []
        for _, row in df.iterrows():
            text = row[text_col]
            categories_str = row[categories_col]
            categories = json.loads(categories_str) if isinstance(categories_str, str) else {}
            data.append({"text": text, "categories": categories})
        return data

    def convert_to_csv(self, data: List[Dict], file_path: str):
        csv_data = [{"text": item["text"], "categories": json.dumps(item["categories"])} for item in data]
        pd.DataFrame(csv_data).to_csv(file_path, index=False)

    # -------------------------
    # Internal utilities
    # -------------------------
    def _build_hierarchy_mapping(self, data: List[Dict]):
        self.parent_to_children = defaultdict(set)
        self.child_to_parents = defaultdict(set)
        self.all_parent_categories = set()
        self.all_child_categories = set()

        for item in data:
            for parent, children in item["categories"].items():
                self.all_parent_categories.add(parent)
                for child in children:
                    self.all_child_categories.add(child)
                    self.parent_to_children[parent].add(child)
                    self.child_to_parents[child].add(parent)

        self.parent_to_children = {k: sorted(list(v)) for k, v in self.parent_to_children.items()}
        self.child_to_parents = {k: sorted(list(v)) for k, v in self.child_to_parents.items()}

    def _extract_labels_from_data(self, data: List[Dict]):
        texts, parent_labels, child_labels = [], [], []
        for item in data:
            texts.append(item["text"])
            parents = list(item["categories"].keys())
            parent_labels.append(parents)
            children = []
            for _, child_list in item["categories"].items():
                children.extend(child_list)
            child_labels.append(children)
        return texts, parent_labels, child_labels

    def _predict_binary(self, X):
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first")
        pred_parent = self.parent_classifier.predict(X)
        X_hier = np.hstack([X, pred_parent])
        pred_child = self.child_classifier.predict(X_hier)
        return pred_parent, pred_child

    def _calculate_consistency(self, pred_parent, pred_child):
        consistent_predictions, total_child_predictions = 0, 0
        for i in range(len(pred_parent)):
            pred_parents_idx = np.where(pred_parent[i])[0]
            pred_children_idx = np.where(pred_child[i])[0]
            pred_parent_names = [self.mlb_parent.classes_[idx] for idx in pred_parents_idx]
            for child_idx in pred_children_idx:
                child_name = self.mlb_child.classes_[child_idx]
                expected_parents = self.child_to_parents.get(child_name, [])
                has_valid_parent = any(parent in pred_parent_names for parent in expected_parents)
                if has_valid_parent:
                    consistent_predictions += 1
                total_child_predictions += 1
        return consistent_predictions / max(1, total_child_predictions)

    def _expand_with_ancestors(self, nested_sample: Dict) -> set:
        expanded = set()
        for parent, children in nested_sample.items():
            expanded.add("ROOT")
            expanded.add(f"ROOT→{parent}")
            for child in children:
                expanded.add(f"ROOT→{parent}→{child}")
        return expanded

    # -------------------------
    # Core API
    # -------------------------
    def fit(self, data: List[Dict], validation_split: float = 0.0):
        self._build_hierarchy_mapping(data)
        texts, parent_labels, child_labels = self._extract_labels_from_data(data)

        X = self.vectorizer.fit_transform(texts).toarray()
        self.feature_names = self.vectorizer.get_feature_names_out()

        y_parent = self.mlb_parent.fit_transform(parent_labels)
        y_child = self.mlb_child.fit_transform(child_labels)

        if validation_split > 0:
            X_train, X_val, y_p_train, y_p_val, y_c_train, y_c_val = train_test_split(
                X, y_parent, y_child, test_size=validation_split, random_state=self.random_state
            )
        else:
            X_train, y_p_train, y_c_train = X, y_parent, y_child

        self.parent_classifier.fit(X_train, y_p_train)
        parent_pred_train = self.parent_classifier.predict(X_train)
        X_hierarchical = np.hstack([X_train, parent_pred_train])
        self.child_classifier.fit(X_hierarchical, y_c_train)

        self.is_fitted = True
        return self

    def predict(self, texts: List[str]) -> List[Dict[str, List[str]]]:
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first. Call fit() first.")
        X = self.vectorizer.transform(texts).toarray()
        pred_parent, pred_child = self._predict_binary(X)

        parent_labels = self.mlb_parent.inverse_transform(pred_parent)
        child_labels = self.mlb_child.inverse_transform(pred_child)

        results = []
        for i in range(len(texts)):
            predicted_parents = list(parent_labels[i])
            predicted_children = list(child_labels[i])
            categories = {}
            for parent in predicted_parents:
                valid_children = [c for c in predicted_children if parent in self.child_to_parents.get(c, [])]
                categories[parent] = valid_children  # cho phép parent rỗng children
            results.append(categories)
        return results

    def evaluate(self, test_data: List[Dict]) -> Dict:
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first")

        texts, parent_labels, child_labels = self._extract_labels_from_data(test_data)
        X_test = self.vectorizer.transform(texts).toarray()
        y_true_parent = self.mlb_parent.transform(parent_labels)
        y_true_child = self.mlb_child.transform(child_labels)

        pred_parent, pred_child = self._predict_binary(X_test)

        f1_macro_parent = f1_score(y_true_parent, pred_parent, average="macro", zero_division=0)

        f1_macro_per_parent: Dict[str, float] = {}
        for parent_name in self.parent_to_children.keys():
            parent_children = self.parent_to_children[parent_name]
            if len(parent_children) == 0:
                f1_macro_per_parent[parent_name] = 0.0
                continue
            child_indices = []
            for child in parent_children:
                if child in self.mlb_child.classes_:
                    idx = list(self.mlb_child.classes_).index(child)
                    child_indices.append(idx)
            if len(child_indices) == 0:
                f1_macro_per_parent[parent_name] = 0.0
                continue
            y_true_parent_children = y_true_child[:, child_indices]
            y_pred_parent_children = pred_child[:, child_indices]
            if y_true_parent_children.shape[1] > 0:
                f1_macro_per_parent[parent_name] = f1_score(
                    y_true_parent_children, y_pred_parent_children, average="macro", zero_division=0
                )
            else:
                f1_macro_per_parent[parent_name] = 0.0

        consistency_rate = self._calculate_consistency(pred_parent, pred_child)
        f1_macro_children_overall = np.mean(list(f1_macro_per_parent.values())) if f1_macro_per_parent else 0.0

        return {
            "f1_macro_parent": f1_macro_parent,
            "f1_macro_per_parent": f1_macro_per_parent,
            "f1_macro_children_overall": f1_macro_children_overall,
            "consistency_rate": consistency_rate,  # giữ lại nếu bạn cần debug nội bộ
        }

    def calculate_hiclass_metrics(self, test_data: List[Dict]) -> Dict:
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first")

        texts = [item["text"] for item in test_data]
        predictions = self.predict(texts)

        total_intersection = 0
        total_predicted = 0
        total_true = 0
        expansion_details = []

        for i, (true_sample, pred_sample) in enumerate(zip(test_data, predictions)):
            true_categories = true_sample["categories"]
            true_expanded = self._expand_with_ancestors(true_categories)
            pred_expanded = self._expand_with_ancestors(pred_sample)
            intersection = true_expanded & pred_expanded

            expansion_details.append(
                {
                    "sample_index": i,
                    "text": (true_sample["text"][:50] + "...") if len(true_sample["text"]) > 50 else true_sample["text"],
                    "true_categories": true_categories,
                    "pred_categories": pred_sample,
                    "true_expanded": sorted(true_expanded),
                    "pred_expanded": sorted(pred_expanded),
                    "intersection": sorted(intersection),
                    "intersection_size": len(intersection),
                    "true_size": len(true_expanded),
                    "pred_size": len(pred_expanded),
                }
            )

            total_intersection += len(intersection)
            total_predicted += len(pred_expanded)
            total_true += len(true_expanded)

        h_precision = total_intersection / total_predicted if total_predicted > 0 else 0.0
        h_recall = total_intersection / total_true if total_true > 0 else 0.0
        h_f1 = 2 * h_precision * h_recall / (h_precision + h_recall) if (h_precision + h_recall) > 0 else 0.0

        return {
            "hierarchical_precision": h_precision,
            "hierarchical_recall": h_recall,
            "hierarchical_f1": h_f1,
            "expansion_details": expansion_details,
        }

    def evaluate_minimal_with_example(self, test_data: List[Dict], example_index: int = 0) -> Dict[str, Any]:
        """
        Trả về đúng các chỉ số cần:
          - Your approach: f1_macro_parent, f1_macro_per_parent, f1_macro_children_overall
          - HiClass: hierarchical_precision, hierarchical_recall, hierarchical_f1
          - example_detail: chỉ 1 ví dụ (ancestor expansion + phép tính)
        """
        if not self.is_fitted:
            raise ValueError("Classifier must be fitted first")

        your_metrics = self.evaluate(test_data)
        your_block = {
            "f1_macro_parent": your_metrics["f1_macro_parent"],
            "f1_macro_per_parent": your_metrics["f1_macro_per_parent"],
            "f1_macro_children_overall": your_metrics["f1_macro_children_overall"],
        }

        hiclass = self.calculate_hiclass_metrics(test_data)
        hiclass_block = {
            "hierarchical_precision": hiclass["hierarchical_precision"],
            "hierarchical_recall": hiclass["hierarchical_recall"],
            "hierarchical_f1": hiclass["hierarchical_f1"],
        }

        details = hiclass.get("expansion_details", [])
        if not details:
            example = None
        else:
            idx = max(0, min(example_index, len(details) - 1))
            d = details[idx]
            sp = d["intersection_size"] / d["pred_size"] if d["pred_size"] > 0 else 0.0
            sr = d["intersection_size"] / d["true_size"] if d["true_size"] > 0 else 0.0
            sf1 = 2 * sp * sr / (sp + sr) if (sp + sr) > 0 else 0.0
            example = {
                "text": d["text"],
                "true_categories": d["true_categories"],
                "pred_categories": d["pred_categories"],
                "true_expanded": d["true_expanded"],
                "pred_expanded": d["pred_expanded"],
                "intersection": d["intersection"],
                "precision_calc": f"{d['intersection_size']}/{d['pred_size']}",
                "recall_calc": f"{d['intersection_size']}/{d['true_size']}",
                "sample_precision": sp,
                "sample_recall": sr,
                "sample_f1": sf1,
            }

        return {"your_approach": your_block, "hiclass_approach": hiclass_block, "example_detail": example}

    # -------------------------
    # Model I/O
    # -------------------------
    def save(self, file_path: str):
        with open(file_path, "wb") as f:
            pickle.dump(self, f)

    @classmethod
    def load(cls, file_path: str) -> "HierarchicalTextClassifier":
        with open(file_path, "rb") as f:
            model = pickle.load(f)
        return model


# -------------------------
# Minimal runnable demo
# -------------------------
def create_your_data_template():
    """Tạo template để bạn điền dữ liệu riêng (tuỳ chọn)."""
    template = [
        {
            "text": "Replace this with your first text document",
            "categories": {
                "YourParentCategory1": ["ChildCategory1", "ChildCategory2"],
                "YourParentCategory2": ["ChildCategory3"],
            },
        },
        {
            "text": "Replace this with your second text document",
            "categories": {
                "YourParentCategory1": ["ChildCategory1"],
                "YourParentCategory3": ["ChildCategory4", "ChildCategory5"],
            },
        },
    ]
    with open("your_data_template.json", "w", encoding="utf-8") as f:
        json.dump(template, f, indent=2, ensure_ascii=False)


if __name__ == "__main__":
    # 1) Train nhanh trên sample
    clf = HierarchicalTextClassifier(max_features=1000, ngram_range=(1, 2))
    train_data = clf.create_sample_data()
    clf.fit(train_data, validation_split=0.0)

    # 2) Một ví dụ test DUY NHẤT (Science → Math); model có thể dự đoán lệch (vd: Physics)
    test_data = [
        {
            "text": "advanced physics and theoretical concepts",
            "categories": {"Science": ["Math"]},
        }
    ]

    # 3) Lấy báo cáo tối giản + 1 ví dụ chi tiết
    report = clf.evaluate_minimal_with_example(test_data, example_index=0)

    # 4) In ra JSON (bạn có thể bỏ phần print này trong production)
    print(json.dumps(report, indent=2, ensure_ascii=False))

    # (Tuỳ chọn) Tạo template dữ liệu riêng
    # create_your_data_template()


{
  "your_approach": {
    "f1_macro_parent": 0.0,
    "f1_macro_per_parent": {
      "Business": 0.0,
      "Science": 0.0,
      "Technology": 0.0,
      "Arts": 0.0,
      "Humanities": 0.0,
      "Education": 0.0
    },
    "f1_macro_children_overall": 0.0
  },
  "hiclass_approach": {
    "hierarchical_precision": 0.0,
    "hierarchical_recall": 0.0,
    "hierarchical_f1": 0.0
  },
  "example_detail": {
    "text": "advanced physics and theoretical concepts",
    "true_categories": {
      "Science": [
        "Math"
      ]
    },
    "pred_categories": {},
    "true_expanded": [
      "ROOT",
      "ROOT→Science",
      "ROOT→Science→Math"
    ],
    "pred_expanded": [],
    "intersection": [],
    "precision_calc": "0/0",
    "recall_calc": "0/3",
    "sample_precision": 0.0,
    "sample_recall": 0.0,
    "sample_f1": 0.0
  }
}
