# Connection

In [1]:
import urllib3
urllib3.disable_warnings()  # Suppress self-signed cert warnings

from requests.auth import HTTPBasicAuth
import requests
import pprint as pp

host = 'localhost'
port = 9200
user = 'admin'
password = 'MyStr0ng@Pass'
index_name = user

url = f"https://{host}:{port}"

try:
    res = requests.get(url, auth=HTTPBasicAuth(user, password), verify=False)
    pp.pprint(res.json())
except requests.exceptions.RequestException as e:
    print("Request failed:", e)


{'cluster_name': 'docker-cluster',
 'cluster_uuid': 'N4D1rJQtQC-KCHXVHEExZw',
 'name': '370b9e54a331',
 'tagline': 'The OpenSearch Project: https://opensearch.org/',
 'version': {'build_date': '2025-02-27T01:16:47.726162386Z',
             'build_hash': '2e4741fb45d1b150aaeeadf66d41445b23ff5982',
             'build_snapshot': False,
             'build_type': 'tar',
             'distribution': 'opensearch',
             'lucene_version': '9.12.1',
             'minimum_index_compatibility_version': '7.0.0',
             'minimum_wire_compatibility_version': '7.10.0',
             'number': '2.19.1'}}


# Load Captions

In [None]:
import pandas as pd
from scripts.parse_activitynet import load_captions

from pathlib import Path

# Get current working directory
repo_root = Path().resolve()

# Build the relative path to the JSON file
json_path = repo_root / "captions" / "train.json"

# Load all captions
data = load_captions(str(json_path))

df = pd.DataFrame(data)

# Step 1: Find videos that mention "car"
contains_car = df[df['caption'].str.contains(r'\bcar\b', case=False, na=False)]
car_video_ids = set(contains_car['video_id'])

# Step 2: Get videos with more than 8 captions
caption_counts = df['video_id'].value_counts()
videos_with_8plus = set(caption_counts[caption_counts > 8].index)

# Step 3: Intersection → videos that meet both criteria
valid_video_ids = list(car_video_ids & videos_with_8plus)

# Step 4: Pick just 10 videos from the intersection
five_video_ids = valid_video_ids[:10] 

# Step 5: Get all captions for those 10 videos
selected_df = df[df['video_id'].isin(five_video_ids)]

# Confirm selection
print(f"Selected {len(selected_df)} captions from 10 videos:")
print(selected_df.groupby("video_id").size())


# Open Index

In [None]:
import pprint as pp
from opensearchpy import OpenSearch
from opensearchpy import helpers

# Create the client with SSL/TLS enabled, but hostname verification disabled.
client = OpenSearch(
    hosts = [{'host': host, 'port': port}],
    http_compress = True, # enables gzip compression for request bodies
    http_auth = (user, password),
    use_ssl = True,
    verify_certs = False,
    ssl_assert_hostname = False,
    ssl_show_warn = False
)

if client.indices.exists(index_name):

    resp = client.indices.open(index = index_name)
    print(resp)

    print('\n----------------------------------------------------------------------------------- INDEX SETTINGS')
    settings = client.indices.get_settings(index = index_name)
    pp.pprint(settings)

    print('\n----------------------------------------------------------------------------------- INDEX MAPPINGS')
    mappings = client.indices.get_mapping(index = index_name)
    pp.pprint(mappings)

    print('\n----------------------------------------------------------------------------------- INDEX #DOCs')
    print(client.count(index = index_name))
else:
    print("Index does not exist.")


## Close Index

In [None]:
resp = client.indices.close(index = index_name)
print(resp)

# Index creation

In [None]:
index_body = {
    "settings": {
        "index": {
            "number_of_replicas": 0,
            "number_of_shards": 1,
            "refresh_interval": "1s",
            "knn": "true"
        }
    },
    "mappings": {
        "dynamic": "strict",
        "properties": {
            "video_id": {"type": "keyword"},
            "start": {"type": "float"},
            "end": {"type": "float"},
            "duration": {"type": "float"},
            "caption": {"type": "text", "analyzer": "standard"},
            "video_url": {"type": "keyword"},
        }
    }
}


if client.indices.exists(index=index_name):
    print("Index already existed. Nothing to be done.")
