<a href="https://colab.research.google.com/github/Adeel777eng/TASK-No.5/blob/main/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
#!/usr/bin/env python3
"""
auto_tag_tickets.py

Auto-tag support tickets:
- Zero-shot classification using Hugging Face zero-shot pipeline (bart-large-mnli)
- Optional few-shot using OpenAI ChatCompletion (requires OPENAI_API_KEY)
- Fine-tune DistilBERT classifier and compare performance
- Output top-3 tags per ticket for each method

Usage:
    python auto_tag_tickets.py --csv tickets.csv

Optional args:
    --tags tags.txt             # candidate tags (one per line). If omitted, tags are inferred.
    --do_finetune               # perform fine-tuning (can be slow)
    --use_openai_fewshot        # use OpenAI few-shot prompting (requires OPENAI_API_KEY env var)
    --epochs 3
    --batch_size 16
    --output_dir saved_model
"""

import os
import argparse
import math
from typing import List, Tuple, Dict, Optional

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from tqdm import tqdm

import torch
from transformers import (
    pipeline,
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
)
from datasets import Dataset, DatasetDict # Removed load_metric
import evaluate # Added evaluate library

# Optional OpenAI few-shot (only used if requested and openai installed)
try:
    import openai
    _HAS_OPENAI = True
except Exception:
    _HAS_OPENAI = False

# ---------------------------
# Helpers
# ---------------------------
def load_data(csv_path: str) -> pd.DataFrame:
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"CSV not found: {csv_path}")
    df = pd.read_csv(csv_path)
    if "text" not in df.columns:
        raise ValueError("CSV must contain 'text' column.")
    return df

def load_tags(tags_path: Optional[str], df: pd.DataFrame) -> List[str]:
    if tags_path and os.path.exists(tags_path):
        with open(tags_path, "r", encoding="utf-8") as f:
            tags = [line.strip() for line in f if line.strip()]
        if not tags:
            raise ValueError("tags.txt is empty.")
        return tags
    # Infer from labeled data if available
    if "label" in df.columns and not df["label"].isnull().all():
        tags = sorted(df["label"].dropna().unique().tolist())
        return tags
    raise ValueError("No tags provided and no labeled data to infer tags from. Provide tags.txt or labeled CSV.")

def prepare_datasets(df: pd.DataFrame, test_size=0.2, seed=42) -> Tuple[DatasetDict, List[str]]:
    """
    Returns HuggingFace DatasetDict with train/val/test if labels exist; otherwise returns dataset for inference.
    """
    if "label" in df.columns and not df["label"].isnull().all():
        # Only keep rows that have a label for supervised training
        labeled_df = df.dropna(subset=["label"]).reset_index(drop=True)
        train_df, test_df = train_test_split(labeled_df, test_size=test_size, random_state=seed, stratify=labeled_df["label"])
        train_df, val_df = train_test_split(train_df, test_size=0.1, random_state=seed, stratify=train_df["label"])
        ds = DatasetDict({
            "train": Dataset.from_pandas(train_df[["text", "label"]]),
            "validation": Dataset.from_pandas(val_df[["text", "label"]]),
            "test": Dataset.from_pandas(test_df[["text", "label"]]),
        })
        labels = sorted(labeled_df["label"].unique().tolist())
        return ds, labels
    else:
        # No labels — just return dataset for inference
        ds = Dataset.from_pandas(df[["text"]])
        return DatasetDict({"infer": ds}), []

# ---------------------------
# Zero-shot classification
# ---------------------------
def run_zero_shot(texts: List[str], candidate_labels: List[str], hypothesis_template: str = "This example is {}.") -> List[List[Tuple[str, float]]]:
    """
    Returns top-k label probabilities per text as list of (label, score) tuples sorted desc.
    """
    device = 0 if torch.cuda.is_available() else -1
    zsp = pipeline("zero-shot-classification", model="facebook/bart-large-mnli", device=device)
    results = []
    for t in tqdm(texts, desc="zero-shot"):
        out = zsp(t, candidate_labels, hypothesis_template=hypothesis_template)
        labels = out["labels"]
        scores = out["scores"]
        results.append(list(zip(labels, scores)))
    return results

