# Spreading Activation Algorithm

This is my implementation of a Spreading Activation Algorithm. It was a coding challenge sent to me as part of a job interview, so this follows the specs I received. I spent roughly 4h on it, I think it showcases my current algorithm level. I'm excited for future me to look at it and facepalm :)

In [32]:
class Node:
    def __init__(self, node_id: str, node_type: str, node_weight: float = 1.0) -> None:
        """
        Initialize a Node with its ID, type, and an empty neighbor dictionary.

        Args:
        - node_id (str): Unique identifier for the node.
        - node_type (str): Type of the node, based on the schema.

        """
        self.node_id = node_id
        self.node_type = node_type
        self.node_weight = node_weight
        self.neighbors = {}  # Initialize empty neighbors dictionary

    def add_neighbor(self, neighbor, weight: float = 1.0) -> None:
        """
        Add a neighbor to the current node with an optional weight.

        Args:
        - neighbor (Node): Node object to add as a neighbor.
        - weight (float, optional): Weight of the edge to the neighbor. Defaults to 1.0.

        """
        self.neighbors[neighbor] = weight
    
    def get_neighbors(self) -> dict:
        """
        Return the dictionary of neighbors with their respective weights.

        Returns:
        - dict: Dictionary where keys are neighbor nodes and values are edge weights.

        """
        return self.neighbors


class Graph:
    def __init__(self, schema: list):
        """
        Initialize a Graph with an empty node dictionary and a schema.

        Args:
        - schema (list): List defining the schema or order of node types.

        """
        self.nodes = {}  # Dictionary to store nodes (key = node_id, value = Node object)
        self.schema = schema  # Schema defining the order of node types for activation propagation
    
    def add_node(self, node_id, node_type, node_weight=1):
        """
        Add a node to the graph if it doesn't already exist.

        Args:
        - node_id (str): Unique identifier for the node.
        - node_type (str): Type of the node based on the schema.

        """
        if node_id not in self.nodes:
            self.nodes[node_id] = Node(node_id, node_type, node_weight)
    
    def add_edge(self, node1_id, node2_id, weight=1.0):
        """
        Add an undirected edge between two nodes in the graph.

        Args:
        - node1_id (str): ID of the first node.
        - node2_id (str): ID of the second node.
        - weight (float, optional): Weight of the edge. Defaults to 1.0.

        """
        if node1_id in self.nodes and node2_id in self.nodes:
            self.nodes[node1_id].add_neighbor(self.nodes[node2_id], weight)
            self.nodes[node2_id].add_neighbor(self.nodes[node1_id], weight)


def spreading_activation(graph, start_node_id, decay_factor=0.5, activation_threshold=0.2, max_steps=10, initial_activation=1.0, aspect_self_activation=True):
    """
    Perform spreading activation from a starting node in the graph.

    Args:
    - graph (Graph): Graph object representing the network of nodes.
    - start_node_id (str): ID of the node from which activation starts.
    - decay_factor (float, optional): Factor by which activation decays in each step. Defaults to 0.5.
    - activation_threshold (float, optional): Minimum activation level for propagation. Defaults to 0.1.
    - max_steps (int, optional): Maximum number of steps to propagate activation. Defaults to 10.
    - initial_activation (float, optional): Initial activation level at the starting node. Defaults to 1.0.
    - aspect_self_activation (bool, optional): Whether to allow self-activation at the starting node type. Defaults to True.

    Returns:
    - dict: Dictionary mapping node IDs to their activation levels after spreading activation.

    """
    if start_node_id not in graph.nodes:
        raise ValueError(f"Node ID {start_node_id} not found in graph.")
    
    schema = graph.schema
    activation = {node_id: 0 for node_id in graph.nodes}  # Initialize activation levels for all nodes
    activation[start_node_id] = initial_activation  # Set initial activation at the starting node
    
    queue = [(graph.nodes[start_node_id], initial_activation, 0)]  # Initialize queue with (node, activation_level, step)
    start_node_type = graph.nodes[start_node_id].node_type
    start_node_index = schema.index(start_node_type)

    while queue:
        current_node, current_activation, current_step = queue.pop(0)
        if current_step >= max_steps:
            continue
        
        current_node_type = current_node.node_type
        if current_node_type in schema:
            current_type_index = schema.index(current_node_type)
            next_types = []
            
            # Determine next types based on schema
            if current_step == 0:
                # Initial step: Activate both adjacent node types and same type
                next_types.append(schema[current_type_index - 1])  # Left neighbor
                
                if current_type_index < len(schema) -1: # Right neighbor
                    next_types.append(schema[current_type_index + 1])  
                else:
                    next_types.append(schema[0])  # Wrap around to the first element
                
            else:
                # Subsequent steps: Proceed in one direction only 
                if current_type_index < len(schema) -1 and current_type_index > start_node_index:
                    next_types.append(schema[current_type_index + 1])
                else:
                    if current_type_index > 0: # (no wrap around)
                        next_types.append(schema[current_type_index - 1])

            if aspect_self_activation:
                next_types.append(start_node_type)  # Also consider nodes of the starting type if True

            for neighbor, edge_weight in current_node.get_neighbors().items():
                if neighbor.node_type in next_types and (neighbor.node_id != start_node_id or aspect_self_activation) and current_activation > activation_threshold:
                    spread_activation = current_activation * decay_factor * edge_weight * neighbor.node_weight
                    if spread_activation > activation[neighbor.node_id]:
                        activation[neighbor.node_id] = spread_activation
                        queue.append((neighbor, spread_activation, current_step + 1))
    
    return activation







# Example usage with schema and graph initialization
schema = ["partnership", "feature", "value_prop", "customer_seg", "channel", "market"]

g = Graph(schema)
g.add_node("A", "value_prop")
g.add_node("B", "feature", 3)
g.add_node("C", "customer_seg")
g.add_node("D", "market")
g.add_node("E", "channel")
g.add_node("F", "partnership")
g.add_node("G", "value_prop")

# Adding edges based on the schema (not exhaustive)
g.add_edge("A", "B", 1.0)  # value_prop -> feature
g.add_edge("A", "C", 0.9)  # value_prop -> customer_seg
g.add_edge("B", "D", 0.5)  # feature -> channel
g.add_edge("B", "F", 0.5)  # feature -> partnership
g.add_edge("C", "E", 0.5)  # customer_seg -> channel
g.add_edge("A", "G", 1.1)  # value_prop -> value_prop
g.add_edge("E", "D", 0.2)  # channel -> market

# Perform spreading activation from node "A"
activation_result = spreading_activation(g, "A", aspect_self_activation=False)

# Print activation results
for node_id, activation_level in activation_result.items():
    print(f"Node {node_id}: Activation = {activation_level}")


Node A: Activation = 1.0
Node B: Activation = 1.5
Node C: Activation = 0.45
Node D: Activation = 0
Node E: Activation = 0.1125
Node F: Activation = 0.375
Node G: Activation = 0
