In [44]:
# define some helper functions to make our main event loop more readable
import json
from string import ascii_uppercase
import math
import re
# define an ActivityPlan class that will allow us to create an activity plan in a stateful way
import json
from string import ascii_uppercase
import pandas as pd

'''
Helper Functions
'''

# Helper Functions
def objName(event):
    return event["ObjectDetails"]["Name"]

def parentObjName(event):
    match = re.match(r'(.+?)(?:_\d+)?$', event["ObjectDetails"]["Name"])
    return match.group(1) if match else event["ObjectDetails"]["Name"]

def objPos(event):
    return event["ObjectDetails"]["Position"]

def stringToTuple(string):
    return tuple(float(x) for x in string[1:-1].split(','))

def moveDistance(start, end):
    return math.dist(stringToTuple(objPos(start)), stringToTuple(objPos(end)))

def parse_position(pos_str):
    """Parse a position string like '(12.5, 4.3, 7.8)' into a tuple of floats (12.5, 4.3, 7.8)."""
    pos_str = pos_str.strip("()")
    coordinates = pos_str.split(",")

    if len(coordinates) != 3:
        raise ValueError("Position format is invalid. Expected three coordinates (X, Y, Z).")
    
    # Parse all three coordinates for a 3D position
    x, y, z = map(float, coordinates)
    return (x, y, z)

def is_close(pos1, pos2, threshold=1.0):
    # Parse position strings if necessary
    if isinstance(pos1, str):
        pos1 = parse_position(pos1)
    if isinstance(pos2, str):
        pos2 = parse_position(pos2)

    # Calculate Euclidean distance for 3D positions
    if isinstance(pos1, tuple) and isinstance(pos2, tuple):
        distance = math.sqrt((pos1[0] - pos2[0]) ** 2 + (pos1[1] - pos2[1]) ** 2 + (pos1[2] - pos2[2]) ** 2)
        return distance < threshold
    else:
        raise ValueError("Position format is not recognized. Expected tuple or coordinate string.")


# Task Deduction Algorithm

In [48]:
class TaskPlan:
    def __init__(self, path):
        self.tasks = []  # Stores all tasks in the plan
        self.counter = 1  # Unique identifier counter for each task
        self.currentAdd1, self.currentAdd2 = None, None  # Temp storage for slice events
        self.currentGrab, self.currentRelease = None, None  # Temp storage for move events
        self.currentFixed = {}  # Stores bracing tasks with IDs keyed by object name
        self.loadEvents(path)
        self.parseEvents()

    def resetSlice(self):
        """Resets temporary variables used for slice actions."""
        self.currentAdd1, self.currentAdd2 = None, None

    def resetMove(self):
        """Resets temporary variables used for move actions."""
        self.currentGrab, self.currentRelease = None, None

    def loadEvents(self, path):
        """Loads events from a JSON file and assigns an event number to each."""
        with open(path, 'r') as f:
            self.events = json.load(f)
        
        # Assign a unique eventNumber to each event
        for i, event in enumerate(self.events, start=1):
            event["eventNumber"] = i

    def parseEvents(self):
        """Processes each event to determine and create tasks."""
        for event in self.events:
            event_type = event["EventType"]

            if event_type == "Slice Added":
                self.handleSliceEvent(event)

            elif event_type in ["GameObject Picked Up", "GameObject Let Go"]:
                self.handleMoveEvent(event)

            elif event_type == "Temporary Support Added":
                self.addTask(self.computeSupport(event))

            elif event_type in ["GameObject Temporarily Fixed", "GameObject Unfixed"]:
                self.handleBracingEvent(event)

    def handleSliceEvent(self, event):
        """Handles slice-related events."""
        obj_name = objName(event)

        # Check whether the slice is for part 1 or part 2 and store it
        if obj_name.endswith("_0"):
            self.currentAdd1 = event
        elif obj_name.endswith("_1"):
            self.currentAdd2 = event

        # Attempt to create a slice task if both parts are present
        if self.currentAdd1 and self.currentAdd2:
            self.addTask(self.computeSlice(self.currentAdd1, self.currentAdd2))
            self.resetSlice()

    def handleMoveEvent(self, event):
        """Handles move-related events."""
        obj_name = objName(event)
        event_type = event["EventType"]

        if event_type == "GameObject Picked Up":
            self.resetSlice()
            self.resetMove()
            self.currentGrab = event

        elif event_type == "GameObject Let Go":
            self.resetSlice()
            self.currentRelease = event
            if self.currentGrab and obj_name == objName(self.currentGrab):
                self.addTask(self.computeMove(self.currentGrab, self.currentRelease))

    def handleBracingEvent(self, event):
        """Handles bracing-related events."""
        obj_name = objName(event)
        event_type = event["EventType"]

        if event_type == "GameObject Temporarily Fixed":
            bracing_task = self.computeBracing(event, None)
            self.addTask(bracing_task)
            self.currentFixed[obj_name] = bracing_task

        elif event_type == "GameObject Unfixed":
            if obj_name in self.currentFixed:
                bracing_task = self.currentFixed.pop(obj_name)
                bracing_task.update(self.computeBracing(None, event))

    def computeBracing(self, start, end):
        """Creates or updates a bracing task."""
        if start:
            return {
                "_id": str(self.counter),
                "type": "bracing",
                "object": objName(start),
                "dependentObjects": [objName(start)],
                "startEvent": start,
                "startPosition": objPos(start),
                "startEventNumber": start.get("eventNumber"),
                "description": f"Brace '{objName(start)}'",
                "duration": 30,
                "resources": "Crane, Crane Driver, Worker"
            }
        elif end:
            return {
                "endEvent": end,
                "endPosition": objPos(end),
                "endEventNumber": end.get("eventNumber")
            }

    def computeMove(self, start, end):
        """Creates a move task with calculated attributes."""
        return {
            "type": "move",
            "object": objName(start),
            "dependentObjects": [objName(start)],
            "startEvent": start,
            "endEvent": end,
            "startPosition": objPos(start),
            "endPosition": objPos(end),
            "moveDistance": moveDistance(start, end),
            "startEventNumber": start.get("eventNumber"),
            "endEventNumber": end.get("eventNumber"),
            "description": f"Move '{objName(start)}'",
            "duration": round(moveDistance(start, end) * 5, 1),
            "resources": "Forklift, Forklift Driver, Worker"
        }

    def computeSlice(self, create1, create2):
        """Creates a slice task with calculated attributes."""
        parentObj = parentObjName(create1)
        return {
            "type": "slice",
            "object": parentObj,
            "dependentObjects": [objName(create1), objName(create2)],
            "createEvent1": create1,
            "createEvent2": create2,
            "startPosition": objPos(create1),
            "endPosition": objPos(create1),
            "eventNumber": create1.get("eventNumber"),
            "description": f"Slice '{parentObj}'",
            "duration": 120,
            "resources": "Slicer"
        }

    def computeSupport(self, event):
        """Creates a support task with calculated attributes."""
        return {
            "type": "support",
            "dependentObjects": [],
            "startEvent": event,
            "startPosition": objPos(event),
            "endPosition": objPos(event),
            "eventNumber": event.get("eventNumber"),
            "description": "Create a temporary support",
            "duration": 30,
            "resources": "Manlift, Manlift Driver"
        }

    def addTask(self, task):
        """Adds a task to the plan with a unique ID."""
        task["_id"] = str(self.counter)
        self.tasks.append(task)
        self.counter += 1




