<a href="https://colab.research.google.com/github/ac-26/Automated-Content-Tagging-Provision/blob/main/content_tagging_v6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [119]:
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification
import torch
import re
from typing import List, Dict, Tuple
from collections import Counter
import spacy

In [120]:
class TextEncoder:
  #initialization function
  def __init__(self, model_name="sentence-transformers/all-MiniLM-L6-v2"):
      print(f"Initializing TextEncoder with model: {model_name}", flush=True)
      self.tokenizer = AutoTokenizer.from_pretrained(model_name)
      print("Tokenizer loaded", flush=True)
      self.model = AutoModel.from_pretrained(model_name)
      print("Model loaded", flush=True)
      self.model.eval()
      print("Model Loaded Successfully", flush=True)

  #encodes text
  def encode_text(self, text: str) -> np.ndarray:
    inputs = self.tokenizer(text, return_tensors="pt",truncation=True, padding=True, max_length=512)
    with torch.no_grad():
      outputs = self.model(**inputs)
    embeddings = outputs.last_hidden_state.mean(dim=1)
    return embeddings.numpy().flatten()

### **This was my initial approach when I was brainstorming to build this system, this is not being used currently in our system, IT IS ONLY KEPT FOR DEMONSTRATION OF PROJECT EVOLUTION.**

In [121]:
# class TagVocabulary:
#   #initialization function
#   def __init__(self):
#     self.tags = [
#             # Content Creation
#             "Content Writing", "Copywriting", "Blog Writing", "Article Writing",
#             "Creative Writing", "Technical Writing", "Content Strategy",

#             # Marketing
#             "Social Media Marketing", "Digital Marketing", "Email Marketing",
#             "Marketing Strategy", "Brand Marketing", "Influencer Marketing",

#             # Social Media
#             "Social Media", "Facebook Marketing", "Instagram Marketing",
#             "Twitter Marketing", "LinkedIn Marketing", "TikTok Marketing",

#             # Analytics & Testing
#             "A/B Testing", "Analytics", "Performance Tracking", "Data Analysis",
#             "Audience Research", "Market Research",

#             # Advertising
#             "Online Advertising", "Social Media Ads", "Google Ads",
#             "Facebook Ads", "Digital Advertising",

#             # Skills & Techniques
#             "Communication Skills", "Writing Skills", "Creative Skills",
#             "Marketing Skills", "Design Skills",

#             # Strategy & Planning
#             "Content Planning", "Marketing Planning", "Campaign Strategy",
#             "Audience Targeting", "Customer Engagement"
#         ]

#     print(f"Tag vocabulary initialized with {len(self.tags)} tags")

#     #this will return list of tags in our vocabulary
#     def get_tags(self) -> List[str]:
#         return self.tags.copy()

#     #this will add a new tag in our vocabulary
#     def add_tag(self, new_tag: str):
#         if new_tag not in self.tags:
#             self.tags.append(new_tag)
#             print(f"Added new tag: {new_tag}")
#         else:
#             print(f"Tag '{new_tag}' already exists")

In [122]:
# class BasicTagger:
#   #initializer function
#     def __init__(self):
#       self.encoder = TextEncoder()
#       self.vocabulary = TagVocabulary()

#       self.tag_embeddings = self._encode_all_tags()

#     #function to encode all tags before hand
#     def _encode_all_tags(self) -> Dict[str, np.ndarray]:
#         tag_embeddings = {}
#         for tag in self.vocabulary.tags:
#             embedding = self.encoder.encode_text(tag)
#             tag_embeddings[tag] = embedding

#         return tag_embeddings

#     #finds tags from our vocaublary that are applicable according to our text input
#     def find_matching_tags(self, input_text: str, top_k: int = 10) -> List[Tuple[str, float]]:
#         # Encode the input text
#         input_embedding = self.encoder.encode_text(input_text)

#         similarities = []

#         for tag_name, tag_embedding in self.tag_embeddings.items():
#             # Calculate cosine similarity
#             similarity = cosine_similarity(
#                 input_embedding.reshape(1, -1),
#                 tag_embedding.reshape(1, -1)
#             )[0][0]

#             similarities.append((tag_name, float(similarity)))

#         similarities.sort(key=lambda x: x[1], reverse=True)

