In [1]:
import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy import stats
from scipy.spatial.distance import cosine
from typing import List, Dict, Tuple

In [2]:
def load_vocab_dict(path: str) -> Tuple[List[str], Dict[str, int]]:
    """
    Reads a vocabulary list from a file and creates a dictionary mapping each word to its index.

    Args:
        path (str): The file path to the vocabulary list.

    Returns:
        Tuple[List[str], Dict[str, int]]: A tuple containing the list of vocabulary words and a dictionary
                                          mapping each word to its index.
    """
    vocab = open(path).read().strip().split('\n')
    return vocab, {word: idx for idx, word in enumerate(vocab)}

In [3]:
def read_corpus(path: str) -> List[str]:
    """Reads the corpus from a file, excluding the last empty entry if the file ends with a newline.

    Args:
        path (str): The file path to the corpus.

    Returns:
        List[str]: A list of strings, each representing a line from the file.
    """
    return open(path).read().strip().split('\n')

In [4]:
def counting(corpus: List[str], V: List[str], V_C: List[str], V_set: Dict[str, int], V_C_set: Dict[str, int]) -> np.ndarray:
    """
    Generates a co-occurrence (counting) matrix from the given corpus, considering specified vocabularies and a window size.

    Args:
        corpus (List[str]): The corpus as a list of sentences.
        V (List[str]): The list of vocabulary words.
        V_C (List[str]): The list of context vocabulary words.
        V_set (Dict[str, int]): A dictionary mapping vocabulary words to their indices.
        V_C_set (Dict[str, int]): A dictionary mapping context vocabulary words to their indices.

    Returns:
        np.ndarray: A 2D NumPy array representing the co-occurrence matrix with dimensions (len(V), len(V_C)).
    """
    # Initialize the matrix to hold word vectors
    C = np.zeros((len(V), len(V_C)), dtype=float)

    for line in tqdm(corpus): # Iterate over each word in the original dataset
        # Append start and end tokens to the sentence
        words = ['<s>'] + line.split(' ') + ['</s>']
        length = len(words)

        for idx, word in enumerate(words): # Iterate over each word in the current sentence
            # Skip '<s>' and '</s>', as they are not real words
            if idx > 0 and idx < length - 1 and word in V_set:
                ### BEGIN SOLUTION
                # Get current word index
                word_idx = V_set[word]

                # Define window size (you can adjust this)
                window_size = 5

                # Get context words within window
                start = max(1, idx - window_size)
                end = min(length - 1, idx + window_size + 1)

                for context_pos in range(start, end):
                    if context_pos != idx:  # Skip the current word itself
                        context_word = words[context_pos]
                        if context_word in V_C_set:
                            context_idx = V_C_set[context_word]
                            # Update co-occurrence count
                            C[word_idx][context_idx] += 1.0 / (abs(context_pos - idx))  # Weight by distance
                ### END SOLUTION
    return C

In [5]:
def eval_word_similarity(C: np.ndarray, V_set: Dict[str, int], path: str) -> float:
    pairs = []
    with open(path, encoding='utf-8') as f:
        for line in f.readlines()[1:]:
            parts = line.strip().split('\t')
            if len(parts) != 3:
                continue
            w1, w2, score_str = parts
            if w1 in V_set and w2 in V_set:
                score = float(score_str)
                v1, v2 = C[V_set[w1]], C[V_set[w2]]
                sim = 1 - cosine(v1, v2) if np.linalg.norm(v1) > 0 and np.linalg.norm(v2) > 0 else 0.0
                pairs.append((sim, score))

    if not pairs:
        return 0.0

    x, y = zip(*pairs)
    corr = stats.spearmanr(x, y).correlation
    return float(corr) if not np.isnan(corr) else 0.0

In [6]:
# Read the main vocabulary and its indices from a file,
# creating a list of words (V) and a dictionary mapping words to indices (V_set).
V, V_set = load_vocab_dict('./data/main_words.txt')

# Read the context vocabulary and its indices from a separate file,
# creating a list of context words (V_C) and a dictionary mapping these words to indices (V_C_set).
V_C, V_C_set = load_vocab_dict('./data/context_words.txt')

# Read the corpus from a text file, creating a list where each item represents a document or line in the corpus.
corpus = read_corpus('./data/corpus.txt')

# Generate a co-occurrence (counting) matrix from the corpus using the main and context vocabularies.
C = counting(corpus, V, V_C, V_set, V_C_set)

100%|██████████| 997898/997898 [05:00<00:00, 3322.09it/s]


In [7]:
### BEGIN PUBLIC TESTS
eval_word_similarity(C, V_set, './data/men.txt')
### BEGIN PUBLIC TESTS

0.23035753802145398

In [8]:
### BEGIN PUBLIC TESTS
eval_word_similarity(C, V_set, './data/simlex-999.txt')
### BEGIN PUBLIC TESTS

0.058947013073989286

In [9]:
def improve_C(C: np.ndarray, corpus: List[str], V_C: List[str], V_C_set: Dict[str, int]) -> np.ndarray:
    ### BEGIN SOLUTION
    # Apply Positive Pointwise Mutual Information (PPMI)
        # Calculate probabilities
        total = np.sum(C)
        word_probs = np.sum(C, axis=1) / total
        context_probs = np.sum(C, axis=0) / total

        # Calculate PMI
        C_improved = np.zeros_like(C)
        for i in range(C.shape[0]):
            for j in range(C.shape[1]):
                if C[i,j] > 0:
                    pmi = np.log2((C[i,j] / total) / (word_probs[i] * context_probs[j]))
                    C_improved[i,j] = max(0, pmi)  # PPMI
                else:
                    C_improved[i,j] = 0

        # Apply SVD for dimensionality reduction
        U, s, Vh = np.linalg.svd(C_improved, full_matrices=False)
        k = 100  # Reduced dimension
        C_reduced = U[:, :k] @ np.diag(s[:k])

        # Normalize rows (optional but often helpful)
        norms = np.linalg.norm(C_reduced, axis=1, keepdims=True)
        norms[norms == 0] = 1  # Avoid division by zero
        C_normalized = C_reduced / norms

        return C_normalized

































    ### END SOLUTION

### <span style="color:red; font-size:small;">Part #1: {"men": 0.5388839800291707, "simlex-999": 0.2346222684809021}</span>
