In [1]:
# Colab Notebook Template for Multi-Label Text Classification Pipeline

# --- 1. Setup and Installations ---
!pip install transformers datasets scikit-learn pandas seaborn nltk



In [3]:
# --- 3. Generate Synthetic Data ---
def generate_synthetic_data():
    # Define the domain knowledge
    domain_knowledge = {
        "competitors": ["CompetitorX", "CompetitorY", "CompetitorZ"],
        "features": ["analytics", "AI engine", "data pipeline"],
        "pricing_keywords": ["discount", "renewal cost", "budget", "pricing model"]
    }

    # Save domain knowledge as JSON
    with open('domain_knowledge.json', 'w') as f:
        json.dump(domain_knowledge, f, indent=4)

    # Generate calls dataset
    data = []
    for i in range(1, 101):
        snippet = f"This is a sample call snippet discussing {np.random.choice(domain_knowledge['features'])} and {np.random.choice(domain_knowledge['pricing_keywords'])}."
        labels = np.random.choice(["Objection", "Pricing Discussion", "Security", "Competition"], size=np.random.randint(1, 4), replace=False)
        data.append({"id": i, "text_snippet": snippet, "labels": ",".join(labels)})

    # Convert to DataFrame and save as CSV
    df = pd.DataFrame(data)
    df.to_csv('calls_dataset.csv', index=False)
    return df

data = generate_synthetic_data()
data.head()

Unnamed: 0,id,text_snippet,labels
0,1,This is a sample call snippet discussing AI en...,Pricing Discussion
1,2,This is a sample call snippet discussing AI en...,Competition
2,3,This is a sample call snippet discussing data ...,"Security,Objection,Competition"
3,4,This is a sample call snippet discussing AI en...,"Objection,Pricing Discussion,Security"
4,5,This is a sample call snippet discussing AI en...,"Objection,Security,Competition"


In [4]:
# --- 4. Data Preprocessing with spaCy ---
!pip install spacy -q
!python -m spacy download en_core_web_sm

import spacy
from sklearn.model_selection import train_test_split

# Load spaCy's English model
nlp = spacy.load("en_core_web_sm")

# Preprocessing function using spaCy
def preprocess_text_spacy(text):
    doc = nlp(text.lower())
    # Filter tokens: alphabetic, not stopwords, and not punctuation
    words = [token.text for token in doc if token.is_alpha and not token.is_stop]
    return " ".join(words)

# Apply preprocessing
data['cleaned_text'] = data['text_snippet'].apply(preprocess_text_spacy)

# Display the first few rows
data.head()

# Split the dataset into training and testing sets
train_data, test_data = train_test_split(data, test_size=0.2, stratify=data['labels'])


Collecting en-core-web-sm==3.7.1
  Using cached https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


ValueError: The least populated class in y has only 1 member, which is too few. The minimum number of groups for any class cannot be less than 2.

In [None]:


import spacy
from sklearn.model_selection import train_test_split

# Load spaCy's English model
nlp = spacy.load("en_core_web_sm")

# Preprocessing function using spaCy
def preprocess_text_spacy(text):
    doc = nlp(text.lower())
    # Filter tokens: alphabetic, not stopwords, and not punctuation
    words = [token.text for token in doc if token.is_alpha and not token.is_stop]
    return " ".join(words)

# Apply preprocessing
data['cleaned_text'] = data['text_snippet'].apply(preprocess_text_spacy)

# Filter data to ensure each class has at least two instances
data['labels_split'] = data['labels'].str.split(',')  # Split multi-label column into lists
data = data.explode('labels_split')  # Expand multi-label rows
label_counts = data['labels_split'].value_counts()  # Count occurrences of each label
valid_labels = label_counts[label_counts > 1].index  # Keep only labels with >1 instance
data = data[data['labels_split'].isin(valid_labels)]  # Filter the dataset

# Recombine labels for multi-label format
data = data.groupby('id').agg({
    'text_snippet': 'first',
    'cleaned_text': 'first',
    'labels_split': lambda x: ','.join(sorted(set(x)))
}).reset_index()

# Split the dataset
train_data, test_data = train_test_split(data, test_size=0.2, stratify=data['labels_split'])


