# Recommendation System Project 
AD315 - Project 2
Amy Ramsay 

In [None]:
import csv 
import time 
from typing import List, Any, Optional 

# Preview the content of the CSV
# This is 0(5) but if we were reading the whole file it would be 0(n) where n is number of rows
def preview_csv(path="user_item_data.csv"):
    with open(path, "r") as f:
        reader = csv.reader(f)
        next(reader)  # skip header
        for i in range(5):
            print(next(reader))

preview_csv()


['user6', 'Ghost of Tsushima']
['user4', 'Fallout']
['user6', 'Bloodborne']
['user1', 'League of Legends']
['user5', 'Super Mario Bros']


In [None]:
## HashTable (Base Class)
from typing import Any

class HashTable:
    def __init__(self, size:int, collision_avoidance: str = "separate_chaining"):
        self.size = size
        self.collision_avoidance = collision_avoidance

        # For separate chaining each slot is a list (bucket) of [key, items_list]
        if collision_avoidance == "separate_chaining":
            self.table = [[] for _ in range(size)]
        else: 
            # Double hashing: flat array or entries 
            self.table = [None] * size 

        # Stats
        self.collisions = 0

    def hash(self, key: Any) -> int:
        return hash(key) % self.size
    
    def second_hash(self, key: Any) -> int:
        # For double hashing step size 
        return 1 + (hash(key) % (self.size - 1))
    
    def insert(self, key: Any, item: Any):
        # Insert a (user, item) pair
        # key: user ID (e.g., "user42")
        # item: single item (e.g., "item23")

        # Separate chaining
        # Big O(1) average case 
        if self.collision_avoidance == "separate_chaining":
            index = self.hash(key)
            bucket = self.table[index]

            # Look for existing key in bucket 
            for pair in bucket:
                if pair[0] == key:
                    items_list = pair[1]
                    if item not in items_list:
                        items_list.append(item)
                    return
                
            # Key not found, create new entry
            if bucket: 
                self.collisions += 1
            bucket.append([key, [item]])
            return
        
        # Double hashing
        # Big O(1) average case 
        if self.collision_avoidance == "double_hashing":
            index = self.hash(key)
            step = self.second_hash(key)

            original_index = index
            i = 0

            while True:
                entry = self.table[index]

                # Empty slow --> create new entry
                if entry is None:
                    self.table[index] = (key, [item])
                    return 
                
                # Same key --> append item if new 
                if entry[0] == key:
                    items_list = entry[1]
                    if item not in items_list:
                        items_list.append(item)
                        self.table[index] = (key, items_list)
                    return
                
                # Collision --> probe to next index
                self.collisions += 1
                i += 1
                index = (original_index + i * step) % self.size

                if i >= self.size:
                    raise Exception("Hash table is full")


    def retrieve(self, key: Any):
        # Retrieve items list for given user key
        # Big O(1) average case 
        if self.collision_avoidance == "separate_chaining":
            index = self.hash(key)
            bucket = self.table[index]

            for pair in bucket:
                if pair[0] == key:
                    return pair[1]
            return None
        
        # Big O(1) average case 
        if self.collision_avoidance == "double_hashing":
            index = self.hash(key)
            step = self.second_hash(key)

            original_index = index
            i = 0

            while True:
                entry = self.table[index]

                if entry is None:
                    return None # key not found
                
                if entry[0] == key:
                    return entry[1]
                
                i += 1
                index = (original_index + i * step) % self.size

                if i >= self.size:
                    return None
                

ht_chain = HashTable(10, collision_avoidance="separate_chaining")
ht_chain.insert("user1", "apple")
ht_chain.insert("user1", "banana")
ht_chain.insert("user2", "carrot")

print("SC user1:", ht_chain.retrieve("user1"))
print("SC user2:", ht_chain.retrieve("user2"))
print("SC collisions:", ht_chain.collisions)
print("SC table:", ht_chain.table)

