In [None]:
# Install dependencies
!pip install -r installments.txt --quiet

In [None]:
# Download and Install SpaCy's LLM
import spacy
!python -m spacy download en_core_web_sm --quiet

In [None]:
import pandas as pd
import numpy as np
from tqdm.auto import tqdm
import re
import ftfy
import torch
from torch.utils.data import TensorDataset, DataLoader
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from captum.attr import IntegratedGradients
import spacy
from IPython.display import HTML
import matplotlib as mpl
import matplotlib.cm

# Check if CUDA is available
if torch.cuda.is_available():
    # To use GPU
    device = torch.device("cuda")
    print('There are %d GPU(s) available.' % torch.cuda.device_count())
    print('GPU is:', torch.cuda.get_device_name(0))
else:
    print('No GPU available, using the CPU instead.')
    device = torch.device("cpu")

In [30]:
class AgencyIG:
    def __init__(self):
        """Initialize the model, tokenizer, and SpaCy NLP pipeline."""
        self.model_name = "EnchantedStardust/bertagent-best"
        self.revision = "5bae55efbd95dd51759d275410cea36c81109227"

        # Initialize tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name, do_lower_case=True, revision=self.revision
        )

        # Initialize model
        self.model = AutoModelForSequenceClassification.from_pretrained(
            self.model_name, num_labels=1, revision=self.revision
        ).to(device)

        # Initialize SpaCy
        self.nlp = spacy.load("en_core_web_sm")

    def extract_ig(self, sentences):
        """Extract integrated gradients for a list of sentences."""
        # Clean sentences
        cleaned_sentences = self._clean_sentences(sentences)

        # Tokenize sentences
        tokenized = self.tokenizer(
            cleaned_sentences,
            add_special_tokens=True,
            padding="max_length",
            truncation=True,
            max_length=128,
            return_attention_mask=True,
            return_tensors='pt',
            return_token_type_ids=False,
            return_offsets_mapping=True,
        )

        # Create DataLoader
        dataset = TensorDataset(
            tokenized['input_ids'], tokenized['attention_mask'], tokenized['offset_mapping']
        )
        dataloader = DataLoader(dataset, batch_size=1)

        self.model.eval()
        results = []  # Store results for all sentences

        for batch_index, batch in enumerate(tqdm(dataloader, desc="Evaluating")):
            input_ids = batch[0].to(device)
            attention_mask = batch[1].to(device)
            offset_mapping = batch[2]
            current_text = cleaned_sentences[batch_index]

            # Compute embeddings and baseline embeddings
            embeddings = self.model.roberta.embeddings.word_embeddings(input_ids).detach()
            baseline_embeddings = self._baseline_zero_embed(input_ids, embeddings)

            # Define forward pass for integrated gradients
            def model_forward(inputs):
                outputs = self.model.roberta(inputs_embeds=inputs, attention_mask=attention_mask)
                logits = self.model.classifier(outputs.last_hidden_state)
                return logits

            # Compute attributions using integrated gradients
            embeddings.requires_grad = True
            ig = IntegratedGradients(model_forward)
            attributions = ig.attribute(
                inputs=embeddings,
                baselines=baseline_embeddings,
                internal_batch_size=128,
                target=0,
                n_steps=300,
                return_convergence_delta=False,
            )
            attributions = attributions.sum(dim=2).squeeze(0).detach().cpu().numpy()
            
            #Clear CUDA cache
            torch.cuda.empty_cache()
            
            # Map RoBERTa tokens to SpaCy tokens and compute scores
            token_results = self._spacy_map(
                current_text, input_ids[0].cpu(), offset_mapping[0], attributions
            )

            # Compute agency and baseline scores
            agency_score = self.model(
                inputs_embeds=embeddings, attention_mask=attention_mask
            )['logits'].detach().cpu().item()
            baseline_score = self.model(
                inputs_embeds=baseline_embeddings, attention_mask=attention_mask
            )['logits'].detach().cpu().item()

            token_results['agency'] = agency_score
            token_results['baseline'] = baseline_score
            results.append(token_results)

        # Construct DataFrame
        df = pd.DataFrame(
            data={
                'text': sentences,
                'agency': [res['agency'] for res in results],
                'baseline': [res['baseline'] for res in results],
                'token': [res['tokens'] for res in results],
                'attribution': [res['attributions'] for res in results],
            }
        )

        self.df = df
        return df

    def _clean_sentences(self, sentences):
        """Clean and normalize input sentences."""
        return [ftfy.fix_text(re.sub(r"\s\s+", " ", sent).strip()) for sent in sentences]

    def _baseline_zero_embed(self, input_ids, embeddings):
        """Create baseline embeddings with zero values except for special tokens."""
        baseline = torch.zeros_like(embeddings).to(device)
        for i, ids in enumerate(input_ids):
            special_token_positions = [j for j, token_id in enumerate(ids) if token_id in [0, 2]]
            for pos in special_token_positions:
                baseline[i, pos, :] = embeddings[i, pos, :]
        return baseline

    def _spacy_map(self, text, token_ids, offsets, attributions):
        """Map RoBERTa tokens and attributions to SpaCy tokens."""
        roberta_tokens = self.tokenizer.convert_ids_to_tokens(token_ids.tolist())
        filtered_tokens = [
            (token.replace('Ġ', ''), attribution, offset)
            for token, attribution, offset in zip(roberta_tokens, attributions, offsets)
            if token not in ['<pad>', '<s>', '</s>']
        ]
        filtered_tokens, filtered_attributions, filtered_offsets = zip(*filtered_tokens)

        doc = self.nlp(text)
        spacy_tokens = [token.text for token in doc]
        spacy_offsets = [(token.idx, token.idx + len(token.text)) for token in doc]

        spacy_attributions = np.zeros(len(spacy_tokens))
        spacy_index, roberta_index = 0, 0
        while spacy_index < len(spacy_offsets) and roberta_index < len(filtered_offsets):
            spacy_start, spacy_end = spacy_offsets[spacy_index]
            roberta_start, roberta_end = filtered_offsets[roberta_index]
            if spacy_start <= roberta_start < spacy_end:
                spacy_attributions[spacy_index] += filtered_attributions[roberta_index]
                if roberta_end <= spacy_end:
                    roberta_index += 1
                else:
                    spacy_index += 1
            elif roberta_start < spacy_start:
                roberta_index += 1
            else:
                spacy_index += 1

        return {
            'tokens': spacy_tokens,
            'attributions': spacy_attributions,
            'f_tokens': np.array(filtered_tokens),
            'f_attributions': np.array(filtered_attributions),
        }

    def render_ig(self, sentence_index, latex=False):
        """Render integrated gradients for a specific sentence in HTML or LaTeX."""
        row = self.df.iloc[sentence_index]
        text, agency, tokens, attributions = (
            row['text'], row['agency'], row['token'], row['attribution']
        )
        print(f"index: {sentence_index}\n")
        if latex:
            display(self._display_tex(tokens, self._map_ag(attributions, agency, text)))
        else:
            display(self._display_html(tokens, self._map_ag(attributions, agency, text)))

    def _map_ag(self, attributions, agency, text, show=False):
        """Map and normalize attributions with agency scores."""
        adjusted_attributions = attributions + (agency - attributions.sum()) / len(attributions)
        adjusted_attributions = self._group_negations(text, adjusted_attributions, show=show)
        if agency > 0:
            adjusted_attributions = np.clip(adjusted_attributions, 0, 1)
            return adjusted_attributions / adjusted_attributions.max() * agency
        else:
            adjusted_attributions = np.clip(adjusted_attributions, -1, 0)
            return adjusted_attributions / adjusted_attributions.min() * agency

    def _group_negations(self, text, attributions, show=False):
        """Group negations to adjust attributions."""
        doc = self.nlp(text)
        for token in doc:
            if token.dep_ == "neg":
                related_tokens = [token.i, token.head.i]
                for child in token.head.children:
                    if child.dep_ in ["acomp", "prt"]:
                        related_tokens.append(child.i)
                if token.head.dep_ == 'auxpass':
                    related_tokens.append(token.head.head.i)
                if show:
                    print(doc[related_tokens])
                attributions[related_tokens] = attributions[related_tokens].sum()
        return attributions

    def _hlstr(self, string, color='white'):
        """Highlight a string with a background color."""
        return f"<mark style=background-color:{color}>{string} </mark>"

    def _colorize(self, attrs, cmap='PiYG'):
        """Colorize attributions for visualization."""
        norm = mpl.colors.Normalize(vmin=-1, vmax=1)
        cmap = matplotlib.colormaps.get_cmap(cmap)
        return list(map(lambda x: mpl.colors.rgb2hex(cmap(norm(x))), attrs))

    def _display_html(self, tokens, attrs, scale_factor=1):
        """Display tokens with HTML highlighting."""
        return HTML(
            "".join(map(self._hlstr, tokens, self._colorize(scale_factor * attrs)))
        )

    def _texstr(self, string, color='white'):
        """Render a string with a LaTeX colorbox."""
        tex_str = f"\\colorbox[HTML]({color[1:]})({string})"
        return tex_str.replace('(', '{').replace(')', '}')

    def _display_tex(self, tokens, attrs, scale_factor=1):
        """Display tokens with LaTeX highlighting."""
        return " ".join(map(self._texstr, tokens, self._colorize(scale_factor * attrs)))


In [3]:
# define your sentences
sentences = ["I'm nothing but the least not lazy person.",
              "I'm not motivated.",
              "I'm in no way motivated.",
              "It is not true that I'm nothing but the least not lazy person.",
              "I'm one of the least lazy people you'll ever meet.",
              'I have never been unmotivated!',
              'These people are not lazy at all!']

In [None]:
# Instantiate an element for IG inspection
ai = AgencyIG()

In [None]:
# Extract IG attributions
ai.extract_ig(sentences)

In [None]:
# Show the result
for i in range(len(sentences)):
    ai.render_ig(i)

In [None]:
# Show the result in LateX format
for i in range(len(sentences)):
    ai.render_ig(i,latex=True)