In [None]:
# --- 5. Define Dataset Class for Transformers ---
class MultiLabelDataset(Dataset):
    def __init__(self, dataframe, tokenizer, label_map, max_length):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.label_map = label_map
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        text = self.data.iloc[index]['cleaned_text']
        labels = self.data.iloc[index]['labels'].split(',')
        label_vector = [0] * len(self.label_map)
        for label in labels:
            label_vector[self.label_map[label]] = 1

        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label_vector, dtype=torch.float)
        }

In [None]:
# --- 6. Model Training ---
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModelForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=4, problem_type="multi_label_classification")

# Map labels to integers
unique_labels = sorted(set(",".join(data['labels']).split(',')))
label_map = {label: i for i, label in enumerate(unique_labels)}

# Prepare datasets
train_dataset = MultiLabelDataset(train_data, tokenizer, label_map, max_length=128)
test_dataset = MultiLabelDataset(test_data, tokenizer, label_map, max_length=128)

# Training arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=3,
    weight_decay=0.01,
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
)

# Define compute metrics
def compute_metrics(pred):
    from sklearn.metrics import f1_score, precision_score, recall_score
    logits, labels = pred
    predictions = (logits > 0.5).astype(int)
    precision = precision_score(labels, predictions, average='micro')
    recall = recall_score(labels, predictions, average='micro')
    f1 = f1_score(labels, predictions, average='micro')
    return {"precision": precision, "recall": recall, "f1": f1}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)

# Train the model
trainer.train()

In [8]:
# --- 3. Generate Synthetic Data ---
def generate_synthetic_data():
    # Define the domain knowledge
    domain_knowledge = {
        "competitors": ["CompetitorX", "CompetitorY", "CompetitorZ"],
        "features": ["analytics", "AI engine", "data pipeline"],
        "pricing_keywords": ["discount", "renewal cost", "budget", "pricing model"]
    }

    # Save domain knowledge as JSON
    with open('domain_knowledge.json', 'w') as f:
        json.dump(domain_knowledge, f, indent=4)

    # Generate calls dataset
    data = []
    for i in range(1, 101):
        snippet = f"This is a sample call snippet discussing {np.random.choice(domain_knowledge['features'])} and {np.random.choice(domain_knowledge['pricing_keywords'])}."
        labels = np.random.choice(["Objection", "Pricing Discussion", "Security", "Competition"], size=np.random.randint(1, 4), replace=False)
        data.append({"id": i, "text_snippet": snippet, "labels": ",".join(labels)})

    # Convert to DataFrame and save as CSV
    df = pd.DataFrame(data)
    df.to_csv('calls_dataset.csv', index=False)
    return df

data = generate_synthetic_data()
data.head()

Unnamed: 0,id,text_snippet,labels
0,1,This is a sample call snippet discussing analy...,Pricing Discussion
1,2,This is a sample call snippet discussing data ...,"Security,Competition,Pricing Discussion"
2,3,This is a sample call snippet discussing analy...,"Security,Objection"
3,4,This is a sample call snippet discussing analy...,"Competition,Pricing Discussion"
4,5,This is a sample call snippet discussing analy...,"Security,Competition,Pricing Discussion"


In [9]:
# Download spaCy model
!python -m spacy download en_core_web_sm

