add relationships

In [None]:
import json

# Load merged matched nodes
with open('/content/final_matched_nodes.json') as matched_nodes_file:
    matched_nodes_data = json.load(matched_nodes_file)

# Load environment descriptions
with open('/content/cleaned_environment_descriptions.json') as env_desc_file:
    environment_description_data = json.load(env_desc_file)

# Actions to consider for new nodes
valid_actions = {"On the left", "On the right"}

# Process each instance
for instance in matched_nodes_data:
    instance_id = instance["Instance_id"]

    # Find corresponding environment description
    env_instance = next((e for e in environment_description_data if e["Instance_id"] == instance_id), None)
    if not env_instance:
        continue

    env_nodes = env_instance.get("Node Description", {})
    env_actions = env_instance.get("actions", [])

    # Collect all matched environment node IDs
    matched_env_ids = {match["Environment_Node"]["ID"] for match in instance["Matches"]}

    # Find new environment nodes connected via valid actions

        # If the 'from' node is in matched environment nodes and the action is valid, add 'to' node

    # Add new nodes to the instance


# Save the updated matched nodes with new environment nodes
output_file_path = "/content/updated_matched_nodes.json"
with open(output_file_path, "w") as output_file:
    json.dump(matched_nodes_data, output_file, indent=4)

print(f"Updated matched nodes saved to: {output_file_path}")


Updated matched nodes saved to: /content/updated_matched_nodes.json


In [None]:
import json

# Load updated matched nodes (with new environment nodes)
with open('/content/updated_matched_nodes.json') as matched_nodes_file:
    matched_nodes_data = json.load(matched_nodes_file)

# Process each instance to update Action_From references
for instance in matched_nodes_data:
    instance_id = instance["Instance_id"]

    # Step 1: Create a mapping from Environment Node ID → Cognitive Node ID
    env_to_cog_id_map = {
        match["Environment_Node"]["ID"]: match["Cognitive_Node"]["ID"]
        for match in instance["Matches"]
    }

    # Step 2: Update the "Action_From" field in "New_Environment_Nodes"
    for new_env_node in instance.get("New_Environment_Nodes", []):
        env_node = new_env_node["Environment_Node"]
        action_from = env_node["Action_From"]

        # Replace "Action_From" with its corresponding Cognitive Node ID
        if action_from in env_to_cog_id_map:
            env_node["Action_From"] = env_to_cog_id_map[action_from]

# Save the updated matched nodes
output_file_path = "/content/updated_nodes_id.json"
with open(output_file_path, "w") as output_file:
    json.dump(matched_nodes_data, output_file, indent=4)

print(f"Final updated matched nodes saved to: {output_file_path}")


Final updated matched nodes saved to: /content/updated_nodes_id.json


Organice

In [None]:
import json

# Load cognitive map
with open('/content/dev_graph_lmm.json') as cognitive_map_file:
    cognitive_map_data = json.load(cognitive_map_file)

# Load updated matched nodes (with new environment nodes)
with open('/content/updated_nodes_id.json') as matched_nodes_file:
    matched_nodes_data = json.load(matched_nodes_file)

# Convert matched data to a dictionary for easy lookup
matched_lookup = {match["Instance_id"]: match for match in matched_nodes_data}

# Process each cognitive instance
updated_cognitive_map = []

for cognitive_instance in cognitive_map_data:
    instance_id = cognitive_instance["Instance_id"]

    # Get matched data for this instance
    matched_data = matched_lookup.get(instance_id, {})

    if not matched_data:
        updated_cognitive_map.append(cognitive_instance)  # No changes, add as-is
        continue

    # Extract cognitive nodes and edges
    cognitive_nodes = cognitive_instance["Graph"]["Nodes"]
    cognitive_edges = cognitive_instance["Graph"]["Edges"]

    # Step 1: Update existing cognitive nodes with matched environment descriptions
    for match in matched_data.get("Matches", []):
        cog_id = match["Cognitive_Node"]["ID"]
        env_desc = match["Environment_Node"]["Description"]
        match_type = match["Match_Type"]

        if "Semantic Match" in match_type:
            score = match_type.split(":")[-1].strip(")").strip()
            cognitive_nodes[cog_id] += f"; Matched Scene: {env_desc} (Semantic Match: {score})"
        else:
            cognitive_nodes[cog_id] += f"; Matched Scene: {env_desc}"

    # Step 2: Add newly discovered environment nodes as cognitive nodes
    for new_node in matched_data.get("New_Environment_Nodes", []):
        env_id = new_node["Environment_Node"]["ID"]
        env_desc = new_node["Environment_Node"]["Description"]
        action_from = new_node["Environment_Node"]["Action_From"]
        action_type = new_node["Environment_Node"]["Action_Type"]

        if env_id not in cognitive_nodes:  # Avoid duplicate additions
            cognitive_nodes[env_id] = f"{env_desc} (New Context Node)"
            cognitive_edges.append({"from": action_from, "to": env_id, "action": action_type})

    # Save updated cognitive instance
    updated_cognitive_map.append({
        "Instance_id": instance_id,
        "Navigation_instruction": cognitive_instance["Navigation_instruction"],
        "Graph": {
            "Nodes": cognitive_nodes,
            "Edges": cognitive_edges
        }
    })