# ---------------------------
# Optional few-shot using OpenAI (chat)
# ---------------------------
def run_openai_fewshot(texts: List[str], candidate_labels: List[str], examples: List[Tuple[str, str]], top_k=3) -> List[List[Tuple[str, float]]]:
    """
    Few-shot with OpenAI ChatCompletion. Requires OPENAI_API_KEY in env and openai package.
    examples: list of (text, label) training exemplars to include in prompt
    Returns top_k predicted labels with dummy scores (OpenAI doesn't return probs reliably).
    """
    if not _HAS_OPENAI:
        raise RuntimeError("openai package not installed. Install openai to use few-shot mode.")
    if "OPENAI_API_KEY" not in os.environ:
        raise RuntimeError("OPENAI_API_KEY not found in environment for OpenAI few-shot.")

    openai.api_key = os.environ["OPENAI_API_KEY"]
    system_prompt = "You are an assistant that assigns a single tag from the provided candidate tags to each support ticket. Return a JSON list of top tags with confidences."

    # craft few-shot prompt
    example_block = ""
    for ex_text, ex_label in examples:
        example_block += f"Ticket: {ex_text}\nTag: {ex_label}\n\n"

    candidate_block = "Candidate tags: " + ", ".join(candidate_labels) + "\n\n"

    # Fix: Correctly format the prompt string to avoid SyntaxError
    prompt = f"{system_prompt}\n\n{candidate_block}Examples:\n{example_block}Now, label the following ticket. Provide the top {top_k} tags in JSON as [{{\"tag\": <tag>, \"score\": <0-1>}}].\n\nTicket: " + text + "\n\nAnswer:"

    results = []
    for text in tqdm(texts, desc="openai-fewshot"):
        # Use ChatCompletion (gpt-3.5-turbo)
        try:
            resp = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[{"role":"user","content":prompt}],
                temperature=0.0,
                max_tokens=200,
            )
            content = resp["choices"][0]["message"]["content"].strip()
            # Attempt to parse JSON inside content; fallback to simple heuristics
            import json, re
            m = re.search(r"\[.*\]", content, re.S)
            parsed = []
            if m:
                try:
                    parsed = json.loads(m.group(0))
                except Exception:
                    # crude parse: find tags in text
                    for tag in candidate_labels:
                        if tag.lower() in content.lower():
                            parsed.append({"tag": tag, "score": 0.9})
            else:
                # fallback: pick first mentioned tags in candidate_labels
                parsed = []
                for tag in candidate_labels:
                    if tag.lower() in content.lower():
                        parsed.append({"tag": tag, "score": 0.9})
            # normalize to list of tuples
            tup = []
            for p in parsed[:top_k]:
                tag = p.get("tag") if isinstance(p, dict) else str(p)
                score = float(p.get("score", 0.5)) if isinstance(p, dict) else 0.5
                tup.append((tag, score))
            results.append(tup)
        except Exception as e:
            results.append([(candidate_labels[0], 1.0)])  # fallback
    return results


# ---------------------------
# Fine-tune classifier (DistilBERT)
# ---------------------------
def finetune_classifier(ds: DatasetDict, label_list: List[str], output_dir: str, epochs=3, batch_size=16, lr=2e-5) -> Tuple[str, AutoTokenizer, AutoModelForSequenceClassification]:
    """
    Fine-tune DistilBERT on ds['train'] with ds['validation'].
    Returns model_dir path, tokenizer and model.
    """
    model_name = "distilbert-base-uncased"
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    def tokenize(batch):
        return tokenizer(batch["text"], truncation=True, padding="max_length", max_length=128)
    tokenized = ds.map(tokenize, batched=True, remove_columns=ds["train"].column_names)

    # map labels to ids
    label2id = {l:i for i,l in enumerate(label_list)}
    def label_map(batch):
        batch["labels"] = [label2id[l] for l in batch["label"]]
        return batch
    tokenized = tokenized.map(label_map, batched=True)

    tokenized.set_format(type="torch", columns=["input_ids","attention_mask","labels"])

    model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(label_list))

    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    training_args = TrainingArguments(
        output_dir=output_dir,
        num_train_epochs=epochs,
        per_device_train_batch_size=batch_size,
        per_device_eval_batch_size=batch_size,
        learning_rate=lr,
        evaluation_strategy="epoch",
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        greater_is_better=True,
        save_total_limit=2,
        logging_steps=50,
        push_to_hub=False,
    )

    # metrics
    # Load metric using evaluate.load()
    f1_metric = evaluate.load("f1")
    accuracy_metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        preds = np.argmax(logits, axis=-1)
        # Compute metrics using the loaded metric objects
        acc = accuracy_metric.compute(predictions=preds, references=labels)["accuracy"]
        f1 = f1_metric.compute(predictions=preds, references=labels, average="macro")["f1"]
        return {"accuracy": acc, "f1_macro": f1}

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized["train"],
        eval_dataset=tokenized["validation"],
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics,
    )

    trainer.train()
    trainer.save_model(output_dir)
    tokenizer.save_pretrained(output_dir)
    return output_dir, tokenizer, model

