# Entity similarity

In [112]:
def jaccard_similarity(set1, set2) -> float:
    intersection = len(set1 & set2)
    union = len(set1 | set2)
    return (intersection + 1)/ (union + 1)

def attribute_similarity_score(attrs1, attrs2) -> float:
    set1 = set(get_lower_case_list(attrs1))
    set2 = set(get_lower_case_list(attrs2))
    return jaccard_similarity(set1, set2)

def get_lower_case_list(values):
    if type(values) is list:
        lowered = []
        for value in values:
            if type(value) is str:
                lowered.append(value.lower())
            elif type(value) is list:
                lowered.extend(get_lower_case_list(value))
        return lowered
    else:
        return values.lower().split()

def get_name(entity):
    val = entity.get('name')
    if isinstance(val, list):
        return ' '.join(val)
    return str(val)

parameters = {
    "A": 0.6,              # weight for type similarity
    "B": 0.05,             # weight for name similarity
    "C": 0.1,              # weight for attribute number similarity
    "D": 0.25,             # weight for attribute similarity
    "DIFFERENT_TYPE": 0.5, # penalty for different types
    "ALL_PAIRS": True     # set to True for option 2
}

In [113]:
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

# load the pre-trained SBERT model
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')

In [114]:
def get_name_similarity(name1, name2) -> float:
    #TODO: implement name similarity using SBERT embeddings and cosine similarity
    embedding1 = model.encode(make_list_into_string(name1))
    embedding2 = model.encode(make_list_into_string(name2))
    similarity = cosine_similarity([embedding1], [embedding2])
    #print(f"Name1: {make_list_into_string(name1)}, Name2: {make_list_into_string(name2)}, Similarity: {similarity[0][0]}")
    return similarity[0][0]

def make_list_into_string(value) -> str:
    # replace '_' with space and join list elements if value is a list
    if type(value) is list:
        return ' '.join([str(v).replace('_', ' ') for v in value])
    else:
        return str(value).replace('_', ' ')

In [115]:
def entity_similarity(e1, e2, parameters) -> float:
    '''Compute similarity between two entities e1 and e2.'''
    assert parameters['A'] + parameters['B'] + parameters['C'] + parameters['D'] == 1.0, "Weights must sum to 1.0"
    
    type_similarity = 1.0 if (e1['kind'] == e2['kind']) else parameters["DIFFERENT_TYPE"]
    e1_list = get_lower_case_list(e1['name'])
    e2_list = get_lower_case_list(e2['name'])
    #name_similarity = 0.5
    #name_similarity = get_name_similarity(e1_list, e2_list)
    name_similarity = jaccard_similarity(set(e1_list), set(e2_list))
    
    attribute_number_similarity = 1.0 - abs(len(e1['attributes']) - len(e2['attributes'])) / max(len(e1['attributes']), len(e2['attributes']), 1)
    attribute_similarity = attribute_similarity_score(e1['attributes'], e2['attributes'])
    overall_similarity = (parameters['A'] * type_similarity +
                          parameters['B'] * name_similarity +
                          parameters['C'] * attribute_number_similarity +
                          parameters['D'] * attribute_similarity)
    #print(overall_similarity)
    return overall_similarity