else:        
    response = client.indices.create(index_name, body=index_body)
    print('\nCreating index:')
    print(response)


## Check the index

In [None]:
print('\n----------------------------------------------------------------------------------- INDEX SETTINGS')
index_settings = {
    "settings":{
      "index":{
         "refresh_interval" : "1s"
      }
   }
}
pp.pprint(client.indices.get_alias(index_name))

client.indices.put_settings(index = index_name, body = index_settings)
settings = client.indices.get_settings(index = index_name)
pp.pprint(settings)

print('\n----------------------------------------------------------------------------------- INDEX MAPPINGS')
mappings = client.indices.get_mapping(index = index_name)
pp.pprint(mappings)

print('\n----------------------------------------------------------------------------------- INDEX #DOCs')
print(client.count(index = index_name))

## Index deletion

In [None]:
This line is here to prevent you from inadvertently deleting data.

if client.indices.exists(index=index_name):
    # Delete the index.
    response = client.indices.delete(
        index = index_name
    )
    print('\nDeleting index:')
    print(response)

# Built-in document tokenizers and analyzers

In [None]:
anls = {
  "analyzer": "whitespace",
  "text": "The car speeds down the highway"
}
client.indices.analyze(body=anls, index=index_name)

In [None]:
anls = {
  "analyzer": "standard",
  "text": "The car speeds down the highway"
}
client.indices.analyze(body=anls, index=index_name)


In [None]:
for idx, row in selected_df.iterrows():
    moment = {
        'video_id': row['video_id'],
        'caption': row['caption'],
        'start': row['start'],
        'end': row['end'],
        'duration': row['duration'],
        'video_url': row['video_url'],
    }
    
    response = client.index(index=index_name, id=f"{row['video_id']}_{row['start']}", body=moment)
    print(response['result'])


## Deleting a single document

In [None]:
This line is here to prevent you from inadvertently deleting data.

response = client.delete(
    index = index_name,
    id = id
)

print('\nDeleting document:')
print(response)

# Text-based search

In [None]:
qtxt = "fast car"

query_bm25 = {
  'size': 5,
  '_source': ['caption', 'video_id', 'start', 'end', 'video_url'],
  'query': {
    'multi_match': {
      'query': qtxt,
      'fields': ['caption']
    }
  }
}

response = client.search(
    body=query_bm25,
    index=index_name
)

print('\nSearch results:')
for hit in response['hits']['hits']:
    source = hit['_source']
    print(f"- {source['caption']} (video: {source['video_id']}, time: {source['start']}s → {source['end']}s)")


## Term-based search

In [None]:
query_bm25 = {
  'size':10,
  '_source': ['caption'],
  'query': {
        "term": {
            "video_id" : 'v_06ofnvq2Hjs'
        }
   }
}

response = client.search(
    body = query_bm25,
    index = index_name
)

print('\nSearch results:')
for hit in response['hits']['hits']:
    source = hit['_source']
    print(f"- {source['caption']}")


## Bool-based search

In [None]:
query_bm25 = {
  'size': 10,
  '_source': ['caption'],
  'query': {
    'bool': {
      'must': [
        { 'term': { 'video_id': 'v_06ofnvq2Hjs' } }
      ],
      'should': [
        { 'match': { 'caption': 'car' } }
      ]
    }
  }
}

response = client.search(
    body=query_bm25,
    index=index_name
)

print('\nSearch results:')
for hit in response['hits']['hits']:
    source = hit['_source']
    print(f"- {source['caption']}")


### 2.3 Embeddings Neighborhood

## Dense Vectors

In [None]:

index_body = {
    "settings": {
        "index": {
            "number_of_replicas": 0,
            "number_of_shards": 4,
            "refresh_interval": "-1",
            "knn": "true"
        }
    },
    "mappings": {
        "dynamic": "strict",
        "properties": {
            "video_id": {"type": "keyword"},
            "start": {"type": "float"},
            "end": {"type": "float"},
            "duration": {"type": "float"},
            "caption": {"type": "text", "analyzer": "standard"},
            "video_url": {"type": "keyword"},
            "caption_vec": {
                "type": "knn_vector",
                "dimension": 768,
                "method": {
                    "name": "hnsw",
                    "engine": "nmslib",
                    "space_type": "cosinesimil"
                }
            }
        }
    }
}