#         return similarities[:top_k]

In [123]:
# # Test our basic tagger
# def test_basic_tagger():
#     tagger = BasicTagger()

#     test_text = """
#     Creating social media posts is a great way to hone your content writing skills.
#     Since posts are typically very short, snappy, and quick, you can easily try out
#     different styles of writing and see what people respond to. It's easy to change
#     direction and adapt if you need to tweak your writing style since social media
#     posts are typically fluid and changeable by nature. You can also practice A/B
#     testing with your social media ads—try writing two different posts and sending
#     it to similar demographics and see which one performs better.
#     """

#     print("Input text:")
#     print(test_text)

#     # Find matching tags
#     matching_tags = tagger.find_matching_tags(test_text, top_k=15)

#     print("Top matching tags:")
#     for i, (tag, score) in enumerate(matching_tags, 1):
#         print(f"{i:2d}. {tag:<25} (Score: {score:.3f})")

In [124]:
# if __name__ == "__main__":
#   test_basic_tagger()

### **Dynamic Tag Generation**

### **This is the approach that is being followed right now, it is able to fix the problems and limitation that we were facing in the above approach**

In [125]:
#this extracts key phrases from text using linguistic patterns and statistical methods.
class KeyPhraseExtractor:
    def __init__(self):
        # trying to use spacy model for linguistic analysis
        try:
            self.nlp = spacy.load("en_core_web_sm")
        except:
            print("Please install spacy model: python -m spacy download en_core_web_sm")
            raise

        #I have used Parts of Speech(POS) concept for tag extraction
        self.phrase_patterns = [
            #1-word phrases
            ["NOUN"],
            ["PROPN"],

            # 2-word phrases
            ["ADJ", "NOUN"],
            ["ADJ", "PROPN"],
            ["NOUN", "NOUN"],
            ["PROPN", "NOUN"],
            ["NOUN", "PROPN"],

            # 3-word phrases
            ["ADJ", "NOUN", "NOUN"],
            ["NOUN", "NOUN", "NOUN"],
            ["PROPN", "PROPN", "PROPN"],
            ["ADJ", "ADJ", "NOUN"],
            # ["NOUN", "VERB", "NOUN"],

            # 4-word phrases
            ["ADJ", "NOUN", "NOUN", "NOUN"],
            ["NOUN", "NOUN", "NOUN", "NOUN"],
            ["ADJ", "ADJ", "NOUN", "NOUN"],
            # ["NOUN", "NOUN", "VERB", "NOUN"],
            # ["ADJ", "NOUN", "VERB", "NOUN"],
            # ["NOUN", "NOUN", "NOUN", "VERB"],

            # Verb forms (gerunds)
            # ["VERB"],  # Will filter for -ing forms
            # ["ADJ", "VERB"],
        ]

        # Common compound terms that should stay together
        self.compound_terms = {
            "machine learning", "deep learning", "natural language processing",
            "neural network", "data science", "artificial intelligence",
            "computer vision", "big data", "real time", "decision making",
            "supply chain", "customer relationship", "human resources",
            "business process", "electronic health", "patient care"
        }

        print("KeyPhraseExtractor initialized successfully")


    #applying Named Entity Recognition(NER) for extracting entities like ORG, PERSON, GPE, DATE, etc.
    def extract_named_entities(self, text: str) -> List[Tuple[str, str, int]]:
      """
      Extract named entities from text using spaCy's NER.
      Returns: List of (entity_text, entity_label, frequency)
      """
      doc = self.nlp(text)  # Using SpaCy, not BERT

      # Count frequency of each entity (case-insensitive)
      entity_counter = Counter()
      entity_labels = {}  # Store the label for each entity

      for ent in doc.ents:
          # Normalize the entity text (lowercase for counting)
          entity_norm = ent.text.lower().strip()

          # Skip very short entities (except important ones like "UN", "AI")
          if len(ent.text) < 3 and ent.label_ not in ["ORG", "GPE", "PERSON"]:
              continue

          entity_counter[entity_norm] += 1
          # Store the original casing and label
          if entity_norm not in entity_labels:
              entity_labels[entity_norm] = (ent.text, ent.label_)

      # Format output: (original_text, label, frequency)
      entities = []
      for norm_text, freq in entity_counter.items():
          original_text, label = entity_labels[norm_text]
          entities.append((original_text, label, freq))

      return entities


    def extract_noun_chunks(self, text: str) -> List[Tuple[str, int]]:
        """
        Extract noun chunks from text using spaCy.
        """
        doc = self.nlp(text)

        chunk_counter = Counter()

        for chunk in doc.noun_chunks:
            chunk_text = chunk.text.strip()

            # Only keep multi-word chunks (2+ words)
            if len(chunk_text.split()) >= 2:
                chunk_lower = chunk_text.lower()
                chunk_counter[chunk_lower] += 1

        return list(chunk_counter.items())


    #updating extract phrases function to combine POS patterns from before and NER entitites as well
    def extract_phrases_with_metadata(self, text: str, min_freq: int = 1) -> List[Tuple[str, int, dict]]:
      doc = self.nlp(text.lower())

      phrase_data = {}  # one phrase contains -> {'freq': count, 'source': type, 'entity_type': label}

      #getting all NER entities from the tetx.
      # Get NER entities using BERT
      ner_entities = self.extract_named_entities(text)
      for entity_text, entity_label, freq in ner_entities:
          phrase_lower = entity_text.lower()

          if phrase_lower not in phrase_data:
              phrase_data[phrase_lower] = {
                  'freq': freq,
                  'source': 'ner',
                  'entity_type': entity_label,
                  'original_text': entity_text
          }
          else:
              #if already exists (maybe from POS) update only if NER has higher frequency count
              if freq > phrase_data[phrase_lower]['freq']:
                  phrase_data[phrase_lower]['freq'] = freq
                  phrase_data[phrase_lower]['entity_type'] = entity_label

      # After NER extraction, add:
      noun_chunks = self.extract_noun_chunks(text)
      for chunk_text, freq in noun_chunks:
          if chunk_text not in phrase_data:
              phrase_data[chunk_text] = {
                  'freq': freq,
                  'source': 'noun_chunk',
                  'entity_type': None,
                  'original_text': chunk_text
              }

      #extract phrases using existing POS patterns
      text_lower = text.lower()
      for compound in self.compound_terms:
          count = text_lower.count(compound)
          if count > 0:
              if compound not in phrase_data:
                  phrase_data[compound] = {
                      'freq': count,
                      'source': 'pos',
                      'entity_type': None,
                      'original_text': compound
                  }
              else:
                  #update frequency count if POS found more count
                  phrase_data[compound]['freq'] = max(phrase_data[compound]['freq'], count)

      #extract POS patterns
      for sentence in doc.sents:
          tokens = list(sentence)

          for start_idx in range(len(tokens)):
              for pattern in self.phrase_patterns:
                  end_idx = start_idx + len(pattern)
                  if end_idx <= len(tokens):
                      span_tokens = tokens[start_idx:end_idx]
                      pos_sequence = [token.pos_ for token in span_tokens]

                      if pos_sequence == pattern:
                          phrase_tokens = []
                          valid = True

                          for i, token in enumerate(span_tokens):
                              if len(pattern) == 1 and token.is_stop:
                                  valid = False
                                  break

                              if token.pos_ == "VERB" and len(pattern) == 1:
                                  if not token.text.endswith("ing"):
                                      valid = False
                                      break

                              if len(pattern) == 1 and len(token.text) < 3:
                                  valid = False
                                  break

                              phrase_tokens.append(token.text)

                          if valid and phrase_tokens:
                              phrase = " ".join(phrase_tokens)
                              phrase = re.sub(r'\s+', ' ', phrase).strip()

                              #check if it's a subset of existing phrase
                              is_subset = False
                              for existing in phrase_data:
                                  if phrase in existing and phrase != existing:
                                      is_subset = True
                                      break

                              if not is_subset and phrase:
                                  if phrase not in phrase_data:
                                      phrase_data[phrase] = {
                                          'freq': 1,
                                          'source': 'pos',
                                          'entity_type': None,
                                          'original_text': phrase
                                      }
                                  else:
                                      phrase_data[phrase]['freq'] += 1

      # convert to output format
      phrases_with_metadata = []
      for phrase, data in phrase_data.items():
        if data['freq'] >= min_freq:
            metadata = {
                'source': data['source'],
                'entity_type': data['entity_type']
            }
            display_text = data.get('original_text', phrase)
            phrases_with_metadata.append((display_text, data['freq'], metadata))

      phrases_with_metadata.sort(key=lambda x: (x[1], len(x[0].split())), reverse=True)

      return phrases_with_metadata