def predict_finetuned(model_dir: str, texts: List[str], top_k=3) -> List[List[Tuple[str, float]]]:
    """
    Load finetuned model and produce top_k label probabilities per text.
    """
    device = 0 if torch.cuda.is_available() else -1
    classifier = pipeline("text-classification", model=model_dir, return_all_scores=True, device=device, function_to_apply=None)
    results = []
    for t in tqdm(texts, desc="finetuned-predict"):
        out = classifier(t)[0]  # list of dicts
        # out contains dicts with 'label' and 'score'
        # ensure sorted
        out_sorted = sorted([(d["label"], d["score"]) for d in out], key=lambda x: x[1], reverse=True)
        results.append(out_sorted[:top_k])
    return results

# ---------------------------
# Utilities: evaluate top-1 accuracy & macro F1 for predicted lists
# ---------------------------
def evaluate_top1(preds_topk: List[List[Tuple[str, float]]], golds: List[str]) -> Dict[str, float]:
    # preds_topk: list of list[(label, score)]
    top1 = [p[0][0] if p and len(p)>0 else "" for p in preds_topk]
    acc = accuracy_score(golds, top1)
    f1 = f1_score(golds, top1, average="macro")
    return {"accuracy": acc, "f1_macro": f1}

# ---------------------------
# Main CLI
# ---------------------------
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--csv", type=str, default="tickets.csv", help="CSV with 'text' and optionally 'label'")
    parser.add_argument("--tags", type=str, default=None, help="Optional tags.txt file")
    parser.add_argument("--do_finetune", action="store_true", help="Perform fine-tuning")
    parser.add_argument("--use_openai_fewshot", action="store_true", help="Use OpenAI few-shot")
    parser.add_argument("--epochs", type=int, default=3)
    parser.add_argument("--batch_size", type=int, default=16)
    parser.add_argument("--output_dir", type=str, default="saved_model")
    # Use parse_known_args() to ignore extra arguments from the Colab kernel
    args, unknown = parser.parse_known_args()

    csv_path = args.csv
    # Check for the presence of the CSV file
    if not os.path.exists(csv_path):
        print(f"CSV file not found: {csv_path}. Please ensure the CSV file is in the correct directory.")
        # You might want to exit or handle this differently depending on desired behavior
        return # Exit the main function if the file is not found


    df = load_data(args.csv)
    print(f"Loaded {len(df)} rows from {args.csv}")

    # Prepare datasets (if labels exist)
    ds_dict, inferred_labels = prepare_datasets(df)
    # candidate tags
    tags = load_tags(args.tags, df) if args.tags or inferred_labels else inferred_labels
    print("Candidate tags:", tags)

    # TEXTS to predict — use full CSV order
    texts = df["text"].astype(str).tolist()

    # 1) Zero-shot
    print("\n=== Running zero-shot classification ===")
    zs_results = run_zero_shot(texts, tags)
    # print top-3 for first 5
    for i, (txt, res) in enumerate(zip(texts[:5], zs_results[:5])):
        print(f"\nTicket: {txt}\nZero-shot top-3:")
        for label, score in res[:3]:
            print(f"  {label}: {score:.3f}")

    # Evaluate if gold labels exist
    if "label" in df.columns and not df["label"].isnull().all():
        golds = df["label"].fillna("").tolist()
        zs_eval = evaluate_top1(zs_results, golds)
        print("\nZero-shot evaluation (top-1):", zs_eval)

    # 2) Optional few-shot via OpenAI
    if args.use_openai_fewshot:
        if not _HAS_OPENAI or "OPENAI_API_KEY" not in os.environ:
            print("OpenAI few-shot requested but openai package or OPENAI_API_KEY not available. Skipping.")
        else:
            # Build small example set from labeled rows (up to 5 per tag)
            examples = []
            if "label" in df.columns and not df["label"].isnull().all():
                sampled = df.dropna(subset=["label"]).groupby("label").head(2)  # 2 examples per label (few-shot)
                examples = list(zip(sampled["text"].tolist(), sampled["label"].tolist()))
            else:
                # if no labels, craft synthetic screenshots or prompt-less few-shot is weaker
                examples = [(texts[i], tags[0]) for i in range(min(4, len(texts)))]
            print("\n=== Running OpenAI few-shot classification (may consume credits) ===")
            fs_results = run_openai_fewshot(texts, tags, examples, top_k=3)
            for txt, res in zip(texts[:5], fs_results[:5]):
                print(f"\nTicket: {txt}\nFew-shot top predictions: {res}")

            if "label" in df.columns and not df["label"].isnull().all():
                fs_eval = evaluate_top1(fs_results, df["label"].fillna("").tolist())
                print("\nFew-shot evaluation (top-1):", fs_eval)

    # 3) Fine-tune (optional)
    if args.do_finetune:
        if not inferred_labels:
            raise ValueError("Fine-tuning requires labeled data (a 'label' column).")
        os.makedirs(args.output_dir, exist_ok=True)
        print("\n=== Fine-tuning classifier ===")
        model_dir, tokenizer, model = finetune_classifier(ds_dict, inferred_labels, output_dir=args.output_dir, epochs=args.epochs, batch_size=args.batch_size)
        print("Fine-tuned model saved to:", model_dir)

        # Predictions with fine-tuned model
        ft_results = predict_finetuned(model_dir, texts, top_k=3)
        for txt, res in zip(texts[:5], ft_results[:5]):
            print(f"\nTicket: {txt}\nFine-tuned top-3:")
            for label, score in res:
                print(f"  {label}: {score:.3f}")

        if "label" in df.columns and not df["label"].isnull().all():
            ft_eval = evaluate_top1(ft_results, df["label"].fillna("").tolist())
            print("\nFine-tuned evaluation (top-1):", ft_eval)

    # Save combined outputs to CSV
    print("\nSaving top-3 predictions to 'predictions_output.csv' ...")
    out = df.copy()
    # zero-shot top-3 strings
    out["zs_top3"] = [";".join([f"{l}|{s:.3f}" for l,s in res[:3]]) for res in zs_results]
    if args.do_finetune:
        out["ft_top3"] = [";".join([f"{l}|{s:.3f}" for l,s in res[:3]]) for res in ft_results]
    else:
        out["ft_top3"] = ""
    out.to_csv("predictions_output.csv", index=False)
    print("Saved predictions_output.csv")

if __name__ == "__main__":
    # Use parse_known_args() to ignore extra arguments from the Colab kernel
    parser = argparse.ArgumentParser() # Re-initialize the parser
    parser.add_argument("--csv", type=str, default="tickets.csv", help="CSV with 'text' and optionally 'label'")
    parser.add_argument("--tags", type=str, default=None, help="Optional tags.txt file")
    parser.add_argument("--do_finetune", action="store_true", help="Perform fine-tuning")
    parser.add_argument("--use_openai_fewshot", action="store_true", help="Use OpenAI few-shot")
    parser.add_argument("--epochs", type=int, default=3)
    parser.add_argument("--batch_size", type=int, default=16)
    parser.add_argument("--output_dir", type=str, default="saved_model")

    args, unknown = parser.parse_known_args()
    # Pass the parsed arguments to main
    main()

ModuleNotFoundError: No module named 'evaluate'