if client.indices.exists(index=index_name):
    print(f"Index '{index_name}' already exists. Deleting it to apply new mappings and settings...")
    client.indices.delete(index=index_name)

# Create the index with fresh mappings and settings
response = client.indices.create(index=index_name, body=index_body)
print(f"\nIndex '{index_name}' created successfully.")
print(response)


## Dual-Encoders

In [None]:
from transformers import AutoTokenizer, AutoModel
import torch
import torch.nn.functional as F

#Mean Pooling - Take average of all tokens
def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output.last_hidden_state #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)


#Encode text
def encode(texts):
    # Tokenize sentences
    encoded_input = tokenizer(texts, padding=True, truncation=True, return_tensors='pt')

    # Compute token embeddings
    with torch.no_grad():
        model_output = model(**encoded_input, return_dict=True)

    # Perform pooling
    embeddings = mean_pooling(model_output, encoded_input['attention_mask'])

    # Normalize embeddings
    embeddings = F.normalize(embeddings, p=2, dim=1)
    
    return embeddings


# Load model from HuggingFace Hub
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/msmarco-distilbert-base-v2")
model = AutoModel.from_pretrained("sentence-transformers/msmarco-distilbert-base-v2")

# Sentences we want sentence embeddings for
doc_emb = encode(selected_df['caption'].tolist())


## Indexing document embedding vectors

In [None]:
for i, (idx, row) in enumerate(selected_df.iterrows()):
    moment = {
        'video_id': row['video_id'],
        'caption': row['caption'],
        'start': row['start'],
        'end': row['end'],
        'duration': row['duration'],
        'video_url': row['video_url'],
        'caption_vec': doc_emb[i].numpy()
    }

    
    response = client.index(index=index_name, id=f"{row['video_id']}_{row['start']}", body=moment)
    print(response['result'])

## Embedding spaces search

In [None]:
# Compute the query embedding
query = "fast car"
query_emb = encode(query)

query_denc = {
  'size': 5,
  '_source': ['caption'],
   "query": {
        "knn": {
          "caption_vec": {
            "vector": query_emb[0].numpy(),
            "k": 2
          }
        }
      }
}

response = client.search(
    body = query_denc,
    index = index_name
)

print('\nSearch results:')
pp.pprint(response)

## Search with boolean filters

In [None]:
query = "How many moments have the captions about cars?"
query_emb = encode(query)

query_bm25 = {
  'size': 5,
  '_source': ['caption'],
  'query': {
    'bool': {
      'must': [
        {
          "knn": {
            "caption_vec": {
              "vector": query_emb[0].tolist(),
              "k": 5
            }
          }
        }
      ],
      'should': [
        { 'match': { 'caption': 'car' } }
      ]
    }
  }
}

response = client.search(
    body = query_bm25,
    index = index_name
)

print('\nSearch results:')
pp.pprint(response)

# 2.5 Contextual Embeddings and Self-Attention

## Imports and definitions

In [None]:
import numpy as np
import pprint
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
import torch
import transformers
from transformers import AutoTokenizer, AutoConfig, AutoModelForSequenceClassification
from bertviz import model_view, head_view

# Get the interactive Tools for Matplotlib
#%matplotlib notebook
#%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')

In [None]:
def call_html():
  import IPython
  display(IPython.core.display.HTML('''
        <script src="/static/components/requirejs/require.js"></script>
        <script>
          requirejs.config({
            paths: {
              base: '/static/base',
              "d3": "https://cdnjs.cloudflare.com/ajax/libs/d3/5.7.0/d3.min",
              jquery: '//ajax.googleapis.com/ajax/libs/jquery/2.0.0/jquery.min',
            },
          });
        </script>
        '''))

## Bidirectional Encoder Representations Transformers

In [None]:
model_path = 'cross-encoder/ms-marco-MiniLM-L-12-v2'
model_path = 'nboost/pt-bert-base-uncased-msmarco'
CLS_token = "[CLS]"
SEP_token = "[SEP]"

In [None]:
transformers.logging.set_verbosity_warning()

tokenizer = AutoTokenizer.from_pretrained(model_path)
config = AutoConfig.from_pretrained(model_path,  output_hidden_states=True, output_attentions=True)  
model = AutoModelForSequenceClassification.from_pretrained(model_path, config=config)

