# Assignment 3: Local Association Matrix 

**Student names**: Matiss Podins <br>
**Group number**: 39 <br>
**Date**: _Submission Date_

## Important notes
Please read and follow these rules. Submissions that do not fulfill them may be returned.
1. You may work in groups of maximum 2 students.
2. Submit in **.ipynb** format only.
3. The assignment must be typed. Handwritten answers are not accepted.

**Due date**: 12.10.2025 23:59

### What you will do 
- Build a **local association matrix** from Cranfield collection.
- Compute the **normalized association matrix**.
- Use the normalized matrix to **identify neighborhood terms** for expansion for given queries.


---
## Dataset

You will use the **Cranfield** dataset, provided in this file:

- `cran.all.1400`: The document collection (1400 documents)

**The code to parse the file is ready — just update the cran file path to match your own file location. Use the docs variable in your code for the parsed file**


### Load and parse documents (provided)

Run the cell to parse the Cranfield documents. Update the path so it points to your `cran.all.1400` file.

In [31]:
# Read 'cran.all.1400' and parse the documents into a suitable data structure

CRAN_PATH = r"cran.all.1400"  # <-- change this!

def parse_cranfield(path):
    docs = {}
    current_id = None
    current_field = None
    buffers = {"T": [], "A": [], "B": [], "W": []}
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        for line in f:
            line = line.rstrip("\n")
            if line.startswith(".I "):
                if current_id is not None:
                    docs[current_id] = {
                        "id": current_id,
                        "title": " ".join(buffers["T"]).strip(),
                        "abstract": " ".join(buffers["W"]).strip()
                    }
                current_id = int(line.split()[1])
                buffers = {k: [] for k in buffers}
                current_field = None
            elif line.startswith("."):
                tag = line[1:].strip()
                current_field = tag if tag in buffers else None
            else:
                if current_field is not None:
                    buffers[current_field].append(line)
    if current_id is not None:
        docs[current_id] = {
            "id": current_id,
            "title": " ".join(buffers["T"]).strip(),
            "abstract": " ".join(buffers["W"]).strip()
        }
    print(f"Parsed {len(docs)} documents.")
    return docs

docs = parse_cranfield(CRAN_PATH)

Parsed 1400 documents.


## 3.1  Local association matrix

For the given Cranfield document collection in cran.all.1400 construct a local association matrix to identify association clusters. Use the docs variable with the parsed file. Omit stopwords in the STOPWORDS list given below from the vocabulary. 


The correlation factors $c_{u,v}$ between any pair of terms $w_u$ and $w_v$ are defined as  
$c_{u,v} = \sum_{d_j \in D} f_{u,j} \cdot f_{v,j}$  

$f_{u,j}$ is the raw term frequency of $w_u$ in document $d_j$.

### Weighting variants: **scalar** and **metric**

Add two alternative weighting schemes for the matrix (only the formula for assigning the matrix cell value changes):

- **Metric weighting** :
Let $w_u(n,j)$ and $w_v(m,j)$ denote the $n$-th and $m$-th occurrences of terms $w_u$ and $w_v$ in document $d_j$.  
Define a distance function $r(w_u(n,j), w_v(m,j))$ (e.g., $r(i,k) = 1 + |i - k|$).  
Then:

$$
c_{u,v} = \sum_{d_j \in D} \sum_n \sum_m \frac{1}{r(w_u(n,j), w_v(m,j))}
$$


- **Scalar weighting** :
Let $\vec{s}_u = \langle c_{u,x_1}, c_{u,x_2}, \dots, c_{u,x_n} \rangle$ be the neighborhood vector of term $w_u$, and similarly for $w_v$.  
Then:

$$
c_{u,v} = \frac{\vec{s}_u \cdot \vec{s}_v}{|\vec{s}_u| \cdot |\vec{s}_v|}
$$

In [32]:
# TODO: Construct a local association matrix for the cranfield collection. Use both weighting variants.

STOPWORDS = set("""a about above after again against all am an and any are aren't as at be because been
before being below between both but by can't cannot could couldn't did didn't do does doesn't doing don't down
during each few for from further had hadn't has hasn't have haven't having he he'd he'll he's her here here's hers
herself him himself his how how's i i'd i'll i'm i've if in into is isn't it it's its itself let's me more most
mustn't my myself no nor not of off on once only or other ought our ours ourselves out over own same shan't she
she'd she'll she's should shouldn't so some such than that that's the their theirs them themselves then there there's
these they they'd they'll they're they've this those through to too under until up very was wasn't we we'd we'll we're
we've were weren't what what's when when's where where's which while who who's whom why why's with won't would wouldn't
you you'd you'll you're you've your yours yourself yourselves""".split())

# Your code here
import string
import numpy as np
from collections import defaultdict
from typing import Dict, List, Tuple, Set