In [126]:
#this is used to score adn filter the phrases that we extracted above to identify best tags in our input sentance
class PhraseScorer:
    def __init__(self):
        # Common/generic words that make poor tags
        self.generic_words = {
            'way', 'ways', 'thing', 'things', 'people', 'person', 'time', 'times',
            'place', 'places', 'day', 'days', 'year', 'years', 'good', 'bad',
            'great', 'nice', 'sure', 'certain', 'different', 'same', 'other',
            'new', 'old', 'high', 'low', 'large', 'small', 'long', 'short',
            'easy', 'hard', 'simple', 'complex', 'nature', 'type', 'types',
            'kind', 'kinds', 'lot', 'lots', 'direction', 'need', 'needs'
        }

        # Words that boost phrase importance
        self.domain_indicators = {
            'analysis', 'strategy', 'marketing', 'development', 'management',
            'design', 'research', 'optimization', 'system', 'process', 'method',
            'technique', 'approach', 'framework', 'model', 'algorithm', 'data',
            'content', 'digital', 'social', 'media', 'online', 'software',
            'testing', 'planning', 'writing', 'creative', 'technical',
            'learning', 'training', 'network', 'neural', 'artificial',
            'intelligence', 'language', 'processing', 'natural', 'automated'
        }

        print("PhraseScorer initialized successfully")

    #encorporated NER entities to have higher weightage
    def calculate_phrase_scores_with_ner(self, phrases_with_metadata: List[Tuple[str, int, dict]],
                                    text_length: int) -> List[Tuple[str, float, dict]]:

      scored_phrases = []

      #get max frequency for normalization
      max_freq = 1
      if phrases_with_metadata:
          frequencies = []
          for phrase, freq, metadata in phrases_with_metadata:
              frequencies.append(freq)
          max_freq = max(frequencies)

      for phrase, freq, metadata in phrases_with_metadata:
          #initialize scores
          scores = {
              'frequency': 0.0,
              'specificity': 0.0,
              'length': 0.0,
              'completeness': 0.0,
              'entity_bonus': 0.0
          }

          #frequency score
          scores['frequency'] = min(freq / max_freq, 1.0) * 0.3

          # secificity score
          words = phrase.lower().split()
          generic_count = sum(1 for word in words if word in self.generic_words)
          scores['specificity'] = (1 - generic_count / len(words)) * 0.25

          # 3.length score
          if len(words) == 1:
              scores['length'] = 0.65
          elif len(words) == 2:
              scores['length'] = 0.85
          elif len(words) == 3:
              scores['length'] = 0.90
          elif len(words) == 4:
              scores['length'] = 1.0
          else:
              scores['length'] = 0.4
          scores['length'] *= 0.15

          # completeness score
          incomplete_markers = {'of', 'to', 'for', 'with', 'and', 'or', 'but', 'the', 'a', 'an'}
          is_complete = (words[0] not in incomplete_markers and
                        words[-1] not in incomplete_markers)
          scores['completeness'] = 1.0 if is_complete else 0.5
          scores['completeness'] *= 0.05

          #-----------------------------------------------------------------
          #NER scores
          if metadata.get('source') == 'ner' and metadata.get('entity_type'):
            entity_type = metadata['entity_type']
            if entity_type == 'PERSON':
                scores['entity_bonus'] = 0.05  # Was 0.20
            elif entity_type == 'ORG':
                scores['entity_bonus'] = 0.05  # Was 0.20
            elif entity_type == 'GPE':
                scores['entity_bonus'] = 0.05  # Was 0.25
            elif entity_type == 'DATE':
                scores['entity_bonus'] = 0.03  # Was 0.15
            else:
                scores['entity_bonus'] = 0.02  # Was 0.10

          # Calculate total score
          total_score = sum(scores.values())

          scored_phrases.append((phrase, total_score, metadata))

      scored_phrases.sort(key=lambda x: x[1], reverse=True)

      return scored_phrases


    #This removes similar or redundant phrases keeping the ones that are most meaningful
    def filter_similar_phrases_with_metadata(self, scored_phrases: List[Tuple[str, float, dict]],
                                        similarity_threshold: float = 0.5) -> List[Tuple[str, float, dict]]:

      if not scored_phrases:
          return []

      filtered = []

      for phrase, score, metadata in scored_phrases:
          phrase_lower = phrase.lower()
          words = set(phrase_lower.split())

          should_keep = True
          phrases_to_remove = []

          for i, (kept_phrase, kept_score, kept_metadata) in enumerate(filtered):
              kept_lower = kept_phrase.lower()
              kept_words = set(kept_lower.split())

              # Skip if exact same phrase
              if phrase_lower == kept_lower:
                  should_keep = False
                  break

              # Handle subset relationships
              if words.issubset(kept_words) or kept_words.issubset(words):
                  len_diff = abs(len(words) - len(kept_words))
                  score_diff = abs(score - kept_score)

                  # Prefer longer phrase if score difference is small
                  if score_diff < 0.15 and len_diff >= 1:
                      if len(words) > len(kept_words):
                          phrases_to_remove.append(i)
                      else:
                          should_keep = False
                          break
                  else:
                      # Large score difference - keep higher scoring one
                      if score > kept_score:
                          phrases_to_remove.append(i)
                      else:
                          should_keep = False
                          break

          # Remove marked phrases
          if phrases_to_remove:
              for idx in reversed(phrases_to_remove):
                  filtered.pop(idx)

          if should_keep:
              filtered.append((phrase, score, metadata))

      return filtered