# Save the updated cognitive map
output_file_path = "/content/organiced_cognitive_map.json"
with open(output_file_path, "w") as output_file:
    json.dump(updated_cognitive_map, output_file, indent=4)

print(f"Updated cognitive map saved to: {output_file_path}")


Updated cognitive map saved to: /content/organiced_cognitive_map.json


In [None]:
import json

# Load updated cognitive map
with open('/content/organiced_cognitive_map.json') as cognitive_map_file:
    cognitive_map_data = json.load(cognitive_map_file)

# Process each instance
for cognitive_instance in cognitive_map_data:
    instance_id = cognitive_instance["Instance_id"]
    cognitive_nodes = cognitive_instance["Graph"]["Nodes"]
    cognitive_edges = cognitive_instance["Graph"]["Edges"]

    # Create a mapping of old node order to new order
    new_node_order = {old_id: str(idx + 1) for idx, old_id in enumerate(sorted(cognitive_nodes.keys(), key=int))}

    # Step 1: Rename nodes to their new ordered IDs
    updated_nodes = {new_node_order[old_id]: desc for old_id, desc in cognitive_nodes.items()}

    # Step 2: Update edge `from` and `to` references
    updated_edges = []
    for edge in cognitive_edges:
        from_id = edge["from"]
        to_id = edge["to"]

        updated_edges.append({
            "from": new_node_order.get(str(from_id), str(from_id)),  # Update 'from'
            "to": new_node_order.get(str(to_id), str(to_id)),        # Update 'to'
            "action": edge["action"]
        })

    # Save updated cognitive instance
    cognitive_instance["Graph"]["Nodes"] = updated_nodes
    cognitive_instance["Graph"]["Edges"] = updated_edges

# Save the final cognitive map with updated edge references
output_file_path = "/content/final_cognitive_map_sbert.json"
with open(output_file_path, "w") as output_file:
    json.dump(cognitive_map_data, output_file, indent=4)

print(f"Final cognitive map saved to: {output_file_path}")


Final cognitive map saved to: /content/final_cognitive_map_sbert.json


DATA *CLEANING*

In [None]:
from os import replace
import json
import re  # Import regex module

# Load final cognitive map
with open("/content/final_cognitive_map_sbert.json", "r") as f:
    cognitive_map_data = json.load(f)

# Define standardization rules (common replacements)
phrases_to_remove = [
    " (New Context Node)",
    "- Intersection type:",
    "Intersections type",
    "Intersection type",
    "Intersection type: ",
    "There is not environment description",
    ", environment descriptions:",
    "- Specific stores restaurant:",
    "Specific",
    "\u2764\ufe0f ",
    ": \u70e4\u8089\u5e97",
    "with ",
    "- :",
    "(",
    ")",
    " s,",
    "on the windows",
    "outdoor dining",
    " and",

]

# Regular expression to remove "(Semantic Match: n)"
semantic_match_pattern = r"\Semantic Match: \d+(\.\d+)?"

# Remove unnecessary expressions and format correctly
def clean_description(description):
    desc_cleaned = description.strip()  # Remove leading/trailing spaces

    # Remove predefined phrases
    for phrase in phrases_to_remove:
        desc_cleaned = desc_cleaned.replace(phrase, "").strip()
        # Replace "traffic light" with "intersection"
        desc_cleaned = desc_cleaned.replace("STOP", "Stop-sign").replace("intersection", "Intersection").replace("; Matched Scene:",",")\
                                    .replace(";",",").replace(", ",",").replace(",",", ").replace("(Intersection with crosswalk)" , "crosswalk")\
                                    .replace("(4-way Intersection)" , "4-way Intersection").replace("  "," ")\
                                    .replace("(T-Intersection)" , "T-Intersection").replace("Intersections" , "Intersection")\
                                    .replace("Intersection crosswalk" , "Intersection, crosswalk").replace("stops","stop")\
                                    .replace("Intersection 4-way Intersection" , "Intersection, 4-way Intersection")\
                                    .replace("on buildings", "Buildings").replace(" (Target)", ", Target")\
                                    .replace("PARKING", "parking").replace("on the wall", "sign on the Wall")\
                                    .replace("on awnings", "sign on Awnings").replace("Restaurant", "restaurant")\
                                    .replace("on building windows","sign on Building windows ").replace("TD,","TD Bank,")\
                                    .replace("TD bank","TD Bank").replace("Chase Bank","Chase").replace("Chase bank","Chase")\
                                    .replace("Chase","Chase Bank").replace("Target, Target,","Target").replace("Laight","Light")\
                                    .replace("Starting Point, Starting Point,","Starting Point").replace("LAUNDROMAT", "Laundromat")\
                                    .replace("Bike Rental, Bike Rental","Bike Rental").strip()


    # Remove semantic match using regex
    desc_cleaned = re.sub(semantic_match_pattern, "", desc_cleaned).strip()

    return desc_cleaned  # Keep original capitalization