In [None]:
model

## Next Sentence Prediction 

In [None]:
query = "How many moments involve a car?"
captions = selected_df['caption'].astype(str).tolist()[:5]

sentence_a = [query] * len(captions)
sentence_b = captions 
#inputs = tokenizer.encode_plus(sentence_a, sentence_b, return_tensors='pt', add_special_tokens=True, max_length = 512, padding=True, truncation = True)
inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt', add_special_tokens=True, max_length = 512, padding=True, truncation = True)


pprint.pprint(inputs)

In [None]:
print(tokenizer.decode(inputs["input_ids"][0].tolist()))
print(tokenizer.decode(inputs["input_ids"][1].tolist()))
print(tokenizer.decode(inputs["input_ids"][2].tolist()))
print(tokenizer.decode(inputs["input_ids"][3].tolist()))
print(tokenizer.decode(inputs["input_ids"][4].tolist()))

In [None]:
input_ids = inputs['input_ids']
input_id_list = input_ids[1].tolist() # Batch index 1
pprint.pprint(input_id_list)

In [None]:
input_tokens_list = tokenizer.convert_ids_to_tokens(input_id_list)
pprint.pprint(input_tokens_list)

In [None]:
#inputs = tokenizer(sentence_a, sentence_b, return_offsets_mapping = True, return_tensors='pt', add_special_tokens=True, max_length = 512, padding=True, truncation = True)
inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt', add_special_tokens=True, max_length = 512, padding=True, truncation = True)

In [None]:
inputs

In [None]:
res = "\n".join("{} \t {}".format(x, y) for x, y in zip(input_id_list, input_tokens_list))
print(res)

## Model Inference

In [None]:
with torch.no_grad():
    outputs = model(**inputs)

In [None]:
outputs.keys()

## Hidden layer embeddings

In [None]:
# total number of layers embeddings
len(outputs['hidden_states'])

In [None]:
# The format is as follow:
# outputs['hidden_states'][layer_m][0][token_n]
layer_m = 12
token_n = 1
# Get all the embeddings of one layer:
output_embeddings = outputs['hidden_states'][layer_m][0]
output_embeddings.shape

In [None]:
token_throat = 2
token_cancer = 3

# Get the embedding of one particular token in one particular layer
throat_output_embedding = outputs['hidden_states'][layer_m][0][token_throat]
throat_output_embedding.shape

In [None]:
output_embeddings.shape

In [None]:
def display_scatterplot(data, words):

    if data.shape[1] == 2:
        twodim = data
    else:
        pca = PCA()
        pca.fit(output_embeddings.detach().numpy())
        twodim = pca.transform(data)[:,:2]
    
    plt.style.use('default') # https://matplotlib.org/3.5.1/gallery/style_sheets/style_sheets_reference.html
    plt.figure(figsize=(6,6))
    plt.scatter(twodim[:,0], twodim[:,1], edgecolors='k', c='r')
    for word, (x,y) in zip(words, twodim):
        plt.text(x+0.05, y+0.05, word)

    return

display_scatterplot(output_embeddings.detach().numpy(), input_tokens_list)

In [None]:
import re

def get_word_idx(sent: str, word: str):
    tmp_lst = re.split(r' |\?|\.',sentence_a)
    return tmp_lst.index(word)

def get_word_vector(inputs, outputs, idx, layer):
    """Get a word vector by averaging the embeddings of 
       all word occurrences of that word in the input"""

    # get all token idxs that belong to the word of interest
    token_ids_word = np.where(np.array(inputs.word_ids()) == idx)
    print(inputs.word_ids())
    word_tokens_output = outputs.hidden_states[layer][0][token_ids_word]
    print(token_ids_word)
    return word_tokens_output.mean(dim=0)

# The code below converts the tokens into a space delimited string.
# This will allow computing in which position of the BERT input sequence a given word is.
sentence_a = tokenizer.decode(inputs["input_ids"][0].tolist()).replace("[CLS] ", '').replace(" [SEP]", '')
word = "car"
idx = get_word_idx(sentence_a, word)
print(idx)
print("Input sequence:", sentence_a)
print("The word \"", word, "\" occurs in position", idx, "of the BERT input sequence.")