In [116]:
def all_entities_similarity(erd1, erd2, parameters) -> tuple[float, dict]:
    '''
    Compute overall similarity between two ERD diagrams by comparing all entities.
    Returns: (average_similarity_score, entity_matches_dictionary)
    '''
    similarities = []
    for e1 in erd1['entities']:
        for e2 in erd2['entities']:
            sim = entity_similarity(e1, e2, parameters)
            similarities.append((e1, e2, sim))
    
    similarities.sort(key=lambda x: x[2], reverse=True)
    
    e1_paired = {}
    e2_paired = {}
    pairs = []

    entity_matches = {}

    for e1, e2, sim in similarities:
        e1_name = get_name(e1)
        e2_name = get_name(e2)
        if e1_name not in e1_paired and e2_name not in e2_paired:
            e1_paired[e1_name] = e1
            e2_paired[e2_name] = e2
            pairs.append((e1, e2, sim))
            
            entity_matches[e1_name] = (e2_name, sim)
            

    if not parameters["ALL_PAIRS"]: 
        if not pairs: return 0.0, {}
        average_similarity = sum([sim for e1, e2, sim in pairs]) / len(pairs)
        return average_similarity, entity_matches

    for e1 in erd1['entities']:
        e1_name = get_name(e1)
        if e1_name not in e1_paired:
            pairs.append((e1, None, 0))
    for e2 in erd2['entities']:
        e2_name = get_name(e2)
        if e2_name not in e2_paired:
            pairs.append((None, e2, 0))
            
    if not pairs: return 0.0, {}
    average_similarity = sum([sim for e1, e2, sim in pairs]) / len(pairs)
    
    return average_similarity, entity_matches

# Relationship similarity

In [117]:
import numpy as np

def parse_cardinality(card_str):
    """
    Parses cardinality strings (e.g., '0M', '11') into Min and Max components.
    Returns tuple (min_card, max_card).
    'M' or 'N' is treated as 'Many'.
    """
    if not card_str or str(card_str).lower() == 'unknown':
        return None, None
    
    # Standardize 'N' to 'M' for comparison
    clean_str = str(card_str).upper().replace('N', 'M')
    
    # Assume format is MinMax (e.g., '0M' -> Min 0, Max Many)
    if len(clean_str) == 2:
        return clean_str[0], clean_str[1]
    return clean_str, clean_str

def compute_cardinality_similarity(c1, c2):
    """Simple binary match for cardinality components."""
    if c1 is None or c2 is None:
        return 0.0
    return 1.0 if c1 == c2 else 0.0

In [118]:
# Updated Cell 23
def relationship_similarity(r1, r2, entity_matches, parameters) -> float:
    """
    Computes similarity between two relationships r1 and r2.
    """
    # Helper to safe-string names that might be lists
    def _safe_name(n):
        if isinstance(n, list):
            return ' '.join(n).lower()
        return str(n).lower()

    # 1. Type Similarity (Weight T)
    type_sim = 1.0 if r1.get('kind') == r2.get('kind') else parameters.get('REL_DIFF_TYPE', 0.5)

    # 2. Arity Similarity (Weight U)
    len1 = len(r1['involved_entities'])
    len2 = len(r2['involved_entities'])
    arity_sim = 1.0 - abs(len1 - len2) / max(len1, len2, 1)

    # 3. Participating Entities Similarity (Weight V)
    matched_scores = []
    
    # Create a lookup for r2's involved entities using safe string names
    r2_involved_names = {_safe_name(e['name']): e for e in r2['involved_entities']}
    
    # Lists to store aligned cardinalities for later steps
    r1_cardinalities = []
    r2_cardinalities = []

    for ent1 in r1['involved_entities']:
        ent1_name = get_name(ent1) # Use get_name to ensure we match the keys in entity_matches
        
        match_info = entity_matches.get(ent1_name) 
        
        sim_score = 0.0
        r2_card = None
        
        if match_info:
            matched_ent2_name, match_score = match_info
            # Check if this globally matched entity is actually involved in relationship r2
            if matched_ent2_name and matched_ent2_name.lower() in r2_involved_names:
                sim_score = match_score
                r2_card = r2_involved_names[matched_ent2_name.lower()].get('cardinality')
            else:
                sim_score = 0.0
        
        matched_scores.append(sim_score)
        r1_cardinalities.append(ent1.get('cardinality'))
        r2_cardinalities.append(r2_card)

    entity_sim = sum(matched_scores) / len(matched_scores) if matched_scores else 0.0

    # 4. Attribute Similarity (Weight X)
    # Using simple Jaccard fallback to avoid more errors, or insert embedding code here if desired
    attr1 = set(get_lower_case_list(r1.get('attributes', [])))
    attr2 = set(get_lower_case_list(r2.get('attributes', [])))
    
    if not attr1 and not attr2:
        attr_sim = 1.0
    else:
        intersection = len(attr1 & attr2)
        union = len(attr1 | attr2)
        attr_sim = intersection / union if union > 0 else 0.0

    # 5. Cardinality Similarity (Weight Y and Z)
    max_card_scores = []
    min_card_scores = []

    for c1_str, c2_str in zip(r1_cardinalities, r2_cardinalities):
        min1, max1 = parse_cardinality(c1_str)
        min2, max2 = parse_cardinality(c2_str)
        
        max_card_scores.append(compute_cardinality_similarity(max1, max2))
        min_card_scores.append(compute_cardinality_similarity(min1, min2))

    max_card_sim = sum(max_card_scores) / len(max_card_scores) if max_card_scores else 0.0
    min_card_sim = sum(min_card_scores) / len(min_card_scores) if min_card_scores else 0.0

    # Weighted Sum
    total_sim = (parameters['T'] * type_sim +
                 parameters['U'] * arity_sim +
                 parameters['V'] * entity_sim +
                 parameters['X'] * attr_sim +
                 parameters['Y'] * max_card_sim +
                 parameters['Z'] * min_card_sim)
    
    return total_sim

