In [1]:
!pip install --upgrade pip
!pip install spacy
!pip install transformers
!pip install nltk
!pip install flask
!pip install rapidfuzz
!pip install fastapi uvicorn  # only if you want to test API code locally in Colab or you plan to run it on a VM

Collecting pip
  Downloading pip-25.0-py3-none-any.whl.metadata (3.7 kB)
Downloading pip-25.0-py3-none-any.whl (1.8 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.8/1.8 MB[0m [31m17.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 24.1.2
    Uninstalling pip-24.1.2:
      Successfully uninstalled pip-24.1.2
Successfully installed pip-25.0
Collecting rapidfuzz
  Downloading rapidfuzz-3.11.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Downloading rapidfuzz-3.11.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m31.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz
Successfully installed rapidfuzz-3.11.0
Collecting fastapi
  Downloading fastapi-0.115.7-py3-none-any.whl.metadata (27 kB)
Collecting uvicorn
  Downloading uv

In [10]:
!pip install transformers
!pip install optuna
!pip install torch
!pip install spacy
!pip install iterative-stratification
!pip install sentence-transformers
!python -m spacy download en_core_web_trf
!python -m spacy download en_core_web_sm

Collecting iterative-stratification
  Downloading iterative_stratification-0.1.9-py3-none-any.whl.metadata (1.3 kB)
Downloading iterative_stratification-0.1.9-py3-none-any.whl (8.5 kB)
Installing collected packages: iterative-stratification
Successfully installed iterative-stratification-0.1.9
Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m56.2 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [6]:
import pandas as pd
import random
import json
import re
import spacy
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from collections import Counter
from sklearn.utils import resample

# Download NLTK data if not already present
import nltk

# Download necessary resources
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')  # For WordNet lemmatizer
nltk.download('punkt_tab')  # Explicit download for punkt_tab

# Load spaCy model for additional preprocessing
nlp = spacy.load("en_core_web_sm")

# Define domain knowledge
domain_knowledge = {
    "competitors": [
        "CompetitorX", "CompetitorY", "AcmeCorp", "NextGenTech", "CloudSolutions",
        "DataInsights", "AutoMetrics", "HyperCloud", "InfoTech", "QuantumSoft"
    ],
    "features": [
        "real-time reporting", "automation suite", "advanced metrics", "AI engine",
        "data pipeline", "compliance dashboard", "enterprise-grade analytics",
        "scalable architecture", "cloud optimization"
    ],
    "pricing_keywords": [
        "discount", "promo code", "rebate", "cost reduction", "special rate",
        "bulk pricing", "early sign-up offer", "pricing model", "cost efficiency"
    ],
    "compliance": [
        "SOC2", "FedRAMP", "PCI-DSS", "ISO 27001", "HIPAA", "GDPR",
        "CCPA", "NIST compliance", "data privacy", "risk assessment"
    ]
}

# Define snippet templates
snippet_templates = [
    "We are impressed with {feature}, but {competitor} offers a better {pricing}.",
    "Does your solution handle {compliance}? {competitor} seems to.",
    "Our finance department is asking for a {pricing} or we'll stick with {competitor}.",
    "We are concerned about {compliance}.",
    "Can you provide a {pricing} if we commit early?",
    "How does your solution compare to {competitor} in terms of {feature}?",
    "We need better {feature} to handle enterprise demands.",
    "Does your automation suite support {compliance} and other requirements?",
    "Competitor {competitor} offers better pricing. How can you match it?",
    "We are worried about pricing. Can you provide a better {pricing}?"
]

# Generate realistic text snippets
def generate_snippet(template, domain_knowledge):
    return template.format(
        feature=random.choice(domain_knowledge["features"]),
        competitor=random.choice(domain_knowledge["competitors"]),
        pricing=random.choice(domain_knowledge["pricing_keywords"]),
        compliance=random.choice(domain_knowledge["compliance"])
    )

# Generate a diverse dataset
def create_dataset(num_samples=300):
    data = []
    labels_map = {
        "Positive": ["impressed", "love", "better feature"],
        "Negative": ["concerned", "worried", "stick with"],
        "Objection": ["match", "handle", "requirements"],
        "Pricing Discussion": ["discount", "pricing", "rebate", "promo code"],
        "Compliance": ["SOC2", "FedRAMP", "PCI-DSS", "GDPR"],
        "Competition": ["compare", "Competitor"]
    }

    for _ in range(num_samples):
        template = random.choice(snippet_templates)
        snippet = generate_snippet(template, domain_knowledge)

        # Assign labels based on keywords
        labels = [
            label for label, keywords in labels_map.items()
            if any(keyword in snippet for keyword in keywords)
        ]
        if not labels:
            labels = ["Uncategorized"]

        data.append({"text_snippet": snippet, "labels": ", ".join(labels)})

    return pd.DataFrame(data)

# Preprocess text: cleaning, lemmatization, and stopword removal
def preprocess_text(text):
    # Lowercase the text
    text = text.lower()

    # Remove special characters and digits
    text = re.sub(r"[^a-zA-Z\s]", "", text)

    # Tokenize text
    tokens = word_tokenize(text)

    # Remove stopwords
    stop_words = set(stopwords.words("english"))
    tokens = [token for token in tokens if token not in stop_words]

    # Lemmatize tokens
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(token) for token in tokens]

    return " ".join(tokens)

# Apply preprocessing to dataset
def preprocess_dataset(df):
    df["text_snippet_cleaned"] = df["text_snippet"].apply(preprocess_text)
    return df

# Handle label imbalance by oversampling
def balance_labels(df):
    label_counts = Counter(df["labels"])
    min_count = max(label_counts.values())  # Balance all labels to the max count

    balanced_df = pd.DataFrame()
    for label, count in label_counts.items():
        label_df = df[df["labels"] == label]
        if count < min_count:
            label_df = resample(label_df, replace=True, n_samples=min_count, random_state=42)
        balanced_df = pd.concat([balanced_df, label_df])

    return balanced_df

# Save domain knowledge to JSON
def save_domain_knowledge(domain_knowledge):
    with open("domain_knowledge.json", "w") as f:
        json.dump(domain_knowledge, f, indent=4)
    print("Domain knowledge JSON saved.")

# Main function to create, preprocess, and save the dataset
def main():
    # Generate dataset
    print("Generating dataset...")
    dataset = create_dataset()
    print(f"Generated {len(dataset)} samples.")

    # Preprocess dataset
    print("Preprocessing dataset...")
    dataset = preprocess_dataset(dataset)

    # Balance labels
    print("Balancing labels...")
    dataset = balance_labels(dataset)

    # Save dataset
    dataset.to_csv("calls_dataset.csv", index=False)
    print("Dataset saved as 'calls_dataset.csv'.")

    # Save domain knowledge
    save_domain_knowledge(domain_knowledge)

# Run the script
if __name__ == "__main__":
    main()

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


Generating dataset...
Generated 300 samples.
Preprocessing dataset...
Balancing labels...
Dataset saved as 'calls_dataset.csv'.
Domain knowledge JSON saved.


In [None]:
import pandas as pd
import numpy as np
import re
import nltk
import joblib
import optuna
from typing import List, Dict
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from torch.utils.data import Dataset, DataLoader
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from iterstrat.ml_stratifiers import MultilabelStratifiedKFold

# Preprocessor Class
class TextPreprocessor:
    def __init__(self):
        nltk.download('stopwords', quiet=True)
        nltk.download('wordnet', quiet=True)
        nltk.download('punkt', quiet=True)

    def preprocess(self, text: str) -> str:
        # Lowercase, remove special characters, and tokenize
        text = re.sub(r"[^\w\s]", "", text.lower())
        tokens = nltk.word_tokenize(text)
        return " ".join(tokens)

# Dataset Class
class MultiLabelDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(),
            "attention_mask": encoding["attention_mask"].squeeze(),
            "labels": torch.tensor(label, dtype=torch.float)
        }

# Classifier Class
class TransformerMultiLabelClassifier:
    def __init__(self, model_name="distilbert-base-uncased", num_labels=6):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name,
            num_labels=num_labels,
            problem_type="multi_label_classification"
        )

    def train_with_optuna(self, train_texts, train_labels, val_texts, val_labels):
        def objective(trial):
            # Suggest hyperparameters
            learning_rate = trial.suggest_loguniform("learning_rate", 1e-5, 5e-5)
            batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])

            # Create data loaders
            train_dataset = MultiLabelDataset(train_texts, train_labels, self.tokenizer)
            val_dataset = MultiLabelDataset(val_texts, val_labels, self.tokenizer)

            train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
            val_loader = DataLoader(val_dataset, batch_size=batch_size)

            # Optimizer and scheduler
            optimizer = torch.optim.AdamW(self.model.parameters(), lr=learning_rate)

            # Training
            self.model.train()
            for epoch in range(1):  # Reduced to 1 epoch for faster tuning
                for batch in train_loader:
                    optimizer.zero_grad()
                    outputs = self.model(
                        input_ids=batch["input_ids"],
                        attention_mask=batch["attention_mask"],
                        labels=batch["labels"]
                    )
                    loss = outputs.loss
                    loss.backward()
                    optimizer.step()

            # Validation
            self.model.eval()
            val_preds, val_labels_flat = [], []
            with torch.no_grad():
                for batch in val_loader:
                    outputs = self.model(
                        input_ids=batch["input_ids"],
                        attention_mask=batch["attention_mask"]
                    )
                    val_preds.extend(torch.sigmoid(outputs.logits).cpu().numpy())
                    val_labels_flat.extend(batch["labels"].cpu().numpy())

            # F1 Score
            val_f1 = f1_score(
                val_labels_flat,
                (np.array(val_preds) > 0.5),
                average="weighted"
            )
            return val_f1

        # Run Optuna optimization
        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=3)  # Reduced to 3 trials
        return study.best_params

    def save(self, path="transformer_multi_label_classifier.pkl"):
        joblib.dump(self.model, path)