word_embedding = get_word_vector(inputs, outputs, idx, 4)

# Attention and Embeddings Visualization

## Imports and definitions

In [None]:
import torch

from transformers import AutoTokenizer, AutoModel, AutoConfig
from transformers import logging
logging.set_verbosity_warning()

import numpy as np
import pprint

from bertviz import model_view, head_view

# Get the interactive Tools for Matplotlib
%matplotlib notebook
%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')

# 
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

In [None]:
def call_html():
  import IPython
  display(IPython.core.display.HTML('''
        <script src="/static/components/requirejs/require.js"></script>
        <script>
          requirejs.config({
            paths: {
              base: '/static/base',
              "d3": "https://cdnjs.cloudflare.com/ajax/libs/d3/5.7.0/d3.min",
              jquery: '//ajax.googleapis.com/ajax/libs/jquery/2.0.0/jquery.min',
            },
          });
        </script>
        '''))
    
def display_scatterplot(model, words):

    if model.shape[1] == 2:
        twodim = model
    else:
        twodim = PCA().fit_transform(model)[:,:2]
    
    plt.style.use('ggplot')
    plt.figure(figsize=(6,6))
    plt.scatter(twodim[:,0], twodim[:,1], edgecolors='k', c='r')
    for word, (x,y) in zip(words, twodim):
        plt.text(x+0.05, y+0.05, word)

# Model Explainability with BERT

In [None]:
model_path = 'deepset/roberta-base-squad2'

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_path)
config = AutoConfig.from_pretrained(model_path,  output_hidden_states=True, output_attentions=True)  
model = AutoModel.from_pretrained(model_path, config=config)

In [None]:
query = "How many moments involve a car?"
captions = selected_df['caption'].astype(str).tolist()[:1]

sentence_a = [query] * len(captions)
sentence_b = captions 
inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt', add_special_tokens=True, max_length = 512, truncation = True, padding=True)

input_ids = inputs['input_ids']
input_id_list = input_ids[0].tolist() # Batch index 0
tokens = tokenizer.convert_ids_to_tokens(input_id_list)

In [None]:
with torch.no_grad():
    outputs = model(**inputs, output_attentions=True)

attention = outputs.attentions
hidden_states = outputs.hidden_states

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

rows = 3
cols = 4
fig, ax_full = plt.subplots(rows, cols)
fig.set_figheight(rows*4)
fig.set_figwidth(cols*4+3)
plt.rcParams.update({'font.size': 6})


layer = 0
for r in range(rows):
    for c in range(cols):
       
        ax = ax_full[r,c]
        
        plt.rcParams.update({'font.size': 10})
        current_hidden_state = hidden_states[layer][0].detach().numpy()
        
        if current_hidden_state.shape[1] == 2:
            twodim = current_hidden_state
        else:
            twodim = PCA().fit_transform(current_hidden_state)[:,:2]

        plt.style.use('default') # https://matplotlib.org/3.5.1/gallery/style_sheets/style_sheets_reference.html
        im = ax.scatter(twodim[:,0], twodim[:,1], edgecolors='k', c='r')
        for word, (x,y) in zip(tokens, twodim):
            ax.text(x+0.05, y+0.05, word[1:])
        
        # Show all ticks and label them with the respective list entries
        ax.set_title("Layer " + str(layer))
            
        # Loop over data dimensions and create text annotations.
        layer = layer + 1

fig.suptitle("Visualization of all output embeddings from all layers")
plt.show()

## Token specific visualzation of self-attention

In [None]:
# Then visualize
call_html()
head_view(attention, tokens)

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

layer = 1

rows = 3
cols = 4
fig, ax_full = plt.subplots(rows, cols)
fig.set_figheight(rows*6)
fig.set_figwidth(cols*6+4)
plt.rcParams.update({'font.size': 10})

