In [2]:
import torch
from transformers import AutoModel, AutoTokenizer
from transformers import TextClassificationPipeline
import pandas as pd
import numpy as np
import tqdm
from tqdm import tqdm
import sys, os
from transformers import Pipeline
from torch import Tensor 
import torch.nn as nn
from torch.utils.data import DataLoader
from datasets import load_dataset
import torch.nn as nn
import sentence_transformers

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

ModuleNotFoundError: No module named 'projection_helper'

### Load Data and Model

In [None]:
DATASET_REPO = "shreyahavaldar/multilingual_politeness"
MODEL_REPO = "shreyahavaldar/xlm-roberta-politeness"

def load_data():
    hf_dataset = load_dataset(DATASET_REPO)
    #convert to torch dataset
    hf_dataset.set_format(type='torch')
    return hf_dataset

def load_model():
    model = AutoModel.from_pretrained(MODEL_REPO)
    #convert to pytorch model
    torch_model = nn.Sequential(model, nn.Linear(model.config.hidden_size, 1))
    model.to(device)
    return torch_model

### Define Alignment Metric

In [23]:
## Helper Functions

def project_points_onto_axes(points, x_point1, x_point2, y_point1, y_point2):
    # Compute the unit vectors along the axes
    x_axis_vector = (x_point2 - x_point1) / 2
    y_axis_vector = (y_point2 - y_point1) / 2
    x_axis_vector = x_axis_vector / np.linalg.norm(x_axis_vector, ord=2)
    y_axis_vector = y_axis_vector / np.linalg.norm(y_axis_vector, ord=2)
    # Now length of the vector is 1
    cos = np.dot(x_axis_vector,y_axis_vector)
    # Project each point onto the x-axis and y-axis
    x_projection = []
    y_projection = []
    x_dist = []
    y_dist = []
    x_middle = (x_point1 + x_point2) / 2
    y_middle = (y_point1 + y_point2) / 2
    x1x = np.dot(x_point1 - x_middle , x_axis_vector) 
    x2x = np.dot(x_point2 - x_middle, x_axis_vector) 
    y1y = np.dot(y_point1 - y_middle, y_axis_vector) 
    y2y = np.dot(y_point2 - y_middle, y_axis_vector) 
    x1y = np.dot(x_point1 - y_middle, y_axis_vector) 
    x2y = np.dot(x_point2 - y_middle, y_axis_vector) 
    y1x = np.dot(y_point1 - x_middle, x_axis_vector) 
    y2x = np.dot(y_point2 - x_middle, x_axis_vector) 
    x1xtrue = x1x - x1y*cos
    x1ytrue = x1y - x1x*cos
    x2xtrue = x2x - x2y*cos
    x2ytrue = x2y - x2x*cos
    y1xtrue = y1x - y1y*cos
    y1ytrue = y1y - y1x*cos
    y2xtrue = y2x - y2y*cos
    y2ytrue = y2y - y2x*cos
    xorigin, yorigin = line_intersection((x1xtrue,x2xtrue,y1xtrue,y2xtrue), (x1ytrue,x2ytrue,y1ytrue,y2ytrue))
    x_negative_scale = np.abs(x1xtrue - xorigin)
    x_positive_scale = np.abs(x2xtrue - xorigin)
    y_negative_scale = np.abs(y1ytrue - yorigin)
    y_positive_scale = np.abs(y2ytrue - yorigin)
    for point in points:
        x_proj = np.dot(point - x_middle , x_axis_vector)  
        y_proj = np.dot(point - y_middle , y_axis_vector) 
        true_y = (y_proj - x_proj*cos) - yorigin
        true_x = (x_proj - y_proj*cos) - xorigin
        if true_x < 0:
            true_x = true_x / x_negative_scale
        else:
            true_x = true_x / x_positive_scale
        if true_y < 0:
            true_y = true_y / y_negative_scale
        else:
            true_y = true_y / y_positive_scale
        x_projection.append(true_x)
        y_projection.append(true_y)
        x_dist.append(np.linalg.norm(point - x_proj*x_axis_vector, ord=2))
        y_dist.append(np.linalg.norm(point - y_proj*y_axis_vector, ord=2))

    # Return the magnitudes of the projections as numpy arrays
    return np.array(x_projection), np.array(y_projection), np.array(x_dist), np.array(y_dist)