In [127]:
#Complete dynamic tagging class
class DynamicTagger:
    def __init__(self, encoder_model="sentence-transformers/all-MiniLM-L6-v2"):
        print("Initializing DynamicTagger...", flush=True)
        self.encoder = TextEncoder(encoder_model)
        print("TextEncoder initialized", flush=True)
        self.extractor = KeyPhraseExtractor()
        print("KeyPhraseExtractor initialized", flush=True)
        self.scorer = PhraseScorer()
        print("PhraseScorer initialized", flush=True)
        print("DynamicTagger ready!", flush=True)




    #added NER approach
    def generate_tags(self, text: str, max_tags: int = 10, min_score: float = 0.15) -> List[Tuple[str, float]]:

      # Extract key phrases with metadata (includes NER entities)
      phrases_with_metadata = self.extractor.extract_phrases_with_metadata(text)

      if not phrases_with_metadata:
          return []

      # Score phrases including NER bonuses
      word_count = len(text.split())
      scored_phrases = self.scorer.calculate_phrase_scores_with_ner(phrases_with_metadata, word_count)

      # Filter similar phrases (now preserves metadata)
      filtered_phrases = self.scorer.filter_similar_phrases_with_metadata(scored_phrases)

      # Apply semantic relevance using embeddings
      text_embedding = self.encoder.encode_text(text)

      # Combine quality score with semantic relevance
      final_scores = []
      for phrase, quality_score, metadata in filtered_phrases:
          # Get semantic similarity between phrase and full text
          phrase_embedding = self.encoder.encode_text(phrase)

          # Calculate cosine similarity
          semantic_score = cosine_similarity(
              text_embedding.reshape(1, -1),
              phrase_embedding.reshape(1, -1)
          )[0][0]

          # Combine scores (70% quality, 30% semantic)
          combined_score = (quality_score * 0.7) + (semantic_score * 0.3)

          final_scores.append((phrase, combined_score))

      # Sort by combined score
      final_scores.sort(key=lambda x: x[1], reverse=True)

      # Apply quality threshold
      quality_tags = [(tag, score) for tag, score in final_scores if score > min_score]

      # Ensure minimum number of tags
      if len(quality_tags) < 5 and len(final_scores) >= 5:
          quality_tags = final_scores[:5]

      # Return up to max_tags
      return quality_tags[:max_tags]


    #this returns only tags
    def tag_text(self, text: str, max_tags: int = 7) -> List[str]:
        tag_scores = self.generate_tags(text, max_tags)
        return [tag for tag, _ in tag_scores]

    #this returns tags with scores
    def tag_text_with_scores(self, text: str, max_tags: int = 7) -> List[Tuple[str, float]]:
        return self.generate_tags(text, max_tags)


# **Testing**