j = 0
for r in range(rows):
    for c in range(cols):
       
        ax = ax_full[r,c]
        
        sattention = attention[layer][0][j].numpy()
        sattention = np.flip(sattention, 0)
        
        plt.rcParams.update({'font.size': 10})

        im = ax.pcolormesh(sattention, cmap='gnuplot')

        # Show all ticks and label them with the respective list entries
        ax.set_title("Head " + str(j))
        ax.set_yticks(np.arange(len(tokens)))
        if c == 0:
            ax.set_yticklabels(reversed(tokens))
            ax.set_ylabel("Queries")
        else:
            ax.set_yticks([])

        ax.set_xticks(np.arange(len(tokens)))
        if r == rows-1:
            ax.set_xticklabels(tokens)
            ax.set_xlabel("Keys")
            
            # Rotate the tick labels and set their alignment.
            plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
                     rotation_mode="anchor")
        else:
            ax.set_xticks([])

            
        # Loop over data dimensions and create text annotations.
        j = j + 1

fig.suptitle("Layer" + str(layer) + " Multi-head Self-attentions")
cbar = fig.colorbar(im, ax=ax_full, location='right', shrink=0.5)
cbar.ax.set_ylabel("Selt-attention", rotation=-90, va="bottom")
plt.show()

In [None]:
model_view(attention, tokens)

## Positional Embeddings

In [None]:
model_path = 'deepset/roberta-base-squad2'

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_path)
config = AutoConfig.from_pretrained(model_path,  output_hidden_states=True, output_attentions=True)  
model = AutoModel.from_pretrained(model_path, config=config)

In [None]:
word = "car"
sentence = " ".join([word] * 20)

captions = selected_df['caption'].astype(str).tolist()[:1]

sentence_a = [sentence] * len(captions)
sentence_b = captions 
inputs = tokenizer(sentence_a, sentence_b, return_tensors='pt', add_special_tokens=True, max_length = 512, truncation = True, padding=True)

input_ids = inputs['input_ids']
input_id_list = input_ids[0].tolist() # Batch index 0
tokens = tokenizer.convert_ids_to_tokens(input_id_list)

In [None]:
with torch.no_grad():
    outputs = model(**inputs, output_attentions=True)

attention = outputs.attentions
hidden_states = outputs.hidden_states

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

rows = 3
cols = 4
fig, ax_full = plt.subplots(rows, cols)
fig.set_figheight(rows*4)
fig.set_figwidth(cols*4+3)
plt.rcParams.update({'font.size': 6})

layer = 0
for r in range(rows):
    for c in range(cols):
       
        ax = ax_full[r,c]
        
        plt.rcParams.update({'font.size': 10})
        current_hidden_state = hidden_states[layer][0].detach().numpy()
        
        if current_hidden_state.shape[1] == 2:
            twodim = current_hidden_state
        else:
            twodim = PCA().fit_transform(current_hidden_state)[:,:2]

        plt.style.use('default') # https://matplotlib.org/3.5.1/gallery/style_sheets/style_sheets_reference.html
        im = ax.scatter(twodim[:,0], twodim[:,1], edgecolors='k', c='r')
        for word, (x,y) in zip(tokens, twodim):
            ax.text(x+0.05, y+0.05, word[1:])
        
        # Show all ticks and label them with the respective list entries
        ax.set_title("Layer " + str(layer))
            
        # Loop over data dimensions and create text annotations.
        layer = layer + 1

fig.suptitle("Visualization of all output embeddings from all layers")
plt.show()

## Dual Encoders

In [None]:
model_path = "bert-base-uncased"

In [None]:
tokenizer = AutoTokenizer.from_pretrained(model_path)
config = AutoConfig.from_pretrained(model_path,  output_hidden_states=True, output_attentions=True)  
model = AutoModel.from_pretrained(model_path, config=config)

In [None]:
captions = selected_df['caption'].astype(str).tolist()[:1]

inputs = tokenizer(captions, return_tensors='pt', truncation=True, padding=True)

input_ids = inputs['input_ids']
input_id_list = input_ids[0].tolist() # Batch index 0
tokens = tokenizer.convert_ids_to_tokens(input_id_list)

with torch.no_grad():
    outputs = model(**inputs, output_attentions=True)

attention = outputs.attentions
hidden_states = outputs.hidden_states

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

rows = 3
cols = 4
fig, ax_full = plt.subplots(rows, cols)
fig.set_figheight(rows*4)
fig.set_figwidth(cols*4+3)
plt.rcParams.update({'font.size': 6})

