# Evaluating Embeddings of Pragmatic Markers

### Load Model, Tokenizer

In [86]:
from transformers import BertModel, BertTokenizer

model_name = "bert-base-uncased"

model = BertModel.from_pretrained(model_name)
tokenizer = BertTokenizer.from_pretrained(model_name)
MAX_TOKENS = 512

In [3]:
# sample_text = """WELL, one THING ABOUT -- WELL, I mean, William and Harry are OBVIOUSLY our main concern."""

# encoded_input = tokenizer(sample_text, return_tensors = 'pt')

# output = model(**encoded_input)

# print(output[0][:10])

tensor([[[ 0.2017,  0.1938, -0.0742,  ..., -0.0308,  0.4658,  0.6078],
         [ 0.6253, -0.1001,  0.3567,  ...,  0.8521,  0.7447,  0.1688],
         [-0.0330, -0.3104,  0.5029,  ..., -0.0079,  0.2011,  0.3840],
         ...,
         [ 0.0347,  0.3426, -0.2987,  ..., -0.2210, -0.2680, -0.7982],
         [ 0.8444,  0.1637, -0.2949,  ...,  0.1564, -0.2948, -0.1899],
         [ 0.7730,  0.3879,  0.1304,  ...,  0.4094, -0.3863, -0.2330]]],
       grad_fn=<SliceBackward0>)


### Flag All Pragmatic Markers to Evaluate

In [87]:
import json
from collections import Counter

# Load your JSON data
with open('data/filtered_utterances_ft_data.json', 'r') as file:
    data = json.load(file)

# Initialize a dictionary to hold the count of each value for each term
term_counts = {}

# Iterate through each item in the JSON data
for item in data:
    matched_terms = item['matched_terms']
    for term, value in matched_terms.items():
        if term not in term_counts:
            term_counts[term] = Counter()
        term_counts[term][value] += 1

# Output the results
for term, counts in term_counts.items():
    if len(counts)>1:
        print(f"Term: {term}")
        for value, count in counts.items():
            print(f"  {value}: {count}")

Term: know
  hedge: 2
  none: 2
Term: believe
  hedge: 1
  none: 1
Term: sure
  none: 1
  hedge: 1
Term: tend
  hedge: 1
  none: 2


### Get all "Hedges", all "Authority", and all words that are flagged as 2+

In [108]:
# Define a function to get all hedges:
import json
from collections import Counter

# Define the get_hedge function
def get_category(data, category):
    samples = []
    for item in data:
        matched_terms = item['matched_terms']
        hedge_terms = {term: value for term, value in matched_terms.items() if value == category}
        if hedge_terms:
            hedged_item = item.copy()  # Copy the item to avoid modifying the original
            hedged_item['matched_terms'] = hedge_terms
            samples.append(hedged_item)
    print(f"Category: {category} - {len(samples)} sample sentences found.")
    return samples

def get_dual_matches(data):
    # Initialize a dictionary to hold the count of each value for each term
    term_counts = {}

    # Iterate through each item in the JSON data to count the values
    for item in data:
        matched_terms = item['matched_terms']
        for term, value in matched_terms.items():
            if term not in term_counts:
                term_counts[term] = Counter()
            term_counts[term][value] += 1

    # Identify terms with 2+ distinct values
    dual_terms = {term for term, counts in term_counts.items() if len(counts) > 1}

    # Filter the original data to include only those terms with 2+ distinct values
    dual_match_items = []
    for item in data:
        matched_terms = item['matched_terms']
        dual_matched_terms = {term: value for term, value in matched_terms.items() if term in dual_terms}
        if dual_matched_terms:
            dual_item = item.copy()  # Copy the item to avoid modifying the original
            dual_item['matched_terms'] = dual_matched_terms
            dual_match_items.append(dual_item)
    print(f"Dual Category Word Count: {len(dual_match_items)}.")
    return dual_match_items

# Load your JSON data
with open('data/filtered_utterances_ft_data.json', 'r') as file:
    data = json.load(file)

# Get all items where value is "hedge"
hedge_items = get_category(data, "hedge")
# for item in hedge_items[:10]:
#     print(item['matched_terms'])
    