ht_double = HashTable(11, collision_avoidance="double_hashing")
ht_double.insert("user1", "apple")
ht_double.insert("user1", "banana")
ht_double.insert("user2", "carrot")
ht_double.insert("user3", "donut")

print("DH user1:", ht_double.retrieve("user1"))
print("DH user2:", ht_double.retrieve("user2"))
print("DH user3:", ht_double.retrieve("user3"))
print("DH collisions:", ht_double.collisions)
print("DH table:", ht_double.table)

        
    

SC user1: ['apple', 'banana']
SC user2: ['carrot']
SC collisions: 0
SC table: [[], [], [], [['user1', ['apple', 'banana']]], [], [], [], [['user2', ['carrot']]], [], []]
DH user1: ['apple', 'banana']
DH user2: ['carrot']
DH user3: ['donut']
DH collisions: 1
DH table: [None, None, None, ('user2', ['carrot']), None, None, ('user3', ['donut']), None, None, None, ('user1', ['apple', 'banana'])]


In [None]:
## MaxHeap

class MaxHeap:
    def __init__(self):
        # store (priority, item) tuples
        self.data = []
    
    def parent(self, index): 
        return (index - 1) // 2
    
    def left_child(self, index): 
        return 2 * index + 1
    
    def right_child(self, index): 
        return 2 * index + 2
    
    def has_left(self, index): 
        return self.left_child(index) < len(self.data)
    
    def has_right(self, index): 
        return self.right_child(index) < len(self.data)
    
    def swap(self, i, j):
        self.data[i], self.data[j] = self.data[j], self.data[i]
    
    # O(log n) runtime - worst case number of swaps is height of tree 
    def percolate_up(self, index):
        while index > 0:
            parent_index = self.parent(index)
            if self.data[index][0] > self.data[parent_index][0]:
                self.swap(index, parent_index)
                index = parent_index
            else:
                break
    
    # O(log n) runtime - worst case number of swaps is height of tree
    def percolate_down(self, index):
        while self.has_left(index):
            largest_child_index = self.left_child(index)
            if (self.has_right(index) and 
                self.data[self.right_child(index)][0] > self.data[largest_child_index][0]):
                largest_child_index = self.right_child(index)
            
            if self.data[index][0] < self.data[largest_child_index][0]:
                self.swap(index, largest_child_index)
                index = largest_child_index
            else:
                break
    
    def push(self, priority, item):
        self.data.append((priority, item))
        self.percolate_up(len(self.data) - 1)
    
    def pop(self):
        if not self.data:
            return None
        self.swap(0, len(self.data) - 1)
        item = self.data.pop()
        self.percolate_down(0)
        return item
    
    # Copying the heap is O(n)
    # Popping n items is O(n log n)
    def top_n(self, n):
        result = []
        temp_heap = MaxHeap()
        temp_heap.data = self.data.copy()
        
        for _ in range(min(n, len(self.data))):
            result.append(temp_heap.pop())
        
        return result
    
# Test MaxHeap implementation
heap = MaxHeap()

heap.push(0.5, "itemA")
heap.push(0.9, "itemB")
heap.push(0.7, "itemC")
heap.push(0.2, "itemD")

print("Heap internal array:", heap.data)
print("Top 2:", heap.top_n(2))

print("Pop:", heap.pop())
print("Pop:", heap.pop())
print("Remaining heap:", heap.data)


Heap internal array: [(0.9, 'itemB'), (0.5, 'itemA'), (0.7, 'itemC'), (0.2, 'itemD')]
Top 2: [(0.9, 'itemB'), (0.7, 'itemC')]
Pop: (0.9, 'itemB')
Pop: (0.7, 'itemC')
Remaining heap: [(0.5, 'itemA'), (0.2, 'itemD')]