def tokenization(docs):
    result = []
    
    for doc in docs.values():
        # Combine title and abstract
        text = doc["title"] + " " + doc["abstract"]
        
        # Convert to lowercase
        text = text.lower()
        
        # Remove punctuation and split into words
        translator = str.maketrans(string.punctuation, ' ' * len(string.punctuation))
        text = text.translate(translator)
        
        # Split into tokens
        tokens = text.split()
        
        # Remove stopwords
        filtered_tokens = [token for token in tokens if token not in STOPWORDS and token.strip()]
        
        result.append(filtered_tokens)
    
    return result

tokenized_documents = tokenization(docs)


def build_vocabulary(tokenized_documents: List[List[str]]) -> Dict[str, int]:
    """Build a vocabulary mapping terms to indices."""
    vocab = set()
    for tokens in tokenized_documents:
        vocab.update(tokens)
    return {term: idx for idx, term in enumerate(sorted(vocab))}

def compute_term_frequencies(tokenized_documents: List[List[str]], 
                            vocab: Dict[str, int]) -> List[Dict[int, int]]:
    """Compute raw term frequencies for each document."""
    doc_term_freqs = []
    for tokens in tokenized_documents:
        freq = defaultdict(int)
        for token in tokens:
            if token in vocab:
                freq[vocab[token]] += 1
        doc_term_freqs.append(dict(freq))
    return doc_term_freqs

def compute_term_positions(tokenized_documents: List[List[str]], 
                          vocab: Dict[str, int]) -> List[Dict[int, List[int]]]:
    """Compute positions of each term in each document."""
    doc_term_positions = []
    for tokens in tokenized_documents:
        positions = defaultdict(list)
        for pos, token in enumerate(tokens):
            if token in vocab:
                positions[vocab[token]].append(pos)
        doc_term_positions.append(dict(positions))
    return doc_term_positions

def basic_correlation_matrix(doc_term_freqs: List[Dict[int, int]], 
                             vocab_size: int) -> np.ndarray:
    """Compute basic correlation matrix using raw term frequencies."""
    C = np.zeros((vocab_size, vocab_size))
    
    for freq_dict in doc_term_freqs:
        terms = list(freq_dict.keys())
        for u in terms:
            for v in terms:
                C[u, v] += freq_dict[u] * freq_dict[v]
    
    return C

def metric_correlation_matrix(doc_term_positions: List[Dict[int, List[int]]], 
                              vocab_size: int) -> np.ndarray:
    """Compute metric-weighted correlation matrix (optimized)."""
    C = np.zeros((vocab_size, vocab_size))
    
    for pos_dict in doc_term_positions:
        terms = list(pos_dict.keys())
        for u in terms:
            pos_u = np.array(pos_dict[u])
            for v in terms:
                pos_v = np.array(pos_dict[v])
                # Vectorized distance calculation
                distances = 1 + np.abs(pos_u[:, None] - pos_v[None, :])
                C[u, v] += np.sum(1.0 / distances)
    
    return C

def scalar_correlation_matrix(C_basic: np.ndarray) -> np.ndarray:
    """Compute scalar-weighted correlation matrix using cosine similarity."""
    vocab_size = C_basic.shape[0]
    C_scalar = np.zeros((vocab_size, vocab_size))
    
    for u in range(vocab_size):
        for v in range(vocab_size):
            s_u = C_basic[u, :]
            s_v = C_basic[v, :]
            
            norm_u = np.linalg.norm(s_u)
            norm_v = np.linalg.norm(s_v)
            
            if norm_u > 0 and norm_v > 0:
                C_scalar[u, v] = np.dot(s_u, s_v) / (norm_u * norm_v)
    
    return C_scalar

def build_association_matrices(tokenized_documents: List[List[str]]) -> Tuple:
    """Build all three types of association matrices."""
    vocab = build_vocabulary(tokenized_documents)
    vocab_size = len(vocab)
    
    doc_term_freqs = compute_term_frequencies(tokenized_documents, vocab)
    C_basic = basic_correlation_matrix(doc_term_freqs, vocab_size)
    
    doc_term_positions = compute_term_positions(tokenized_documents, vocab)
    C_metric = metric_correlation_matrix(doc_term_positions, vocab_size)
    
    C_scalar = scalar_correlation_matrix(C_basic)
    
    return vocab, C_basic, C_metric, C_scalar


def find_top_associations(C: np.ndarray, vocab: Dict[str, int], 
                         term: str, k: int = 10) -> List[Tuple[str, float]]:
    """Find top k associated terms for a given term."""
    if term not in vocab:
        return []
    
    idx = vocab[term]
    correlations = C[idx, :]
    
    idx_to_term = {idx: term for term, idx in vocab.items()}
    top_indices = np.argsort(correlations)[::-1]
    
    results = []
    for i in top_indices:
        if i != idx and len(results) < k:
            results.append((idx_to_term[i], correlations[i]))
    
    return results

# Build all three association matrices
vocab, C_basic, C_metric, C_scalar = build_association_matrices(tokenized_documents)


KeyboardInterrupt: 

## 3.2 Normalized association matrix