In [119]:
#4
def all_relationships_similarity(erd1, erd2, entity_matches, parameters) -> float:
    """
    Computes overall similarity between two ERDs based on their relationships.
    Matches relationships greedily similar to entities.
    """
    rels1 = erd1.get('relationships', [])
    rels2 = erd2.get('relationships', [])
    
    if not rels1 and not rels2:
        return 1.0
    if not rels1 or not rels2:
        return 0.0

    # Calculate all pair similarities
    all_pairs = []
    for i, r1 in enumerate(rels1):
        for j, r2 in enumerate(rels2):
            sim = relationship_similarity(r1, r2, entity_matches, parameters)
            all_pairs.append({'r1_idx': i, 'r2_idx': j, 'sim': sim})

    # Sort by similarity descending (Greedy Match)
    all_pairs.sort(key=lambda x: x['sim'], reverse=True)

    matched_r1 = set()
    matched_r2 = set()
    final_matches = []

    for pair in all_pairs:
        if pair['r1_idx'] not in matched_r1 and pair['r2_idx'] not in matched_r2:
            matched_r1.add(pair['r1_idx'])
            matched_r2.add(pair['r2_idx'])
            final_matches.append(pair['sim'])

    # Handle Unmatched Relationships (Option 1 vs Option 2)
    # Project Requirement: Option 1 averages all, Option 2 excludes NULLs[cite: 96].
    
    if parameters.get('REL_ALL_PAIRS', True): # Option 1 (Default)
        # Add 0s for unmatched relationships from r1
        unmatched_count_1 = len(rels1) - len(matched_r1)
        final_matches.extend([0.0] * unmatched_count_1)
        
        # Add 0s for unmatched relationships from r2
        unmatched_count_2 = len(rels2) - len(matched_r2)
        final_matches.extend([0.0] * unmatched_count_2)

    return sum(final_matches) / len(final_matches) if final_matches else 0.0

In [120]:
# FOR SETUP AND TESTING

parameters.update({
    "T": 0.1,  # Relationship: Type weight
    "U": 0.3,  # Relationship: Arity weight 
    "V": 0.4,  # Relationship: Entity participation weight
    "X": 0.1,  # Relationship: Attribute weight
    "Y": 0.05, # Relationship: Max Cardinality weight
    "Z": 0.05, # Relationship: Min Cardinality weight
    "REL_DIFF_TYPE": 0.1, # Penalty for different relationship types
    "REL_ALL_PAIRS": True # True = Option 1 (Average all), False = Option 2 (Exclude NULLs)
})