layer = 0
for r in range(rows):
    for c in range(cols):
       
        ax = ax_full[r,c]
        
        plt.rcParams.update({'font.size': 10})
        current_hidden_state = hidden_states[layer][0].detach().numpy()
        
        if current_hidden_state.shape[1] == 2:
            twodim = current_hidden_state
        else:
            twodim = PCA().fit_transform(current_hidden_state)[:,:2]

        plt.style.use('default') # https://matplotlib.org/3.5.1/gallery/style_sheets/style_sheets_reference.html
        im = ax.scatter(twodim[:,0], twodim[:,1], edgecolors='k', c='r')
        for word, (x,y) in zip(tokens, twodim):
            ax.text(x+0.05, y+0.05, word)
        
        # Show all ticks and label them with the respective list entries
        ax.set_title("Layer " + str(layer))
            
        # Loop over data dimensions and create text annotations.
        layer = layer + 1

fig.suptitle("Visualization of all output embeddings from all layers")
plt.show()

In [None]:
# Then visualize
call_html()
head_view(attention, tokens)

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

layer = 1

rows = 3
cols = 4
fig, ax_full = plt.subplots(rows, cols)
fig.set_figheight(rows*6)
fig.set_figwidth(cols*6+4)
plt.rcParams.update({'font.size': 10})

j = 0
for r in range(rows):
    for c in range(cols):
       
        ax = ax_full[r,c]
        
        sattention = attention[layer][0][j].numpy()
        sattention = np.flip(sattention, 0)
        
        plt.rcParams.update({'font.size': 10})

        im = ax.pcolormesh(sattention, cmap='gnuplot')

        # Show all ticks and label them with the respective list entries
        ax.set_title("Head " + str(j))
        ax.set_yticks(np.arange(len(tokens)))
        if c == 0:
            ax.set_yticklabels(reversed(tokens))
            ax.set_ylabel("Queries")
        else:
            ax.set_yticks([])

        ax.set_xticks(np.arange(len(tokens)))
        if r == rows-1:
            ax.set_xticklabels(tokens)
            ax.set_xlabel("Keys")
            
            # Rotate the tick labels and set their alignment.
            plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
                     rotation_mode="anchor")
        else:
            ax.set_xticks([])

            
        # Loop over data dimensions and create text annotations.
        j = j + 1

fig.suptitle("Layer" + str(layer) + " Multi-head Self-attentions")
cbar = fig.colorbar(im, ax=ax_full, location='right', shrink=0.5)
cbar.ax.set_ylabel("Selt-attention", rotation=-90, va="bottom")
plt.show()

In [None]:
model_view(attention, tokens)

In [None]:
query = "How many moments involve a car?"
inputs = tokenizer(query, return_tensors='pt', truncation=True, padding=True)

with torch.no_grad():
    outputs = model(**inputs)

attention = outputs.attentions
tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])

head_view(attention, tokens)

In [None]:
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

layer = 1

rows = 3
cols = 4
fig, ax_full = plt.subplots(rows, cols)
fig.set_figheight(rows*6)
fig.set_figwidth(cols*6+4)
plt.rcParams.update({'font.size': 10})

j = 0
for r in range(rows):
    for c in range(cols):
       
        ax = ax_full[r,c]
        
        sattention = attention[layer][0][j].numpy()
        sattention = np.flip(sattention, 0)
        
        plt.rcParams.update({'font.size': 10})

        im = ax.pcolormesh(sattention, cmap='gnuplot')

        # Show all ticks and label them with the respective list entries
        ax.set_title("Head " + str(j))
        ax.set_yticks(np.arange(len(tokens)))
        if c == 0:
            ax.set_yticklabels(reversed(tokens))
            ax.set_ylabel("Queries")
        else:
            ax.set_yticks([])

        ax.set_xticks(np.arange(len(tokens)))
        if r == rows-1:
            ax.set_xticklabels(tokens)
            ax.set_xlabel("Keys")
            
            # Rotate the tick labels and set their alignment.
            plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
                     rotation_mode="anchor")
        else:
            ax.set_xticks([])

            
        # Loop over data dimensions and create text annotations.
        j = j + 1

fig.suptitle("Layer" + str(layer) + " Multi-head Self-attentions")
cbar = fig.colorbar(im, ax=ax_full, location='right', shrink=0.5)
cbar.ax.set_ylabel("Selt-attention", rotation=-90, va="bottom")
plt.show()

In [None]:
model_view(attention, tokens)