# Training Pipeline
def train_pipeline(data_path="calls_dataset.csv"):
    # Load dataset
    df = pd.read_csv(data_path)
    preprocessor = TextPreprocessor()
    df["cleaned_text"] = df["text_snippet"].apply(preprocessor.preprocess)

    # Split data
    texts = df["cleaned_text"].tolist()
    labels = df["labels"].str.get_dummies(sep=", ").values.tolist()
    train_texts, val_texts, train_labels, val_labels = train_test_split(
        texts, labels, test_size=0.2, random_state=42
    )

    # Initialize and train classifier
    classifier = TransformerMultiLabelClassifier(num_labels=len(labels[0]))
    best_params = classifier.train_with_optuna(train_texts, train_labels, val_texts, val_labels)
    print("Best Hyperparameters:", best_params)

    # Save the model
    classifier.save()
    print("Model saved as 'transformer_multi_label_classifier.pkl'")

# Run the pipeline
if __name__ == "__main__":
    train_pipeline()


Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight', 'pre_classifier.bias', 'pre_classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
[I 2025-01-26 15:58:28,789] A new study created in memory with name: no-name-4f170b9d-0c04-4ff3-9701-de9c3214150d
  learning_rate = trial.suggest_loguniform("learning_rate", 1e-5, 5e-5)