# Example Usage for Testing
# You must provide the 'entity_matches' from Step 2.
# Structure: {'EntityIn1': ('BestMatchIn2', SimilarityScore)}
example_matches = {
    'student': ('pupil', 0.9), 
    'course': ('class', 0.85)
}

print("Parameters defined successfully:", parameters)

Parameters defined successfully: {'A': 0.6, 'B': 0.05, 'C': 0.1, 'D': 0.25, 'DIFFERENT_TYPE': 0.5, 'ALL_PAIRS': True, 'T': 0.1, 'U': 0.3, 'V': 0.4, 'X': 0.1, 'Y': 0.05, 'Z': 0.05, 'REL_DIFF_TYPE': 0.1, 'REL_ALL_PAIRS': True}


# Other similarities

In [121]:
def number_of_entities(erd):
    return len([entity for entity in erd['entities'] if entity['kind'] == 'entity'])
def number_of_weak_entities(erd):
    return len([entity for entity in erd['entities'] if entity['kind'] == 'weak_entity'])
def avg_number_of_relationship_attributes(erd):
    attribute_counts = [len(relationship['attributes']) for relationship in erd['relationships']]
    return sum(attribute_counts) / len(attribute_counts)
def num_binary_relationships(erd):
    return len([relationship for relationship in erd['relationships'] if relationship['kind'] == 'relationship' and len(relationship['involved_entities']) == 2])
def num_binary_identifying_relationships(erd):
    return len([relationship for relationship in erd['relationships'] if relationship['kind'] == 'identifying_relationship' and len(relationship['involved_entities']) == 2])
def num_ternary_relationships(erd):
    return len([relationship for relationship in erd['relationships'] if relationship['kind'] == 'relationship' and len(relationship['involved_entities']) == 3])
def num_ternary_identifying_relationships(erd):
    return len([relationship for relationship in erd['relationships'] if relationship['kind'] == 'identifying_relationship' and len(relationship['involved_entities']) == 3])
def num_other_relationships(erd):
    return len([[relationship for relationship in erd['relationships'] if relationship['kind'] == 'relationship' and len(relationship['involved_entities']) > 3]])
def num_other_identifying_relationships(erd):
    return len([relationship for relationship in erd['relationships'] if relationship['kind'] == 'identifying_relationship' and len(relationship['involved_entities']) > 3])

def other_similarity(erd1, erd2) -> float:
    weights = [1, 1, 1, 1, 1]
    applier = [
        number_of_entities,
        number_of_weak_entities,
        avg_number_of_relationship_attributes,
        num_binary_relationships,
        num_binary_identifying_relationships,
        num_ternary_relationships,
        num_ternary_identifying_relationships,
        num_other_relationships,
        num_other_identifying_relationships
    ]
    erd1_vector = [a(erd1) for a in applier]
    erd2_vector = [a(erd2) for a in applier]
    for i in range(len(weights)):
        erd1_vector[i] *= weights[i]
        erd2_vector[i] *= weights[i]
    dotted = sum(v1 * v2 for v1, v2 in zip(erd1_vector, erd2_vector))
    m1 = sum(v1 * v1 for v1, v1 in zip(erd1_vector, erd1_vector))**0.5
    m2 = sum(v2 * v2 for v2, v2 in zip(erd1_vector, erd1_vector))**0.5
    return dotted / m1 / m2




# Tying it all together

In [122]:
import os
import json
from typing import Tuple, Dict

