In [1]:
!pip install -q datasets
!pip install rouge

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/485.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m485.4/485.4 kB[0m [31m24.8 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/116.3 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m9.8 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/143.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.5/143.5 kB[0m [31m10.4 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/194.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.8/194.8 kB[0m [31m16.7 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting rouge
  Downloading rouge-1.0.

In [2]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from datasets import load_dataset
import json
from typing import List, Dict, Any
import random
from collections import Counter

class PromptTester:
    def __init__(self, model_name: str, hf_token: str):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            token=hf_token,
            device_map="auto",
            torch_dtype=torch.bfloat16
        )
        self.dataset = load_dataset("n3Er/FinQA-Infix")
        self.examples = self.get_curated_examples()

    def categorize_operation(self, operation: str) -> str:
        """Categorize the type of operation"""
        if any(op in operation for op in ["max", "min", "sum", "average"]):
            return "table"
        elif any(op in operation for op in ["greater", "less", "equal"]):
            return "boolean"
        elif "power" in operation:
            return "exponential"
        else:
            return "arithmetic"

    def get_curated_examples(self) -> List[Dict]:
        """Get diverse examples covering all operation types and operators"""
        train_data = list(self.dataset['train'])
        # Filter for examples with valid operations and answers
        valid_examples = [
            ex for ex in train_data
            if ex['program_re'] and ex['answer'] is not None
        ]

        # Define operation subtypes we want to include
        operation_subtypes = {
            "arithmetic": {
                "addition": "add(",
                "subtraction": "subtract(",
                "multiplication": "multiply(",
                "division": "divide(",
                "percentage": "multiply( divide("
            },
            "table": {
                "sum": "sum(",
                "max": "max(",
                "min": "min(",
                "average": "average("
            },
            "boolean": {
                "greater": "greater(",
                "less": "less(",
                "equal": "equal("
            },
            "exponential": {
                "power": "exp("
            }
        }

        # Initialize collection of examples
        selected_examples = []

        # For each main operation type
        for op_type, subtypes in operation_subtypes.items():
            for subtype, operator in subtypes.items():
                # Find examples containing this operator
                matching_examples = [
                    ex for ex in valid_examples
                    if operator in ex['program_re']
                    and not any(operator in selected['program_re'] for selected in selected_examples)
                ]

                # Select the first matching example
                for example in matching_examples:
                    if len(selected_examples) < 8:  # Limit to 8 total examples
                        selected_examples.append(example)
                        break

        return selected_examples

    def format_table(self, table: List[List[Any]]) -> str:
        if not table:
            return ""
        return "\n".join(" | ".join(str(cell) for cell in row) for row in table)

    def format_context(self, example: Dict) -> str:
        """Format context consistently"""
        parts = []
        if example['pre_text']:
            parts.append("Context before:")
            parts.append(' '.join(example['pre_text']))

        if example['table']:
            parts.append("\nTable:")
            parts.append(self.format_table(example['table']))

        if example['post_text']:
            parts.append("\nContext after:")
            parts.append(' '.join(example['post_text']))

        return "\n".join(parts)

    def create_prompt(self, example: Dict) -> str:
        """Create prompt with pattern-learning approach"""
        demonstrations = []
        for demo in self.examples:
            demo_text = f"""Question: {demo['question']}
Context:
{self.format_context(demo)}
Operations: {demo['program_re']}
Result: {demo['answer']}
---"""
            demonstrations.append(demo_text)

        context = self.format_context(example)
        return f"""Study these examples of financial problems and their solutions.
Each example shows how to deduce the mathematical operation required to solve a problem from the context and question making up the problem. The operations are expressed using function notation like:
add(), subtract(), multiply(), divide(), greater(), less(), table_sum(), table_max(), table_average(), exp(), etc.
Examples:
{chr(10).join(demonstrations)}
Now solve this question:
Question: {example['question']}
Context:
{context}
Based on what you learned from the examples above, provide:
Operations: [mathematical operation]
Result: [result]"""

    def extract_response(self, response: str) -> Dict[str, str]:
        """Extract operations and result with better handling of variations"""
        operations = None
        answer = None

        # Clean up the response
        lines = [line.strip() for line in response.split('\n') if line.strip()]

        # Handle variations in format
        for line in lines:
            if "Operations:" in line:
                operations = line.split("Operations:")[-1].strip()
                operations = operations.replace('[mathematical operation]', '').strip()
                operations = operations.replace('[operations]', '').strip()
                if operations == 'None' or operations == '---':
                    operations = None
            elif "Result:" in line:
                answer = line.split("Result:")[-1].strip()
                answer = answer.replace('[result]', '').strip()
                if answer == 'None' or answer == '---' or answer == '(as a number)':
                    answer = None

        return {
            "operations": operations,
            "answer": answer
        }

    def evaluate_response(self, expected_ops: str, predicted_ops: str) -> Dict[str, float]:
        """Evaluate the response using ROUGE metrics"""
        try:
            from rouge import Rouge
        except ImportError:
            print("Rouge package not found. Installing rouge...")
            import subprocess
            import sys
            subprocess.check_call([sys.executable, "-m", "pip", "install", "rouge"])
            from rouge import Rouge

        metrics = {}

        # Initialize Rouge for operation comparison
        rouge = Rouge()

        # Evaluate operations using Rouge if both exist
        if expected_ops and predicted_ops:
            try:
                # Calculate Rouge scores
                rouge_scores = rouge.get_scores(predicted_ops, expected_ops)[0]
                metrics['rouge1_f'] = rouge_scores['rouge-1']['f']
                metrics['rouge2_f'] = rouge_scores['rouge-2']['f']
                metrics['rougeL_f'] = rouge_scores['rouge-l']['f']
            except Exception:
                metrics['rouge1_f'] = 0.0
                metrics['rouge2_f'] = 0.0
                metrics['rougeL_f'] = 0.0
        else:
            metrics['rouge1_f'] = 0.0
            metrics['rouge2_f'] = 0.0
            metrics['rougeL_f'] = 0.0

        return metrics

    def test_prompts(self, num_samples: int = 10, num_generations: int = 5) -> None:
        """Test with pattern matching and self-consistency"""
        test_samples = []
        example_ids = {ex['id'] for ex in self.examples}

        for sample in random.sample(list(self.dataset['train']), num_samples + len(example_ids)):
            if sample['id'] not in example_ids and len(test_samples) < num_samples:
                test_samples.append(sample)

        print("\nExample Operations Used in Prompt:")
        print("=" * 50)
        for ex in self.examples:
            print(f"\nOperation Type: {self.categorize_operation(ex['program_re'])}")
            print(f"Operations: {ex['program_re']}")

        print("\nTesting with pattern matching and self-consistency:")
        print("=" * 50)

        # Collect metrics across all samples
        all_metrics = []

        for idx, sample in enumerate(test_samples, 1):
            print(f"\nTest Sample {idx}")
            print("-" * 30)
            print(f"Operation Type: {self.categorize_operation(sample['program_re'])}")
            print(f"Question: {sample['question']}")
            print(f"Expected Operations: {sample['program_re']}")
            print(f"Expected Answer: {sample['answer']}")

            # Get multiple responses for self-consistency
            responses = []
            for _ in range(num_generations):
                prompt = self.create_prompt(sample)
                inputs = self.tokenizer(prompt, return_tensors="pt").to("cuda")

                with torch.no_grad():
                    outputs = self.model.generate(
                        **inputs,
                        max_new_tokens=200,
                        temperature=0.7,
                        top_p=0.95,
                        do_sample=True,
                        num_return_sequences=1,
                        pad_token_id=self.tokenizer.eos_token_id
                    )

                response = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
                extracted = self.extract_response(response)

                if extracted["operations"] and extracted["answer"]:
                    responses.append(extracted)

            # Get consensus from responses
            if responses:
                print("\nAll Generated Responses:")
                for i, response in enumerate(responses, 1):
                    print(f"\nGeneration {i}:")
                    print(f"Operations needed: {response['operations']}")
                    print(f"Answer: {response['answer']}")

                # Find most common operation and answer
                operations = [r["operations"] for r in responses]
                answers = [r["answer"] for r in responses]
                most_common_ops = Counter(operations).most_common(1)[0][0]
                most_common_ans = Counter(answers).most_common(1)[0][0]

                print("\nConsensus from multiple generations:")
                print(f"Operations needed: {most_common_ops}")
                print(f"Answer: {most_common_ans}")

                # Evaluate the consensus response
                metrics = self.evaluate_response(
                    sample['program_re'],
                    most_common_ops
                )
            else:
                metrics = {
                    'rouge1_f': 0.0,
                    'rouge2_f': 0.0,
                    'rougeL_f': 0.0
                }

            all_metrics.append(metrics)
            print("\nROUGE Scores:")
            print(f"ROUGE-1 F1: {metrics['rouge1_f']:.3f}")
            print(f"ROUGE-2 F1: {metrics['rouge2_f']:.3f}")
            print(f"ROUGE-L F1: {metrics['rougeL_f']:.3f}")

        # Calculate and display average metrics
        avg_metrics = {
            metric: sum(m[metric] for m in all_metrics) / len(all_metrics)
            for metric in all_metrics[0].keys()
        }

        print("\nOverall ROUGE Scores:")
        print("=" * 50)
        print(f"Average ROUGE-1 F1: {avg_metrics['rouge1_f']:.3f}")
        print(f"Average ROUGE-2 F1: {avg_metrics['rouge2_f']:.3f}")
        print(f"Average ROUGE-L F1: {avg_metrics['rougeL_f']:.3f}")

if __name__ == "__main__":
    tester = PromptTester(
        model_name="Qwen/Qwen2.5-7B-Instruct",
        hf_token="your_private_token"
    )
    tester.test_prompts(num_samples=10, num_generations=5)

tokenizer_config.json:   0%|          | 0.00/7.30k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/2.78M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/1.67M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/7.03M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/663 [00:00<?, ?B/s]

model.safetensors.index.json:   0%|          | 0.00/27.8k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/4 [00:00<?, ?it/s]

model-00001-of-00004.safetensors:   0%|          | 0.00/3.95G [00:00<?, ?B/s]

model-00002-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

model-00003-of-00004.safetensors:   0%|          | 0.00/3.86G [00:00<?, ?B/s]

model-00004-of-00004.safetensors:   0%|          | 0.00/3.56G [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/243 [00:00<?, ?B/s]

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/4.50k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/12.4M [00:00<?, ?B/s]

validation-00000-of-00001.parquet:   0%|          | 0.00/801k [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/1.19M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/6251 [00:00<?, ? examples/s]

Generating validation split:   0%|          | 0/883 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/1147 [00:00<?, ? examples/s]


Example Operations Used in Prompt:

Operation Type: arithmetic
Operations: add(75.0, 72.7), divide(#0, 20)

Operation Type: arithmetic
Operations: subtract(959.2, 991.1), divide(#0, 991.1)

Operation Type: boolean
Operations: multiply(607, 18.13), multiply(#0, const_1000), multiply(3.3, const_1000000), greater(#1, #2)

Operation Type: table
Operations: table_sum(cash flows on retained interests, none)

Operation Type: table
Operations: table_max(cumulative foreign currency translation, none)

Operation Type: table
Operations: table_min(expected volatility, none), table_max(expected volatility, none), subtract(#1, #0)

Operation Type: table
Operations: table_average(2016, none)

Operation Type: arithmetic
Operations: add(const_1, 2.0%), exp(#0, 7), multiply(397, #1)

Testing with pattern matching and self-consistency:

Test Sample 1
------------------------------
Operation Type: arithmetic
Question: what is the average purchase price of shares purchased during 1998?
Expected Operations