[I 2025-01-26 16:07:51,087] Trial 0 finished with value: 0.7395352384213305 and parameters: {'learning_rate': 3.000117376949386e-05, 'batch_size': 32}. Best is trial 0 with value: 0.7395352384213305.
  learning_rate = trial.suggest_loguniform("learning_rate", 1e-5, 5e-5)
[I 2025-01-26 16:17:40,871] Trial 1 finished with value: 0.8248727163821503 and parameters: {'learning_rate': 1.6144628511646174e-05, 'batch_size': 32}. Best is trial 1 with value: 0.824872

In [28]:
import json
import re
import numpy as np
import pandas as pd
import spacy
from typing import Dict, List, Any
from rapidfuzz import process
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM
from sentence_transformers import SentenceTransformer


class AdvancedEntityExtractor:
    def __init__(self, domain_knowledge_path: str):
        # Load spaCy model
        self.nlp = spacy.load("en_core_web_trf")  # Transformer-based spaCy model
        # Load domain knowledge
        with open(domain_knowledge_path, "r") as f:
            self.domain_knowledge = json.load(f)
        # Sentence Transformer for semantic matching
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

    def semantic_matching(self, text: str, candidates: List[str], threshold: float = 0.7) -> List[str]:
        text_embedding = self.embedding_model.encode(text)
        candidate_embeddings = self.embedding_model.encode(candidates)
        similarities = [
            np.dot(text_embedding, candidate_emb) /
            (np.linalg.norm(text_embedding) * np.linalg.norm(candidate_emb))
            for candidate_emb in candidate_embeddings
        ]
        return [candidates[i] for i, sim in enumerate(similarities) if sim > threshold]

    def extract_entities(self, text: str) -> Dict[str, List[str]]:
        entities = {
            "competitors": [],
            "features": [],
            "pricing_keywords": [],
            "compliance": [],
            "ner_entities": []
        }

        # Exact Matching
        for category, terms in self.domain_knowledge.items():
            for term in terms:
                if term.lower() in text.lower():
                    entities[category].append(term)

        # Fuzzy Matching
        for category in ["competitors", "pricing_keywords"]:
            matches = process.extract(
                text, self.domain_knowledge.get(category, []), scorer=process.fuzz.partial_ratio
            )
            entities[category].extend([match[0] for match in matches if match[1] > 80])

        # Semantic Matching
        for category in ["competitors", "features"]:
            semantic_matches = self.semantic_matching(text, self.domain_knowledge.get(category, []))
            entities[category].extend(semantic_matches)

        # NER Extraction
        doc = self.nlp(text)
        entities["ner_entities"] = [
            ent.text for ent in doc.ents if ent.label_ in ["ORG", "PRODUCT", "GPE", "MONEY"]
        ]

        # Deduplicate and return
        for key in entities:
            entities[key] = list(set(entities[key]))
        return entities