def build_train_test_sets(root_dir: str = ".",
                          dataset1_dir: str = "Dataset1",
                          dataset2_dir: str = "for_testing/Dataset1") -> Tuple[Dict[str, dict], Dict[str, dict]]:
    """
    Load JSON files from Dataset1 (train) and Dataset2 (test) subfolders of `root_dir`.
    Returns (train_set, test_set) where each is a dict: {ERD_number_as_str: parsed_json_object}.
    """
    def load_dir(dir_path: str) -> Dict[str, dict]:
        out: Dict[str, dict] = {}
        if not os.path.isdir(dir_path):
            return out
        for fname in sorted(os.listdir(dir_path)):
            if not fname.lower().endswith(".json"):
                continue
            base = os.path.splitext(fname)[0]
            erd_part = base.split("_", 1)[0]
            erd_key = str(erd_part)
            file_path = os.path.join(dir_path, fname)
            try:
                with open(file_path, "r", encoding="utf-8") as f:
                    obj = json.load(f)
                out[erd_key] = obj
            except json.JSONDecodeError as e:
                # skip malformed JSON but you can log or collect errors if desired
                print(f"Warning: JSON decode error in {file_path}: {e}")
            except Exception as e:
                print(f"Warning: Failed to read {file_path}: {e}")
        return out

    d1 = os.path.join(root_dir, dataset1_dir)
    d2 = os.path.join(root_dir, dataset2_dir)

    train_set = load_dir(d1)
    test_set = load_dir(d2)

    return train_set, test_set

In [123]:
import re
from typing import Dict, Iterable, Optional, Any
from nltk.stem import PorterStemmer

_stemmer = PorterStemmer()

def _split_camel_case(s: str) -> str:
    s = re.sub(r'([a-z0-9])([A-Z])', r'\1 \2', s)
    s = re.sub(r'([A-Z])([A-Z][a-z])', r'\1 \2', s)
    return s

def _normalize_string_to_tokens(s: str, use_stemming: bool = True) -> list:
    if not isinstance(s, str):
        return []
    s = _split_camel_case(s)
    s = s.replace('-', ' ').replace('_', ' ').replace('/', ' ').replace('\\', ' ')
    s = s.lower()
    # keep letters and spaces only (follow Stage_2_Approach1 normalization)
    s = re.sub(r'[^a-z\s]', ' ', s)
    s = re.sub(r'\s+', ' ', s).strip()
    # safe whole-word replacements
    s = re.sub(r'\bnum\b', 'number', s)
    s = re.sub(r'\bid\b', 'identifier', s)
    s = re.sub(r'\bno\b', 'number', s)
    tokens = [t for t in s.split() if t]
    if use_stemming and tokens:
        tokens = [_stemmer.stem(t) for t in tokens]
    return tokens

def normalize_erds(json_map: Dict[str, dict],
                   keys_to_normalize: Optional[Iterable[str]] = None,
                   use_stemming: bool = True,
                   as_tokens: bool = False) -> Dict[str, dict]:
    """
    Normalize ERD JSON objects in-place.

    Args:
      json_map: mapping ERD_key (string) -> parsed JSON object (dict).
      keys_to_normalize: iterable of key names to normalize (defaults to ('name','primary_keys','attributes')).
      use_stemming: whether to apply Porter stemming (default True).
      as_tokens: if True store lists of tokens; if False store normalized strings (joined tokens).

    Behavior:
      - Recurses each ERD dict and normalizes any value for keys in `keys_to_normalize`.
      - If a value is a string -> replaced with normalized string (or list of tokens if as_tokens=True).
      - If a value is a list of strings -> each string is normalized.
      - Does NOT modify other keys (e.g., 'kind' or 'cardinality').

    Returns:
      The same `json_map` (modified in-place).
    """
    if keys_to_normalize is None:
        keys_to_normalize = ('name', 'primary_keys', 'attributes')
    keys_set = set(keys_to_normalize)

    def _process_value(val: Any) -> Any:
        if isinstance(val, str):
            toks = _normalize_string_to_tokens(val, use_stemming=use_stemming)
            return toks if as_tokens else " ".join(toks)
        if isinstance(val, list):
            # list of strings -> normalize each string
            if all(isinstance(x, str) for x in val):
                out = []
                for x in val:
                    toks = _normalize_string_to_tokens(x, use_stemming=use_stemming)
                    out.append(toks if as_tokens else " ".join(toks))
                return out
            # heterogeneous list -> recurse elements (e.g. list of dicts)
            return [_recurse(x) for x in val]
        # leave other types unchanged
        return val

    def _recurse(obj: Any) -> Any:
        if isinstance(obj, dict):
            for k, v in list(obj.items()):
                if k in keys_set:
                    # only normalize the specified keys
                    obj[k] = _process_value(v)
                else:
                    # recurse into nested structures to find target keys (e.g., involved_entities -> dicts with 'name')
                    if isinstance(v, (dict, list)):
                        obj[k] = _recurse(v)
            return obj
        if isinstance(obj, list):
            return [_recurse(x) for x in obj]
        return obj

    for key, erd in json_map.items():
        json_map[key] = _recurse(erd)

    return json_map

