In [None]:
from gensim.models import KeyedVectors
from gensim.models import Word2Vec
from collections import Counter
from time import perf_counter
from typing import Callable
import numpy as np
import pickle
import re

In [3]:
def top_k_unigrams(tweets: list[str], stop_words: list[str], k: int) -> dict[str, int]:
    regex = re.compile(r"^[a-z#].*")
    stop_words = set(stop_words)
    
    unigram_list = [
        word.lower()
        for tweet in tweets
        for word in tweet.split()
        if regex.match(word) and word not in stop_words
    ]

    top_k_words = Counter(unigram_list)
    return top_k_words if k == -1 else dict(top_k_words.most_common(k))


def context_word_frequencies(tweets: list[str], stop_words: list[str], context_size: int, frequent_unigrams) -> dict[(str, str), int]:
    # Convert to set for O(1) lookups
    frequent_unigrams = set(frequent_unigrams) if isinstance(frequent_unigrams, list) else set(frequent_unigrams.keys())
    context_pairs = []
    
    for tweet in tweets:
        # Use numpy array for faster slicing
        tokens = np.array(tweet.lower().split())
        n = len(tokens)
        
        # Create all possible context pairs efficiently
        for i in range(n):
            word1 = tokens[i]
            # Calculate context window boundaries
            start, end = max(0, i - context_size), min(n, i + context_size + 1)
            context = tokens[start:end]
            
            # Filter context words that are in frequent_unigrams
            valid_context = [w for w in context if w in frequent_unigrams and w != word1] # frequent_unigrams is a subset of top_k_words
            context_pairs.extend((word1, word2) for word2 in valid_context)
    
    context_counter = Counter(context_pairs)
    return context_counter


def pmi(word1: str, word2: str, unigram_counter: dict[str, int], context_counter: dict[(str, str), int]) -> float:
    total_unigrams = float(sum(unigram_counter.values()))
    total_bigrams = float(sum(context_counter.values()))
    
    # Get the counts (with pseudo-count = 1 if not observed)
    count_w1 = float(unigram_counter.get(word1, 1))
    count_w2 = float(unigram_counter.get(word2, 1))
    count_w1_w2 = float(context_counter.get((word1, word2), 1))
    
    p_w1 = count_w1 / total_unigrams
    p_w2 = count_w2 / total_unigrams
    p_w1_w2 = count_w1_w2 / total_bigrams
    
    pmi = np.log2(p_w1_w2 / (p_w1 * p_w2))
    return pmi


def build_word_vector(word1: str, frequent_unigrams, unigram_counter: dict[str, int], context_counter: dict[(str, str), int]) -> dict[str, float]:
    frequent_unigrams = set(frequent_unigrams) if isinstance(frequent_unigrams, list) else set(frequent_unigrams.keys())
    context_set = set(context_counter.keys())
    word_vector = {}

    for word2 in frequent_unigrams:
        word_vector[word2] = float(0) if (word1, word2) not in context_set else pmi(word1, word2, unigram_counter, context_counter)
    
    return word_vector


def get_top_k_dimensions(word1_vector, k):
    sorted_items = sorted(word1_vector.items(), key=lambda x: x[1], reverse=True)
    top_k_dimensions = dict(sorted_items[:k])
    return top_k_dimensions

### TODO: Fix this function
def get_cosine_similarity(word1_vector: dict[str, float], word2_vector: dict[str, float]) -> float:
    # Convert dictionaries to numpy arrays
    vec1 = np.array([word1_vector.get(word) for word in word1_vector.keys()])
    vec2 = np.array([word2_vector.get(word) for word in word2_vector.keys()])
    
    # Use numpy's optimized operations
    dot_product = np.dot(vec1, vec2)
    norm1 = np.linalg.norm(vec1)
    norm2 = np.linalg.norm(vec2)

    cosine_sim_score = 0.0 if norm1 == 0 or norm2 == 0 else dot_product / (norm1 * norm2)
    return cosine_sim_score

In [4]:
def get_most_similar(word2vec: KeyedVectors, word : str, k : int) -> list[(str, float)]:
    if word not in word2vec.key_to_index:
        return []
    # Use gensim's most_similar method as its much faster than calling get_cosine_similarity()
    similar_words = word2vec.most_similar(word, topn=k)
    return similar_words
    

def word_analogy(word2vec: KeyedVectors, word1: str, word2: str, word3: str) -> tuple[str, float]:
    # Check if all words exist in the model's vocabulary
    if not all(word in word2vec.key_to_index for word in [word1, word2, word3]):
        return ("", 0.0)
    
    # Doesn't make a call to get_most_similar() as its faster to use gensim's most_similar method
    result = word2vec.most_similar(positive=[word2, word3], negative=[word1], topn=1)
    word4 = result[0]  # Returns tuple of (word, similarity)
    return word4


In [14]:
def cos_sim(A: np.ndarray, B: np.ndarray) -> float:
    dot_product = np.dot(A, B)
    norm1 = np.linalg.norm(A)
    norm2 = np.linalg.norm(B)
    cosine_similarity = 0.0 if norm1 == 0 or norm2 == 0 else dot_product / (norm1 * norm2)

    return cosine_similarity