class AdvancedTextSummarizer:
    def __init__(self, model_name: str = "facebook/bart-large-cnn"):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
        self.summarizer = pipeline("summarization", model=self.model, tokenizer=self.tokenizer, device=0)

    def dynamic_summarization(self, text: str, entities: Dict[str, List[str]], max_length: int = None) -> str:
        text_length = len(text.split())
        max_length = max_length or min(100, max(30, int(text_length * 0.3)))
        min_length = max(20, int(text_length * 0.1))
        try:
            summary = self.summarizer(
                text, max_length=max_length, min_length=min_length, do_sample=False
            )[0]["summary_text"]
        except Exception as e:
            summary = "Summary generation failed."

        entity_summary = " | ".join([f"{k}: {', '.join(v)}" for k, v in entities.items() if v])
        return f"{summary} | {entity_summary}"


def process_text(text: str, domain_knowledge_path: str) -> Dict[str, Any]:
    extractor = AdvancedEntityExtractor(domain_knowledge_path)
    summarizer = AdvancedTextSummarizer()
    entities = extractor.extract_entities(text)
    summary = summarizer.dynamic_summarization(text, entities)
    return {"original_text": text, "entities": entities, "summary": summary}


# Example usage
if __name__ == "__main__":
    domain_knowledge = {
        "competitors": ["CompetitorX", "CompetitorY", "AcmeCorp"],
        "features": ["real-time analytics", "automation suite", "advanced metrics"],
        "pricing_keywords": ["discount", "cost reduction", "pricing model"],
        "compliance": ["GDPR", "CCPA", "SOC2"]
    }

    with open("domain_knowledge.json", "w") as f:
        json.dump(domain_knowledge, f)

    text_snippet = (
        "CompetitorX offers advanced real-time analytics at a lower pricing model. "
        "We need cost reduction to match their capabilities."
    )
    result = process_text(text_snippet, "domain_knowledge.json")
    print(json.dumps(result, indent=2))

Augmented dataset saved to 'augmented_calls_dataset.csv'


In [29]:
import joblib
import json
import numpy as np
import torch
from typing import Dict, Any
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sentence_transformers import SentenceTransformer
from rapidfuzz import process
import spacy