# Apply the cleaning function to all nodes
for instance in cognitive_map_data:
    nodes = instance["Graph"]["Nodes"]
    for node_id in nodes:
        cleaned_description = clean_description(nodes[node_id])
        node_description = cleaned_description.title()
        # Apply the formatted description back to the node
        nodes[node_id] = node_description

# Save the refined cognitive map
output_path = "/content/standardized_cognitive_map.json"
with open(output_path, "w") as f:
    json.dump(cognitive_map_data, f, indent=4)

print(f"Updated cognitive map saved to: {output_path}")


Updated cognitive map saved to: /content/standardized_cognitive_map.json


In [None]:
import json

# Load final cognitive map
file_path = "/content/standardized_cognitive_map.json"
with open(file_path, "r") as f:
    cognitive_map_data = json.load(f)

# Define intersection priority (higher index = lower priority)
intersection_priority = [
    "T-Intersection",
    "4-Way Intersection",
    "Intersection",
    "Traffic Light"

]

# Function to refine description while keeping landscape details
def refine_description(desc):
    parts = [part.strip() for part in desc.split(",")]  # Split descriptions by commas and remove extra spaces
    highest_priority = None
    remaining_landscape = []

    # Identify highest-priority intersection type and preserve other landscape details
    for part in parts:
        if part in intersection_priority:
            if highest_priority is None or intersection_priority.index(part) < intersection_priority.index(highest_priority):
                highest_priority = part  # Assign the highest-priority intersection type
        else:
            remaining_landscape.append(part)  # Keep landscape elements

    # If no intersection type is found, return the original description unchanged
    if highest_priority is None:
        return desc

    # Reconstruct the final description with priority intersection and landscape elements
    final_description = ", ".join(filter(None, [highest_priority] + remaining_landscape)).strip(", ")
    return final_description

# Process each instance
for instance in cognitive_map_data:
    nodes = instance["Graph"]["Nodes"]

    # Update node descriptions while keeping landscape details
    for node_id, desc in nodes.items():
        nodes[node_id] = refine_description(desc)

# Save the refined cognitive map
output_path = "/content/final_refined_intersections.json"
with open(output_path, "w") as f:
    json.dump(cognitive_map_data, f, indent=4)

print(f"Updated cognitive map saved to: {output_path}")


Updated cognitive map saved to: /content/final_refined_intersections.json


In [None]:
import json

# Load the cognitive map data
file_path = "/content/final_refined_intersections.json"
with open(file_path, "r") as f:
    cognitive_map_data = json.load(f)

# Function to remove duplicate words/phrases while maintaining order
def remove_duplicates(description):
    words = description.split(", ")
    seen = set()
    unique_words = [word for word in words if not (word in seen or seen.add(word))]
    return ", ".join(unique_words)

# Organize nodes based on edges grouping by 'from' node
def organize_nodes(instance):
    edges = instance["Graph"]["Edges"]
    nodes = instance["Graph"]["Nodes"]

    # Group edges by 'from' node
    grouped_edges = {}
    for edge in edges:
        from_node = edge["from"]
        if from_node not in grouped_edges:
            grouped_edges[from_node] = []
        grouped_edges[from_node].append(edge)

    # Reconstruct ordered nodes dictionary with sequential numbering
    ordered_nodes = {}
    sequence_mapping = {}
    sequence = 1
    visited_nodes = set()

    for from_node in sorted(nodes.keys(), key=int):
        if from_node not in visited_nodes:
            sequence_mapping[from_node] = sequence
            ordered_nodes[sequence] = remove_duplicates(nodes[from_node])
            visited_nodes.add(from_node)
            sequence += 1

        if from_node in grouped_edges:
            for edge in grouped_edges[from_node]:
                to_node = edge["to"]
                if to_node not in visited_nodes and to_node in nodes:
                    sequence_mapping[to_node] = sequence
                    ordered_nodes[sequence] = remove_duplicates(nodes[to_node])
                    visited_nodes.add(to_node)
                    sequence += 1

    # Update edges with new sequence numbers and group by 'from'
    updated_edges = {}
    for edge in edges:
        new_from = sequence_mapping.get(edge["from"], edge["from"])
        new_to = sequence_mapping.get(edge["to"], edge["to"])
        if new_from not in updated_edges:
            updated_edges[new_from] = []
        updated_edges[new_from].append({"from": new_from, "to": new_to, "action": edge["action"]})

    instance["Graph"]["Nodes"] = ordered_nodes
    instance["Graph"]["Edges"] = [edge for edges_list in updated_edges.values() for edge in edges_list]

# Process each instance and clean node descriptions
for instance in cognitive_map_data:
    organize_nodes(instance)

# Save the cleaned cognitive map
output_path = "/content/cleaned_cognitive_map_cost.json"
with open(output_path, "w") as f:
    json.dump(cognitive_map_data, f, indent=4)

print(f"Updated cognitive map saved to: {output_path}")


Updated cognitive map saved to: /content/cleaned_cognitive_map_cost.json