# Precedence Relationship Algorithm

In [49]:
class DependencyManager:
    def __init__(self, activities):
        self.activities = activities

    def computeDependencies(self, new_activity):
        # Only compute dependencies for 'move' or 'slice' types
        if new_activity.get("type") not in ["move", "slice"]:
            return [None]

        dependencies = []
        new_id = int(new_activity["_id"])  # Convert new_activity's ID to an integer for comparison

        # Additional check: If `new_activity` is a move action, check bracing dependencies
        if new_activity["type"] == "move" and new_activity.get("startEventNumber"):
            move_start_event = new_activity["startEventNumber"]
            
            for brace_activity in self.activities:
                # Only consider "bracing" type activities with defined start and end event numbers
                if (brace_activity["type"] == "bracing" and
                    brace_activity.get("startEventNumber") and 
                    brace_activity.get("endEventNumber")):
                    
                    # Check if move's start event falls within the bracing activity's range
                    if brace_activity["startEventNumber"] <= move_start_event <= brace_activity["endEventNumber"]:
                        dependencies.append(brace_activity["_id"])

        # Loop through each prior activity with a lower ID than the new activity
        for previous_activity in self.activities:
            previous_id = int(previous_activity["_id"])  # Convert to integer for comparison

            # Only consider previous activities with a lower ID
            if previous_id < new_id:
                
                # Dependency Condition 1: Object Matching
                if new_activity["object"] in previous_activity.get("dependentObjects", []):
                    dependencies.append(previous_activity["_id"])

                # Dependency Condition 2: Proximity to "support" activities within 1 meter
                elif previous_activity["type"] == "support":
                    start_pos = new_activity.get("startPosition")
                    support_pos = previous_activity.get("endPosition", previous_activity.get("startPosition"))

                    if start_pos and support_pos and is_close(start_pos, support_pos, 2.0):
                        dependencies.append(previous_activity["_id"])

        # Remove duplicate dependencies
        dependencies = list(set(dependencies))

        # If no dependencies were found, assign None
        if not dependencies:
            dependencies = [None]

        return dependencies


# Run Algorithms and Export .xmls file

In [50]:
# Initialize ActivityPlan without dependency calculation
myPlan = ActivityPlan('./LogFile.json')

# Initialize DependencyManager with activities from ActivityPlan
dependency_manager = DependencyManager(myPlan.activities)

# Compute dependencies for each activity of type 'move' or 'slice'
for activity in myPlan.activities:
    activity["dependencies"] = dependency_manager.computeDependencies(activity)


# Prepare data for the first sheet: activity details
activities_data = []

for activity in myPlan.activities:
    duration_seconds = activity["duration"]
    dependencies = activity["dependencies"]
    
    activities_data.append({
        "ID": str(int(activity["_id"])),  # Adjust index to start from 1
        "Duration": duration_seconds,
        "Predecessors": (
            ", ".join(activity["dependencies"]) if activity["dependencies"] != [None] else ""
        ),  # Format dependencies
        "Task": activity["description"],
        "Resource Name": activity["resources"]
    })

# Convert the activity data to a DataFrame
df_activities = pd.DataFrame(activities_data)

# Define resource costs
resource_costs = {
    "Forklift": 50,
    "Forklift Driver": 30,
    "Worker": 20,
    "Slicer": 100,
    "Crane": 75,
    "Crane Driver": 35,
    "Manlift": 40,
    "Manlift Driver": 25
}

# Prepare data for the second sheet: resource details
resource_data = []

# Add each resource and its cost to the data
for resource, cost in resource_costs.items():
    resource_data.append({
        "Resource Name": resource,
        "Cost": cost
    })

# Convert the resource data to a DataFrame
df_resources = pd.DataFrame(resource_data)

# Write both sheets to an Excel file
with pd.ExcelWriter('activities_plan_with_resources.xlsx') as writer:
    df_activities.to_excel(writer, sheet_name='Activities', index=False)
    df_resources.to_excel(writer, sheet_name='Resources', index=False)