class AdvancedInferencePipeline:
    def __init__(self,
                 classifier_path: str,
                 domain_knowledge_path: str,
                 summary_model: str = "facebook/bart-large-cnn"):
        # Optimized model loading
        self.classifier = joblib.load(classifier_path)
        self.tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

        # Advanced embedding model
        self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

        # Load domain knowledge with caching
        with open(domain_knowledge_path, "r") as f:
            self.domain_knowledge = json.load(f)

        # Precompute embeddings for faster semantic matching
        self.precomputed_embeddings = {
            category: self.embedding_model.encode(terms)
            for category, terms in self.domain_knowledge.items()
            if isinstance(terms, list)
        }

        # Lightweight spaCy model
        self.nlp = spacy.load("en_core_web_sm", disable=['parser', 'tagger'])

    def predict_labels(self, text: str, threshold: float = 0.5) -> Dict[str, float]:
        inputs = self.tokenizer(
            text,
            return_tensors="pt",
            truncation=True,
            padding=True,
            max_length=128
        )

        with torch.no_grad():
            outputs = self.classifier(**inputs)
            probabilities = torch.sigmoid(outputs.logits).numpy()[0]

        return {
            f"Label_{i}": float(prob)
            for i, prob in enumerate(probabilities)
            if prob > threshold
        }

    def extract_entities(self, text: str) -> Dict[str, Any]:
        entities = {
            "competitors": [],
            "features": [],
            "pricing_keywords": [],
            "compliance": [],
            "ner_entities": []
        }

        # Concurrent extraction methods
        text_lower = text.lower()

        # 1. Exact Matching (Fast)
        for category, terms in self.domain_knowledge.items():
            if isinstance(terms, list):
                entities[category].extend([
                    term for term in terms
                    if term.lower() in text_lower
                ])

        # 2. Fuzzy Matching
        fuzzy_categories = ["competitors", "pricing_keywords"]
        for category in fuzzy_categories:
            matches = process.extract(
                text,
                self.domain_knowledge.get(category, []),
                scorer=process.fuzz.partial_ratio,
                limit=3
            )
            entities[category].extend([match[0] for match in matches if match[1] > 80])

        # 3. Semantic Matching (Optimized)
        semantic_categories = ["competitors", "features"]
        text_embedding = self.embedding_model.encode(text)

        for category in semantic_categories:
            candidates = self.domain_knowledge.get(category, [])
            candidate_embeddings = self.precomputed_embeddings.get(category, [])

            similarities = [
                np.dot(text_embedding, candidate_emb) /
                (np.linalg.norm(text_embedding) * np.linalg.norm(candidate_emb))
                for candidate_emb in candidate_embeddings
            ]

            entities[category].extend([
                candidates[i] for i, sim in enumerate(similarities) if sim > 0.7
            ])

        # 4. NER Extraction
        doc = self.nlp(text)
        entities["ner_entities"] = [
            ent.text for ent in doc.ents
            if ent.label_ in ["ORG", "PRODUCT", "GPE", "MONEY"]
        ]

        # Deduplicate results
        return {k: list(set(v)) for k, v in entities.items()}

    def summarize_text(self, text: str, entities: Dict[str, Any]) -> str:
        # Dynamic length summarization
        words = text.split()
        max_length = min(100, max(30, int(len(words) * 0.3)))
        min_length = max(20, int(len(words) * 0.1))

        # Simplified summary generation
        summary = " ".join(words[:max_length])

        # Enrich with entities
        entity_summary = " | ".join([
            f"{k.capitalize()}: {', '.join(v)}"
            for k, v in entities.items() if v
        ])

        return f"{summary} | {entity_summary}"

    def process(self, text: str) -> Dict[str, Any]:
        predicted_labels = self.predict_labels(text)
        extracted_entities = self.extract_entities(text)
        summary = self.summarize_text(text, extracted_entities)

        return {
            "original_text": text,
            "predicted_labels": predicted_labels,
            "extracted_entities": extracted_entities,
            "summary": summary
        }

def main():
    pipeline = AdvancedInferencePipeline(
        classifier_path="transformer_multi_label_classifier.pkl",
        domain_knowledge_path="domain_knowledge.json"
    )

    text_snippet = (
        "CompetitorX offers advanced real-time analytics at a lower pricing model. "
        "We need cost reduction to match their capabilities."
    )

    result = pipeline.process(text_snippet)
    print(json.dumps(result, indent=2))

if __name__ == "__main__":
    main()

Classification Report:
                    precision    recall  f1-score   support

       Competition       0.77      0.21      0.33        47
          Negative       0.71      0.35      0.47        48
         Objection       0.47      0.20      0.28        40
          Positive       0.77      0.49      0.60        41