In [124]:
#### ESTIMATES GRADES GIVEN:
### training set
### training set's grades
### test set
### k for neighbors, similarity threshold
### alpha, beta, and gamma

import math
import random


def knn_grade_estimates(
    train_set: Dict[str, dict],
    test_set: Dict[str, dict],
    grade_map: Dict[str, float],
    k: int = 4,
    sigma: float = 0.9,
    alpha: float = 0.4,
    beta: float = 0.4,
    gamma: float = 0.2,
) -> Dict[str, float]:
    """
    For each ERD in test_set, find k nearest neighbors among train_set with similarity >= sigma.
    Similarity = alpha * ent_sim + beta * rel_sim + gamma * oth_sim.
    """
    # Compute mean grade of dataset1 ERDs (only for keys present in train_set and in grade_map)
    train_keys_with_grade = [k for k in train_set.keys() if k in grade_map and grade_map[k] is not None]
    if train_keys_with_grade:
        mean_grade = sum(grade_map[k] for k in train_keys_with_grade) / len(train_keys_with_grade)
    else:
        mean_grade = 0.0

    predictions: Dict[str, float] = {}

    for test_key, test_erd in test_set.items():
        sims = []  # list of (similarity, train_key)

        for train_key, train_erd in train_set.items():
            # skip train ERDs without a dataset1 grade
            if train_key not in grade_map or grade_map[train_key] is None:
                continue

            # 1. Calculate Entity Similarity AND get the match dictionary
            ent_sim, entity_matches = all_entities_similarity(train_erd, test_erd, parameters)

            # 2. Calculate Relationship Similarity using those matches
            rel_sim = all_relationships_similarity(train_erd, test_erd, entity_matches, parameters)

            # 3. Calculate Other Similarity
            oth_sim = other_similarity(train_erd, test_erd)

            # Weighted Sum
            sim = alpha * ent_sim + beta * rel_sim + gamma * oth_sim

            if sim >= sigma:
                sims.append((sim, train_key))

        # If there are at least k neighbors, use top-k by similarity, otherwise fallback to mean grade
        if len(sims) >= k:
            sims.sort(key=lambda x: x[0], reverse=True)
            topk = sims[:k]
            neighbor_grades = [grade_map[tk] for _, tk in topk]
            pred = sum(neighbor_grades) / len(neighbor_grades)
        else:
            pred = mean_grade
        predictions[test_key] = pred #+ round(random.uniform(0, 2), 4)

    return predictions

In [125]:
import csv
from typing import Dict, Optional

### Returns a dictionary of erd_number : grade (dataset1)

