In [1]:
import pandas as pd
import numpy as np
import re
import pickle
import time
import os
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.neural_network import MLPRegressor
from sklearn.svm import SVR
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import traceback

In [None]:
lst = []
df_0 = pd.read_csv("chaotic_categories_data.csv")
lst.append(df_0)

for i in range(1, 4):
    df = pd.read_csv(f"chaotic_categories_dataset ({i}).csv")
    lst.append(df)

df = pd.concat(lst, ignore_index=True)

In [None]:
df.head()

Unnamed: 0,category,answer,relevance_score,creativity_score
0,Things You Can Use as a Hat (that are not hats),A watermelon half,0.95,0.78
1,Things You Can Use as a Hat (that are not hats),A colander,0.92,0.65
2,Things You Can Use as a Hat (that are not hats),A flower pot,0.88,0.72
3,Things You Can Use as a Hat (that are not hats),A bowl,0.85,0.45
4,Things You Can Use as a Hat (that are not hats),A traffic cone,0.91,0.82


In [None]:
try:
    from sentence_transformers import SentenceTransformer
    SENTENCE_TRANSFORMERS_AVAILABLE = True
except ImportError:
    SENTENCE_TRANSFORMERS_AVAILABLE = False
    print("sentence-transformers not available.")

In [2]:
class RelevanceChecker:
    def __init__(self, approach='auto'):
        self.approach = approach
        self.model = None
        self.vectorizer = None
        self.sentence_model = None
        self.scaler = None
        self.feature_names = []
        self.training_stats = {}
        self.scaling_required_models = ['neural', 'svm', 'linear']

    def clean_text(self, text):
        if pd.isna(text):
            return ""

        text = str(text).lower().strip()
        text = re.sub(r'\s+', ' ', text)
        text = re.sub(r'\b(teh|hte)\b', 'the', text)
        text = re.sub(r'\b(adn|nad)\b', 'and', text)
        text = re.sub(r'\b(taht|htat)\b', 'that', text)
        text = re.sub(r'\b(ot)\b', 'to', text)
        text = re.sub(r'\b(fo)\b', 'of', text)
        text = re.sub(r'\b(cna)\b', 'can', text)
        text = re.sub(r'\b(tiem)\b', 'time', text)
        text = re.sub(r'[!]{2,}', '!', text)
        text = re.sub(r'[?]{2,}', '?', text)
        text = re.sub(r'[.]{2,}', '...', text)
        text = re.sub(r'[^\w\s\.\!\?\,\-]', ' ', text)
        text = re.sub(r'\s+', ' ', text).strip()

        return text

    def extract_basic_features(self, categories, answers):
        if self.vectorizer is None:
            all_texts = categories + answers
            self.vectorizer = TfidfVectorizer(
                max_features=2000,
                ngram_range=(1, 2),
                stop_words='english',
                min_df=2,
                max_df=0.8,
                lowercase=True
            )
            self.vectorizer.fit(all_texts)

        category_vectors = self.vectorizer.transform(categories)
        answer_vectors = self.vectorizer.transform(answers)

        features = []

        for i in range(len(categories)):
            category = categories[i]
            answer = answers[i]

            tfidf_similarity = cosine_similarity(
                category_vectors[i:i+1],
                answer_vectors[i:i+1]
            )[0][0]

            cat_words = set(category.split())
            ans_words = set(answer.split())

            common_words = len(cat_words & ans_words)
            total_words = len(cat_words | ans_words)
            jaccard_similarity = common_words / total_words if total_words > 0 else 0

            answer_length = len(answer.split())
            category_length = len(category.split())
            length_ratio = answer_length / category_length if category_length > 0 else 0

            char_overlap = len(set(category.replace(' ', '')) & set(answer.replace(' ', '')))

            feature_vector = [
                tfidf_similarity,
                jaccard_similarity,
                common_words,
                answer_length,
                category_length,
                length_ratio,
                char_overlap,
                abs(answer_length - category_length),  # Length difference
            ]

            features.append(feature_vector)

        self.feature_names = [
            'tfidf_similarity', 'jaccard_similarity', 'common_words',
            'answer_length', 'category_length', 'length_ratio',
            'char_overlap', 'length_diff'
        ]

        return np.array(features)

    def extract_sentence_transformer_features(self, categories, answers):
        if not SENTENCE_TRANSFORMERS_AVAILABLE:
            raise ImportError("sentence-transformers library not available")

        if self.sentence_model is None:
            self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')

        category_embeddings = self.sentence_model.encode(
            categories,
            batch_size=32,
            show_progress_bar=True,
            convert_to_numpy=True
        )

        answer_embeddings = self.sentence_model.encode(
            answers,
            batch_size=32,
            show_progress_bar=True,
            convert_to_numpy=True
        )

        features = []
        embedding_dim = category_embeddings.shape[1]

        for i in range(len(categories)):
            cat_emb = category_embeddings[i]
            ans_emb = answer_embeddings[i]
            cosine_sim = np.dot(cat_emb, ans_emb) / (
                np.linalg.norm(cat_emb) * np.linalg.norm(ans_emb)
            )
            euclidean_dist = np.linalg.norm(cat_emb - ans_emb)
            euclidean_normalized = euclidean_dist / np.sqrt(embedding_dim)
            element_product = cat_emb * ans_emb
            element_diff = cat_emb - ans_emb
            abs_diff = np.abs(element_diff)
            product_mean = np.mean(element_product)
            product_std = np.std(element_product)
            diff_mean = np.mean(abs_diff)
            diff_std = np.std(abs_diff)

            category = categories[i]
            answer = answers[i]
            cat_words = set(category.split())
            ans_words = set(answer.split())

            common_words = len(cat_words & ans_words)
            jaccard_similarity = common_words / len(cat_words | ans_words) if len(cat_words | ans_words) > 0 else 0

            feature_vector = np.concatenate([
                [cosine_sim, euclidean_normalized, product_mean, product_std,
                 diff_mean, diff_std, jaccard_similarity, len(answer.split()),
                 len(category.split()), common_words],
                element_product[:50],
                abs_diff[:50]
            ])

            features.append(feature_vector)

        self.feature_names = (['cosine_sim', 'euclidean_dist', 'product_mean', 'product_std',
                              'diff_mean', 'diff_std', 'jaccard_sim', 'ans_len', 'cat_len', 'common_words'] +
                             [f'element_product_{i}' for i in range(50)] +
                             [f'abs_diff_{i}' for i in range(50)])

        return np.array(features)

    def load_and_preprocess_data(self, file_paths):
        all_data = []
        for file_path in file_paths:
            if os.path.exists(file_path):
                try:
                    df = pd.read_csv(file_path)
                    all_data.append(df)
                except Exception as e:
                    print(f"Error loading {file_path}: {e}")
            else:
                print(f"File not found: {file_path}")

        if not all_data:
            raise ValueError("No valid data files found!")

        combined_df = pd.concat(all_data, ignore_index=True)
        df = combined_df.copy()
        df['category_clean'] = df['category'].apply(self.clean_text)
        df['answer_clean'] = df['answer'].apply(self.clean_text)
        initial_rows = len(df)
        df = df[(df['category_clean'] != '') & (df['answer_clean'] != '')]
        df['relevance_score'] = pd.to_numeric(df['relevance_score'], errors='coerce')
        df = df[df['relevance_score'].notna()]
        df = df[(df['relevance_score'] >= 0) & (df['relevance_score'] <= 1)]
        final_rows = len(df)
        return df

    def train(self, df):
        categories = df['category_clean'].tolist()
        answers = df['answer_clean'].tolist()
        y = df['relevance_score'].values

        start_time = time.time()

        if self.approach in ['basic', 'auto']:
            try:
                X = self.extract_basic_features(categories, answers)
                if self.approach == 'auto':
                    self.approach = 'basic'
            except Exception as e:
                print(f"Basic features extraction failed: {e}")
                raise

        elif self.approach == 'sentence_transformer':
            try:
                X = self.extract_sentence_transformer_features(categories, answers)
            except Exception as e:
                print(f"Sentence transformer features failed: {e}")
                print("Falling back to basic features...")
                X = self.extract_basic_features(categories, answers)
                self.approach = 'basic'

        else:
            raise ValueError(f"Unknown approach: {self.approach}")

        feature_extraction_time = time.time() - start_time
        print(f"Feature extraction completed: {X.shape[1]} features in {feature_extraction_time:.2f}s")

        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=None
        )

        models_to_try = {
            'random_forest': RandomForestRegressor(
                n_estimators=100,
                max_depth=10,
                random_state=42
            ),
            'gradient_boosting': GradientBoostingRegressor(
                n_estimators=200,
                learning_rate=0.1,
                max_depth=6,
                random_state=42
            ),
            'linear_regression': LinearRegression(),
            'neural_network': MLPRegressor(
                hidden_layer_sizes=(100, 50),
                max_iter=500,
                random_state=42
            )
        }

        results = {}

        for model_name, model in models_to_try.items():
            try:
                print(f"\nTraining {model_name}")

                needs_scaling = model_name in ['neural_network', 'linear_regression']

                if needs_scaling:
                    if self.scaler is None:
                        self.scaler = StandardScaler()
                    X_train_scaled = self.scaler.fit_transform(X_train)
                    X_test_scaled = self.scaler.transform(X_test)

                    model.fit(X_train_scaled, y_train)
                    y_pred = model.predict(X_test_scaled)
                else:
                    model.fit(X_train, y_train)
                    y_pred = model.predict(X_test)

                y_pred = np.clip(y_pred, 0, 1)

                mse = mean_squared_error(y_test, y_pred)
                mae = mean_absolute_error(y_test, y_pred)
                r2 = r2_score(y_test, y_pred)

                results[model_name] = {
                    'model': model,
                    'mse': mse,
                    'mae': mae,
                    'r2': r2,
                    'needs_scaling': needs_scaling
                }

                print(f"   MSE: {mse:.4f} | MAE: {mae:.4f} | R²: {r2:.4f}")

            except Exception as e:
                print(f"   Failed to train {model_name}: {e}")

        if results:
            best_model_name = max(results.keys(), key=lambda k: results[k]['r2'])
            best_result = results[best_model_name]

            self.model = best_result['model']
            self.model_name = best_model_name

            if not best_result['needs_scaling']:
                self.scaler = None

            training_time = time.time() - start_time

            self.training_stats = {
                'approach': self.approach,
                'model_name': best_model_name,
                'training_samples': len(df),
                'feature_count': X.shape[1],
                'training_time': training_time,
                'feature_extraction_time': feature_extraction_time,
                'needs_scaling': best_result['needs_scaling'],
                'mse': best_result['mse'],
                'mae': best_result['mae'],
                'r2': best_result['r2'],
                'all_results': {k: {kk: vv for kk, vv in v.items() if kk != 'model'}
                               for k, v in results.items()}
            }

            print(f"\nBest model: {best_model_name}")
            print(f"Training completed in {training_time:.2f}s")
            print(f"Performance: R² = {best_result['r2']:.4f}, MAE = {best_result['mae']:.4f}")

            return self.training_stats
        else:
            raise ValueError("All models failed to train!")

    def predict(self, category, answer):
        if self.model is None:
            raise ValueError("Model not trained yet! Call train() first.")

        category_clean = self.clean_text(category)
        answer_clean = self.clean_text(answer)

        if self.approach == 'basic':
            features = self.extract_basic_features([category_clean], [answer_clean])
        elif self.approach == 'sentence_transformer':
            features = self.extract_sentence_transformer_features([category_clean], [answer_clean])
        else:
            raise ValueError(f"Unknown approach: {self.approach}")

        if self.scaler is not None:
            features = self.scaler.transform(features)

        prediction = self.model.predict(features)[0]
        return np.clip(prediction, 0, 1)

    def predict_batch(self, categories, answers):
        if self.model is None:
            raise ValueError("Model not trained yet! Call train() first.")

        categories_clean = [self.clean_text(cat) for cat in categories]
        answers_clean = [self.clean_text(ans) for ans in answers]

        if self.approach == 'basic':
            features = self.extract_basic_features(categories_clean, answers_clean)
        elif self.approach == 'sentence_transformer':
            features = self.extract_sentence_transformer_features(categories_clean, answers_clean)
        else:
            raise ValueError(f"Unknown approach: {self.approach}")

        if self.scaler is not None:
            features = self.scaler.transform(features)

        predictions = self.model.predict(features)
        return np.clip(predictions, 0, 1)

    def save_model(self, filepath):
        if self.model is None:
            raise ValueError("No model to save! Train the model first.")

        model_data = {
            'approach': self.approach,
            'model_name': self.model_name,
            'model': self.model,
            'vectorizer': self.vectorizer,
            'scaler': self.scaler,
            'feature_names': self.feature_names,
            'training_stats': self.training_stats,
            'sentence_model_name': 'all-MiniLM-L6-v2' if self.approach == 'sentence_transformer' else None
        }

        os.makedirs(os.path.dirname(filepath), exist_ok=True)

        with open(filepath, 'wb') as f:
            pickle.dump(model_data, f)

        print(f"Model saved to {filepath}")
        print(f"Approach: {self.approach}")
        print(f"Model: {self.model_name}")
        print(f"Features: {len(self.feature_names)}")

    def load_model(self, filepath):
        with open(filepath, 'rb') as f:
            model_data = pickle.load(f)

        self.approach = model_data['approach']
        self.model_name = model_data['model_name']
        self.model = model_data['model']
        self.vectorizer = model_data['vectorizer']
        self.scaler = model_data['scaler']
        self.feature_names = model_data['feature_names']
        self.training_stats = model_data['training_stats']

        if self.approach == 'sentence_transformer' and SENTENCE_TRANSFORMERS_AVAILABLE:
            print("Loading sentence transformer model...")
            self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')

        print(f"Model loaded from {filepath}")
        print(f"Approach: {self.approach}")
        print(f"Model: {self.model_name}")
        print(f"Performance: R² = {self.training_stats['r2']:.4f}")