Pricing Discussion       0.78      0.54      0.64        59
          Security       0.53      0.21      0.30        39

         micro avg       0.70      0.35      0.46       274
         macro avg       0.67      0.33      0.44       274
      weighted avg       0.68      0.35      0.45       274
       samples avg       0.45      0.36      0.37       274

Model, vectorizer, and label binarizer saved!


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [37]:
import argparse
import json
import sys
from typing import Dict, Any
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor

# Advanced Logging Setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s: %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('nlp_pipeline.log')
    ]
)

class AdvancedCLI:
    def __init__(self, inference_pipeline):
        self.pipeline = inference_pipeline

    def execute_task(self, task: str, text: str) -> Dict[str, Any]:
        """Execute specific NLP tasks with error handling and logging."""
        try:
            with ThreadPoolExecutor() as executor:
                if task == "classify":
                    future = executor.submit(self.pipeline.predict_labels, text)
                    result = {"predicted_labels": future.result()}
                elif task == "extract":
                    future = executor.submit(self.pipeline.extract_entities, text)
                    result = {"extracted_entities": future.result()}
                elif task == "summarize":
                    entities = self.pipeline.extract_entities(text)
                    future = executor.submit(self.pipeline.summarize_text, text, entities)
                    result = {"summary": future.result()}
                elif task == "all":
                    future = executor.submit(self.pipeline.process, text)
                    result = future.result()
                else:
                    raise ValueError(f"Invalid task: {task}")

            logging.info(f"Task '{task}' completed successfully")
            return result
        except Exception as e:
            logging.error(f"Error in task '{task}': {e}")
            return {"error": str(e)}

    @classmethod
    def create_parser(cls) -> argparse.ArgumentParser:
        """Create advanced argument parser with rich configuration."""
        parser = argparse.ArgumentParser(
            description="Advanced NLP Pipeline CLI",
            epilog="Process text with multi-modal NLP techniques"
        )

        parser.add_argument(
            "--task",
            choices=["classify", "extract", "summarize", "all"],
            required=True,
            help="Select NLP processing task"
        )
        parser.add_argument(
            "--text",
            type=str,
            required=True,
            help="Text snippet for processing"
        )
        parser.add_argument(
            "--input_file",
            type=str,
            help="Process texts from input file (JSON/CSV)"
        )
        parser.add_argument(
            "--output_file",
            type=str,
            help="Save results to output file"
        )
        parser.add_argument(
            "--verbose",
            action="store_true",
            help="Enable detailed logging"
        )

        return parser

    def process_batch(self, input_file: str, task: str) -> list:
        """Process multiple texts from input file."""
        try:
            with open(input_file, 'r') as f:
                texts = json.load(f)

            results = []
            with ThreadPoolExecutor() as executor:
                task_func = partial(self.execute_task, task)
                results = list(executor.map(task_func, texts))

            return results
        except Exception as e:
            logging.error(f"Batch processing error: {e}")
            return []

    def save_results(self, results, output_file: str):
        """Save processing results to file."""
        try:
            with open(output_file, 'w') as f:
                json.dump(results, f, indent=2)
            logging.info(f"Results saved to {output_file}")
        except Exception as e:
            logging.error(f"Error saving results: {e}")

def main():
    from inference_pipeline import AdvancedInferencePipeline

    pipeline = AdvancedInferencePipeline(
        classifier_path="transformer_multi_label_classifier.pkl",
        domain_knowledge_path="domain_knowledge.json"
    )

    cli = AdvancedCLI(pipeline)
    parser = cli.create_parser()
    args = parser.parse_args()

    if args.verbose:
        logging.getLogger().setLevel(logging.DEBUG)

    if args.input_file:
        results = cli.process_batch(args.input_file, args.task)
        if args.output_file:
            cli.save_results(results, args.output_file)
        else:
            print(json.dumps(results, indent=2))
    else:
        result = cli.execute_task(args.task, args.text)
        print(json.dumps(result, indent=2))

if __name__ == "__main__":
    main()

Updated entity extraction completed and saved.


In [None]:
!git clone https://github.com/username/NLP-Project.git
%cd NLP-Project