Collecting en-core-web-sm==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.7.1/en_core_web_sm-3.7.1-py3-none-any.whl (12.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.8/12.8 MB[0m [31m44.1 MB/s[0m eta [36m0:00:00[0m
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_sm')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


In [14]:
# --- 4. Data Preprocessing ---
# Load spaCy model
nlp = spacy.load('en_core_web_sm')

# Preprocessing function
def preprocess_text(text):
    doc = nlp(text.lower())  # Process the text with spaCy
    # Remove stopwords and non-alphanumeric characters
    words = [token.text for token in doc if not token.is_stop and token.is_alpha]
    return " ".join(words)

# Apply preprocessing
data['cleaned_text'] = data['text_snippet'].apply(preprocess_text)

# Ensure each label has at least two samples
label_counts = data['labels'].value_counts()
valid_labels = label_counts[label_counts > 1].index
filtered_data = data[data['labels'].isin(valid_labels)]

# Validate test_size dynamically
min_test_samples = len(filtered_data['labels'].unique())  # Minimum samples needed for stratification
if len(filtered_data) < min_test_samples * 2:  # Check if the dataset is too small
    raise ValueError(
        f"Not enough samples to split: {len(filtered_data)} samples for {min_test_samples} classes. "
        "Ensure at least two samples per class."
    )

# Set test_size as a proportion, ensuring the test set has enough samples
test_size = max(0.2, min(0.5, min_test_samples / len(filtered_data)))

# Split the filtered dataset
train_data, test_data = train_test_split(
    filtered_data,
    test_size=test_size,
    stratify=filtered_data['labels']
)

print(f"Training samples: {len(train_data)}, Test samples: {len(test_data)}")


Training samples: 67, Test samples: 23


In [15]:
# --- 5. Define Dataset Class for Transformers ---
class MultiLabelDataset(Dataset):
    def __init__(self, dataframe, tokenizer, label_map, max_length):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.label_map = label_map
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        text = self.data.iloc[index]['cleaned_text']
        labels = self.data.iloc[index]['labels'].split(',')
        label_vector = [0] * len(self.label_map)
        for label in labels:
            label_vector[self.label_map[label]] = 1

        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(),
            'attention_mask': encoding['attention_mask'].squeeze(),
            'labels': torch.tensor(label_vector, dtype=torch.float)
        }

In [21]:
# --- 6. Model Training ---

import warnings
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer
from sklearn.metrics import f1_score, precision_score, recall_score
import numpy as np

# Suppress the Hugging Face warning related to the API key
warnings.filterwarnings("ignore", message=".*does not exist in your Colab secrets.*")

# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModelForSequenceClassification.from_pretrained(
    'bert-base-uncased',
    num_labels=4,
    problem_type="multi_label_classification"
)

# Map labels to integers
unique_labels = sorted(set(",".join(data['labels']).split(',')))
label_map = {label: i for i, label in enumerate(unique_labels)}

# Prepare datasets
train_dataset = MultiLabelDataset(train_data, tokenizer, label_map, max_length=128)
test_dataset = MultiLabelDataset(test_data, tokenizer, label_map, max_length=128)

# Training arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",  # Evaluate at the end of each epoch
    save_strategy="epoch",  # Save the model at the end of each epoch
    learning_rate=5e-5,  # Increased learning rate for faster learning
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    num_train_epochs=10,  # Increased epochs for better convergence
    weight_decay=0.01,
    save_total_limit=2,  # Keep only the 2 most recent saved models
    load_best_model_at_end=True,  # Load the best model based on evaluation metric
    metric_for_best_model="f1",  # Use F1 score to identify the best model
    lr_scheduler_type="linear",  # Linear learning rate scheduler
    warmup_steps=500,  # Gradual increase in learning rate
    report_to="none",  # Disable W&B logging
)

def compute_metrics(pred):
    logits, labels = pred

    # Apply sigmoid activation to scale logits to probabilities
    probabilities = 1 / (1 + np.exp(-logits))  # Sigmoid function

    # Use a threshold of 0.5 for multi-label classification
    predictions = (probabilities > 0.5).astype(int)

    # Debugging information
    print("Logits Distribution (mean ± std):", np.mean(logits), "±", np.std(logits))
    print("Logits (First 5 Rows):\n", logits[:5])
    print("Probabilities (First 5 Rows):\n", probabilities[:5])  # Display probabilities
    print("Predictions (First 5 Rows):\n", predictions[:5])
    print("Labels (First 5 Rows):\n", labels[:5])

    # Calculate metrics, handling undefined precision/recall with zero_division
    precision = precision_score(labels, predictions, average='micro', zero_division=0)
    recall = recall_score(labels, predictions, average='micro', zero_division=0)
    f1 = f1_score(labels, predictions, average='micro', zero_division=0)
    return {"precision": precision, "recall": recall, "f1": f1}


# Loss function with pos_weight (for handling class imbalance if needed)
# Uncomment if you want to handle class imbalance.
# from torch.nn import BCEWithLogitsLoss
# class_weights = torch.tensor([1.5, 1.0, 2.0, 1.2])  # Example weights (adjust based on your dataset)
# model.config.loss = BCEWithLogitsLoss(pos_weight=class_weights)

# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)

# Train the model
trainer.train()


Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Precision,Recall,F1
1,No log,0.717995,0.456522,0.5,0.477273
2,No log,0.714642,0.456522,0.5,0.477273
3,No log,0.70989,0.456522,0.5,0.477273
4,No log,0.703801,0.428571,0.357143,0.38961
5,No log,0.699993,0.441176,0.357143,0.394737
6,No log,0.701756,0.454545,0.357143,0.4
7,No log,0.70924,0.44,0.261905,0.328358
8,No log,0.722249,0.357143,0.238095,0.285714
9,No log,0.725901,0.391304,0.428571,0.409091
10,No log,0.725112,0.435897,0.404762,0.419753


Logits Distribution (mean ± std): 0.03221993 ± 0.40502295
Logits (First 5 Rows):
 [[ 0.558419   -0.23012756 -0.42872444  0.19110355]
 [ 0.558419   -0.23012756 -0.42872444  0.19110355]
 [ 0.59202385 -0.2959283  -0.43018317  0.24730423]
 [ 0.5644435  -0.2671002  -0.46082866  0.32615334]
 [ 0.58744705 -0.28118682 -0.4484228   0.21871197]]
Probabilities (First 5 Rows):
 [[0.6360867  0.44272068 0.39443097 0.547631  ]
 [0.6360867  0.44272068 0.39443097 0.547631  ]
 [0.64382935 0.42655316 0.3940826  0.5615129 ]
 [0.6374801  0.4336191  0.38678926 0.5808231 ]
 [0.64277923 0.43016288 0.38973582 0.55446106]]
Predictions (First 5 Rows):
 [[1 0 0 1]
 [1 0 0 1]
 [1 0 0 1]
 [1 0 0 1]
 [1 0 0 1]]
Labels (First 5 Rows):
 [[0. 1. 0. 0.]
 [0. 0. 1. 1.]
 [1. 1. 0. 0.]
 [1. 0. 1. 0.]
 [1. 0. 1. 1.]]
Logits Distribution (mean ± std): 0.0113190105 ± 0.3820246
Logits (First 5 Rows):
 [[ 0.53230166 -0.25471294 -0.39467826  0.10742597]
 [ 0.53230166 -0.25471294 -0.39467826  0.10742597]
 [ 0.5658939  -0.30524576

TrainOutput(global_step=90, training_loss=0.6932483673095703, metrics={'train_runtime': 1280.6582, 'train_samples_per_second': 0.523, 'train_steps_per_second': 0.07, 'total_flos': 44071893166080.0, 'train_loss': 0.6932483673095703, 'epoch': 10.0})

In [23]:
# --- 7. Evaluation ---
eval_results = trainer.evaluate()
print(eval_results)

# Generate classification report
all_preds = trainer.predict(test_dataset).predictions
all_labels = [item['labels'].tolist() for item in test_dataset]
all_preds = (all_preds > 0.5).astype(int)
print(classification_report(all_labels, all_preds, target_names=unique_labels))



# --- 8. Save Model & Artifacts ---
model.save_pretrained("./saved_model")
tokenizer.save_pretrained("./saved_model")

Logits Distribution (mean ± std): 0.03221993 ± 0.40502295
Logits (First 5 Rows):
 [[ 0.558419   -0.23012756 -0.42872444  0.19110355]
 [ 0.558419   -0.23012756 -0.42872444  0.19110355]
 [ 0.59202385 -0.2959283  -0.43018317  0.24730423]
 [ 0.5644435  -0.2671002  -0.46082866  0.32615334]
 [ 0.58744705 -0.28118682 -0.4484228   0.21871197]]
Probabilities (First 5 Rows):
 [[0.6360867  0.44272068 0.39443097 0.547631  ]
 [0.6360867  0.44272068 0.39443097 0.547631  ]
 [0.64382935 0.42655316 0.3940826  0.5615129 ]
 [0.6374801  0.4336191  0.38678926 0.5808231 ]
 [0.64277923 0.43016288 0.38973582 0.55446106]]
Predictions (First 5 Rows):
 [[1 0 0 1]
 [1 0 0 1]
 [1 0 0 1]
 [1 0 0 1]
 [1 0 0 1]]
Labels (First 5 Rows):
 [[0. 1. 0. 0.]
 [0. 0. 1. 1.]
 [1. 1. 0. 0.]
 [1. 0. 1. 0.]
 [1. 0. 1. 1.]]
{'eval_loss': 0.717995285987854, 'eval_precision': 0.45652173913043476, 'eval_recall': 0.5, 'eval_f1': 0.4772727272727273, 'eval_runtime': 9.6461, 'eval_samples_per_second': 2.384, 'eval_steps_per_second': 0.31

  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


('./saved_model/tokenizer_config.json',
 './saved_model/special_tokens_map.json',
 './saved_model/vocab.txt',
 './saved_model/added_tokens.json',
 './saved_model/tokenizer.json')

In [26]:
import json
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from typing import List

# Load the trained model and tokenizer (Task 1 model)
model_path = "/content/saved_model"  # Replace with your actual model path
model = AutoModelForSequenceClassification.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Define the labels used during training
labels = ["Objection", "General Query", "Competition", "Pricing Discussion"]

# Load domain knowledge JSON
with open("domain_knowledge.json", "r") as f:
    domain_knowledge = json.load(f)

# Function to classify text (supports single-label classification)
def classify_text(text: str, tokenizer, model, labels: List[str]) -> List[str]:
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=128)
    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    predicted_class = torch.argmax(logits, dim=1).item()  # For single-label classification
    return [labels[predicted_class]]

# Function to clean text
def clean_text(text: str) -> str:
    return text.lower()

# Function to extract entities
def extract_entities(text: str, domain_knowledge: dict, predicted_labels: List[str]) -> dict:
    entities = {}
    for label in predicted_labels:
        if label in domain_knowledge:
            keywords = domain_knowledge[label]
            matched_keywords = [keyword for keyword in keywords if keyword in text]
            print(f"Label: {label}, Matched Keywords: {matched_keywords}")  # Debugging step
            entities[f"{label}_keywords"] = matched_keywords
    return entities

# Function to process text data
def process_text_data(input_file: str, output_file: str):
    # Read input text file
    with open(input_file, "r") as f:
        lines = f.readlines()

    results = []
    for line in lines:
        # Clean the input text
        cleaned = clean_text(line.strip())

        # Classify the text
        predicted_labels = classify_text(cleaned, tokenizer, model, labels)

        # Extract entities based on the predicted labels
        extracted_entities = extract_entities(cleaned, domain_knowledge, predicted_labels)

        # Create the result object
        result = {
            "text": line.strip(),
            "cleaned_text": cleaned,
            "predicted_labels": predicted_labels,
            "extracted_entities": extracted_entities,
        }
        results.append(result)

    # Write the results to a JSON file
    with open(output_file, "w") as f:
        json.dump(results, f, indent=4)

# Example usage
input_file = "/content/sample_data/input.txt"  # Replace with your input .txt file path
output_file = "/content/sample_data/output.json"  # Replace with your desired output file path
process_text_data(input_file, output_file)


Label: Objection, Matched Keywords: ['concern', 'pricing']
Label: Objection, Matched Keywords: []


In [27]:
import json
import re
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch

# --- Load Domain Knowledge ---
with open('domain_knowledge.json', 'r') as f:
    domain_knowledge = json.load(f)

# --- 5. Load Pretrained Model ---
model_name = "/content/saved_model"  # Replace with your model path or name
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name)

# --- 6. Text Preprocessing ---
def preprocess_text(text):
    # Add any text preprocessing steps here like lowercasing, removing special characters, etc.
    return text.lower()

# --- 7. Entity Extraction ---
def extract_entities(text, domain_knowledge):
    entities = {
        'competitors': [],
        'features': [],
        'pricing_keywords': [],
        'priority': 'General extraction'  # Default to 'General extraction'
    }

    # Extract competitors
    for competitor in domain_knowledge['competitors']:
        if re.search(r'\b' + re.escape(competitor.lower()) + r'\b', text):
            entities['competitors'].append(competitor)

    # Extract features
    for feature in domain_knowledge['features']:
        if re.search(r'\b' + re.escape(feature.lower()) + r'\b', text):
            entities['features'].append(feature)

    # Extract pricing-related terms
    for pricing_term in domain_knowledge['pricing_keywords']:
        if re.search(r'\b' + re.escape(pricing_term.lower()) + r'\b', text):
            entities['pricing_keywords'].append(pricing_term)

    return entities

# --- 8. Model Prediction ---
def predict_labels(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
    logits = outputs.logits
    predicted_class = torch.argmax(logits, dim=1).item()
    return predicted_class  # Return the predicted class index (or you can map it to label)

# --- 9. Example Text Data ---
input_text = "Your competitor CompetitorX offers better services with AI engine at a lower cost. The discount they offer is better than ours."

# --- Preprocessing ---
cleaned_text = preprocess_text(input_text)

# --- Entity Extraction ---
extracted_entities = extract_entities(cleaned_text, domain_knowledge)

# --- Prediction ---
predicted_label_index = predict_labels(cleaned_text)

# Map predicted index to actual label (you need to define the mapping)
label_mapping = {0: "Objection", 1: "General Query"}  # Update with your actual label mapping
predicted_labels = [label_mapping.get(predicted_label_index, "Unknown")]

# --- Output ---
output = {
    'text': input_text,
    'cleaned_text': cleaned_text,
    'predicted_labels': predicted_labels,  # Using the model's prediction
    'extracted_entities': extracted_entities
}

print(output)


{'text': 'Your competitor CompetitorX offers better services with AI engine at a lower cost. The discount they offer is better than ours.', 'cleaned_text': 'your competitor competitorx offers better services with ai engine at a lower cost. the discount they offer is better than ours.', 'predicted_labels': ['Objection'], 'extracted_entities': {'competitors': ['CompetitorX'], 'features': ['AI engine'], 'pricing_keywords': ['discount'], 'priority': 'General extraction'}}


In [28]:
!pip install fastapi[all] uvicorn pyngrok transformers torch


Collecting uvicorn
  Downloading uvicorn-0.34.0-py3-none-any.whl.metadata (6.5 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.3-py3-none-any.whl.metadata (8.7 kB)
Collecting fastapi[all]
  Downloading fastapi-0.115.7-py3-none-any.whl.metadata (27 kB)
Collecting starlette<0.46.0,>=0.40.0 (from fastapi[all])
  Downloading starlette-0.45.3-py3-none-any.whl.metadata (6.3 kB)
Collecting fastapi-cli>=0.0.5 (from fastapi-cli[standard]>=0.0.5; extra == "all"->fastapi[all])
  Downloading fastapi_cli-0.0.7-py3-none-any.whl.metadata (6.2 kB)
Collecting python-multipart>=0.0.18 (from fastapi[all])
  Downloading python_multipart-0.0.20-py3-none-any.whl.metadata (1.8 kB)
Collecting ujson!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0,>=4.0.1 (from fastapi[all])
  Downloading ujson-5.10.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.3 kB)
Collecting email-validator>=2.0.0 (from fastapi[all])
  Downloading email_validator-2.2.0-py3-none-any.whl.metadata (25 kB)
Collecting p

In [29]:
from fastapi import FastAPI
from pydantic import BaseModel
import json
import re
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# Load model (Task 1 model path)
def load_model(model_path="./saved_model"):
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForSequenceClassification.from_pretrained(model_path)
    return tokenizer, model

# Initialize FastAPI app
app = FastAPI()

# Load the model and tokenizer
tokenizer, model = load_model()

# Load domain knowledge
with open('domain_knowledge.json', 'r') as f:
    domain_knowledge = json.load(f)

# Text input model
class TextInput(BaseModel):
    text: str

# Preprocessing function
def preprocess_text(text):
    return text.lower()

# Entity extraction function
def extract_entities(text, domain_knowledge):
    entities = {
        'competitors': [],
        'features': [],
        'pricing_keywords': [],
        'priority': 'General extraction'
    }

    for competitor in domain_knowledge['competitors']:
        if re.search(r'\b' + re.escape(competitor.lower()) + r'\b', text):
            entities['competitors'].append(competitor)

    for feature in domain_knowledge['features']:
        if re.search(r'\b' + re.escape(feature.lower()) + r'\b', text):
            entities['features'].append(feature)

    for pricing_term in domain_knowledge['pricing_keywords']:
        if re.search(r'\b' + re.escape(pricing_term.lower()) + r'\b', text):
            entities['pricing_keywords'].append(pricing_term)

    return entities

# Define the FastAPI endpoint
@app.post("/process_text/")
async def process_text(input_data: TextInput):
    input_text = input_data.text

    # Preprocess the text
    cleaned_text = preprocess_text(input_text)

    # Extract entities
    extracted_entities = extract_entities(cleaned_text, domain_knowledge)

    # Generate summary (simple version)
    summary = f"Competitor offers better services with {', '.join(extracted_entities['features'])}."

    # Predicted labels (dummy for now, replace with actual model prediction)
    predicted_labels = ["Objection"]  # Replace this with the actual model prediction

    return {
        "text": input_text,
        "cleaned_text": cleaned_text,
        "predicted_labels": predicted_labels,
        "extracted_entities": extracted_entities,
        "summary": summary
    }


In [31]:
!pip install flask




In [32]:
!pip install fastapi uvicorn




In [None]:
from flask import Flask, jsonify, request
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import json
import re

# --- Load Domain Knowledge ---
with open('domain_knowledge.json', 'r') as f:
    domain_knowledge = json.load(f)

# --- Load Task 1 Model ---
def load_model(model_path="/content/saved_model"):
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForSequenceClassification.from_pretrained(model_path)
    return tokenizer, model

# Load model
tokenizer, model = load_model()

# --- Flask App Setup ---
app = Flask(__name__)

# --- Text Preprocessing Function ---
def preprocess_text(text):
    return text.lower()

# --- Entity Extraction Function ---
def extract_entities(text, domain_knowledge):
    entities = {
        'competitors': [],
        'features': [],
        'pricing_keywords': [],
        'priority': 'General extraction'  # Default to 'General extraction'
    }

    # Extract competitors
    for competitor in domain_knowledge['competitors']:
        if re.search(r'\b' + re.escape(competitor.lower()) + r'\b', text):
            entities['competitors'].append(competitor)

    # Extract features
    for feature in domain_knowledge['features']:
        if re.search(r'\b' + re.escape(feature.lower()) + r'\b', text):
            entities['features'].append(feature)

    # Extract pricing-related terms
    for pricing_term in domain_knowledge['pricing_keywords']:
        if re.search(r'\b' + re.escape(pricing_term.lower()) + r'\b', text):
            entities['pricing_keywords'].append(pricing_term)

    return entities

# --- Prediction Function ---
def predict_labels(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)
        predictions = torch.sigmoid(outputs.logits).numpy()

    # You can customize the threshold for multi-label classification
    predicted_labels = ['Objection' if p[0] > 0.5 else 'General Query' for p in predictions]
    return predicted_labels

# --- Define the API Endpoint ---
@app.route("/predict", methods=["POST"])
def predict():
    # Get the input JSON data
    data = request.get_json()
    text = data['text']  # Assuming input JSON has a 'text' field

    # --- Preprocess the Text ---
    cleaned_text = preprocess_text(text)

    # --- Entity Extraction ---
    extracted_entities = extract_entities(cleaned_text, domain_knowledge)

    # --- Predicted Labels ---
    predicted_labels = predict_labels(cleaned_text)

    # --- Summary (dummy implementation) ---
    summary = f"Text contains references to competitors {extracted_entities['competitors']} with features {extracted_entities['features']}."

    # --- Return the JSON response ---
    response = {
        'text': text,
        'cleaned_text': cleaned_text,
        'predicted_labels': predicted_labels,
        'extracted_entities': extracted_entities,
        'summary': summary
    }
    return jsonify(response)

# --- Run the Flask App ---
if __name__ == "__main__":
    from google.colab.output import eval_js

    # Start the Flask app
    app.run(host='0.0.0.0', port=8000)

    # Display the API endpoint within Colab using an iframe
    eval_js('google.colab.output.iframe("http://127.0.0.1:8000", width=800, height=600)')


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:8000
 * Running on http://172.28.0.12:8000
INFO:werkzeug:[33mPress CTRL+C to quit[0m


In [30]:
from pyngrok import ngrok

# Set up a tunnel to the FastAPI app (default port 8000)
public_url = ngrok.connect(8000)
print(f"FastAPI app is running at: {public_url}")

# Run the FastAPI app with Uvicorn
!uvicorn app:app --host 0.0.0.0 --port 8000 --reload &




ERROR:pyngrok.process.ngrok:t=2025-01-26T19:31:12+0000 lvl=eror msg="failed to reconnect session" obj=tunnels.session err="authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n"
ERROR:pyngrok.process.ngrok:t=2025-01-26T19:31:12+0000 lvl=eror msg="session closing" obj=tunnels.session err="authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n"
ERROR:pyngrok.process.ngrok:t=2025-01-26T19:31:12+0000 lvl=eror msg="terminating with error" obj=app err="authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your aut

PyngrokNgrokError: The ngrok process errored on start: authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n.

In [24]:
import torch
import pandas as pd
import json
from transformers import AutoTokenizer, AutoModelForSequenceClassification

# Load Task 1 model
def load_model(model_path="./saved_model"):
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = AutoModelForSequenceClassification.from_pretrained(model_path)
    return tokenizer, model

# Classify text using Task 1 model
def classify_text(text, tokenizer, model, labels):
    encoding = tokenizer(
        text,
        max_length=128,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    )
    model.eval()
    with torch.no_grad():
        outputs = model(**encoding)
        logits = outputs.logits
        probabilities = torch.sigmoid(logits)
        predictions = (probabilities > 0.5).squeeze().numpy()
    return [labels[i] for i, p in enumerate(predictions) if p == 1]

# Extract entities from text using domain knowledge
def extract_entities(text, domain_knowledge, context_labels):
    extracted_entities = {category: [] for category in domain_knowledge.keys()}

    # Extract keywords using domain knowledge
    for category, keywords in domain_knowledge.items():
        for keyword in keywords:
            if keyword.lower() in text.lower():
                extracted_entities[category].append(keyword)

    # Add context-based prioritization
    if "Pricing Discussion" in context_labels:
        extracted_entities["priority"] = "Focus on pricing-related terms"
    elif "Competition" in context_labels:
        extracted_entities["priority"] = "Focus on competitor-related terms"
    else:
        extracted_entities["priority"] = "General extraction"

    return extracted_entities

# Main Task 2 Pipeline
def task2_pipeline(data_path, domain_knowledge_path, model_path):
    # Step 1: Load Data
    df = pd.read_csv(data_path)
    df["cleaned_text"] = df["text"].str.strip().str.lower()

    # Step 2: Load Task 1 model and domain knowledge
    tokenizer, model = load_model(model_path)
    with open(domain_knowledge_path, 'r') as f:
        domain_knowledge = json.load(f)

    # Define labels for Task 1 model (used during training)
    labels = ["Objection", "Pricing Discussion", "Competition", "General Query"]

    # Step 3: Process each text snippet
    df["predicted_labels"] = df["cleaned_text"].apply(lambda x: classify_text(x, tokenizer, model, labels))
    df["extracted_entities"] = df.apply(
        lambda row: extract_entities(row["cleaned_text"], domain_knowledge, row["predicted_labels"]),
        axis=1
    )

    return df

# Evaluate precision and recall (if ground truth is available)
def evaluate_extraction(predicted_entities, true_entities):
    from sklearn.metrics import precision_score, recall_score
    y_true = true_entities
    y_pred = predicted_entities
    precision = precision_score(y_true, y_pred, average="micro")
    recall = recall_score(y_true, y_pred, average="micro")
    return {"precision": precision, "recall": recall}

# Save Results
def save_results(df, output_path):
    df.to_csv(output_path, index=False)
    print(f"Results saved to {output_path}")

# Run Task 2 Pipeline
if __name__ == "__main__":
    data_path = "/content/sample_data/data.csv"  # Input file containing text snippets
    domain_knowledge_path = "/content/domain_knowledge.json"  # Domain-specific keywords
    model_path = "/content/saved_model"  # Path to the Task 1 trained model
    output_path = "/content/sample_data/task2_results.csv"  # Path to save the results

    # Run the pipeline
    results_df = task2_pipeline(data_path, domain_knowledge_path, model_path)
    save_results(results_df, output_path)


Results saved to /content/sample_data/task2_results.csv
