Serial Schedule Generation Scheme (SGS)

The solver iteratively schedules jobs that are ready (all predecessors completed). For each ready job, it determines the earliest feasible start time by considering:

* Precedence Constraints: Must start after all predecessors finish.
* Resource Constraints: Must start when sufficient resources are available for its entire duration.

The algorithm maintains a timeline to track resource usage over time and incrementally finds the first available time slot that satisfies all constraints.

Heuristics Employed:

* Job Prioritization: When multiple jobs are ready, the solver prioritizes jobs based on their Critical Path length (longest path to the project end). Jobs with longer critical paths are scheduled first. This is implemented by sorting the unscheduled list based on the pre-calculated memo_path values in descending order.
* Earliest Feasible Start Time: The solver searches for the absolute earliest time a job can begin by incrementally checking time steps (t) starting from its earliest possible start time (e_start) until resource availability is confirmed for the job's entire duration.
* Mode Selection: If a job has multiple operational modes, the solver defaults to using the mode specified by job.get('mode', 1) or the first mode (job['modes'][0]) if mode is not explicitly set.

In [1]:
import json
import time
from typing import Dict, Any

def solve_project(json_data: Any) -> Dict:
    start_perf = time.perf_counter()
    
    if isinstance(json_data, str):
        import json
        data = json.loads(json_data)
    else:
        data = json_data

    # 1. Parse Resources - Robust String Conversion
    resources_raw = data.get('resources', [])
    # We force all IDs to strings to prevent "1" vs 1 mismatches
    resource_capacities = {
        str(r.get('id')): r.get('capacity', 0) for r in resources_raw
    }

    # 2. Parse Jobs
    jobs_raw = data.get('jobs', [])
    job_map = {j['id']: j for j in jobs_raw}
    
    # 3. Pre-calculate Predecessors
    predecessors = {jid: [] for jid in job_map}
    for jid, job in job_map.items():
        for succ_id in job.get('successors', []):
            if succ_id in predecessors:
                predecessors[succ_id].append(jid)

    # 4. Priority Rule (Critical Path)
    memo_path = {}
    def get_path_length(jid):
        if jid in memo_path: return memo_path[jid]
        job = job_map[jid]
        mode_idx = (job.get('mode', 1) - 1)
        mode_data = job['modes'][mode_idx] if mode_idx < len(job['modes']) else job['modes'][0]
        duration = mode_data.get('duration', 0)
        valid_succs = [sid for sid in job.get('successors', []) if sid in job_map]
        path_len = duration + (max(get_path_length(sid) for sid in valid_succs) if valid_succs else 0)
        memo_path[jid] = path_len
        return path_len

    for jid in job_map:
        get_path_length(jid)

    # 5. Scheduling Logic
    scheduled_starts = {}
    finished_at = {}
    resource_usage = {} 
    unscheduled = sorted(list(job_map.keys()), key=lambda x: memo_path.get(x, 0), reverse=True)

    # Safety limit to prevent infinite loops
    max_horizon = sum(memo_path.values()) + 1000 

    while unscheduled:
        made_progress = False
        for jid in unscheduled[:]:
            preds = predecessors[jid]
            if all(p_id in scheduled_starts for p_id in preds):
                
                e_start = max([finished_at[p_id] for p_id in preds]) if preds else 0
                job = job_map[jid]
                mode_idx = (job.get('mode', 1) - 1)
                mode_data = job['modes'][mode_idx] if mode_idx < len(job['modes']) else job['modes'][0]
                duration = mode_data.get('duration', 0)
                
                # Normalize activity demands: convert keys to strings
                demands = {str(k): v for k, v in mode_data.get('resources_required', {}).items()}

                # Validation: Check if resource exists and has enough capacity
                for res_id, req_amount in demands.items():
                    if res_id not in resource_capacities:
                        raise ValueError(f"Data Error: Activity {jid} requires resource '{res_id}', "
                                         f"but '{res_id}' is not defined in the resources list.")
                    if req_amount > resource_capacities[res_id]:
                        raise ValueError(f"Capacity Error: Activity {jid} requires {req_amount} of resource '{res_id}', "
                                         f"but max capacity is only {resource_capacities[res_id]}.")

                # Find time slot
                t = e_start
                found_slot = False
                while t < max_horizon:
                    is_feasible = True
                    if duration > 0:
                        for check_t in range(t, t + duration):
                            for res_id, req_amount in demands.items():
                                used = resource_usage.get(check_t, {}).get(res_id, 0)
                                if used + req_amount > resource_capacities[res_id]:
                                    is_feasible = False
                                    break
                            if not is_feasible: break
                    
                    if is_feasible:
                        scheduled_starts[jid] = t
                        finished_at[jid] = t + duration
                        for alloc_t in range(t, t + duration):
                            if alloc_t not in resource_usage:
                                resource_usage[alloc_t] = {}
                            for res_id, req_amount in demands.items():
                                resource_usage[alloc_t][res_id] = resource_usage[alloc_t].get(res_id, 0) + req_amount
                        
                        unscheduled.remove(jid)
                        made_progress = True
                        found_slot = True
                        break
                    t += 1
                
                if not found_slot:
                    raise RuntimeError(f"Scheduling Error: No time slot found for {jid}.")
                if made_progress: break
        
        if not made_progress and unscheduled:
            raise RuntimeError("Logic Error: Circular dependency detected in project jobs.")

    end_perf = time.perf_counter()
    return {
        "schedule": scheduled_starts,
        "metrics": {
            "makespan": max(finished_at.values()) if finished_at else 0,
            "execution_time": f"{(end_perf - start_perf)*1000:.4f}ms"
        }
    }

In [2]:
with open("C:/Users/Admin/Desktop/PSPLIB_j301_1.json", 'r') as f:
    dataset = json.load(f)  

result = solve_project(dataset)
print(json.dumps(result, indent=2))

{
  "schedule": {
    "1": 0,
    "3": 0,
    "4": 0,
    "8": 4,
    "2": 4,
    "10": 6,
    "13": 4,
    "9": 10,
    "12": 13,
    "16": 13,
    "11": 12,
    "14": 15,
    "17": 23,
    "18": 10,
    "7": 4,
    "5": 12,
    "15": 12,
    "20": 21,
    "22": 29,
    "27": 15,
    "6": 29,
    "19": 18,
    "26": 21,
    "21": 37,
    "23": 36,
    "29": 28,
    "24": 38,
    "25": 28,
    "28": 41,
    "30": 44,
    "31": 44,
    "32": 46
  },
  "metrics": {
    "makespan": 46,
    "execution_time": "0.4389ms"
  }
}