authority_items = get_category(data, "authority")
# for item in authority_items[:10]:
#     print(item['matched_terms'])
dual_items = get_dual_matches(data)

Category: hedge - 32 sample sentences found.
Category: authority - 6 sample sentences found.
Dual Category Word Count: 10.


In [52]:
# item = hedge_items[0]
# string = f'''"{item["previous_statement"]}"\n"{item["statement"]}'''
# # Remove '<' and '>' characters from the string
# string = string.replace("<", "")
# string = string.replace(">", "")
# print(string)
# matched_terms = item['matched_terms']
# encoded_input = tokenizer(string, return_tensors='pt')

# # Extract input ids and find the indices for "know"
# input_ids = encoded_input['input_ids'][0]
# tokens = tokenizer.convert_ids_to_tokens(input_ids)

# output = model(**encoded_input)

"Right. He can't read them all."
"Let's get away from the diplomatic wrangling here for a moment. Let's talk ABOUT the military front. Tommy Franks, head of the Central Command, yesterday at the Pentagon says the military is ready to go, is in position now, if GIVEN the order from the White House. To the Pentagon from yesterday, back again today, Barbara Starr for more on this -- Barbara, good morning to you. Turkey, we KNOW, not granting that permission to stage ABOUT 60,000 troops on its territory, which has the Pentagon scrambling right now. We are hearing that some movement, some aircraft carriers in the eastern Mediterranean COULD be on the move. What are we learning ABOUT this?
['[CLS]', '"', 'right', '.', 'he', 'can', "'", 't', 'read', 'them', 'all', '.', '"', '"', 'let', "'", 's', 'get', 'away', 'from', 'the', 'diplomatic', 'wr', '##ang', '##ling', 'here', 'for', 'a', 'moment', '.', 'let', "'", 's', 'talk', 'about', 'the', 'military', 'front', '.', 'tommy', 'franks', ',', 'head

In [113]:
def find_phrase_indices(tokens, phrase):
    """
    Find the indices of the phrase in the token list.
    """
    phrase_tokens = tokenizer.tokenize(phrase)
    phrase_length = len(phrase_tokens)
    
    for i in range(len(tokens) - phrase_length + 1):
        if tokens[i:i + phrase_length] == phrase_tokens:
            return list(range(i, i + phrase_length))
    return []

def get_phrase_vector(phrase, tokens, output):
    """
    Get the vector for a multi-word phrase by averaging the embeddings of each word and its subwords in the phrase.
    """
    words = phrase.split()
    token_indices = []
    
    for word in words:
        word_indices = []
        for i, token in enumerate(tokens):
            # Check for exact match or subword match
            if token == word or token.lstrip("##") == word:
                word_indices.append(i)
        token_indices.extend(word_indices)
    
    if not token_indices:
        return None
    
    # Extract embeddings for specific word indices and average them
    token_embeddings = output.last_hidden_state[0, token_indices, :]
    phrase_vector = token_embeddings.mean(dim=0).detach()
    return phrase_vector
    
    
def get_relevant_segment(tokens, target_indices, max_length=512):
    """
    Get the relevant segment of tokens centered around the target indices within the max_length.
    """
    start = max(0, min(target_indices) - (max_length // 2))
    end = start + max_length
    if end > len(tokens):
        end = len(tokens)
        start = max(0, end - max_length)
    return tokens[start:end], start, end

def get_vectors(item, category, tokenizer=tokenizer, model=model, vectors = []) -> list:
    string = f'''"{item["previous_statement"]}"\n"{item["statement"]}"'''
    string = string.replace("<", "").replace(">", "")
    
    # First tokenize without truncation to get the full token list
    full_encoded_input = tokenizer(string, return_tensors='pt')
    full_tokens = tokenizer.convert_ids_to_tokens(full_encoded_input['input_ids'][0])
    # if len(full_tokens)>MAX_TOKENS:
    #     print(f"{len(full_tokens)} tokens. string: {string}")
    
    matched_terms = item['matched_terms']
    
    for term, value in matched_terms.items():
        if value == category:
            # Find the indices of the target term or phrase in the full token list
            if " " in term:  # It's a phrase
                term_indices = find_phrase_indices(full_tokens, term)
            else:  # It's a single word or subword
                term_indices = [i for i, token in enumerate(full_tokens) if token == term or token.lstrip("##") == term]
            # if len(full_tokens)>MAX_TOKENS:
            #     print(term_indices)

            if term_indices:
                # Get the relevant segment of tokens
                relevant_tokens, start, end = get_relevant_segment(full_tokens, term_indices)
                # if len(full_tokens)>MAX_TOKENS:
                #     print("Relevant Token Length:",len(relevant_tokens), "start/end;", start, end)
                
                # Retokenize the relevant segment
                relevant_string = tokenizer.convert_tokens_to_string(relevant_tokens)
                encoded_input = tokenizer(relevant_string, return_tensors='pt', truncation=True, max_length=MAX_TOKENS)
                
                tokens = tokenizer.convert_ids_to_tokens(encoded_input['input_ids'][0])
                output = model(**encoded_input)
                
                if " " in term:  # It's a phrase
                    phrase_vector = get_phrase_vector(term, tokens, output)
                    if phrase_vector is not None:
                        vectors.append((phrase_vector, term, category))
                    else:
                        vectors.append((None, term, category))
                else:  # It's a single word or subword
                    word_indices = [i for i, token in enumerate(tokens) if token == term or token.lstrip("##") == term]
                    if word_indices:
                        word_embeddings = output.last_hidden_state[0, word_indices, :]
                        word_vector = word_embeddings.mean(dim=0).detach()
                        vectors.append((word_vector, term, category))
                    else:
                        vectors.append((None, term, category))
    return vectors

def get_category_vectors(data, category, vectors = []):
    for item in data:
        # print("Analyzing:", item['matched_terms'])
        vectors = get_vectors(item, category, vectors = vectors)
    # for vector in vectors:
    #     print(vector[1], vector[2], vector[0].shape)
    return vectors

hedge_vectors = get_category_vectors(hedge_items, "hedge")        
authority_vectors = get_category_vectors(authority_items, "authority")        
print("Hedge Vectors:", len(hedge_vectors))
print("Authority Vectors:", len(authority_vectors))

string: "It's important say over and over and over and over and over again on this day, we wish the president well and we wish him a speedy recovery but it is just a fact, it is just a fact that his behavior, the way he has conducted himself in recent days and weeks is directly contrary to the advice of his own experts."
"We're ALL praying for the president, praying for the First Lady and any of other folks that MAY end up testing POSITIVE as WELL but it is also true that the president in many ways with his behavior and his rhetoric opened the door to the White House to this virus and remember the White House is both a place of business, it's a workplace and it's also a residence, right? So if you THINK ABOUT those people who come to work EVERY day, they were left very vulnerable because it was a president who didn't really like the look of masks. He thought mask were essentially ugly and didn't want folks AROUND him wearing them and so that is the KIND OF environment that was created.

Hedge Vectors: 64
Authority Vectors: 64


### Get Vectors for Hedge Words

In [23]:
import json

# Load your JSON data
with open('data/filtered_utterances_ft_data.json', 'r') as file:
    data = json.load(file)

word = "know"
# Filter data to include entries where 'matched_terms' has 'know' as a key
filtered_data = [item for item in data if word in item['matched_terms']]
statements = []
# Optionally, print the filtered data to see the result
for item in filtered_data:
    # Concatenate strings properly
    string = item["previous_statement"] + " statement: " + item["statement"]
    category = item["matched_terms"]["know"]
    # Remove '<' and '>' characters from the string
    string = string.replace("<", "")
    string = string.replace(">", "")
    statements.append((string, category, word))
for item in statements:
    print(item)

("You know, Salena, as someone who covers the president's base so extensively and so well, answer this for us. Release the memo has become a battle cry for many in the president's base and for many in the right wing. Is there any way that the president does not, cannot release the memo at this point in time, despite the pleas of his own handpicked FBI chief not to do so? Is he stuck? statement: You KNOW, I don't -- I -- because he's such a different KIND OF president and he navigates the presidency in a way, you KNOW, that's not ALWAYS predictable, I mean I COULD see a scenario where, you KNOW, he says, OK, Wray, let's sit down and talk ABOUT this. You KNOW, what's he -- what are the things that you find that are dangerous or COULD put people in jeopardy? What SHOULD we redact? Then I'll release the memo. I mean there is the possibility that that happens. I don't THINK there's any political -- not any. There's ALWAYS some, right? But I don't THINK there's a large political problem if t

In [14]:
import numpy as np

def cosine_similarity(vec1, vec2):
    dot_product = np.dot(vec1, vec2)
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    return dot_product / (norm_vec1 * norm_vec2)

def identify_similar_parts(vectors, threshold=0.5):
    num_vectors = len(vectors)
    num_dimensions = len(vectors[0])
    
    # Initialize a list to hold the similarity for each dimension
    dimension_similarities = np.zeros(num_dimensions)
    
    # Compute the cosine similarity for each dimension across all pairs of vectors
    for i in range(num_vectors):
        for j in range(i + 1, num_vectors):
            vec1, vec2 = vectors[i], vectors[j]
            for k in range(num_dimensions):
                dim_vec1 = np.zeros(num_dimensions)
                dim_vec2 = np.zeros(num_dimensions)
                dim_vec1[k] = vec1[k]
                dim_vec2[k] = vec2[k]
                dimension_similarities[k] += cosine_similarity(dim_vec1, dim_vec2)
    
    # Average the similarity scores for each dimension
    dimension_similarities /= (num_vectors * (num_vectors - 1) / 2)
    
    # Identify dimensions with similarity above the threshold
    similar_dimensions = np.where(dimension_similarities > threshold)[0]
    
    return similar_dimensions, dimension_similarities

In [13]:
from transformers import BertModel, BertTokenizer
import torch
from scipy.spatial.distance import cosine

# Load the pretrained BERT base uncased model and tokenizer
model_name = "bert-base-uncased"
model = BertModel.from_pretrained(model_name)
tokenizer = BertTokenizer.from_pretrained(model_name)

vectors = []
for statement in statements:
    sample_text = statement[0]
    classification = statement[1]
    word = statement[2]
    # Encode the text
    encoded_input = tokenizer(sample_text, return_tensors='pt')

    # Extract input ids and find the indices for "know"
    input_ids = encoded_input['input_ids'][0]
    tokens = tokenizer.convert_ids_to_tokens(input_ids)
    token_indices = [i for i, token in enumerate(tokens) if token == 'know']

    # Get the model output
    output = model(**encoded_input)

    # Extract embeddings for "know"
    token_embeddings = output.last_hidden_state[0, token_indices, :] if token_indices else None
    if token_embeddings is not None:
        vectors.append((token_embeddings.mean(dim=0).detach(),classification, word))  # Average if multiple "know" tokens and detach
    else:
        vectors.append((None, None, None))  # No "know" found in this sample

# Compute cosine similarity between all pairs of vectors
if len(vectors) > 1:
    for i in range(len(vectors)):
        for j in range(i + 1, len(vectors)):
            if vectors[i] is not None and vectors[j] is not None:
                cat1 = vectors[i][1]
                cat2 = vectors[j][1]
                word = vectors[i][2]
                similarity = 1 - cosine(vectors[i][0].numpy(), vectors[j][0].numpy())
                print(f"{word} {i+1},{j+1}: {cat1}/{cat2} Cosine similarity: {similarity:.4f}")
            else:
                print(f"One or both of the texts {i+1} and {j+1} do not contain the token 'know'.")

know 1,2: hedge/hedge Cosine similarity: 0.8926
know 1,3: hedge/none Cosine similarity: 0.7740
know 1,4: hedge/none Cosine similarity: 0.3465
know 2,3: hedge/none Cosine similarity: 0.8332
know 2,4: hedge/none Cosine similarity: 0.4173
know 3,4: none/none Cosine similarity: 0.3577