In [None]:
data_files = [
    'chaotic_categories_data.csv',
    'chaotic_categories_dataset 1.csv',
    'chaotic_categories_dataset 2.csv',
    'chaotic_categories_dataset 3.csv'
]

test_cases = [
    ("Things You Can Use as a Hat (that are not hats)", "A watermelon half", 0.95),
    ("Things You Can Use as a Hat (that are not hats)", "A colander", 0.92),
    ("Things You Can Use as a Hat (that are not hats)", "Eat rice", 0.05),
    ("Ways to Avoid Paying a Debt", "Move to another country", 0.85),
    ("Ways to Avoid Paying a Debt", "Change identity to an alpaca", 0.65),
    ("Ways to Avoid Paying a Debt", "Buy groceries", 0.10),
    ("Unconvincing Superpowers", "The ability to fly very slowly", 0.90),
    ("Unconvincing Superpowers", "Make toast appear", 0.20),
]

approaches = ['basic']
if SENTENCE_TRANSFORMERS_AVAILABLE:
    approaches.append('sentence_transformer')

results = {}

for approach in approaches:
    print(f"\n{'=' * 60}")
    print(f"TESTING APPROACH: {approach.upper()}")
    print(f"{'=' * 60}")

    try:
        checker = RelevanceChecker(approach=approach)a
        df = checker.load_and_preprocess_data(data_files)
        training_stats = checker.train(df)
        model_path = f"models/relevance_checker_{approach}.pkl"
        os.makedirs(os.path.dirname(model_path), exist_ok=True)
        checker.save_model(model_path)
        print("\nPrediction Results:")
        print("-" * 60)

        for category, answer, expected in test_cases:
            try:
                prediction = checker.predict(category, answer)
                diff = abs(prediction - expected)
                if diff < 0.3:
                    status = "PASS"
                elif diff < 0.5:
                    status = "WARNING"
                else:
                    status = "FAIL"

                print(f"{status} - Category: {category[:50]}")
                print(f"  Answer: {answer}")
                print(f"  Predicted: {prediction:.3f} | Expected: {expected:.3f} | Difference: {diff:.3f}\n")

            except Exception as e:
                print(f"ERROR - Failed to predict for '{answer}' in category '{category}': {e}")

        results[approach] = {
            'checker': checker,
            'stats': training_stats,
            'model_path': model_path
        }

    except Exception as e:
        print(f"ERROR - Approach '{approach}' encountered an exception:\n{e}")
        traceback.print_exc()

if results:
    print(f"\n{'=' * 60}")
    print("FINAL SUMMARY")
    print(f"{'=' * 60}")

    for approach, result in results.items():
        stats = result['stats']
        print(f"\nApproach: {approach.upper()}")
        print(f"  Model Name       : {stats.get('model_name', 'N/A')}")
        print(f"  R² Score         : {stats.get('r2', 0.0):.4f}")
        print(f"  MAE              : {stats.get('mae', 0.0):.4f}")
        print(f"  Feature Count    : {stats.get('feature_count', 'N/A')}")
        print(f"  Training Duration: {stats.get('training_time', 0.0):.2f} seconds")
        print(f"  Model File Path  : {result['model_path']}")

    best_approach = max(results.keys(), key=lambda k: results[k]['stats'].get('r2', 0))
    print(f"\nrecommended approach: {best_approach.upper()}")
    print(f"  Best R² Score: {results[best_approach]['stats']['r2']:.4f}")