In [None]:
class RecommendationSystem: 
    def __init__(self, user_item_file: str):
        self.user_item_file = user_item_file
        self.user_items: dict[str, set[str]] = {}
        self.user_ids: list[str] = []
        self.hash_tables: dict[str, HashTable] = {}
    
    def load_data(self):
        # Read csv file and populate user_items and user_ids
        with open(self.user_item_file, "r") as f:
            reader = csv.reader(f)
            next(reader)  # skip header
            for row in reader:
                if len(row) < 2:
                    continue
                user, item = row[0], row[1]
                if user not in self.user_items:
                    self.user_items[user] = set()
                self.user_items[user].add(item)

        self.user_ids = list(self.user_items.keys())

    def build_recommendation_system(self, technique: str) -> HashTable:
        # Build hash table based on specified technique
        num_users = len(self.user_ids)
        # make table size 2x number of users for lower load factor
        table_size = max(11, 2 * num_users)
        hash_table = HashTable(size=table_size, collision_avoidance=technique)

        for user, items in self.user_items.items():
            for item in items:
                hash_table.insert(user, item)

        self.hash_tables[technique] = hash_table
        return hash_table
    
    def recommend_items(self, target_user: str, technique: str, top_n: int = 5) -> list[str]:
        # For a given target_user and collision tecnhique: 
        # - Compute Jaccard similartiy to all other users 
        # - Convert user similartiy into item scores 
        # - Push items into MaxHeap by score
        # - Return top n recommended items 

        if target_user not in self.user_items:
            print(f"User '{target_user}' not found in data.")
            return None
        
        target_items = self.user_items[target_user]
        if not target_items:
            return []
        
        # Computer user to user Jaccard similarity
        similarity_by_user: dict[str, float] = {}

        # O(n) where n is number of users
        for other_user in self.user_ids:
            if other_user == target_user: 
                continue
            other_items = self.user_items[other_user]
            if not other_items: 
                continue
            
            intersection_size = len(target_items & other_items)
            union_size = len(target_items | other_items)

            if union_size == 0:
                continue

            sim = intersection_size / union_size

            if sim > 0:
                similarity_by_user[other_user] = sim

        # Convert user similarity into item scores
        # You are looping through users and then all of the user's items so 0(n^2)
        item_scores: dict[str, float] = {}
        for other_user, sim in similarity_by_user.items():
            other_items = self.user_items[other_user]
            for item in other_items:
                if item in target_items:
                    continue
                if item not in item_scores:
                    item_scores[item] = 0.0
                item_scores[item] += sim
        
        if not item_scores:
            return []
        
        # Push items into MaxHeap by score
        heap = MaxHeap()
        for item, score in item_scores.items():
            heap.push(score, item)
            
        # Return top_n recommended items
        top = heap.top_n(top_n)
        # top it a list of (score, item) --> we only want items
        return [item for score, item in top]
    
rs = RecommendationSystem("user_item_data.csv")
rs.load_data()

print("Number of users:", len(rs.user_ids))
print("First few users:", rs.user_ids[:5])

# Build hash table with separate chaining
ht_sc = rs.build_recommendation_system("separate_chaining")
print("HashTable size (SC):", ht_sc.size)
print("SC collisions:", ht_sc.collisions)

example_user = rs.user_ids[0]
print("Example user:", example_user)
print("Items for example user:", rs.user_items[example_user])

recs_sc = rs.recommend_items(example_user, "separate_chaining", top_n=5)
print("Recommendations for", example_user, "(SC):", recs_sc)

ht_dh = rs.build_recommendation_system("double_hashing")
print("HashTable size (DH):", ht_dh.size)
print("DH collisions:", ht_dh.collisions)

recs_dh = rs.recommend_items(example_user, "double_hashing", top_n=5)
print("Recommendations for", example_user, "(DH):", recs_dh)