def build_grade_map(csv_path: str = "ERD_grades.csv", dataset: int = 1) -> Dict[str, Optional[float]]:
    """
    Read ERD grades from CSV and return a map {ERD_number_str: grade_or_None}.

    Args:
      csv_path: path to CSV file (default "ERD_grades.csv" in repo root).
      dataset: which dataset column to use (1 -> "dataset1_grade", 2 -> "dataset2_grade").

    Returns:
      Dict where keys are ERD numbers as strings and values are floats or None (if missing/invalid).
    """
    grade_col = f"dataset{dataset}_grade"
    out: Dict[str, Optional[float]] = {}
    try:
        with open(csv_path, newline="", encoding="utf-8") as fh:
            reader = csv.DictReader(fh)
            for row in reader:
                # tolerate different header casing/whitespace
                erd_key_raw = row.get("ERD_No") or row.get("ERD_No".lower()) or row.get("ERD_No".upper()) or row.get("ERD No") or row.get("ERD No".lower())
                if erd_key_raw is None:
                    # try to find a column that looks like ERD number
                    for k in row.keys():
                        if k.strip().lower().replace(" ", "_") in ("erd_no", "erdno", "erd"):
                            erd_key_raw = row[k]
                            break
                if erd_key_raw is None:
                    continue
                erd_key = str(erd_key_raw).strip()
                if not erd_key:
                    continue

                # find the grade column with tolerant matching
                grade_val = None
                if grade_col in row:
                    grade_val = row[grade_col]
                else:
                    # try tolerant lookup
                    for k in row.keys():
                        if k.strip().lower() == grade_col.lower():
                            grade_val = row[k]
                            break

                if grade_val is None:
                    out[erd_key] = None
                    continue

                grade_val = grade_val.strip()
                if grade_val == "":
                    out[erd_key] = None
                    continue

                try:
                    out[erd_key] = float(grade_val)
                except ValueError:
                    # if parsing fails, store None so callers can detect missing values
                    out[erd_key] = None
    except FileNotFoundError:
        # file missing -> return empty map
        print(f"Warning: CSV file {csv_path} not found.")
        return {}
    return out

In [126]:
import math
from typing import Dict, Optional

### CALCULATE MEAN SQUARED ERROR

def compute_rmse(predictions: Dict[str, float],
                 csv_path: str = "ERD_grades.csv",
                 dataset: int = 2) -> Optional[float]:
    """
    Compute RMSE between `predictions` and actual grades from `csv_path`.
    - predictions: {ERD_number_str: predicted_grade}
    - dataset: which dataset column to compare against (1 -> dataset1_grade, 2 -> dataset2_grade)

    Returns:
      RMSE as float, or None if there are no matching actual grades to compare.
    """
    actual_map = build_grade_map(csv_path=csv_path, dataset=dataset)
    sq_sum = 0.0
    count = 0

    for erd_key, pred in predictions.items():
        if pred is None:
            continue
        actual = actual_map.get(erd_key)
        if actual is None:
            continue
        try:
            err = float(pred) - float(actual)
        except (TypeError, ValueError):
            continue
        sq_sum += err * err
        count += 1

    if count == 0:
        return None

    mse = sq_sum / count
    return math.sqrt(mse)

In [127]:
root = "."
train_set, test_set = build_train_test_sets(root)

print("Train count:", len(train_set))
print("Test count:", len(test_set))


# assuming train_set, test_set were built with build_train_test_sets and contain parsed JSON dicts
normalize_erds(train_set)               # default: join tokens into normalized strings
normalize_erds(test_set, as_tokens=True) # store token lists for test_set

grade_map = build_grade_map()

grade_estimates = knn_grade_estimates(
    train_set, test_set, grade_map,
    k=4, sigma=0.5,
    alpha=0.3,beta=0.2, gamma=0.5
)

print(grade_estimates)
print(compute_rmse(grade_estimates))