def get_cos_sim_different_models(word: str, model1: Word2Vec, model2: Word2Vec, cos_sim_function: Callable[[np.ndarray, np.ndarray], float]) -> float:
    vec1 = model1.wv[word]
    vec2 = model2.wv[word]
    cosine_similarity_of_embeddings = cos_sim_function(vec1, vec2)

    return cosine_similarity_of_embeddings


def get_average_cos_sim(word: str, neighbors: list[str], model: Word2Vec, cos_sim_function: Callable[[np.ndarray, np.ndarray], float]) -> float:
    word_vector = model.wv[word]
    similarities = []
    
    for neighbor in neighbors:
        try:
            neighbor_vector = model.wv[neighbor]
            sim = cos_sim_function(word_vector, neighbor_vector)
            similarities.append(sim)
        except KeyError:
            continue
            
    return np.mean(similarities) if similarities else 0.0

In [6]:
def load_or_compute_variables():
    tic = perf_counter()
    try:
        with open('twitter_analysis_data.pkl', 'rb') as f:
            data = pickle.load(f)
            print(f"Loaded {len(data['tweets'])} tweets and {len(data['frequent_unigrams'])} frequent unigrams in {perf_counter() - tic:.2f}s")
            return (data['tweets'], data['stop_words'], data['frequent_unigrams'], 
                   data['unigram_counter'], data['context_counter'])
    except Exception as e:
        print(f"Cache invalid, recomputing: {str(e)}")
        
    print("Computing fresh values...")
    with open('data/covid-tweets-2020-08-10-2020-08-21.tokenized.txt', 'r', encoding='utf-8') as f:
        tweets = [line.strip() for line in f]
    with open('data/stop_words.txt', 'r', encoding='utf-8') as f:
        stop_words = [line.strip() for line in f]
    
    frequent_unigrams = top_k_unigrams(tweets, stop_words, 1000)
    unigram_counter = top_k_unigrams(tweets, stop_words, -1)
    context_counter = context_word_frequencies(tweets, stop_words, 3, frequent_unigrams)
    
    with open('twitter_analysis_data.pkl', 'wb') as f:
        pickle.dump({'tweets': tweets, 'stop_words': stop_words, 
                    'frequent_unigrams': frequent_unigrams, 'unigram_counter': unigram_counter, 
                    'context_counter': context_counter}, f)
    
    print(f"Time taken: {perf_counter() - tic:.2f}s")
    return tweets, stop_words, frequent_unigrams, unigram_counter, context_counter

# global variables
tweets, stop_words, frequent_unigrams, unigram_counter, context_counter = load_or_compute_variables()

Loaded 312877 tweets and 1000 frequent unigrams in 7.13s


In [7]:
"""Exploring Word2Vec"""    
EMBEDDING_FILE = 'data/GoogleNews-vectors-negative300.bin.gz'
tic = perf_counter()
word2vec = KeyedVectors.load_word2vec_format(EMBEDDING_FILE, binary=True)
toc = perf_counter()
print(f"Time taken to load Word2Vec model: {toc - tic:.2f} seconds")

Time taken to load Word2Vec model: 31.20 seconds


In [8]:
similar_words =  get_most_similar(word2vec, 'ventilator', 3)
print(similar_words)
# [('respirator', 0.7864563465118408), ('mechanical_ventilator', 0.7063839435577393), ('intensive_care', 0.6809945702552795)]

# Word analogy - Tokyo is to Japan as Paris is to what?
print(word_analogy(word2vec, 'Tokyo', 'Japan', 'Paris'))
# ('France', 0.7889978885650635)

[('respirator', 0.7864563465118408), ('mechanical_ventilator', 0.7063839435577393), ('intensive_care', 0.6809945702552795)]
('France', 0.7889978885650635)


In [9]:
# Comparing 40-60 year olds in the 1910s and 40-60 year olds in the 2000s
model_t1 = Word2Vec.load('data/1910s_50yos.model')
model_t2 = Word2Vec.load('data/2000s_50yos.model')

In [None]:
# Cosine similarity function for vector inputs
vector_1 = np.array([1,2,3,4])
vector_2 = np.array([3,5,4,2])
cos_similarity = cos_sim(vector_1, vector_2)
print(cos_similarity)
# 0.8198915917499229

# Similarity between embeddings of the same word from different times
major_cos_similarity = get_cos_sim_different_models("major", model_t1, model_t2, cos_sim)
print(major_cos_similarity)
# 0.19302374124526978

0.8198915917499229
0.19302373


In [15]:
# Average cosine similarity to neighborhood of words
neighbors_old = ['brigadier', 'colonel', 'lieutenant', 'brevet', 'outrank']
neighbors_new = ['significant', 'key', 'big', 'biggest', 'huge']
print(get_average_cos_sim("major", neighbors_old, model_t1, cos_sim))
# 0.6957747220993042
print(get_average_cos_sim("major", neighbors_new, model_t1, cos_sim))
# 0.27042335271835327
print(get_average_cos_sim("major", neighbors_old, model_t2, cos_sim))
# 0.2626224756240845
print(get_average_cos_sim("major", neighbors_new, model_t2, cos_sim))
# 0.6279034614562988

### The takeaway -- When comparing word embeddings from 40-60 year olds in the 1910s and 2000s,
###                 (i) cosine similarity to the neighborhood of words related to military ranks goes down;
###                 (ii) cosine similarity to the neighborhood of words related to significance goes up.

0.69577473
0.27042335
0.26262248
0.62790346