Compute the normalized association matrix from the unnormalized matrix computed above. 

To normalize the matrix use the following formula: <br>
$c'_{u,v} = \frac{c_{u,v}}{c_{u,u} + c_{v,v} - c_{u,v}}$  


In [None]:
#TODO: Compute the normalized association matrix 

# Your code here
def normalize_association_matrix(C: np.ndarray) -> np.ndarray:
    """Normalize association matrix using Dice coefficient.
    c'_{u,v} = c_{u,v} / (c_{u,u} + c_{v,v} - c_{u,v})
    """
    vocab_size = C.shape[0]
    C_normalized = np.zeros((vocab_size, vocab_size))
    
    for u in range(vocab_size):
        for v in range(vocab_size):
            denominator = C[u, u] + C[v, v] - C[u, v]
            if denominator > 0:
                C_normalized[u, v] = C[u, v] / denominator
    
    return C_normalized

# Normalize the association matrices
C_basic_normalized = normalize_association_matrix(C_basic)
C_metric_normalized = normalize_association_matrix(C_metric)
C_scalar_normalized = normalize_association_matrix(C_scalar)

NameError: name 'C_basic' is not defined

## 3.3 Neighborhood terms

With the help of the normalized local association matrix, identify the neighborhood terms that should be used for expansion for the following queries (queries_assignment3):


In [None]:
# Do not change this code
queries_assignment3 = [
  "gas pressure",
  "structural aeroelastic flight high speed aircraft",
  "heat conduction composite slabs",
  "boundary layer control",
  "compressible flow nozzle",
  "combustion chamber injection",
  "laminar turbulent transition",
  "fatigue crack growth",
  "wing tip vortices",
  "propulsion efficiency"
]

In [None]:
#TODO: Identify neighborhood terms for queries_assignment3

# Your code here

def get_query_neighborhood_terms(query: str, 
                                 C_normalized: np.ndarray, 
                                 vocab: Dict[str, int], 
                                 k: int = 5,
                                 threshold: float = 0.0) -> Dict[str, List[Tuple[str, float]]]:
   
    query_terms = query.lower().split()
    idx_to_term = {idx: term for term, idx in vocab.items()}
    
    neighborhood = {}
    
    for term in query_terms:
        if term in vocab:
            idx = vocab[term]
            correlations = C_normalized[idx, :]
            
            # Get top k terms excluding the term itself
            top_indices = np.argsort(correlations)[::-1]
            
            neighbors = []
            for i in top_indices:
                if i != idx and correlations[i] >= threshold:
                    neighbors.append((idx_to_term[i], correlations[i]))
                    if len(neighbors) >= k:
                        break
            
            neighborhood[term] = neighbors
    
    return neighborhood

def get_expanded_query_terms(query: str,
                             C_normalized: np.ndarray,
                             vocab: Dict[str, int],
                             k: int = 5,
                             threshold: float = 0.0) -> Set[str]:
    neighborhood = get_query_neighborhood_terms(query, C_normalized, vocab, k, threshold)
    
    expansion_terms = set()
    for neighbors in neighborhood.values():
        expansion_terms.update([term for term, score in neighbors])
    
    return expansion_terms

def expand_queries(queries: List[str],
                   C_normalized: np.ndarray,
                   vocab: Dict[str, int],
                   k: int = 5,
                   threshold: float = 0.0) -> Dict[str, Dict]:
    results = {}
    
    for query in queries:
        neighborhood = get_query_neighborhood_terms(query, C_normalized, vocab, k, threshold)
        expansion_terms = get_expanded_query_terms(query, C_normalized, vocab, k, threshold)
        
        results[query] = {
            'original_terms': query.lower().split(),
            'neighborhood': neighborhood,
            'expansion_terms': sorted(expansion_terms)
        }
    
    return results

def print_query_expansion(query: str, 
                         result: Dict,
                         show_scores: bool = True):
    """Pretty print query expansion results."""
    print(f"\nQuery: '{query}'")
    print("=" * 60)
    
    for term, neighbors in result['neighborhood'].items():
        print(f"\n  '{term}' → neighborhood terms:")
        for neighbor, score in neighbors:
            if show_scores:
                print(f"    - {neighbor} ({score:.4f})")
            else:
                print(f"    - {neighbor}")
    
    print(f"\n  All expansion terms: {', '.join(result['expansion_terms'])}")
    print()

for query in queries_assignment3:
    print(f"\n'{query}':")
    
    neighborhood = get_query_neighborhood_terms(query, C_basic_normalized, vocab, k=5)
    
    for term, neighbors in neighborhood.items():
        neighbor_terms = [n[0] for n in neighbors]
        print(f"  {term} → {', '.join(neighbor_terms)}")
    
    expansion_terms = get_expanded_query_terms(query, C_basic_normalized, vocab, k=5)
    print(f"  → Expansion terms: {', '.join(sorted(expansion_terms))}")




'gas pressure':


NameError: name 'C_basic_normalized' is not defined