def line_intersection(x_pts, y_pts):  
    line1 = ((x_pts[0], y_pts[0]), (x_pts[1], y_pts[1]))
    line2 = ((x_pts[2], y_pts[2]), (x_pts[3], y_pts[3]))
    xdiff = (line1[0][0] - line1[1][0], line2[0][0] - line2[1][0])
    ydiff = (line1[0][1] - line1[1][1], line2[0][1] - line2[1][1])

    def det(a, b):
        return a[0] * b[1] - a[1] * b[0]

    div = det(xdiff, ydiff)
    if div == 0:
       raise Exception('lines do not intersect')

    d = (det(*line1), det(*line2))
    x = det(d, xdiff) / div
    y = det(d, ydiff) / div
    return x, y


class Metric(nn.Module): 
    def __init__(self, model_name:str="distiluse-base-multilingual-cased"): 
        super(Metric, self).__init__()
        self.model = sentence_transformers.SentenceTransformer(model_name)
        self.centroids = self.get_centroids()
    
    def get_centroids(self):
        # read lexica files
        languages = ["english", "spanish", "chinese", "japanese"]
        lexica = {}
        for l in languages:
            filepath = f"../src/exlib/utils/politeness_lexica/{l}_politelex.csv"
            lexica[l] = pd.read_csv(filepath)

        # create centroids
        all_centroids = {}        
        for l in languages:
            categories = lexica[l]["CATEGORY"].unique()
            centroids = {}
            for c in categories:
                words = lexica[l][lexica[l]["CATEGORY"] == c]["word"].tolist()
                embeddings = self.model.encode(words)
                centroid = np.mean(embeddings, axis=0)
                centroids[c] = centroid
            assert len(categories) == len(centroids.keys())
            all_centroids[l] = centroids
            print(f"Centroids for {l} created.")
        return all_centroids

    # input: list of words
    def calculate_single_group_alignment(self, group:list, language:str="english"):
        #find max avg cos sim between word embeddings and centroids
        category_similarities = {}
        centroids = self.centroids[language]
        for category, centroid_emb in centroids.items():
            #calculate cosine similarity
            cos_sim = []
            for word in group:
                word_emb = self.model.encode(word)
                cos_sim.append(np.dot(word_emb, centroid_emb) / (np.linalg.norm(word_emb) * np.linalg.norm(centroid_emb)))
            avg_cos_sim = np.mean(cos_sim)
            category_similarities[category] = avg_cos_sim
        #return highest similarity score
        return max(category_similarities.values())

    def calculate_group_alignment(self, groups:list, language:str="english"):
        group_alignments = []
        for group in groups:
            group_alignments.append(self.calculate_single_group_alignment(group, language))
        return group_alignments

### Example Group Alignment Calculation

In [24]:
metric = Metric()
sample_groups = [["dog", "cat", "fish"], 
                ["hello", "goodbye", "please"], 
                ["computer", "laptop", "phone"], 
                ["idiot", "stupid", "dumb"], 
                ["thank you", "grateful", "thanks"]]
alignments = metric.calculate_group_alignment(sample_groups)
for group, alignment in zip(sample_groups, alignments):
    print(f"Group: {group}, Alignment: {alignment}")

Centroids for english created.
Centroids for spanish created.
Centroids for chinese created.
Centroids for japanese created.
Group: ['dog', 'cat', 'fish'], Alignment: 0.5292773842811584
Group: ['hello', 'goodbye', 'please'], Alignment: 0.7011184692382812
Group: ['computer', 'laptop', 'phone'], Alignment: 0.4826013147830963
Group: ['idiot', 'stupid', 'dumb'], Alignment: 0.7102837562561035
Group: ['thank you', 'grateful', 'thanks'], Alignment: 0.9256609082221985