Train count: 100
Test count: 30
{'104': 80.0, '105': 76.3325, '106': 76.0, '107': 79.0, '108': 79.0, '109': 81.6675, '110': 72.33500000000001, '111': 77.6675, '112': 75.3325, '114': 74.6675, '115': 75.6675, '116': 76.3325, '117': 76.3325, '118': 71.6675, '119': 78.6675, '121': 79.6675, '122': 74.3325, '123': 74.0, '124': 74.3325, '125': 71.6675, '127': 71.6675, '128': 77.6675, '129': 79.0, '130': 78.66499999999999, '131': 77.0, '132': 72.33500000000001, '133': 77.33500000000001, '134': 76.6675, '135': 75.6675, '136': 75.3325}
None


In [128]:
root = "."
train_set, test_set = build_train_test_sets(root, dataset1_dir="Dataset2", dataset2_dir="for_testing/Dataset2")

print("Train count:", len(train_set))
print("Test count:", len(test_set))


# assuming train_set, test_set were built with build_train_test_sets and contain parsed JSON dicts
normalize_erds(train_set)               # default: join tokens into normalized strings
normalize_erds(test_set, as_tokens=True) # store token lists for test_set

grade_map_2 = build_grade_map(dataset=2)

grade_estimates_2 = knn_grade_estimates(
    train_set, test_set, grade_map_2,
    k=4, sigma=0.5,
    alpha=0.3,beta=0.2, gamma=0.5
)

print(grade_estimates_2)
print(compute_rmse(grade_estimates_2, dataset=2))

for erd_key, est_grade in grade_estimates.items():
    alt_grade = grade_estimates_2.get(erd_key)
    grade_estimates[erd_key] = [est_grade, alt_grade]
    
# i have modified grade_estimates to have [dataset1_grade dataset2]

print(grade_estimates)

Train count: 101
Test count: 29
{'104': 87.0, '105': 83.0, '106': 83.0, '107': 83.0, '108': 89.0, '109': 81.0, '110': 78.0, '111': 90.0, '112': 90.0, '114': 78.0, '115': 86.0, '116': 83.0, '117': 88.0, '118': 90.0, '119': 86.0, '121': 84.0, '122': 90.0, '123': 87.0, '124': 84.0, '125': 95.0, '128': 83.0, '129': 90.0, '130': 83.0, '131': 83.0, '132': 83.0, '133': 91.0, '134': 81.0, '135': 85.0, '136': 89.0}
None
{'104': [80.0, 87.0], '105': [76.3325, 83.0], '106': [76.0, 83.0], '107': [79.0, 83.0], '108': [79.0, 89.0], '109': [81.6675, 81.0], '110': [72.33500000000001, 78.0], '111': [77.6675, 90.0], '112': [75.3325, 90.0], '114': [74.6675, 78.0], '115': [75.6675, 86.0], '116': [76.3325, 83.0], '117': [76.3325, 88.0], '118': [71.6675, 90.0], '119': [78.6675, 86.0], '121': [79.6675, 84.0], '122': [74.3325, 90.0], '123': [74.0, 87.0], '124': [74.3325, 84.0], '125': [71.6675, 95.0], '127': [71.6675, None], '128': [77.6675, 83.0], '129': [79.0, 90.0], '130': [78.66499999999999, 83.0], '131':

In [129]:
# take the dict grade_estimates and write to CSV
import csv
def write_grade_estimates_to_csv(grade_estimates: Dict[str, float], output_csv_path: str = "a4_custom_graph.csv"):
    """
    Write the grade estimates to a CSV file with columns: ERD_No, Estimated_Grade.
    """
    with open(output_csv_path, mode='w', newline='', encoding='utf-8') as csvfile:
        fieldnames = ['ERD_No', 'dataset1_grade', 'dataset2_grade']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

        writer.writeheader()
        for erd_no, estimated_grade in grade_estimates.items():
            writer.writerow({'ERD_No': erd_no, 'dataset1_grade': estimated_grade[0], 'dataset2_grade': estimated_grade[1]})
    print(f"Grade estimates written to {output_csv_path}")
write_grade_estimates_to_csv(grade_estimates)

Grade estimates written to a4_custom_graph.csv