Number of users: 7
First few users: ['user6', 'user4', 'user1', 'user5', 'user2']
HashTable size (SC): 14
SC collisions: 1
Example user: user6
Items for example user: {'Gears of War', 'Far Cry', 'League of Legends', 'Battlefield', 'Bayonetta', 'Bloodborne', 'The Last of Us', 'Super Mario Bros', 'Sekiro', "Assassin's Creed", 'Ghost of Tsushima', 'Horizon Zero Dawn'}
Recommendations for user6 (SC): ['Red Dead Redemption 2', 'Dragon Age', 'Metal Gear Solid', 'World of Warcraft', 'The Legend of Zelda']
HashTable size (DH): 14
DH collisions: 10
Recommendations for user6 (DH): ['Red Dead Redemption 2', 'Dragon Age', 'Metal Gear Solid', 'World of Warcraft', 'The Legend of Zelda']


In [None]:
def main():
    rs = RecommendationSystem("user_item_data.csv")
    rs.load_data()

    techniques = ["separate_chaining", "double_hashing"]

    for tech in techniques:
        print("="*80)
        print(f"Using collision avoidance technique: {tech}\n")

        # Build HashTable and time it
        start_insert = time.time()
        ht = rs.build_recommendation_system(tech)
        end_insert = time.time()

        insertion_time = end_insert - start_insert
        collisions = ht.collisions

       # Recommend items for each user and time it
        for user in rs.user_ids:
            start_retrieve = time.time()
            recs = rs.recommend_items(user, tech, top_n=5)
            end_retrieve = time.time()
            retrieve_time = end_retrieve - start_retrieve

            print(f"Recommendations for {user}: {recs}")
            print(f"Insertion Time: {insertion_time:.6f} seconds")
            print(f"Retrieval Time: {retrieve_time:.6f} seconds")
            print(f"Collisions: {collisions}\n")

        print("="*80)
        print("\n")

# Run main
main()


Using collision avoidance technique: separate_chaining

Recommendations for user6: ['Red Dead Redemption 2', 'Dragon Age', 'Metal Gear Solid', 'World of Warcraft', 'The Legend of Zelda']
Insertion Time: 0.000044 seconds
Retrieval Time: 0.000108 seconds
Collisions: 1

Recommendations for user4: ['Silent Hill', 'Half-Life', 'Splatoon', 'Dota 2', 'Battlefield']
Insertion Time: 0.000044 seconds
Retrieval Time: 0.000068 seconds
Collisions: 1

Recommendations for user1: ['Battlefield', 'The Last of Us', 'Gears of War', 'Splatoon', 'Bloodborne']
Insertion Time: 0.000044 seconds
Retrieval Time: 0.000054 seconds
Collisions: 1

Recommendations for user5: ['Bayonetta', 'Resident Evil', 'Half-Life', 'Fire Emblem', 'Portal']
Insertion Time: 0.000044 seconds
Retrieval Time: 0.000059 seconds
Collisions: 1

Recommendations for user2: ['Bayonetta', 'Splatoon', 'Bloodborne', 'Gears of War', 'Cyberpunk 2077']
Insertion Time: 0.000044 seconds
Retrieval Time: 0.000053 seconds
Collisions: 1

Recommendations

## Collision Technique Analysis 

In my experiment **separate chaining clearly had fewer collisions than double hashing**:
- Separate Chaining Collisions: **1**
- Double Hashing Collisions: **10**

This result makes sense for this data set and table size. Each bucket can hold multiple keys in a small list. As soon as there's a collision we don't need to search for a new index, we just append to the list. Also the number of users is relatively small and the table is large so the overall number of collisions stays low. 

With double hashing every time two keys hash to the same initial index the algorithm has to jump around the table using the secondary hash function until it finds an empty spot and each of these jumps counts as a collision. These probes add up quickly. 

## Was MaxHeap a good choice? 

Using a MaxHeap for recommendations is reasonable in this project. It allows you to take the top five values numerically without having to sort the list every time. 

In this small of a data set it would have worked to just sort the `item_scores` dictionary and take the first N items, which would also work. However the MaxHeap implementation will scale much better. 