# Graph Algorithms Cheat Sheet (Engineering-Grade)

## 0) Graph Theory Introduction

### Core objects
- **Graph** `G = (V, E)`
  - `|V| = n` vertices, `|E| = m` edges
- **Directed** vs **Undirected**
- **Weighted** vs **Unweighted**
- **Simple** graph vs **Multigraph** (parallel edges) vs **Self-loops**
- **Path**: sequence of edges; **simple path**: no repeated vertices
- **Cycle**: path that starts/ends same vertex
- **Connected components** (undirected) / **SCCs** (directed)
- **Degree**
  - Undirected: `deg(v)`
  - Directed: `outdeg(v)`, `indeg(v)`

### Representations (pick for asymptotics + cache locality)
- **Adjacency List**: best default for sparse graphs (`m ~ O(n)`)
  - Iteration over neighbors is `O(deg(v))`
  - Total traversal `O(n + m)`
- **Edge List**: best for algorithms scanning all edges repeatedly (Bellman-Ford, Kruskal)
  - Sequential memory access (good cache locality)
- **Adjacency Matrix**: `O(n^2)` memory; only for dense graphs or when `n` is small and constant-bounded
- **CSR (Compressed Sparse Row)**: adjacency list with contiguous neighbor arrays
  - Best cache locality; avoids pointer chasing in hot paths

### Engineering invariants
- Vertex IDs normalized to `[0, n-1]`
- Input sanity:
  - Bounds-check endpoints
  - Verify no unbounded reads; cap `n, m` per memory budget
  - Use checked arithmetic for `m` sizes, distance sums, `n*n` in matrices
- Determinism:
  - If output depends on traversal order, sort adjacency or preserve input order consistently

---

## 1) Problems in Graph Theory (Pattern → Tool)

| Problem Type | Typical Constraint | Tool |
|---|---:|---|
| Reachability / components | unweighted | DFS/BFS |
| Shortest path (unweighted) | edges weight = 1 | BFS |
| Shortest path (non-negative weights) | `w >= 0` | Dijkstra |
| Shortest path (negative edges) | no negative cycles | Bellman-Ford |
| All-pairs shortest path | `n <= ~500` typical | Floyd-Warshall |
| Topological order | DAG | Kahn / DFS topo |
| Longest path | DAG only (general is NP-hard) | DP on topo |
| Bridges / articulation points | undirected | Tarjan low-link |
| SCCs | directed | Tarjan SCC / Kosaraju |
| Eulerian path/circuit | degree constraints | Hierholzer |
| MST | undirected, connected | Prim / Kruskal |
| Max flow / matching | capacities | Dinic / Edmonds-Karp |
| TSP | `n <= ~20..25` for DP | Bitmask DP |

---

## 2) Depth First Search (DFS)

### Use when
- Reachability, connected components
- Cycle detection (directed/undirected variants)
- Topological sort (via postorder)
- Bridges / articulation points (as a framework)

### Complexity
- Time: `O(n + m)`
- Space: `O(n)` for stack/visited; recursion depth can be `O(n)` (risk stack overflow)

### Edge cases
- Disconnected graphs: must loop over all vertices
- Self-loops / parallel edges (especially for bridge logic)
- Deterministic output: neighbor iteration order matters

### Pseudocode (iterative to avoid recursion limits)
```text
// DFS_ITERATIVE(G, start):
//   // G adjacency list: for each vertex, contiguous neighbor list preferred (CSR)
//   // visited: boolean[n] initialized false
//   stack: stack of (v, next_index)
//   push (start, 0); visited[start] = true
//   while stack not empty:
//     (v, i) = top
//     if i == degree(v):
//       // finished v => postorder hook here if needed
//       pop
//       continue
//     u = adj[v][i]
//     top.next_index++
//     if not visited[u]:
//       visited[u] = true
//       push (u, 0)
//
// Notes:
//   - This form gives both preorder (when first discovered) and postorder (when popped).
//   - For directed graphs, "visited" prevents infinite loops.
//   - For cycle detection in directed graphs, add color/state: 0=unseen,1=active,2=done.
```

---

## 3) Breadth First Search (BFS)

### Use when
- Unweighted shortest paths (each edge cost = 1)
- Level-order traversal, minimum-edge paths
- Multi-source shortest paths (push multiple starts)

### Complexity
- Time: `O(n + m)`
- Space: `O(n)` queue

### Edge cases
- Disconnected: BFS from all sources or loop all nodes
- Large graphs: use ring-buffer queue for cache locality
- Deterministic parents: fixed neighbor order

### Pseudocode
```text
// BFS(G, sources):
//   dist: int[n], init INF
//   parent: int[n], init -1
//   queue: FIFO
//   for s in sources:
//     dist[s] = 0
//     push s
//   while queue not empty:
//     v = pop_front
//     for u in adj[v]:
//       if dist[u] == INF:
//         dist[u] = dist[v] + 1
//         parent[u] = v
//         push u
//
// Notes:
//   - dist is shortest number of edges from nearest source.
//   - parent reconstructs shortest path tree.
//   - Use checked_add for dist[v] + 1 if dist can approach integer max (rare but sanitize).
```

---

## 4) BFS Grid Shortest Path (4/8-neighbor)

### Use when
- Grid maze shortest path, uniform cost
- Obstacles + bounds checks

### Complexity
- Time: `O(R*C)` (each cell processed once)
- Space: `O(R*C)` dist/visited

### Edge cases
- Multiple starts/targets
- Walls, unreachable target
- Non-rectangular input (validate row lengths)
- Coordinate encoding: avoid overflow in `id = r*C + c` with checked multiplication

### Pseudocode (multi-source)
```text
// GRID_BFS(grid[R][C], starts, is_blocked):
//   dist: int[R][C] = INF
//   queue
//   for (sr, sc) in starts:
//     if not is_blocked(sr, sc):
//       dist[sr][sc] = 0
//       push (sr, sc)
//   dirs = [(+1,0),(-1,0),(0,+1),(0,-1)] // or add diagonals
//   while queue not empty:
//     (r, c) = pop_front
//     for (dr, dc) in dirs:
//       nr = r + dr; nc = c + dc
//       if nr,nc out of bounds: continue
//       if is_blocked(nr, nc): continue
//       if dist[nr][nc] == INF:
//         dist[nr][nc] = dist[r][c] + 1
//         push (nr, nc)
//
// Notes:
//   - Use integer bounds checks before indexing.
//   - For path reconstruction, store parent cell.
```

---

## 5) Topological Sort (DAG)

### Use when
- Dependency resolution (prereqs)
- Scheduling tasks with constraints
- Enables DP on DAG (shortest/longest paths)

### Preconditions
- Graph must be a DAG for a full topological ordering
- If cycle exists: detect and return error variant (not partial silently)

### Algorithm A: Kahn’s (in-degree queue)
- Time: `O(n + m)`
- Space: `O(n)`
- Deterministic: use min-heap / ordered queue if you need lexicographically smallest topo order (cost: `O((n+m) log n)`)

```text
// TOPO_KAHN(G):
//   indeg[n]=0
//   for v in 0..n-1:
//     for u in adj[v]: indeg[u]++
//   queue = all v with indeg[v]==0
//   order = empty list
//   while queue not empty:
//     v = pop_front
//     append v to order
//     for u in adj[v]:
//       indeg[u]--
//       if indeg[u]==0: push u
//   if len(order) != n:
//     // cycle exists => return Error(CYCLE_DETECTED)
//   return order
//
// Notes:
//   - indeg decrement must be consistent; parallel edges increment/decrement multiple times (correct).
```

### Algorithm B: DFS postorder
- Also `O(n + m)`
- Needs cycle detection with colors

---

## 6) Shortest / Longest Path on a DAG

### Use when
- Directed acyclic graph only
- Supports negative weights safely (no cycles)

### Complexity
- Time: `O(n + m)` after topo sort
- Space: `O(n)`

### Edge cases
- Unreachable nodes remain INF / -INF
- Longest path: initialize to `-INF`, careful with overflow when adding weights

### Pseudocode
```text
// DAG_SHORTEST_PATH(G, topo, source):
//   dist[n]=INF; dist[source]=0
//   for v in topo:
//     if dist[v]==INF: continue
//     for (u, w) in adj[v]:
//       // relax edge v->u
//       cand = dist[v] + w  // checked_add to prevent overflow
//       if cand < dist[u]: dist[u]=cand
//   return dist
//
// DAG_LONGEST_PATH(G, topo, source):
//   dist[n]=-INF; dist[source]=0
//   for v in topo:
//     if dist[v]==-INF: continue
//     for (u, w) in adj[v]:
//       cand = dist[v] + w  // checked_add
//       if cand > dist[u]: dist[u]=cand
//   return dist
//
// Notes:
//   - For longest path, requires DAG; otherwise NP-hard in general graphs.
```

---

## 7) Dijkstra’s Shortest Path

### Use when
- Weighted graph with **non-negative** edge weights `w >= 0`
- Single-source shortest paths

### Complexity (binary heap)
- Time: `O((n + m) log n)`; practically `O(m log n)`
- Space: `O(n + m)`
- Comparator costs: heap comparisons are frequent; keep heap keys simple (distance, vertex)

### Engineering notes
- Prefer adjacency in contiguous memory (CSR)
- Use 64-bit distances if weights sum can exceed 32-bit
- Avoid decrease-key complexity by pushing duplicates; skip stale heap entries by checking `if d != dist[v] continue`

### Edge cases
- Multiple edges, self-loops: safe
- Zero-weight edges: safe
- Negative edge => invalid; return Error(NEGATIVE_EDGE_FOUND)

### Pseudocode (source code equivalent, but pseudocode)
```text
// DIJKSTRA(G, source):
//   dist[n]=INF; dist[source]=0
//   parent[n]=-1
//   heap = min-heap of (dist, vertex)
//   push (0, source)
//   while heap not empty:
//     (d, v) = pop_min
//     if d != dist[v]:
//       // stale entry due to duplicate pushes
//       continue
//     for (u, w) in adj[v]:
//       if w < 0:
//         // must fail fast; Dijkstra assumes non-negative weights
//         return Error(NEGATIVE_EDGE_FOUND)
//       cand = d + w  // checked_add
//       if cand < dist[u]:
//         dist[u] = cand
//         parent[u] = v
//         push (cand, u)
//   return (dist, parent)
//
// Notes:
//   - For deterministic parent choice in ties, define tie-break: if cand==dist[u], keep smaller parent id.
//   - If you need actual path, reconstruct by following parent[] from target to source.
```

---

## 8) Bellman-Ford

### Use when
- Graph may have negative edges
- Need to detect negative cycles reachable from source

### Complexity
- Time: `O(n*m)` worst-case (only acceptable when bounded; otherwise too slow)
- Space: `O(n)`

### Edge cases
- Negative cycle not reachable from source: shouldn’t invalidate other distances
- For full-cycle marking: after detecting relaxation on nth iteration, propagate “-INF” effect along reachable edges

### Pseudocode
```text
// BELLMAN_FORD(edge_list, n, source):
//   dist[n]=INF; dist[source]=0
//   parent[n]=-1
//   // relax edges up to n-1 times
//   for i in 1..n-1:
//     changed = false
//     for each edge (a, b, w):
//       if dist[a]==INF: continue
//       cand = dist[a] + w  // checked_add
//       if cand < dist[b]:
//         dist[b]=cand
//         parent[b]=a
//         changed = true
//     if not changed: break
//
//   // detect negative cycle reachable from source
//   neg[n]=false
//   for each edge (a, b, w):
//     if dist[a]==INF: continue
//     cand = dist[a] + w
//     if cand < dist[b]:
//       neg[b]=true
//
//   // propagate neg-cycle influence (nodes whose shortest path is undefined => -INF)
//   // do n times to saturate reachability
//   for i in 1..n:
//     for each edge (a, b, w):
//       if neg[a]: neg[b]=true
//
//   return (dist, neg, parent)
//
// Notes:
//   - If neg[v]==true then "shortest" is -infinity (arbitrarily low).
//   - Must not silently output dist[v] in that case; return an explicit variant.
```

---

## 9) Floyd–Warshall (All-Pairs Shortest Paths)

### Use when
- Need all-pairs shortest paths and `n` is small (typical `n <= 400..800` depending on time limits)
- Dense graphs or when adjacency matrix is natural

### Complexity
- Time: `O(n^3)` (forbidden for unbounded datasets; only when `n` is explicitly small-bounded)
- Space: `O(n^2)`

### Edge cases
- Negative edges allowed
- Negative cycle detection: if `dist[i][i] < 0` after algorithm, negative cycle exists (affects paths through that cycle)

### Pseudocode
```text
// FLOYD_WARSHALL(n, dist):
//   // dist is n x n matrix:
//   // dist[i][j]=0 if i==j, weight(i->j) if edge exists, else INF
//   for k in 0..n-1:
//     for i in 0..n-1:
//       if dist[i][k]==INF: continue
//       for j in 0..n-1:
//         if dist[k][j]==INF: continue
//         cand = dist[i][k] + dist[k][j]  // checked_add
//         if cand < dist[i][j]:
//           dist[i][j]=cand
//   // negative cycle check:
//   for i in 0..n-1:
//     if dist[i][i] < 0: return Error(NEGATIVE_CYCLE)
//   return dist
//
// Notes:
//   - Loop order k-i-j is standard; keep dist rows contiguous for cache locality.
//   - Can store next[i][j] to reconstruct paths (update when relax).
```

---

## 10) Bridges and Articulation Points (Undirected)

### Use when
- Identify edges whose removal disconnects graph (**bridges**)
- Identify vertices whose removal disconnects graph (**articulation points**)

### Preconditions
- Undirected graph
- Must handle disconnected graphs: run DFS from all unvisited vertices

### Complexity
- Time: `O(n + m)`
- Space: `O(n)`

### Key idea (Tarjan low-link)
- `tin[v]`: discovery time
- `low[v]`: lowest `tin` reachable from `v` via:
  - zero or more tree edges + at most one back edge

### Edge cases
- Parallel edges: can invalidate naive “parent edge” skip logic
  - Must distinguish edges by unique edge-id, not just parent vertex
- Root articulation rule differs

### Pseudocode
```text
// BRIDGES_ARTICULATION(G):
//   timer=0
//   tin[n]=-1
//   low[n]=0
//   is_art[n]=false
//   bridges = empty list
//
//   DFS(v, parent_edge_id):
//     tin[v]=low[v]=timer; timer++
//     children=0
//     for each (to, edge_id) in adj[v]:
//       if edge_id == parent_edge_id: continue
//       if tin[to] != -1:
//         // back edge
//         low[v] = min(low[v], tin[to])
//       else:
//         children++
//         DFS(to, edge_id)
//         low[v] = min(low[v], low[to])
//         // bridge condition
//         if low[to] > tin[v]:
//           bridges.add(edge_id)
//         // articulation condition (non-root)
//         if parent_edge_id != -1 and low[to] >= tin[v]:
//           is_art[v]=true
//     // root articulation
//     if parent_edge_id == -1 and children >= 2:
//       is_art[v]=true
//
//   for v in 0..n-1:
//     if tin[v]==-1: DFS(v, -1)
//   return (bridges, is_art)
//
// Notes:
//   - Edge-id is mandatory to handle multiedges correctly.
//   - For self-loops: treated as back edge to itself; does not create bridges.
```

---

## 11) Tarjan’s Strongly Connected Components (SCC) (Directed)

### Use when
- Compress directed graph into SCC DAG
- Solve reachability/cycle structure questions
- 2-SAT, dominance of cycles, etc.

### Complexity
- Time: `O(n + m)`
- Space: `O(n)`

### Edge cases
- Disconnected directed graph: run from all nodes
- Deterministic SCC numbering depends on adjacency order

### Pseudocode
```text
// TARJAN_SCC(G):
//   timer=0
//   disc[n]=-1      // discovery index
//   low[n]=0
//   on_stack[n]=false
//   st = stack
//   scc_id[n]=-1
//   scc_count=0
//
//   DFS(v):
//     disc[v]=low[v]=timer; timer++
//     st.push(v); on_stack[v]=true
//     for to in adj[v]:
//       if disc[to]==-1:
//         DFS(to)
//         low[v]=min(low[v], low[to])
//       else if on_stack[to]:
//         low[v]=min(low[v], disc[to])
//     // root of SCC
//     if low[v]==disc[v]:
//       while true:
//         x=st.pop()
//         on_stack[x]=false
//         scc_id[x]=scc_count
//         if x==v: break
//       scc_count++
//
//   for v in 0..n-1:
//     if disc[v]==-1: DFS(v)
//   return (scc_id, scc_count)
//
// Notes:
//   - disc is strictly increasing; low tracks reachable earliest active node.
//   - Output SCC graph can be built by scanning edges and connecting scc_id[u] -> scc_id[v] when different.
```

---

## 12) Travelling Salesman Problem (TSP) via DP (Bitmask)

### Use when
- `n` small (typical `n <= 20..22` practical)
- Need exact optimal Hamiltonian cycle/path

### Complexity
- Time: `O(n^2 * 2^n)`
- Space: `O(n * 2^n)`
- This is exponential; only valid for explicitly small-bounded `n`

### Variants
- **Cycle**: return to start
- **Path**: end anywhere or fixed end

### Edge cases
- Disconnected graph: transitions may be INF
- Overflow: distances can exceed 32-bit

### Pseudocode
```text
// TSP_DP(dist, start):
//   // dist is n x n cost matrix; dist[i][j]=INF if no edge
//   // dp[mask][v] = min cost to start at 'start', visit exactly 'mask', and end at v
//   dp = map or array sized (2^n) x n initialized INF
//   dp[1<<start][start]=0
//   for mask in 0..(2^n - 1):
//     if (mask & (1<<start))==0: continue
//     for v in 0..n-1:
//       if dp[mask][v]==INF: continue
//       if (mask & (1<<v))==0: continue
//       for u in 0..n-1:
//         if (mask & (1<<u))!=0: continue
//         cand = dp[mask][v] + dist[v][u]  // checked_add
//         dp[mask | (1<<u)][u] = min(dp[mask | (1<<u)][u], cand)
//   full = (1<<n)-1
//   ans = INF
//   for v in 0..n-1:
//     cand = dp[full][v] + dist[v][start] // for cycle; omit for path variant
//     ans = min(ans, cand)
//   return ans
//
// Notes:
//   - Store parent pointers if reconstruction required.
//   - Use iterative loops to maximize locality; dp as flat array [mask*n + v].
```

---

## 13) Existence of Eulerian Paths and Circuits

### Eulerian trail facts
- **Undirected**
  - Eulerian **circuit** exists iff:
    - every vertex with nonzero degree is in the same connected component
    - all vertices have even degree
  - Eulerian **path** (not circuit) exists iff:
    - exactly 0 or 2 vertices have odd degree
    - all nonzero-degree vertices connected
- **Directed**
  - Eulerian **circuit** exists iff:
    - for all v: `indeg(v) == outdeg(v)`
    - all vertices with nonzero degree are in one SCC of the underlying graph (usually check connectivity in “edge-present” subgraph)
  - Eulerian **path** exists iff:
    - one start with `outdeg = indeg + 1`
    - one end with `indeg = outdeg + 1`
    - all others `indeg == outdeg`
    - connectivity condition holds

### Edge cases
- Graph with zero edges: trivially Eulerian (path/circuit depends on definition; treat as circuit of length 0)
- Multiple edges and self-loops are allowed; algorithm must track edge usage by edge-id

---

## 14) Eulerian Path Algorithm (Hierholzer)

### Use when
- Need actual Eulerian path/circuit (uses each edge exactly once)

### Complexity
- Time: `O(n + m)`
- Space: `O(n + m)`

### Pseudocode (works with edge-ids; directed/undirected)
```text
// HIERHOLZER(G, start):
//   // G adjacency: list of (to, edge_id)
//   // used[edge_id]=false
//   // it[v] = current index into adjacency list (for O(m) total scanning)
//   it[n]=0
//   st = stack of vertices
//   path = empty list
//   st.push(start)
//   while st not empty:
//     v = st.top()
//     // advance iterator to next unused edge
//     while it[v] < degree(v) and used[ adj[v][it[v]].edge_id ]:
//       it[v]++
//     if it[v] == degree(v):
//       // dead end => add to output
//       path.append(v)
//       st.pop()
//     else:
//       (to, id) = adj[v][it[v]]
//       used[id]=true
//       st.push(to)
//   // path is in reverse
//   reverse(path)
//   // validate: path length should be m+1 (for connected edge-set)
//   if len(path) != m+1: return Error(NOT_EULERIAN_OR_DISCONNECTED)
//   return path
//
// Notes:
//   - For undirected graphs, each undirected edge must appear twice in adjacency but share one edge_id.
//   - Connectivity requirement should be validated beforehand to avoid partial outputs.
```

---

## 15) Prim’s Minimum Spanning Tree (MST)

### Use when
- Undirected weighted graph
- Want MST (or MSF if disconnected)
- Prim is strong for dense graphs with adjacency matrix; also fine with heap for sparse

### Complexity (binary heap)
- Time: `O(m log n)`
- Space: `O(n + m)`

### Edge cases
- Disconnected graph: returns minimum spanning forest (or explicit error if MST requires connected)
- Negative weights: allowed (MST still well-defined)
- Parallel edges: ok, picks cheapest

### Pseudocode (lazy Prim)
```text
// PRIM_LAZY(G):
//   in_mst[n]=false
//   mst_edges = empty
//   total=0
//   for each component root s:
//     if in_mst[s]: continue
//     heap = min-heap of (w, from, to)
//     push all edges from s
//     in_mst[s]=true
//     while heap not empty:
//       (w, a, b) = pop_min
//       if in_mst[b]: continue
//       // add edge to MST
//       in_mst[b]=true
//       mst_edges.add(a,b,w)
//       total += w
//       for (to, w2) in adj[b]:
//         if not in_mst[to]: push(w2, b, to)
//   return (mst_edges, total)
//
// Notes:
//   - Lazy Prim pushes many edges; eager Prim is tighter and often faster.
```

---

## 16) Eager Prim’s MST

### Use when
- Need fewer heap operations than lazy Prim
- Preferable for large sparse graphs

### Complexity
- With binary heap + decrease-key via “push duplicates and skip stale”: `O(m log n)`
- True decrease-key heap: `O(m log n)` but with lower constants

### Pseudocode (eager with best-known edge per vertex)
```text
// PRIM_EAGER(G):
//   key[n]=INF            // best edge weight to connect vertex into MST
//   parent[n]=-1
//   in_mst[n]=false
//   heap = min-heap of (key, v)
//
//   for each component root s:
//     if in_mst[s]: continue
//     key[s]=0
//     push(0, s)
//     while heap not empty:
//       (k, v) = pop_min
//       if in_mst[v]: continue
//       if k != key[v]: continue   // stale
//       in_mst[v]=true
//       for (to, w) in adj[v]:
//         if in_mst[to]: continue
//         if w < key[to]:
//           key[to]=w
//           parent[to]=v
//           push(key[to], to)
//
//   // edges are (parent[v], v) where parent[v]!=-1
//   return (parent, key)
//
// Notes:
//   - key[] + parent[] defines the MST forest.
//   - Determinism in ties: if w==key[to], pick smaller parent id.
```

---

## 17) Max Flow (Ford–Fulkerson Framework)

### Use when
- Compute max `s->t` flow in capacitated directed graph
- Also solves matching and many allocation problems via reductions

### Core residual graph concept
- Each edge has:
  - capacity `cap`
  - flow `f`
  - residual forward capacity: `cap - f`
  - residual backward capacity: `f`
- Augment along an `s->t` path in residual graph

### Pitfalls
- Naive DFS augmenting can be exponential if capacities are irrational or path choices are bad
- For integer capacities, terminates, but can still be too slow
- Prefer Edmonds–Karp, Dinic, or Capacity Scaling in practice

---

## 18) Unweighted Bipartite Matching via Flow

### Use when
- Bipartite graph `L`–`R`, unweighted, maximize matches

### Reduction
- Source `S` to each `u in L` capacity 1
- Each original edge `u->v` capacity 1
- Each `v in R` to sink `T` capacity 1
- Max flow equals max matching

### Complexity choice
- For unweighted bipartite matching, **Hopcroft–Karp** is specialized (`O(m sqrt n)`), but if constrained to “network flow section,” Dinic on unit networks is very fast.

### Edge cases
- Isolated vertices
- Multiple edges (safe; redundant)

---

## 19) “Mice and Owls” via Network Flow (Canonical Reduction)

### Typical structure (general pattern)
- Entities that must be assigned to resources with constraints:
  - mice → holes (capacity per hole)
  - mouse can go to certain holes if reachable within time/distance
- Build bipartite graph (mouse → hole) edges when feasible
- Then run max flow / matching

### Engineering checklist
- Precompute feasibility edges efficiently:
  - If geometric distance: compute squared distances to avoid floating error; compare against squared threshold
  - If grid distance with obstacles: run BFS from holes (multi-source) or from each mouse depending on counts (choose asymptotically)

---

## 20) “Elementary Math” via Network Flow (Canonical Reduction)

### Typical structure (general pattern)
- Assign each pair `(a,b)` one operation result among `{a+b, a-b, a*b}` such that all results are distinct.
- Build bipartite:
  - Left: each pair index `i`
  - Right: each possible result value node
  - Edge if pair i can produce that result via some operation
- Find perfect matching (flow) to assign unique results.

### Edge cases
- Duplicate results across different operations/pairs
- Large result values: compress coordinate values to `[0..K-1]` to keep memory bounded

---

## 21) Edmonds–Karp (Max Flow)

### Use when
- Need a simple, deterministic max-flow
- Graph sizes moderate

### Algorithm
- Ford–Fulkerson where augmenting path is found by BFS in residual graph (shortest in edges)

### Complexity
- Time: `O(n * m^2)` worst-case
- Space: `O(n + m)`
- Too slow for large dense graphs; use Dinic

### Pseudocode
```text
// EDMONDS_KARP(N, edges, s, t):
//   // adjacency stores indices of residual edges; each edge has (to, rev, cap)
//   flow=0
//   while true:
//     parent_v[N]=-1
//     parent_e[N]=-1
//     queue
//     parent_v[s]=s
//     push s
//     while queue not empty and parent_v[t]==-1:
//       v=pop_front
//       for ei in adj[v]:
//         e = edges[ei]
//         if parent_v[e.to]==-1 and e.cap > 0:
//           parent_v[e.to]=v
//           parent_e[e.to]=ei
//           push e.to
//     if parent_v[t]==-1: break // no augmenting path
//     // find bottleneck
//     add=INF
//     v=t
//     while v!=s:
//       ei=parent_e[v]
//       add=min(add, edges[ei].cap)
//       v=parent_v[v]
//     // augment
//     v=t
//     while v!=s:
//       ei=parent_e[v]
//       edges[ei].cap -= add
//       edges[edges[ei].rev].cap += add
//       v=parent_v[v]
//     flow += add
//   return flow
//
// Notes:
//   - Use 64-bit capacities if totals can exceed 32-bit.
//   - Graph must be built with explicit reverse edges for O(1) residual updates.
```

---

## 22) Capacity Scaling (Max Flow)

### Use when
- Large integer capacities; want fewer augmentations than plain FF/EK
- Works well when capacities vary widely

### Idea
- Maintain threshold `Δ` (power of 2)
- Only consider residual edges with capacity ≥ `Δ`
- Find augmenting paths under this restriction; then halve `Δ`

### Complexity (typical statement)
- `O(m^2 log U)` in some analyses for DFS-style; depends on implementation details
- In practice: often speeds up Ford–Fulkerson substantially on large capacities

### Pseudocode (high level)
```text
// CAPACITY_SCALING_MAXFLOW(G, s, t):
//   U = max capacity in network
//   Delta = highest power of 2 <= U
//   flow=0
//   while Delta >= 1:
//     while true:
//       // find any s->t path using only edges with cap >= Delta
//       path = DFS_or_BFS_with_threshold(Delta)
//       if no path: break
//       add = bottleneck_on_path
//       augment(add)
//       flow += add
//     Delta /= 2
//   return flow
//
// Notes:
//   - Determinism: fix adjacency scan order.
//   - Still inferior to Dinic on many benchmarks, but a solid intermediate.
```

---

## 23) Dinic’s Algorithm (Max Flow)

### Use when
- High-performance max flow in general graphs
- Standard competitive + production choice for integral capacities

### Core components
1. **Level graph** via BFS from `s` using residual edges with cap > 0
2. **Blocking flow** via DFS sending flow along level-respecting edges
3. Use `ptr[v]` current-edge pointer to ensure `O(m)` per BFS phase in practice

### Complexity
- General: `O(m n^2)` worst-case (rarely tight in practice)
- Unit networks / bipartite matching: significantly faster (often near `O(m sqrt n)` behavior in practice with structure)
- Space: `O(n + m)`

### Engineering notes
- Use contiguous arrays for edges: `(to, cap, next/rev)` to minimize pointer chasing
- Avoid recursion depth issues in DFS (can be iterative, but recursive often acceptable with bounded stack; sanitize)
- Explicitly return error on overflow in `cap` arithmetic

### Pseudocode
```text
// DINIC(N, s, t):
//   flow=0
//   while BFS_LEVELS():
//     ptr[N]=0
//     while true:
//       pushed = DFS_BLOCKING(s, INF)
//       if pushed==0: break
//       flow += pushed
//   return flow
//
// BFS_LEVELS():
//   level[N]=-1
//   queue
//   level[s]=0; push s
//   while queue not empty:
//     v=pop_front
//     for ei in adj[v]:
//       e=edges[ei]
//       if e.cap>0 and level[e.to]==-1:
//         level[e.to]=level[v]+1
//         push e.to
//   return level[t]!=-1
//
// DFS_BLOCKING(v, pushed):
//   if pushed==0: return 0
//   if v==t: return pushed
//   for i from ptr[v] to degree(v)-1:
//     ptr[v]=i
//     ei=adj[v][i]
//     e=edges[ei]
//     if e.cap>0 and level[e.to]==level[v]+1:
//       tr = DFS_BLOCKING(e.to, min(pushed, e.cap))
//       if tr>0:
//         e.cap -= tr
//         edges[e.rev].cap += tr
//         return tr
//   return 0
//
// Notes:
//   - ptr[v] prevents re-scanning dead edges, critical for performance.
//   - For determinism: adjacency order fixed; stable iteration yields stable flows.
```

---

## 24) Practical Selection Guide (What to Use When)

- **Reachability / components**
  - Use: BFS/DFS
  - If recursion risk: iterative DFS
- **Unweighted shortest path**
  - Use: BFS (multi-source supported)
- **Weighted shortest path**
  - `w >= 0`: Dijkstra
  - Negative edges: Bellman-Ford (and detect cycles)
  - DAG: topo + DP (fastest, supports negative)
- **All-pairs shortest path**
  - Small `n`: Floyd–Warshall
  - Larger sparse: run Dijkstra from each node if `w>=0` (`O(n*m log n)`)
- **DAG ordering / scheduling**
  - Kahn topo (cycle detection built-in)
- **Cycle structure**
  - Directed SCC: Tarjan SCC
  - Undirected bridges/AP: low-link
- **MST**
  - Sparse: Prim eager or Kruskal (not listed, but valid alternative)
  - Dense: Prim with matrix `O(n^2)` only if `n` is small-bounded
- **Max flow / matching**
  - Default: Dinic
  - Educational/simple: Edmonds–Karp (but beware `O(n*m^2)`)

---

## 25) Cross-Cutting Edge Cases & Correctness Invariants

- **Disconnected graphs**
  - Many algorithms require looping all nodes (DFS/BFS/bridges/SCC/topo for full coverage)
- **Parallel edges**
  - Bridges/articulation/Euler need **edge-id** to avoid “parent vertex” ambiguity
- **Self-loops**
  - Shortest paths: harmless
  - Bridges: never a bridge
  - Euler: contributes 2 to degree in undirected? (implementation-defined; treat carefully)
- **Overflow**
  - Distances: use 64-bit; checked add when summing weights
  - Matrices: `n*n` memory sizing must be checked
- **Negative cycles**
  - Only Bellman-Ford / Floyd–Warshall can detect reliably; Dijkstra must reject negative edges
- **Deterministic behavior**
  - Fix adjacency order; define tie-breaks in relaxations; avoid hash-map iteration order for outputs
- **Complexity discipline**
  - Avoid `O(n^2)` unless `n` explicitly bounded (same for `O(n^3)` Floyd, `O(2^n)` TSP)
  - Prefer contiguous memory (CSR / flat edge arrays) for cache locality

---

# Graph Theory - Complete Pseudocode Reference

## Table of Contents
1. [Graph Theory Introduction](#1-graph-theory-introduction)
2. [Depth First Search (DFS)](#2-depth-first-search-dfs)
3. [Breadth First Search (BFS)](#3-breadth-first-search-bfs)
4. [BFS Grid Shortest Path](#4-bfs-grid-shortest-path)
5. [Topological Sort](#5-topological-sort)
6. [Shortest/Longest Path on DAG](#6-shortestlongest-path-on-dag)
7. [Dijkstra's Algorithm](#7-dijkstras-algorithm)
8. [Bellman-Ford Algorithm](#8-bellman-ford-algorithm)
9. [Floyd-Warshall Algorithm](#9-floyd-warshall-algorithm)
10. [Bridges and Articulation Points](#10-bridges-and-articulation-points)
11. [Tarjan's SCC Algorithm](#11-tarjans-scc-algorithm)
12. [Travelling Salesman Problem](#12-travelling-salesman-problem-dp)
13. [Eulerian Paths and Circuits](#13-eulerian-paths-and-circuits)
14. [Prim's MST Algorithm](#14-prims-mst-algorithm)
15. [Max Flow Ford-Fulkerson](#15-max-flow-ford-fulkerson)
16. [Edmonds-Karp Algorithm](#16-edmonds-karp-algorithm)
17. [Capacity Scaling](#17-capacity-scaling)
18. [Dinic's Algorithm](#18-dinics-algorithm)
19. [Bipartite Matching](#19-bipartite-matching)

---

## 1. Graph Theory Introduction

### Graph Representation Types

```
// ADJACENCY MATRIX - O(V²) space
// Best for: Dense graphs, frequent edge existence queries
// Edge lookup: O(1), Space: O(V²)
STRUCTURE AdjacencyMatrix:
    matrix[V][V] ← 2D array of size V×V
    
    FUNCTION addEdge(u, v, weight):
        matrix[u][v] ← weight
        IF undirected THEN matrix[v][u] ← weight
    
    FUNCTION hasEdge(u, v):
        RETURN matrix[u][v] ≠ 0

// ADJACENCY LIST - O(V + E) space
// Best for: Sparse graphs, iteration over neighbors
// Edge lookup: O(degree), Space: O(V + E)
STRUCTURE AdjacencyList:
    adj[V] ← array of V empty lists
    
    FUNCTION addEdge(u, v, weight):
        adj[u].append((v, weight))
        IF undirected THEN adj[v].append((u, weight))
    
    FUNCTION getNeighbors(u):
        RETURN adj[u]

// EDGE LIST - O(E) space
// Best for: Bellman-Ford, Kruskal's MST
STRUCTURE EdgeList:
    edges ← empty list of (u, v, weight)
    
    FUNCTION addEdge(u, v, weight):
        edges.append((u, v, weight))
```

### Graph Types Classification

```
// DIRECTED vs UNDIRECTED
// Directed: edges have direction (u → v)
// Undirected: edges are bidirectional (u ↔ v)

// WEIGHTED vs UNWEIGHTED  
// Weighted: edges have associated costs/distances
// Unweighted: all edges have equal cost (typically 1)

// CYCLIC vs ACYCLIC
// Cyclic: contains at least one cycle
// Acyclic: no cycles exist (DAG for directed)

// CONNECTED vs DISCONNECTED
// Connected: path exists between any two vertices
// Disconnected: some vertices unreachable from others

// DENSE vs SPARSE
// Dense: E ≈ V², use adjacency matrix
// Sparse: E << V², use adjacency list
```

---

## 2. Depth First Search (DFS)

### Time Complexity: O(V + E) | Space Complexity: O(V)

```
// DFS RECURSIVE VERSION
// Use case: Path finding, cycle detection, topological sort
// Edge case: Handle disconnected components
FUNCTION DFS_Recursive(graph, start):
    visited ← array of size V, initialized to FALSE
    result ← empty list
    
    FUNCTION explore(node):
        IF visited[node] THEN RETURN
        visited[node] ← TRUE
        result.append(node)
        
        FOR each neighbor IN graph.getNeighbors(node):
            IF NOT visited[neighbor]:
                explore(neighbor)
    
    explore(start)
    RETURN result

// DFS ITERATIVE VERSION (Stack-based)
// Advantage: No stack overflow for deep graphs
FUNCTION DFS_Iterative(graph, start):
    visited ← array of size V, initialized to FALSE
    stack ← empty stack
    result ← empty list
    
    stack.push(start)
    
    WHILE stack is NOT empty:
        node ← stack.pop()
        
        IF visited[node] THEN CONTINUE
        visited[node] ← TRUE
        result.append(node)
        
        // Push neighbors in reverse for correct order
        FOR each neighbor IN reverse(graph.getNeighbors(node)):
            IF NOT visited[neighbor]:
                stack.push(neighbor)
    
    RETURN result

// DFS FOR ALL COMPONENTS
// Handles disconnected graphs
FUNCTION DFS_AllComponents(graph):
    visited ← array of size V, initialized to FALSE
    components ← empty list
    
    FOR each vertex v IN graph:
        IF NOT visited[v]:
            component ← DFS_Recursive(graph, v)
            components.append(component)
    
    RETURN components
```

### DFS Applications

```
// CYCLE DETECTION (Directed Graph)
FUNCTION hasCycle_Directed(graph):
    WHITE, GRAY, BLACK ← 0, 1, 2
    color ← array of size V, initialized to WHITE
    
    FUNCTION dfs(node):
        color[node] ← GRAY
        
        FOR each neighbor IN graph.getNeighbors(node):
            IF color[neighbor] = GRAY:
                RETURN TRUE  // Back edge found = cycle
            IF color[neighbor] = WHITE AND dfs(neighbor):
                RETURN TRUE
        
        color[node] ← BLACK
        RETURN FALSE
    
    FOR each vertex v IN graph:
        IF color[v] = WHITE AND dfs(v):
            RETURN TRUE
    RETURN FALSE

// CYCLE DETECTION (Undirected Graph)
FUNCTION hasCycle_Undirected(graph):
    visited ← array of size V, initialized to FALSE
    
    FUNCTION dfs(node, parent):
        visited[node] ← TRUE
        
        FOR each neighbor IN graph.getNeighbors(node):
            IF NOT visited[neighbor]:
                IF dfs(neighbor, node):
                    RETURN TRUE
            ELSE IF neighbor ≠ parent:
                RETURN TRUE  // Back edge to non-parent = cycle
        
        RETURN FALSE
    
    FOR each vertex v IN graph:
        IF NOT visited[v] AND dfs(v, -1):
            RETURN TRUE
    RETURN FALSE
```

---

## 3. Breadth First Search (BFS)

### Time Complexity: O(V + E) | Space Complexity: O(V)

```
// BFS STANDARD IMPLEMENTATION
// Use case: Shortest path (unweighted), level-order traversal
// Guarantees: Visits nodes in order of distance from source
FUNCTION BFS(graph, start):
    visited ← array of size V, initialized to FALSE
    distance ← array of size V, initialized to INFINITY
    parent ← array of size V, initialized to -1
    queue ← empty queue
    
    visited[start] ← TRUE
    distance[start] ← 0
    queue.enqueue(start)
    
    WHILE queue is NOT empty:
        node ← queue.dequeue()
        
        FOR each neighbor IN graph.getNeighbors(node):
            IF NOT visited[neighbor]:
                visited[neighbor] ← TRUE
                distance[neighbor] ← distance[node] + 1
                parent[neighbor] ← node
                queue.enqueue(neighbor)
    
    RETURN distance, parent

// PATH RECONSTRUCTION
FUNCTION reconstructPath(parent, start, end):
    IF parent[end] = -1 AND end ≠ start:
        RETURN NULL  // No path exists
    
    path ← empty list
    current ← end
    
    WHILE current ≠ -1:
        path.prepend(current)
        current ← parent[current]
    
    RETURN path

// BFS SHORTEST PATH (Unweighted)
FUNCTION shortestPath_BFS(graph, start, end):
    distance, parent ← BFS(graph, start)
    
    IF distance[end] = INFINITY:
        RETURN NULL, INFINITY
    
    path ← reconstructPath(parent, start, end)
    RETURN path, distance[end]
```

---

## 4. BFS Grid Shortest Path

### Time Complexity: O(R × C) | Space Complexity: O(R × C)

```
// GRID BFS - 4 DIRECTIONAL MOVEMENT
// Direction vectors for up, down, left, right
CONSTANT DIRECTIONS ← [(−1,0), (1,0), (0,−1), (0,1)]

FUNCTION gridBFS(grid, startRow, startCol, endRow, endCol):
    rows ← grid.rows
    cols ← grid.cols
    
    // Validation
    IF grid[startRow][startCol] = BLOCKED OR
       grid[endRow][endCol] = BLOCKED:
        RETURN -1
    
    visited ← 2D array [rows][cols], initialized to FALSE
    queue ← empty queue
    
    queue.enqueue((startRow, startCol, 0))  // (row, col, distance)
    visited[startRow][startCol] ← TRUE
    
    WHILE queue is NOT empty:
        row, col, dist ← queue.dequeue()
        
        // Goal check
        IF row = endRow AND col = endCol:
            RETURN dist
        
        // Explore neighbors
        FOR each (dr, dc) IN DIRECTIONS:
            newRow ← row + dr
            newCol ← col + dc
            
            // Boundary and validity check
            IF newRow >= 0 AND newRow < rows AND
               newCol >= 0 AND newCol < cols AND
               NOT visited[newRow][newCol] AND
               grid[newRow][newCol] ≠ BLOCKED:
                
                visited[newRow][newCol] ← TRUE
                queue.enqueue((newRow, newCol, dist + 1))
    
    RETURN -1  // No path found

// GRID BFS - 8 DIRECTIONAL MOVEMENT
CONSTANT DIRECTIONS_8 ← [
    (−1,0), (1,0), (0,−1), (0,1),     // Cardinal
    (−1,−1), (−1,1), (1,−1), (1,1)    // Diagonal
]

// Same algorithm, use DIRECTIONS_8 instead
```

---

## 5. Topological Sort

### Time Complexity: O(V + E) | Space Complexity: O(V)

```
// TOPOLOGICAL SORT - DFS (Kahn's Alternative Below)
// Precondition: Graph must be a DAG (Directed Acyclic Graph)
// Use case: Task scheduling, dependency resolution
FUNCTION topologicalSort_DFS(graph):
    visited ← array of size V, initialized to FALSE
    stack ← empty stack
    
    FUNCTION dfs(node):
        visited[node] ← TRUE
        
        FOR each neighbor IN graph.getNeighbors(node):
            IF NOT visited[neighbor]:
                dfs(neighbor)
        
        stack.push(node)  // Post-order insertion
    
    FOR each vertex v IN graph:
        IF NOT visited[v]:
            dfs(v)
    
    // Reverse stack to get topological order
    result ← empty list
    WHILE stack is NOT empty:
        result.append(stack.pop())
    
    RETURN result

// TOPOLOGICAL SORT - KAHN'S ALGORITHM (BFS-based)
// Advantage: Detects cycles, easier to understand
FUNCTION topologicalSort_Kahn(graph):
    inDegree ← array of size V, initialized to 0
    
    // Calculate in-degrees
    FOR each vertex u IN graph:
        FOR each neighbor v IN graph.getNeighbors(u):
            inDegree[v] ← inDegree[v] + 1
    
    queue ← empty queue
    FOR each vertex v IN graph:
        IF inDegree[v] = 0:
            queue.enqueue(v)
    
    result ← empty list
    
    WHILE queue is NOT empty:
        node ← queue.dequeue()
        result.append(node)
        
        FOR each neighbor IN graph.getNeighbors(node):
            inDegree[neighbor] ← inDegree[neighbor] − 1
            IF inDegree[neighbor] = 0:
                queue.enqueue(neighbor)
    
    // Cycle detection
    IF result.size ≠ V:
        RETURN NULL  // Cycle exists, no valid ordering
    
    RETURN result
```

---

## 6. Shortest/Longest Path on DAG

### Time Complexity: O(V + E) | Space Complexity: O(V)

```
// SHORTEST PATH ON DAG
// Advantage over Dijkstra: O(V+E) vs O((V+E)logV), handles negative weights
FUNCTION shortestPathDAG(graph, source):
    topOrder ← topologicalSort_DFS(graph)
    
    IF topOrder = NULL:
        ERROR "Graph contains cycle"
    
    dist ← array of size V, initialized to INFINITY
    parent ← array of size V, initialized to -1
    dist[source] ← 0
    
    FOR each node IN topOrder:
        IF dist[node] ≠ INFINITY:
            FOR each (neighbor, weight) IN graph.getNeighbors(node):
                IF dist[node] + weight < dist[neighbor]:
                    dist[neighbor] ← dist[node] + weight
                    parent[neighbor] ← node
    
    RETURN dist, parent

// LONGEST PATH ON DAG
// Method: Negate all weights, find shortest path, negate result
// Alternative: Modify relaxation condition
FUNCTION longestPathDAG(graph, source):
    topOrder ← topologicalSort_DFS(graph)
    
    dist ← array of size V, initialized to -INFINITY
    parent ← array of size V, initialized to -1
    dist[source] ← 0
    
    FOR each node IN topOrder:
        IF dist[node] ≠ -INFINITY:
            FOR each (neighbor, weight) IN graph.getNeighbors(node):
                // Maximize instead of minimize
                IF dist[node] + weight > dist[neighbor]:
                    dist[neighbor] ← dist[node] + weight
                    parent[neighbor] ← node
    
    RETURN dist, parent

// CRITICAL PATH METHOD (Project Scheduling)
FUNCTION criticalPath(tasks, dependencies):
    // Build DAG from dependencies
    graph ← buildDAG(tasks, dependencies)
    
    // Find longest path from virtual start to virtual end
    longestPath ← longestPathDAG(graph, virtualStart)
    
    RETURN longestPath[virtualEnd]
```

---

## 7. Dijkstra's Algorithm

### Time Complexity: O((V + E) log V) | Space Complexity: O(V)

```
// DIJKSTRA'S ALGORITHM - Priority Queue Implementation
// Precondition: No negative edge weights
// Use case: Single-source shortest path in weighted graphs
FUNCTION dijkstra(graph, source):
    dist ← array of size V, initialized to INFINITY
    parent ← array of size V, initialized to -1
    visited ← array of size V, initialized to FALSE
    
    // Min-heap: (distance, vertex)
    pq ← empty min priority queue
    
    dist[source] ← 0
    pq.insert((0, source))
    
    WHILE pq is NOT empty:
        currentDist, u ← pq.extractMin()
        
        // Skip if already processed with better distance
        IF visited[u]:
            CONTINUE
        visited[u] ← TRUE
        
        // Early termination optimization (if target known)
        // IF u = target: RETURN dist[u]
        
        FOR each (v, weight) IN graph.getNeighbors(u):
            IF NOT visited[v]:
                newDist ← dist[u] + weight
                
                IF newDist < dist[v]:
                    dist[v] ← newDist
                    parent[v] ← u
                    pq.insert((newDist, v))
    
    RETURN dist, parent

// DIJKSTRA WITH DECREASE-KEY (Indexed Priority Queue)
// More memory efficient, avoids duplicate entries
FUNCTION dijkstra_IPQ(graph, source):
    dist ← array of size V, initialized to INFINITY
    parent ← array of size V, initialized to -1
    
    ipq ← empty indexed min priority queue
    
    dist[source] ← 0
    ipq.insert(source, 0)
    
    WHILE ipq is NOT empty:
        u ← ipq.extractMinIndex()
        
        FOR each (v, weight) IN graph.getNeighbors(u):
            newDist ← dist[u] + weight
            
            IF newDist < dist[v]:
                dist[v] ← newDist
                parent[v] ← u
                
                IF ipq.contains(v):
                    ipq.decreaseKey(v, newDist)
                ELSE:
                    ipq.insert(v, newDist)
    
    RETURN dist, parent
```

---

## 8. Bellman-Ford Algorithm

### Time Complexity: O(V × E) | Space Complexity: O(V)

```
// BELLMAN-FORD ALGORITHM
// Advantage: Handles negative weights, detects negative cycles
// Use case: Arbitrage detection, routing protocols
FUNCTION bellmanFord(graph, source):
    dist ← array of size V, initialized to INFINITY
    parent ← array of size V, initialized to -1
    
    dist[source] ← 0
    
    // Relax all edges V-1 times
    FOR i FROM 1 TO V-1:
        FOR each edge (u, v, weight) IN graph.edges:
            IF dist[u] ≠ INFINITY AND dist[u] + weight < dist[v]:
                dist[v] ← dist[u] + weight
                parent[v] ← u
    
    // Detect negative cycle (V-th iteration)
    FOR each edge (u, v, weight) IN graph.edges:
        IF dist[u] ≠ INFINITY AND dist[u] + weight < dist[v]:
            RETURN NULL, "Negative cycle detected"
    
    RETURN dist, parent

// BELLMAN-FORD WITH NEGATIVE CYCLE IDENTIFICATION
FUNCTION bellmanFord_FindCycle(graph, source):
    dist ← array of size V, initialized to INFINITY
    parent ← array of size V, initialized to -1
    
    dist[source] ← 0
    
    FOR i FROM 1 TO V-1:
        FOR each edge (u, v, weight) IN graph.edges:
            IF dist[u] ≠ INFINITY AND dist[u] + weight < dist[v]:
                dist[v] ← dist[u] + weight
                parent[v] ← u
    
    // Find vertex in negative cycle
    cycleVertex ← -1
    FOR each edge (u, v, weight) IN graph.edges:
        IF dist[u] ≠ INFINITY AND dist[u] + weight < dist[v]:
            cycleVertex ← v
            BREAK
    
    IF cycleVertex = -1:
        RETURN dist, parent, NULL
    
    // Trace back to find cycle
    // Go back V times to ensure we're in the cycle
    FOR i FROM 1 TO V:
        cycleVertex ← parent[cycleVertex]
    
    cycle ← empty list
    current ← cycleVertex
    REPEAT:
        cycle.prepend(current)
        current ← parent[current]
    UNTIL current = cycleVertex
    cycle.prepend(cycleVertex)
    
    RETURN dist, parent, cycle
```

---

## 9. Floyd-Warshall Algorithm

### Time Complexity: O(V³) | Space Complexity: O(V²)

```
// FLOYD-WARSHALL ALL-PAIRS SHORTEST PATH
// Use case: Dense graphs, all pairs needed, small V
// Advantage: Simple, handles negative weights
FUNCTION floydWarshall(graph):
    n ← graph.vertexCount
    
    // Initialize distance matrix
    dist ← 2D array [n][n]
    next ← 2D array [n][n], initialized to -1
    
    FOR i FROM 0 TO n-1:
        FOR j FROM 0 TO n-1:
            IF i = j:
                dist[i][j] ← 0
            ELSE IF edge(i,j) exists:
                dist[i][j] ← weight(i, j)
                next[i][j] ← j
            ELSE:
                dist[i][j] ← INFINITY
    
    // Main algorithm - triple nested loop
    FOR k FROM 0 TO n-1:        // Intermediate vertex
        FOR i FROM 0 TO n-1:     // Source
            FOR j FROM 0 TO n-1: // Destination
                IF dist[i][k] ≠ INFINITY AND
                   dist[k][j] ≠ INFINITY AND
                   dist[i][k] + dist[k][j] < dist[i][j]:
                    dist[i][j] ← dist[i][k] + dist[k][j]
                    next[i][j] ← next[i][k]
    
    // Detect negative cycles
    FOR i FROM 0 TO n-1:
        IF dist[i][i] < 0:
            RETURN NULL, "Negative cycle detected"
    
    RETURN dist, next

// PATH RECONSTRUCTION
FUNCTION getPath_FloydWarshall(next, u, v):
    IF next[u][v] = -1:
        RETURN NULL  // No path
    
    path ← [u]
    WHILE u ≠ v:
        u ← next[u][v]
        path.append(u)
    
    RETURN path

// TRANSITIVE CLOSURE (Warshall's Algorithm)
FUNCTION transitiveClosure(graph):
    n ← graph.vertexCount
    reach ← 2D boolean array [n][n]
    
    // Initialize
    FOR i FROM 0 TO n-1:
        FOR j FROM 0 TO n-1:
            reach[i][j] ← (i = j) OR edge(i,j) exists
    
    FOR k FROM 0 TO n-1:
        FOR i FROM 0 TO n-1:
            FOR j FROM 0 TO n-1:
                reach[i][j] ← reach[i][j] OR
                              (reach[i][k] AND reach[k][j])
    
    RETURN reach
```

---

## 10. Bridges and Articulation Points

### Time Complexity: O(V + E) | Space Complexity: O(V)

```
// BRIDGES - Edges whose removal disconnects graph
// ARTICULATION POINTS - Vertices whose removal disconnects graph
// Based on Tarjan's DFS with discovery and low-link values

FUNCTION findBridgesAndArticulationPoints(graph):
    n ← graph.vertexCount
    visited ← array of size n, initialized to FALSE
    disc ← array of size n      // Discovery time
    low ← array of size n       // Lowest reachable
    parent ← array of size n, initialized to -1
    timer ← 0
    
    bridges ← empty list
    articulationPoints ← empty set
    
    FUNCTION dfs(u):
        timer ← timer + 1
        disc[u] ← timer
        low[u] ← timer
        visited[u] ← TRUE
        childCount ← 0
        
        FOR each neighbor v IN graph.getNeighbors(u):
            IF NOT visited[v]:
                childCount ← childCount + 1
                parent[v] ← u
                dfs(v)
                
                // Update low value
                low[u] ← min(low[u], low[v])
                
                // BRIDGE CONDITION
                // If low[v] > disc[u], edge (u,v) is a bridge
                IF low[v] > disc[u]:
                    bridges.append((u, v))
                
                // ARTICULATION POINT CONDITIONS
                // Case 1: u is root with 2+ children
                IF parent[u] = -1 AND childCount >= 2:
                    articulationPoints.add(u)
                
                // Case 2: u is not root and low[v] >= disc[u]
                IF parent[u] ≠ -1 AND low[v] >= disc[u]:
                    articulationPoints.add(u)
            
            ELSE IF v ≠ parent[u]:
                // Back edge - update low value
                low[u] ← min(low[u], disc[v])
    
    // Handle disconnected components
    FOR each vertex v IN graph:
        IF NOT visited[v]:
            dfs(v)
    
    RETURN bridges, articulationPoints
```

---

## 11. Tarjan's SCC Algorithm

### Time Complexity: O(V + E) | Space Complexity: O(V)

```
// TARJAN'S STRONGLY CONNECTED COMPONENTS
// SCC: Maximal set where every vertex is reachable from every other
// Use case: 2-SAT, dependency analysis, graph condensation

FUNCTION tarjanSCC(graph):
    n ← graph.vertexCount
    disc ← array of size n, initialized to -1
    low ← array of size n
    onStack ← array of size n, initialized to FALSE
    stack ← empty stack
    timer ← 0
    
    sccs ← empty list  // List of SCCs
    
    FUNCTION dfs(u):
        timer ← timer + 1
        disc[u] ← timer
        low[u] ← timer
        stack.push(u)
        onStack[u] ← TRUE
        
        FOR each neighbor v IN graph.getNeighbors(u):
            IF disc[v] = -1:  // Not visited
                dfs(v)
                low[u] ← min(low[u], low[v])
            ELSE IF onStack[v]:  // Back edge to stack vertex
                low[u] ← min(low[u], disc[v])
        
        // If u is root of SCC
        IF low[u] = disc[u]:
            scc ← empty list
            REPEAT:
                v ← stack.pop()
                onStack[v] ← FALSE
                scc.append(v)
            UNTIL v = u
            sccs.append(scc)
    
    FOR each vertex v IN graph:
        IF disc[v] = -1:
            dfs(v)
    
    RETURN sccs

// KOSARAJU'S ALGORITHM (Alternative)
FUNCTION kosarajuSCC(graph):
    // Step 1: Get finish order via DFS
    visited ← array of size V, initialized to FALSE
    finishOrder ← empty stack
    
    FUNCTION dfs1(u):
        visited[u] ← TRUE
        FOR each v IN graph.getNeighbors(u):
            IF NOT visited[v]:
                dfs1(v)
        finishOrder.push(u)
    
    FOR each v IN graph:
        IF NOT visited[v]:
            dfs1(v)
    
    // Step 2: Transpose graph
    transposed ← transpose(graph)
    
    // Step 3: DFS on transposed in reverse finish order
    visited ← reset to FALSE
    sccs ← empty list
    
    FUNCTION dfs2(u, scc):
        visited[u] ← TRUE
        scc.append(u)
        FOR each v IN transposed.getNeighbors(u):
            IF NOT visited[v]:
                dfs2(v, scc)
    
    WHILE finishOrder NOT empty:
        u ← finishOrder.pop()
        IF NOT visited[u]:
            scc ← empty list
            dfs2(u, scc)
            sccs.append(scc)
    
    RETURN sccs
```

---

## 12. Travelling Salesman Problem (DP)

### Time Complexity: O(n² × 2ⁿ) | Space Complexity: O(n × 2ⁿ)

```
// TSP WITH DYNAMIC PROGRAMMING (Held-Karp Algorithm)
// Problem: Find minimum cost Hamiltonian cycle
// Precondition: n ≤ 20 (exponential complexity)

FUNCTION tsp_DP(dist):
    n ← dist.rows
    FULL_MASK ← (1 << n) - 1  // All cities visited
    
    // dp[mask][i] = min cost to reach city i having visited cities in mask
    dp ← 2D array [2^n][n], initialized to INFINITY
    parent ← 2D array [2^n][n], initialized to -1
    
    // Base case: start at city 0
    dp[1][0] ← 0
    
    FOR mask FROM 1 TO FULL_MASK:
        FOR last FROM 0 TO n-1:
            // Check if last city is in mask
            IF (mask & (1 << last)) = 0:
                CONTINUE
            IF dp[mask][last] = INFINITY:
                CONTINUE
            
            // Try extending to unvisited cities
            FOR next FROM 0 TO n-1:
                IF (mask & (1 << next)) ≠ 0:
                    CONTINUE  // Already visited
                
                newMask ← mask | (1 << next)
                newCost ← dp[mask][last] + dist[last][next]
                
                IF newCost < dp[newMask][next]:
                    dp[newMask][next] ← newCost
                    parent[newMask][next] ← last
    
    // Find minimum cost to complete cycle
    minCost ← INFINITY
    lastCity ← -1
    
    FOR i FROM 1 TO n-1:
        cost ← dp[FULL_MASK][i] + dist[i][0]
        IF cost < minCost:
            minCost ← cost
            lastCity ← i
    
    // Reconstruct path
    path ← reconstructTSPPath(parent, FULL_MASK, lastCity)
    path.append(0)
    
    RETURN minCost, path

FUNCTION reconstructTSPPath(parent, mask, last):
    path ← empty list
    
    WHILE mask ≠ 0:
        path.prepend(last)
        prevLast ← parent[mask][last]
        mask ← mask XOR (1 << last)
        last ← prevLast
    
    RETURN path
```

---

## 13. Eulerian Paths and Circuits

### Time Complexity: O(E) | Space Complexity: O(V + E)

```
// EULERIAN PATH: Visits every EDGE exactly once
// EULERIAN CIRCUIT: Eulerian path that starts and ends at same vertex

// EXISTENCE CONDITIONS
FUNCTION checkEulerianExistence(graph, isDirected):
    IF isDirected:
        inDegree ← array of size V, initialized to 0
        outDegree ← array of size V, initialized to 0
        
        FOR each edge (u, v) IN graph:
            outDegree[u] ← outDegree[u] + 1
            inDegree[v] ← inDegree[v] + 1
        
        startNodes ← 0  // out - in = 1
        endNodes ← 0    // in - out = 1
        
        FOR each vertex v:
            diff ← outDegree[v] - inDegree[v]
            IF diff > 1 OR diff < -1:
                RETURN "NONE"
            IF diff = 1:
                startNodes ← startNodes + 1
            IF diff = -1:
                endNodes ← endNodes + 1
        
        IF startNodes = 0 AND endNodes = 0:
            RETURN "CIRCUIT"
        IF startNodes = 1 AND endNodes = 1:
            RETURN "PATH"
        RETURN "NONE"
    
    ELSE:  // Undirected
        oddDegree ← 0
        FOR each vertex v:
            IF degree(v) is ODD:
                oddDegree ← oddDegree + 1
        
        IF oddDegree = 0:
            RETURN "CIRCUIT"
        IF oddDegree = 2:
            RETURN "PATH"
        RETURN "NONE"

// HIERHOLZER'S ALGORITHM - Find Eulerian Path/Circuit
FUNCTION findEulerianPath(graph, isDirected):
    // Step 1: Check existence and find start vertex
    eulerType ← checkEulerianExistence(graph, isDirected)
    
    IF eulerType = "NONE":
        RETURN NULL
    
    // Find start vertex
    IF eulerType = "PATH":
        IF isDirected:
            start ← vertex with outDegree - inDegree = 1
        ELSE:
            start ← vertex with odd degree
    ELSE:
        start ← any vertex with degree > 0
    
    // Step 2: Hierholzer's algorithm
    edgeCount ← copy of adjacency list sizes
    path ← empty list
    stack ← empty stack
    stack.push(start)
    
    WHILE stack NOT empty:
        u ← stack.top()
        
        IF edgeCount[u] = 0:
            path.prepend(u)
            stack.pop()
        ELSE:
            v ← graph.adj[u][edgeCount[u] - 1]
            edgeCount[u] ← edgeCount[u] - 1
            
            IF NOT isDirected:
                // Remove reverse edge for undirected
                removeEdge(edgeCount, v, u)
            
            stack.push(v)
    
    RETURN path
```

---

## 14. Prim's MST Algorithm

### Time Complexity: O((V + E) log V) | Space Complexity: O(V)

```
// PRIM'S MST - LAZY IMPLEMENTATION
// Use case: Finding minimum spanning tree
// Works for: Connected, undirected, weighted graphs

FUNCTION primMST_Lazy(graph, start):
    n ← graph.vertexCount
    visited ← array of size n, initialized to FALSE
    mstEdges ← empty list
    mstCost ← 0
    
    // Min-heap: (weight, from, to)
    pq ← empty min priority queue
    
    FUNCTION addEdges(node):
        visited[node] ← TRUE
        FOR each (neighbor, weight) IN graph.getNeighbors(node):
            IF NOT visited[neighbor]:
                pq.insert((weight, node, neighbor))
    
    addEdges(start)
    edgeCount ← 0
    
    WHILE pq NOT empty AND edgeCount < n - 1:
        weight, from, to ← pq.extractMin()
        
        IF visited[to]:
            CONTINUE  // Skip stale edges
        
        mstEdges.append((from, to, weight))
        mstCost ← mstCost + weight
        edgeCount ← edgeCount + 1
        
        addEdges(to)
    
    IF edgeCount ≠ n - 1:
        RETURN NULL, "Graph is not connected"
    
    RETURN mstEdges, mstCost

// PRIM'S MST - EAGER IMPLEMENTATION (Indexed Priority Queue)
// More efficient: O((V + E) log V) with better constants

FUNCTION primMST_Eager(graph, start):
    n ← graph.vertexCount
    visited ← array of size n, initialized to FALSE
    minEdge ← array of size n, initialized to (INFINITY, -1)
    
    ipq ← empty indexed min priority queue
    mstEdges ← empty list
    mstCost ← 0
    
    minEdge[start] ← (0, -1)
    ipq.insert(start, 0)
    
    WHILE ipq NOT empty:
        u ← ipq.extractMinIndex()
        weight, from ← minEdge[u]
        visited[u] ← TRUE
        
        IF from ≠ -1:
            mstEdges.append((from, u, weight))
            mstCost ← mstCost + weight
        
        FOR each (v, w) IN graph.getNeighbors(u):
            IF visited[v]:
                CONTINUE
            
            IF w < minEdge[v].weight:
                minEdge[v] ← (w, u)
                
                IF ipq.contains(v):
                    ipq.decreaseKey(v, w)
                ELSE:
                    ipq.insert(v, w)
    
    RETURN mstEdges, mstCost

// KRUSKAL'S MST (Alternative using Union-Find)
FUNCTION kruskalMST(graph):
    edges ← sort all edges by weight ascending
    uf ← UnionFind(V)
    mstEdges ← empty list
    mstCost ← 0
    
    FOR each (u, v, weight) IN edges:
        IF uf.find(u) ≠ uf.find(v):
            uf.union(u, v)
            mstEdges.append((u, v, weight))
            mstCost ← mstCost + weight
            
            IF mstEdges.size = V - 1:
                BREAK
    
    RETURN mstEdges, mstCost
```

---

## 15. Max Flow Ford-Fulkerson

### Time Complexity: O(E × maxFlow) | Space Complexity: O(V²)

```
// FORD-FULKERSON METHOD (DFS-based)
// Use case: Maximum flow in a network
// Note: May not terminate with irrational capacities

FUNCTION fordFulkerson(graph, source, sink):
    n ← graph.vertexCount
    
    // Residual graph: capacity[u][v] = remaining capacity
    capacity ← copy of graph capacity matrix
    
    maxFlow ← 0
    
    // Find augmenting path using DFS
    FUNCTION findPath(s, t, parent):
        visited ← array of size n, initialized to FALSE
        visited[s] ← TRUE
        stack ← [(s, INFINITY)]
        
        WHILE stack NOT empty:
            u, flow ← stack.pop()
            
            FOR v FROM 0 TO n-1:
                IF NOT visited[v] AND capacity[u][v] > 0:
                    visited[v] ← TRUE
                    parent[v] ← u
                    newFlow ← min(flow, capacity[u][v])
                    
                    IF v = t:
                        RETURN newFlow
                    
                    stack.push((v, newFlow))
        
        RETURN 0
    
    parent ← array of size n
    
    WHILE TRUE:
        pathFlow ← findPath(source, sink, parent)
        
        IF pathFlow = 0:
            BREAK
        
        maxFlow ← maxFlow + pathFlow
        
        // Update residual capacities
        v ← sink
        WHILE v ≠ source:
            u ← parent[v]
            capacity[u][v] ← capacity[u][v] - pathFlow
            capacity[v][u] ← capacity[v][u] + pathFlow  // Reverse edge
            v ← u
    
    RETURN maxFlow
```

---

## 16. Edmonds-Karp Algorithm

### Time Complexity: O(V × E²) | Space Complexity: O(V²)

```
// EDMONDS-KARP (BFS-based Ford-Fulkerson)
// Guaranteed polynomial time, finds shortest augmenting paths

FUNCTION edmondsKarp(graph, source, sink):
    n ← graph.vertexCount
    capacity ← copy of graph capacity matrix
    maxFlow ← 0
    
    FUNCTION bfs(s, t, parent):
        visited ← array of size n, initialized to FALSE
        visited[s] ← TRUE
        queue ← empty queue
        queue.enqueue((s, INFINITY))
        
        WHILE queue NOT empty:
            u, flow ← queue.dequeue()
            
            FOR v FROM 0 TO n-1:
                IF NOT visited[v] AND capacity[u][v] > 0:
                    visited[v] ← TRUE
                    parent[v] ← u
                    newFlow ← min(flow, capacity[u][v])
                    
                    IF v = t:
                        RETURN newFlow
                    
                    queue.enqueue((v, newFlow))
        
        RETURN 0
    
    parent ← array of size n
    
    WHILE TRUE:
        pathFlow ← bfs(source, sink, parent)
        
        IF pathFlow = 0:
            BREAK
        
        maxFlow ← maxFlow + pathFlow
        
        v ← sink
        WHILE v ≠ source:
            u ← parent[v]
            capacity[u][v] ← capacity[u][v] - pathFlow
            capacity[v][u] ← capacity[v][u] + pathFlow
            v ← u
    
    RETURN maxFlow

// MIN-CUT FROM MAX-FLOW
FUNCTION findMinCut(graph, source, sink):
    maxFlow ← edmondsKarp(graph, source, sink)
    
    // Find reachable vertices from source in residual graph
    visited ← BFS from source in residual graph
    
    minCutEdges ← empty list
    FOR each edge (u, v) IN original graph:
        IF visited[u] AND NOT visited[v]:
            minCutEdges.append((u, v))
    
    RETURN minCutEdges, maxFlow
```

---

## 17. Capacity Scaling

### Time Complexity: O(E² log C) | Space Complexity: O(V²)

```
// CAPACITY SCALING - Improved Ford-Fulkerson
// C = maximum edge capacity
// Only considers edges with capacity >= delta

FUNCTION capacityScaling(graph, source, sink):
    n ← graph.vertexCount
    capacity ← copy of graph capacity matrix
    maxFlow ← 0
    
    // Find initial delta (largest power of 2 <= max capacity)
    maxCap ← maximum edge capacity in graph
    delta ← 1
    WHILE delta * 2 <= maxCap:
        delta ← delta * 2
    
    FUNCTION findPath(s, t, parent, minCap):
        visited ← array of size n, initialized to FALSE
        visited[s] ← TRUE
        queue ← empty queue
        queue.enqueue((s, INFINITY))
        
        WHILE queue NOT empty:
            u, flow ← queue.dequeue()
            
            FOR v FROM 0 TO n-1:
                // Only use edges with capacity >= minCap
                IF NOT visited[v] AND capacity[u][v] >= minCap:
                    visited[v] ← TRUE
                    parent[v] ← u
                    newFlow ← min(flow, capacity[u][v])
                    
                    IF v = t:
                        RETURN newFlow
                    
                    queue.enqueue((v, newFlow))
        
        RETURN 0
    
    parent ← array of size n
    
    WHILE delta >= 1:
        WHILE TRUE:
            pathFlow ← findPath(source, sink, parent, delta)
            
            IF pathFlow = 0:
                BREAK
            
            maxFlow ← maxFlow + pathFlow
            
            v ← sink
            WHILE v ≠ source:
                u ← parent[v]
                capacity[u][v] ← capacity[u][v] - pathFlow
                capacity[v][u] ← capacity[v][u] + pathFlow
                v ← u
        
        delta ← delta / 2
    
    RETURN maxFlow
```

---

## 18. Dinic's Algorithm

### Time Complexity: O(V² × E) | Space Complexity: O(V²)

```
// DINIC'S ALGORITHM - Blocking Flow Method
// Key insight: Use level graph to find blocking flows
// Best for: Unit capacity graphs O(E√V), bipartite matching O(E√V)

FUNCTION dinic(graph, source, sink):
    n ← graph.vertexCount
    capacity ← build residual graph from edges
    maxFlow ← 0
    
    // Build level graph using BFS
    FUNCTION buildLevelGraph():
        level ← array of size n, initialized to -1
        level[source] ← 0
        queue ← [source]
        
        WHILE queue NOT empty:
            u ← queue.dequeue()
            FOR each v with capacity[u][v] > 0:
                IF level[v] = -1:
                    level[v] ← level[u] + 1
                    queue.enqueue(v)
        
        RETURN level[sink] ≠ -1
    
    // Send flow using DFS with pruning
    FUNCTION sendFlow(u, pushed):
        IF u = sink:
            RETURN pushed
        
        WHILE iter[u] < n:
            v ← iter[u]
            
            IF level[v] = level[u] + 1 AND capacity[u][v] > 0:
                d ← sendFlow(v, min(pushed, capacity[u][v]))
                
                IF d > 0:
                    capacity[u][v] ← capacity[u][v] - d
                    capacity[v][u] ← capacity[v][u] + d
                    RETURN d
            
            iter[u] ← iter[u] + 1
        
        RETURN 0
    
    WHILE buildLevelGraph():
        iter ← array of size n, initialized to 0
        
        WHILE TRUE:
            pushed ← sendFlow(source, INFINITY)
            IF pushed = 0:
                BREAK
            maxFlow ← maxFlow + pushed
    
    RETURN maxFlow

// DINIC WITH EDGE LIST (Memory Efficient)
STRUCTURE Edge:
    to, rev, cap, flow
    
FUNCTION dinic_EdgeList(n, edges, source, sink):
    adj ← array of n empty lists
    
    FUNCTION addEdge(from, to, cap):
        adj[from].append(Edge(to, adj[to].size, cap, 0))
        adj[to].append(Edge(from, adj[from].size - 1, 0, 0))
    
    // Add all edges
    FOR each (u, v, c) IN edges:
        addEdge(u, v, c)
    
    // Rest of Dinic's algorithm...
    // Use edge.cap - edge.flow for residual capacity
```

---

## 19. Bipartite Matching

### Time Complexity: O(V × E) or O(E√V) with Hopcroft-Karp | Space: O(V)

```
// BIPARTITE MATCHING - Hungarian Algorithm / Kuhn's Algorithm
// Problem: Maximum matching in bipartite graph

FUNCTION maxBipartiteMatching(graph, leftSize, rightSize):
    match ← array of size rightSize, initialized to -1
    result ← 0
    
    // Try to find augmenting path from each left vertex
    FUNCTION tryKuhn(v, visited):
        FOR each u IN graph.getNeighbors(v):
            IF NOT visited[u]:
                visited[u] ← TRUE
                
                // If right vertex is unmatched or can find alternative
                IF match[u] = -1 OR tryKuhn(match[u], visited):
                    match[u] ← v
                    RETURN TRUE
        
        RETURN FALSE
    
    FOR v FROM 0 TO leftSize - 1:
        visited ← array of size rightSize, initialized to FALSE
        IF tryKuhn(v, visited):
            result ← result + 1
    
    RETURN result, match

// HOPCROFT-KARP ALGORITHM - O(E√V)
FUNCTION hopcroftKarp(graph, leftSize, rightSize):
    matchL ← array of size leftSize, initialized to -1
    matchR ← array of size rightSize, initialized to -1
    dist ← array of size leftSize + 1
    
    FUNCTION bfs():
        queue ← empty queue
        
        FOR u FROM 0 TO leftSize - 1:
            IF matchL[u] = -1:
                dist[u] ← 0
                queue.enqueue(u)
            ELSE:
                dist[u] ← INFINITY
        
        dist[NIL] ← INFINITY
        
        WHILE queue NOT empty:
            u ← queue.dequeue()
            
            IF dist[u] < dist[NIL]:
                FOR each v IN graph.getNeighbors(u):
                    IF dist[matchR[v] ?? NIL] = INFINITY:
                        dist[matchR[v] ?? NIL] ← dist[u] + 1
                        IF matchR[v] ≠ -1:
                            queue.enqueue(matchR[v])
        
        RETURN dist[NIL] ≠ INFINITY
    
    FUNCTION dfs(u):
        IF u = NIL:
            RETURN TRUE
        
        FOR each v IN graph.getNeighbors(u):
            IF dist[matchR[v] ?? NIL] = dist[u] + 1:
                IF dfs(matchR[v] ?? NIL):
                    matchR[v] ← u
                    matchL[u] ← v
                    RETURN TRUE
        
        dist[u] ← INFINITY
        RETURN FALSE
    
    matching ← 0
    
    WHILE bfs():
        FOR u FROM 0 TO leftSize - 1:
            IF matchL[u] = -1 AND dfs(u):
                matching ← matching + 1
    
    RETURN matching

// MINIMUM VERTEX COVER (König's Theorem)
// In bipartite graph: min vertex cover = max matching
FUNCTION minVertexCover(graph, leftSize, rightSize):
    matching, match ← maxBipartiteMatching(graph, leftSize, rightSize)
    
    // Apply König's theorem to find cover
    // ... (BFS from unmatched left vertices)
    
    RETURN cover
```

---

## Quick Reference Cheat Sheet

| Algorithm | Time | Space | Use When |
|-----------|------|-------|----------|
| DFS | O(V+E) | O(V) | Path finding, cycle detection, connected components |
| BFS | O(V+E) | O(V) | Shortest path (unweighted), level traversal |
| Topological Sort | O(V+E) | O(V) | Task scheduling, DAG ordering |
| Dijkstra | O((V+E)logV) | O(V) | Shortest path, non-negative weights |
| Bellman-Ford | O(VE) | O(V) | Negative weights, cycle detection |
| Floyd-Warshall | O(V³) | O(V²) | All-pairs shortest path, small graphs |
| Tarjan SCC | O(V+E) | O(V) | Strongly connected components |
| Bridges/AP | O(V+E) | O(V) | Network reliability, cut vertices |
| TSP (DP) | O(n²2ⁿ) | O(n2ⁿ) | Optimal tour, n ≤ 20 |
| Prim/Kruskal | O((V+E)logV) | O(V) | Minimum spanning tree |
| Edmonds-Karp | O(VE²) | O(V²) | Maximum flow, general networks |
| Dinic | O(V²E) | O(V²) | Max flow, unit capacity O(E√V) |
| Hopcroft-Karp | O(E√V) | O(V) | Bipartite matching |

---

## Edge Cases Checklist

```
// ALWAYS HANDLE THESE EDGE CASES:

□ Empty graph (V = 0 or E = 0)
□ Single vertex graph
□ Disconnected components
□ Self-loops
□ Parallel edges (multigraph)
□ Negative edge weights
□ Negative cycles
□ Source equals destination
□ No path exists
□ Integer overflow in distance calculations
□ Graph with only one edge
□ Complete graph (E = V²)
□ Tree (E = V - 1)
□ Bipartite vs non-bipartite
□ Directed vs undirected edge interpretation
```


# Adaptive Directed Acyclic Graph Execution Engine (ADAGE)
## First-of-Kind Background Task Pipeline Architecture

---

# TABLE OF CONTENTS


===============================================================
### SECTION 0: DOCUMENT STRUCTURE
===============================================================

This README defines a revolutionary background task orchestration system
that transcends current SOTA solutions (Airflow, Temporal, Prefect, Dagster)
by introducing adaptive runtime optimization, speculative execution, and
self-healing flow graphs with predictive resource allocation.

Key Innovation: Compile-Time + Runtime Hybrid Optimization with
Criticality-Weighted Priority Scheduling and Speculative Dependency Resolution
=============================================================================


1. [Core Philosophy & Innovation](#core-philosophy--innovation)
2. [System Architecture Overview](#system-architecture-overview)
3. [Task Classification Taxonomy](#task-classification-taxonomy)
4. [Flow Graph Data Structures](#flow-graph-data-structures)
5. [Compilation Phase Algorithms](#compilation-phase-algorithms)
6. [Runtime Execution Engine](#runtime-execution-engine)
7. [Speculative Execution Framework](#speculative-execution-framework)
8. [Adaptive Resource Allocation](#adaptive-resource-allocation)
9. [Fault Tolerance & Self-Healing](#fault-tolerance--self-healing)
10. [Backpressure & Flow Control](#backpressure--flow-control)
11. [Observability & Metrics](#observability--metrics)
12. [Pseudo Algorithm Specifications](#pseudo-algorithm-specifications)
13. [Complexity Analysis](#complexity-analysis)
14. [Deployment Topology](#deployment-topology)

---

# CORE PHILOSOPHY & INNOVATION


=============================================================================
INNOVATION THESIS: Beyond Static DAG Execution
=============================================================================
Current SOTA Limitations:
- Airflow: Static DAG, no runtime adaptation, heavy scheduler overhead
- Celery: No native dependency graph, manual chaining required
- Temporal: Workflow-centric, not optimized for heterogeneous task graphs
- Prefect: Dynamic but lacks speculative execution and criticality analysis

ADAGE Innovation Pillars:
1. CRITICALITY-WEIGHTED PRIORITY SCHEDULING (CWPS)
   - Tasks scored by downstream impact, not just dependency order
   - Critical path tasks preemptively allocated premium resources

2. SPECULATIVE DEPENDENCY RESOLUTION (SDR)
   - Begin dependent tasks before dependencies complete using predicted outputs
   - Rollback mechanism if prediction incorrect

3. ADAPTIVE TOPOLOGY REORDERING (ATR)
   - Runtime DAG restructuring based on observed execution characteristics
   - Hot path optimization through execution history analysis

4. PREDICTIVE RESOURCE ALLOCATION (PRA)
   - ML-driven resource prediction based on task signatures
   - Elastic worker pool with task-type affinity

5. CHECKPOINT-DELTA PROPAGATION (CDP)
   - Incremental state transfer between dependent tasks
   - Reduce serialization overhead through delta encoding
=============================================================================


## Design Principles

<!--
PRINCIPLE 1: Zero-Wait Maximization
- No task should wait for resources if runnable
- Speculative execution fills pipeline bubbles

PRINCIPLE 2: Failure Isolation
- Task failure isolated to minimal blast radius
- Dependent tasks notified via poison pill, not cascade

PRINCIPLE 3: Resource Elasticity
- Worker pools scale with queue depth and task criticality
- Heterogeneous workers match task requirements

PRINCIPLE 4: Observability by Default
- Every state transition emits structured event
- Distributed tracing spans entire task graph execution
-->

---

# SYSTEM ARCHITECTURE OVERVIEW

<!--
=============================================================================
MULTI-LAYER ARCHITECTURE
=============================================================================

┌─────────────────────────────────────────────────────────────────────────┐
│                         CLIENT LAYER                                     │
│  [Task Definitions] [Dependency Declarations] [Execution Policies]      │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      COMPILATION LAYER                                   │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐   │
│  │   PARSER     │ │  OPTIMIZER   │ │   ANALYZER   │ │  SCHEDULER   │   │
│  │              │ │              │ │              │ │   GENERATOR  │   │
│  │ - Validate   │ │ - Fusion     │ │ - Critical   │ │              │   │
│  │ - Normalize  │ │ - Hoisting   │ │   Path       │ │ - Priority   │   │
│  │ - Type Check │ │ - Parallelize│ │ - Resource   │ │   Queues     │   │
│  └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                     ORCHESTRATION LAYER                                  │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                    EXECUTION COORDINATOR                         │   │
│  │  - State Machine Manager    - Speculative Executor               │   │
│  │  - Checkpoint Controller    - Rollback Handler                   │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐   │
│  │   DISPATCH   │ │   RESOURCE   │ │   RESULT     │ │   FAILURE    │   │
│  │   QUEUE      │ │   ALLOCATOR  │ │   AGGREGATOR │ │   HANDLER    │   │
│  └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                       EXECUTION LAYER                                    │
│  ┌────────────────────────────────────────────────────────────────┐    │
│  │                    WORKER POOL MANAGER                          │    │
│  └────────────────────────────────────────────────────────────────┘    │
│  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐     │
│  │ FAST     │ │ STANDARD │ │ HEAVY    │ │ GPU      │ │ MEMORY   │     │
│  │ WORKER   │ │ WORKER   │ │ WORKER   │ │ WORKER   │ │ WORKER   │     │
│  │ POOL     │ │ POOL     │ │ POOL     │ │ POOL     │ │ POOL     │     │
│  │          │ │          │ │          │ │          │ │          │     │
│  │ <10ms    │ │ 10-500ms │ │ >500ms   │ │ GPU Bound│ │ RAM >4GB │     │
│  └──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘     │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                     PERSISTENCE LAYER                                    │
│  ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐   │
│  │   STATE      │ │   RESULT     │ │   CHECKPOINT │ │   DEAD       │   │
│  │   STORE      │ │   CACHE      │ │   STORE      │ │   LETTER Q   │   │
│  │   (Redis)    │ │   (Redis)    │ │   (S3/Minio) │ │   (Kafka)    │   │
│  └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘

=============================================================================
-->

---

# TASK CLASSIFICATION TAXONOMY

<!--
=============================================================================
TASK CLASSIFICATION MATRIX
=============================================================================
Every task T_i classified along 4 orthogonal dimensions:

DIMENSION 1: EXECUTION WEIGHT
┌────────────────┬───────────────┬─────────────────────────────────────┐
│ Classification │ Latency Range │ Resource Characteristics            │
├────────────────┼───────────────┼─────────────────────────────────────┤
│ NANO           │ < 1ms         │ In-memory, no I/O, pure computation │
│ MICRO          │ 1ms - 10ms    │ Cache hits, simple DB lookups       │
│ LIGHT          │ 10ms - 100ms  │ Single DB query, API call           │
│ MEDIUM         │ 100ms - 1s    │ Multiple queries, file processing   │
│ HEAVY          │ 1s - 60s      │ Large data, ML inference            │
│ BATCH          │ > 60s         │ ETL, report generation              │
└────────────────┴───────────────┴─────────────────────────────────────┘

DIMENSION 2: DEPENDENCY PATTERN
┌────────────────┬─────────────────────────────────────────────────────┐
│ Pattern        │ Description                                         │
├────────────────┼─────────────────────────────────────────────────────┤
│ INDEPENDENT    │ No dependencies, can execute immediately            │
│ SEQUENTIAL     │ Single predecessor must complete                    │
│ FAN-IN         │ Multiple predecessors must all complete (AND-join)  │
│ FAN-OUT        │ Single task triggers multiple successors            │
│ CONDITIONAL    │ Dependency satisfied based on predecessor result    │
│ DYNAMIC        │ Dependencies determined at runtime                  │
└────────────────┴─────────────────────────────────────────────────────┘

DIMENSION 3: IDEMPOTENCY CLASS
┌────────────────┬─────────────────────────────────────────────────────┐
│ Class          │ Retry Semantics                                     │
├────────────────┼─────────────────────────────────────────────────────┤
│ PURE           │ Safe to retry unlimited times, no side effects      │
│ IDEMPOTENT     │ Safe to retry, same result guaranteed               │
│ AT-MOST-ONCE   │ Execute maximum once, failure = skip                │
│ AT-LEAST-ONCE  │ Must complete, duplicates acceptable                │
│ EXACTLY-ONCE   │ Requires distributed transaction / outbox pattern   │
└────────────────┴─────────────────────────────────────────────────────┘

DIMENSION 4: CRITICALITY LEVEL
┌────────────────┬─────────────────────────────────────────────────────┐
│ Level          │ Impact Assessment                                   │
├────────────────┼─────────────────────────────────────────────────────┤
│ CRITICAL       │ On critical path, delays cascade to completion      │
│ HIGH           │ Significant downstream impact, many dependents      │
│ MEDIUM         │ Moderate impact, parallel alternatives exist        │
│ LOW            │ Minimal impact, optional optimization task          │
│ BACKGROUND     │ No immediate impact, can be deferred indefinitely   │
└────────────────┴─────────────────────────────────────────────────────┘

COMPOSITE TASK SIGNATURE:
TaskSignature = (Weight, DependencyPattern, IdempotencyClass, Criticality)

Example: T_payment = (LIGHT, SEQUENTIAL, EXACTLY-ONCE, CRITICAL)
Example: T_thumbnail = (HEAVY, FAN-OUT, PURE, LOW)
=============================================================================
-->

---

# FLOW GRAPH DATA STRUCTURES

<!--
=============================================================================
CORE DATA STRUCTURES
=============================================================================

STRUCTURE 1: TASK NODE
────────────────────────────────────────────────────────────────────────────
TaskNode {
    id:                 UUID                    // Unique task identifier
    name:               String                  // Human-readable name
    signature:          TaskSignature           // Classification tuple
    
    // Execution Metadata
    handler:            FunctionPointer         // Actual task implementation
    timeout:            Duration                // Maximum execution time
    retry_policy:       RetryPolicy             // Backoff configuration
    
    // Resource Requirements
    cpu_units:          Float                   // CPU cores required (0.1 - 16)
    memory_mb:          Integer                 // Memory requirement
    gpu_required:       Boolean                 // GPU acceleration needed
    
    // Runtime State
    state:              TaskState               // Current execution state
    attempts:           Integer                 // Retry count
    checkpoints:        List<Checkpoint>        // Recovery points
    
    // Dependency Tracking
    predecessors:       Set<TaskNode>           // Upstream dependencies
    successors:         Set<TaskNode>           // Downstream dependents
    input_mapping:      Map<TaskNode, Selector> // Output-to-input mapping
    
    // Metrics
    estimated_duration: Duration                // Predicted execution time
    actual_duration:    Duration                // Observed execution time
    criticality_score:  Float                   // Computed priority score
}

STRUCTURE 2: EXECUTION GRAPH
────────────────────────────────────────────────────────────────────────────
ExecutionGraph {
    id:                 UUID                    // Graph instance ID
    nodes:              Map<UUID, TaskNode>     // All tasks in graph
    edges:              List<Edge>              // Dependency edges
    
    // Computed Properties
    topological_order:  List<UUID>              // Valid execution order
    critical_path:      List<UUID>              // Longest duration path
    parallel_stages:    List<Set<UUID>>         // Parallelizable groups
    
    // Optimization Metadata
    fusion_groups:      List<FusionGroup>       // Merged task clusters
    speculative_hints:  Map<UUID, Prediction>   // Output predictions
    
    // Execution State
    status:             GraphStatus             // PENDING|RUNNING|COMPLETE|FAILED
    start_time:         Timestamp               // Execution start
    completion_time:    Timestamp               // Execution end
}

STRUCTURE 3: EDGE (DEPENDENCY)
────────────────────────────────────────────────────────────────────────────
Edge {
    source:             UUID                    // Predecessor task
    target:             UUID                    // Successor task
    type:               EdgeType                // DATA|CONTROL|TEMPORAL
    
    // Data Flow
    selector:           OutputSelector          // Which output field to pass
    transformer:        TransformFunction       // Optional data transformation
    
    // Conditional Execution
    condition:          PredicateFunction       // When to activate edge
    
    // Edge Weights (for optimization)
    data_size_estimate: Bytes                   // Expected data transfer size
    latency_overhead:   Duration                // Serialization/transfer time
}

STRUCTURE 4: TASK STATE MACHINE
────────────────────────────────────────────────────────────────────────────
TaskState Enum:
    PENDING         → Initial state, dependencies not met
    READY           → Dependencies satisfied, awaiting dispatch
    SPECULATIVE     → Started speculatively before dependencies complete
    DISPATCHED      → Assigned to worker, in queue
    RUNNING         → Actively executing on worker
    CHECKPOINTED    → Paused with state saved
    SUCCEEDED       → Completed successfully
    FAILED          → Failed, may retry
    DEAD_LETTERED   → Permanently failed, moved to DLQ
    ROLLED_BACK     → Speculative execution invalidated
    SKIPPED         → Bypassed due to conditional logic
    CANCELLED       → Externally terminated

State Transition Graph:
    PENDING ──[deps_met]──→ READY
    PENDING ──[speculate]──→ SPECULATIVE
    READY ──[dispatch]──→ DISPATCHED
    DISPATCHED ──[worker_ack]──→ RUNNING
    RUNNING ──[success]──→ SUCCEEDED
    RUNNING ──[failure]──→ FAILED
    RUNNING ──[checkpoint]──→ CHECKPOINTED
    FAILED ──[retry]──→ DISPATCHED
    FAILED ──[max_retries]──→ DEAD_LETTERED
    SPECULATIVE ──[validated]──→ SUCCEEDED
    SPECULATIVE ──[invalidated]──→ ROLLED_BACK
    ROLLED_BACK ──[re-enqueue]──→ READY

STRUCTURE 5: PRIORITY QUEUE ENTRY
────────────────────────────────────────────────────────────────────────────
QueueEntry {
    task_id:            UUID
    priority_score:     Float                   // Higher = more urgent
    enqueue_time:       Timestamp
    deadline:           Timestamp               // Optional hard deadline
    affinity_hints:     List<WorkerType>        // Preferred worker pools
}

Priority Score Computation:
    score = (criticality_weight * criticality_score) +
            (deadline_weight * deadline_urgency) +
            (wait_weight * time_in_queue) +
            (dependency_weight * blocked_task_count) -
            (speculation_penalty if speculative)

=============================================================================
-->

---

# COMPILATION PHASE ALGORITHMS

<!--
=============================================================================
PHASE 1: GRAPH CONSTRUCTION & VALIDATION
=============================================================================

ALGORITHM: BuildExecutionGraph
────────────────────────────────────────────────────────────────────────────
INPUT:  task_definitions: List<TaskDefinition>
        dependency_declarations: List<DependencyDeclaration>
OUTPUT: validated_graph: ExecutionGraph

PROCEDURE:
    1. INITIALIZATION
       graph ← new ExecutionGraph()
       node_map ← empty Map<String, TaskNode>
    
    2. NODE CREATION
       FOR EACH task_def IN task_definitions:
           node ← CreateTaskNode(task_def)
           node.signature ← ClassifyTask(task_def)
           node_map[task_def.name] ← node
           graph.nodes.add(node)
    
    3. EDGE CONSTRUCTION
       FOR EACH dep IN dependency_declarations:
           source ← node_map[dep.predecessor]
           target ← node_map[dep.successor]
           
           VALIDATE source EXISTS AND target EXISTS
           
           edge ← CreateEdge(source, target, dep)
           graph.edges.add(edge)
           
           source.successors.add(target)
           target.predecessors.add(source)
    
    4. CYCLE DETECTION (Tarjan's Algorithm)
       sccs ← TarjanSCC(graph)
       IF any scc.size > 1:
           RAISE CyclicDependencyError(scc)
    
    5. ORPHAN DETECTION
       FOR EACH node IN graph.nodes:
           IF node.predecessors.empty AND node.successors.empty:
               WARN "Isolated task detected: " + node.name
    
    6. TYPE COMPATIBILITY VALIDATION
       FOR EACH edge IN graph.edges:
           source_output_type ← edge.source.output_schema
           target_input_type ← edge.target.input_schema[edge.selector]
           IF NOT compatible(source_output_type, target_input_type):
               RAISE TypeMismatchError(edge)
    
    RETURN graph

COMPLEXITY: O(V + E) where V = tasks, E = dependencies
────────────────────────────────────────────────────────────────────────────

=============================================================================
PHASE 2: TOPOLOGICAL ORDERING WITH PRIORITY LAYERS
=============================================================================

ALGORITHM: ComputeExecutionOrder
────────────────────────────────────────────────────────────────────────────
INPUT:  graph: ExecutionGraph
OUTPUT: ordered_stages: List<Set<TaskNode>>

PROCEDURE:
    1. COMPUTE IN-DEGREES
       in_degree ← Map<TaskNode, Integer>
       FOR EACH node IN graph.nodes:
           in_degree[node] ← node.predecessors.size
    
    2. INITIALIZE READY SET
       ready ← Set containing all nodes where in_degree[node] = 0
       ordered_stages ← empty List
       stage_index ← 0
    
    3. WAVEFRONT PROPAGATION (Kahn's Algorithm Extended)
       WHILE ready NOT EMPTY:
           current_stage ← copy of ready
           ready ← empty Set
           
           // Sort within stage by criticality
           current_stage.sort_descending(by: node.criticality_score)
           ordered_stages.append(current_stage)
           
           FOR EACH node IN current_stage:
               FOR EACH successor IN node.successors:
                   in_degree[successor] ← in_degree[successor] - 1
                   IF in_degree[successor] = 0:
                       ready.add(successor)
           
           stage_index ← stage_index + 1
    
    4. VALIDATE COMPLETE ORDERING
       total_ordered ← sum of stage sizes
       IF total_ordered ≠ graph.nodes.size:
           RAISE GraphNotFullyOrderableError()
    
    graph.parallel_stages ← ordered_stages
    RETURN ordered_stages

COMPLEXITY: O(V + E)
────────────────────────────────────────────────────────────────────────────

=============================================================================
PHASE 3: CRITICAL PATH ANALYSIS
=============================================================================

ALGORITHM: ComputeCriticalPath
────────────────────────────────────────────────────────────────────────────
INPUT:  graph: ExecutionGraph
OUTPUT: critical_path: List<TaskNode>
        criticality_scores: Map<TaskNode, Float>

PROCEDURE:
    1. FORWARD PASS - Earliest Start Time (EST)
       est ← Map<TaskNode, Duration>
       
       FOR EACH node IN graph.topological_order:
           IF node.predecessors.empty:
               est[node] ← 0
           ELSE:
               est[node] ← MAX(
                   est[pred] + pred.estimated_duration
                   FOR pred IN node.predecessors
               )
    
    2. COMPUTE TOTAL DURATION
       terminal_nodes ← nodes with no successors
       total_duration ← MAX(est[n] + n.estimated_duration FOR n IN terminal_nodes)
    
    3. BACKWARD PASS - Latest Start Time (LST)
       lst ← Map<TaskNode, Duration>
       
       FOR EACH node IN REVERSE(graph.topological_order):
           IF node.successors.empty:
               lst[node] ← total_duration - node.estimated_duration
           ELSE:
               lst[node] ← MIN(lst[succ] FOR succ IN node.successors)
                           - node.estimated_duration
    
    4. COMPUTE SLACK (Float)
       slack ← Map<TaskNode, Duration>
       FOR EACH node IN graph.nodes:
           slack[node] ← lst[node] - est[node]
    
    5. IDENTIFY CRITICAL PATH (slack = 0)
       critical_nodes ← nodes WHERE slack[node] = 0
       critical_path ← order critical_nodes by est ascending
    
    6. COMPUTE CRITICALITY SCORES
       max_downstream ← MAX(successor_count(n) FOR n IN graph.nodes)
       
       FOR EACH node IN graph.nodes:
           // Normalized score 0.0 - 1.0
           slack_factor ← 1.0 - (slack[node] / total_duration)
           downstream_factor ← downstream_count(node) / max_downstream
           duration_factor ← node.estimated_duration / total_duration
           
           criticality_scores[node] ←
               0.5 * slack_factor +
               0.3 * downstream_factor +
               0.2 * duration_factor
    
    graph.critical_path ← critical_path
    RETURN critical_path, criticality_scores

COMPLEXITY: O(V + E)
────────────────────────────────────────────────────────────────────────────

=============================================================================
PHASE 4: TASK FUSION OPTIMIZATION
=============================================================================

ALGORITHM: FuseCompatibleTasks
────────────────────────────────────────────────────────────────────────────
INPUT:  graph: ExecutionGraph
OUTPUT: optimized_graph: ExecutionGraph with fusion groups

PROCEDURE:
    1. IDENTIFY FUSION CANDIDATES
       // Tasks that can be merged into single execution unit
       
       fusion_candidates ← empty List<Pair<TaskNode, TaskNode>>
       
       FOR EACH edge IN graph.edges:
           source ← edge.source
           target ← edge.target
           
           // Fusion criteria
           can_fuse ← ALL OF:
               - source.successors.size = 1                    // Single output
               - target.predecessors.size = 1                  // Single input
               - source.weight + target.weight ≤ LIGHT         // Combined still fast
               - source.worker_affinity = target.worker_affinity
               - source.idempotency_class = target.idempotency_class
               - edge.data_size_estimate < FUSION_DATA_THRESHOLD
               - NOT source.checkpoint_required
               - NOT target.checkpoint_required
           
           IF can_fuse:
               fusion_candidates.append((source, target))
    
    2. GREEDY FUSION (avoid conflicts)
       fused ← empty Set<TaskNode>
       fusion_groups ← empty List<FusionGroup>
       
       FOR EACH (source, target) IN fusion_candidates:
           IF source NOT IN fused AND target NOT IN fused:
               group ← new FusionGroup(source, target)
               fusion_groups.append(group)
               fused.add(source)
               fused.add(target)
    
    3. EXTEND FUSION CHAINS
       // Merge consecutive fuseable tasks into longer chains
       
       changed ← true
       WHILE changed:
           changed ← false
           FOR EACH group IN fusion_groups:
               last_task ← group.tasks.last()
               IF last_task.successors.size = 1:
                   next_task ← last_task.successors.first()
                   IF can_fuse(last_task, next_task) AND next_task NOT IN fused:
                       group.tasks.append(next_task)
                       fused.add(next_task)
                       changed ← true
    
    4. CREATE FUSED NODES
       FOR EACH group IN fusion_groups:
           fused_node ← MergeTasks(group.tasks)
           fused_node.estimated_duration ← SUM(t.estimated_duration FOR t IN group.tasks)
           fused_node.handler ← CreatePipeline(group.tasks)
           
           // Update graph edges
           ReplaceInGraph(graph, group.tasks, fused_node)
    
    RETURN graph

BENEFIT: Reduces scheduling overhead, eliminates serialization between fused tasks
COMPLEXITY: O(E) for candidate identification, O(V) for fusion
────────────────────────────────────────────────────────────────────────────

=============================================================================
PHASE 5: SPECULATION CANDIDATE IDENTIFICATION
=============================================================================

ALGORITHM: IdentifySpeculativeTasks
────────────────────────────────────────────────────────────────────────────
INPUT:  graph: ExecutionGraph
        historical_data: ExecutionHistory
OUTPUT: speculation_plan: Map<TaskNode, SpeculationConfig>

PROCEDURE:
    1. ANALYZE HISTORICAL PATTERNS
       FOR EACH task IN graph.nodes:
           task.success_rate ← ComputeSuccessRate(historical_data, task)
           task.output_predictability ← ComputeOutputVariance(historical_data, task)
           task.avg_duration ← ComputeAvgDuration(historical_data, task)
    
    2. IDENTIFY SPECULATION OPPORTUNITIES
       speculation_plan ← empty Map
       
       FOR EACH task IN graph.nodes WHERE task.predecessors NOT EMPTY:
           // Score speculative potential
           pred_durations ← [p.avg_duration FOR p IN task.predecessors]
           max_pred_duration ← MAX(pred_durations)
           own_duration ← task.avg_duration
           
           // Speculation worthwhile if predecessor takes long
           // and own task is fast or on critical path
           
           speculation_score ←
               (max_pred_duration / own_duration) *    // Wait ratio
               task.criticality_score *                  // Importance
               MIN(p.output_predictability FOR p IN task.predecessors)
           
           IF speculation_score > SPECULATION_THRESHOLD:
               config ← new SpeculationConfig()
               config.trigger_point ← 0.8 * max_pred_duration  // Start at 80% expected
               config.prediction_model ← SelectPredictionModel(task.predecessors)
               config.rollback_cost ← EstimateRollbackCost(task)
               speculation_plan[task] ← config
    
    3. VALIDATE SPECULATION SAFETY
       FOR EACH (task, config) IN speculation_plan:
           IF task.idempotency_class NOT IN [PURE, IDEMPOTENT]:
               // Non-idempotent tasks need special handling
               config.requires_shadow_execution ← true
               config.commit_on_validation ← true
    
    graph.speculative_hints ← speculation_plan
    RETURN speculation_plan

COMPLEXITY: O(V) with historical data lookup
────────────────────────────────────────────────────────────────────────────
-->

---

# RUNTIME EXECUTION ENGINE

<!--
=============================================================================
CORE EXECUTION LOOP
=============================================================================

ALGORITHM: ExecuteGraph
────────────────────────────────────────────────────────────────────────────
INPUT:  graph: ExecutionGraph
        input_data: InitialInputs
OUTPUT: execution_result: GraphExecutionResult

PROCEDURE:
    1. INITIALIZE EXECUTION CONTEXT
       context ← new ExecutionContext()
       context.graph ← graph
       context.results ← empty Map<TaskNode, TaskResult>
       context.states ← Map(all nodes → PENDING)
       context.start_time ← now()
       
       // Initialize priority queues
       ready_queue ← new PriorityQueue(by: criticality_score DESC)
       speculative_queue ← new PriorityQueue(by: speculation_score DESC)
       
       // Event channels
       completion_channel ← new Channel<TaskCompletion>()
       failure_channel ← new Channel<TaskFailure>()
    
    2. SEED INITIAL TASKS
       FOR EACH node IN graph.nodes WHERE node.predecessors.empty:
           context.states[node] ← READY
           ready_queue.enqueue(CreateQueueEntry(node))
           EmitEvent(TaskReady, node)
    
    3. MAIN EXECUTION LOOP
       pending_count ← graph.nodes.size
       active_tasks ← empty Set<TaskNode>
       
       WHILE pending_count > 0:
           
           // 3A. DISPATCH READY TASKS
           WHILE ready_queue.not_empty AND ResourcesAvailable():
               entry ← ready_queue.dequeue()
               task ← graph.nodes[entry.task_id]
               
               IF context.states[task] ≠ READY:
                   CONTINUE  // State changed, skip
               
               worker ← AllocateWorker(task)
               DispatchToWorker(worker, task, context.results)
               context.states[task] ← DISPATCHED
               active_tasks.add(task)
               EmitEvent(TaskDispatched, task, worker)
           
           // 3B. TRIGGER SPECULATIVE EXECUTION
           IF speculation_enabled:
               CheckAndTriggerSpeculation(context, speculative_queue)
           
           // 3C. WAIT FOR COMPLETION EVENTS
           SELECT:
               CASE completion ← completion_channel.receive():
                   HandleTaskCompletion(context, completion, ready_queue)
                   pending_count ← pending_count - 1
                   active_tasks.remove(completion.task)
               
               CASE failure ← failure_channel.receive():
                   HandleTaskFailure(context, failure, ready_queue)
                   // pending_count adjusted in handler
                   active_tasks.remove(failure.task)
               
               CASE TIMEOUT after HEARTBEAT_INTERVAL:
                   CheckStaleTasks(active_tasks)
                   AdjustResourceAllocation(context)
    
    4. FINALIZE RESULTS
       execution_result ← new GraphExecutionResult()
       execution_result.status ← DetermineOverallStatus(context)
       execution_result.outputs ← CollectTerminalOutputs(context)
       execution_result.duration ← now() - context.start_time
       execution_result.metrics ← CollectMetrics(context)
       
       RETURN execution_result

────────────────────────────────────────────────────────────────────────────

ALGORITHM: HandleTaskCompletion
────────────────────────────────────────────────────────────────────────────
INPUT:  context: ExecutionContext
        completion: TaskCompletion
        ready_queue: PriorityQueue

PROCEDURE:
    task ← completion.task
    result ← completion.result
    
    1. STORE RESULT
       context.results[task] ← result
       context.states[task] ← SUCCEEDED
       EmitEvent(TaskCompleted, task, result.duration)
    
    2. VALIDATE SPECULATIVE DEPENDENTS
       FOR EACH speculative_task IN GetSpeculativeDependents(task):
           actual_input ← result.output
           predicted_input ← speculative_task.predicted_input
           
           IF InputsMatch(actual_input, predicted_input):
               // Speculation successful, validate execution
               context.states[speculative_task] ← SUCCEEDED
               EmitEvent(SpeculationValidated, speculative_task)
           ELSE:
               // Speculation failed, rollback and re-execute
               RollbackTask(speculative_task, context)
               context.states[speculative_task] ← READY
               ready_queue.enqueue(CreateQueueEntry(speculative_task))
               EmitEvent(SpeculationRolledBack, speculative_task)
    
    3. ACTIVATE SUCCESSORS
       FOR EACH successor IN task.successors:
           IF AllPredecessorsSatisfied(successor, context):
               IF context.states[successor] = PENDING:
                   context.states[successor] ← READY
                   ready_queue.enqueue(CreateQueueEntry(successor))
                   EmitEvent(TaskReady, successor)
               ELIF context.states[successor] = SPECULATIVE:
                   // Already running speculatively, will be validated
                   PASS
    
    4. CACHE RESULT (if configured)
       IF task.cacheable:
           cache_key ← ComputeCacheKey(task, task.inputs)
           ResultCache.store(cache_key, result, task.cache_ttl)
    
    5. UPDATE METRICS
       UpdateDurationHistogram(task.name, result.duration)
       UpdateResourceUtilization(result.resource_usage)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: HandleTaskFailure
────────────────────────────────────────────────────────────────────────────
INPUT:  context: ExecutionContext
        failure: TaskFailure
        ready_queue: PriorityQueue

PROCEDURE:
    task ← failure.task
    error ← failure.error
    attempt ← context.attempts[task]
    
    1. DETERMINE FAILURE ACTION
       retry_policy ← task.retry_policy
       
       action ← CASE error.type OF:
           TRANSIENT_ERROR:
               IF attempt < retry_policy.max_retries:
                   RETRY_WITH_BACKOFF
               ELSE:
                   DEAD_LETTER
           
           RESOURCE_EXHAUSTED:
               IF attempt < 2:
                   RETRY_WITH_SCALED_RESOURCES
               ELSE:
                   DEAD_LETTER
           
           VALIDATION_ERROR:
               DEAD_LETTER  // Don't retry bad input
           
           TIMEOUT:
               IF attempt < retry_policy.max_retries:
                   RETRY_WITH_EXTENDED_TIMEOUT
               ELSE:
                   DEAD_LETTER
           
           FATAL_ERROR:
               DEAD_LETTER
    
    2. EXECUTE FAILURE ACTION
       CASE action OF:
           RETRY_WITH_BACKOFF:
               delay ← ComputeExponentialBackoff(attempt, retry_policy)
               context.attempts[task] ← attempt + 1
               ScheduleDelayedEnqueue(task, delay, ready_queue)
               context.states[task] ← PENDING
               EmitEvent(TaskRetryScheduled, task, delay)
           
           RETRY_WITH_SCALED_RESOURCES:
               task.cpu_units ← task.cpu_units * 1.5
               task.memory_mb ← task.memory_mb * 1.5
               context.attempts[task] ← attempt + 1
               ready_queue.enqueue(CreateQueueEntry(task))
               context.states[task] ← READY
               EmitEvent(TaskRetryWithResources, task)
           
           DEAD_LETTER:
               context.states[task] ← DEAD_LETTERED
               DeadLetterQueue.enqueue(task, error, context)
               PropagateFailureToSuccessors(task, context)
               EmitEvent(TaskDeadLettered, task, error)
    
    3. PROPAGATE FAILURE (if not retrying)
       IF action = DEAD_LETTER:
           FOR EACH successor IN GetAllDownstreamTasks(task):
               IF task.failure_mode = FAIL_FAST:
                   context.states[successor] ← SKIPPED
                   EmitEvent(TaskSkipped, successor, "Upstream failed")
               ELIF task.failure_mode = CONTINUE:
                   // Mark dependency as failed but allow task if other paths exist
                   MarkDependencyFailed(successor, task)

────────────────────────────────────────────────────────────────────────────
-->

---

# SPECULATIVE EXECUTION FRAMEWORK

<!--
=============================================================================
SPECULATIVE EXECUTION ARCHITECTURE
=============================================================================

CONCEPT: Begin executing a task BEFORE its dependencies complete,
         using predicted outputs. Validate or rollback upon completion.

Benefits:
- Reduces critical path latency by overlapping execution
- Utilizes idle resources during long-running dependencies
- Particularly effective for predictable predecessor outputs

Risks:
- Wasted computation if prediction incorrect
- Resource contention with actual work
- Complexity in rollback for non-idempotent tasks

=============================================================================

ALGORITHM: SpeculativeExecutionController
────────────────────────────────────────────────────────────────────────────
BACKGROUND PROCESS running continuously during graph execution

PROCEDURE:
    LOOP:
        SLEEP(SPECULATION_CHECK_INTERVAL)
        
        FOR EACH running_task IN GetRunningTasks():
            progress ← EstimateProgress(running_task)
            remaining ← EstimateRemaining(running_task)
            
            FOR EACH successor IN running_task.successors:
                spec_config ← speculation_plan.get(successor)
                
                IF spec_config IS NULL:
                    CONTINUE
                
                IF ShouldSpeculate(running_task, successor, progress, spec_config):
                    TriggerSpeculativeExecution(successor, running_task)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: ShouldSpeculate
────────────────────────────────────────────────────────────────────────────
INPUT:  predecessor: TaskNode (running)
        successor: TaskNode (pending)
        predecessor_progress: Float (0.0 - 1.0)
        spec_config: SpeculationConfig

OUTPUT: Boolean

PROCEDURE:
    // Check preconditions
    IF successor.state ≠ PENDING:
        RETURN false
    
    IF NOT ResourcesAvailableForSpeculation():
        RETURN false
    
    // Check if we're past the trigger point
    IF predecessor_progress < spec_config.trigger_point:
        RETURN false
    
    // Check if all other dependencies are satisfied
    FOR EACH other_pred IN successor.predecessors:
        IF other_pred ≠ predecessor AND other_pred.state ≠ SUCCEEDED:
            RETURN false
    
    // Compute speculation ROI
    expected_wait ← (1.0 - predecessor_progress) * predecessor.estimated_duration
    successor_duration ← successor.estimated_duration
    rollback_cost ← spec_config.rollback_cost
    prediction_accuracy ← spec_config.prediction_model.accuracy
    
    // Expected value of speculation
    expected_gain ← expected_wait * prediction_accuracy
    expected_loss ← rollback_cost * (1.0 - prediction_accuracy)
    
    IF expected_gain > expected_loss * SPECULATION_RISK_FACTOR:
        RETURN true
    
    RETURN false

────────────────────────────────────────────────────────────────────────────

ALGORITHM: TriggerSpeculativeExecution
────────────────────────────────────────────────────────────────────────────
INPUT:  task: TaskNode
        pending_predecessor: TaskNode

PROCEDURE:
    1. GENERATE PREDICTED INPUT
       prediction_model ← speculation_plan[task].prediction_model
       
       // Use historical data and partial predecessor results
       predicted_output ← prediction_model.predict(
           predecessor: pending_predecessor,
           partial_state: GetPartialState(pending_predecessor),
           historical: GetHistoricalOutputs(pending_predecessor)
       )
       
       task.predicted_input ← predicted_output
       task.speculation_source ← pending_predecessor
    
    2. PREPARE SPECULATIVE CONTEXT
       spec_context ← new SpeculativeContext()
       spec_context.original_state ← CaptureState(task)
       spec_context.shadow_mode ← task.requires_shadow_execution
       
       IF spec_context.shadow_mode:
           // Create isolated execution environment
           spec_context.sandbox ← CreateSandbox(task)
    
    3. DISPATCH SPECULATIVE EXECUTION
       context.states[task] ← SPECULATIVE
       
       worker ← AllocateWorker(task, priority: LOW)  // Lower priority than confirmed work
       DispatchToWorker(worker, task, spec_context)
       
       EmitEvent(SpeculativeExecutionStarted, task, pending_predecessor)
    
    4. REGISTER VALIDATION CALLBACK
       OnComplete(pending_predecessor, () => {
           ValidateSpeculation(task, pending_predecessor.result)
       })

────────────────────────────────────────────────────────────────────────────

ALGORITHM: ValidateSpeculation
────────────────────────────────────────────────────────────────────────────
INPUT:  speculative_task: TaskNode
        actual_predecessor_output: Output

PROCEDURE:
    predicted_input ← speculative_task.predicted_input
    
    1. COMPARE INPUTS
       match_result ← CompareInputs(predicted_input, actual_predecessor_output)
       
       CASE match_result OF:
           EXACT_MATCH:
               // Perfect prediction, commit results
               CommitSpeculativeResult(speculative_task)
               context.states[speculative_task] ← SUCCEEDED
               EmitEvent(SpeculationValidated, speculative_task, "exact")
           
           SEMANTIC_MATCH:
               // Output structurally same but values differ
               // Check if differences affect computation
               IF OutputDifferencesImmaterial(speculative_task, predicted_input, actual_input):
                   CommitSpeculativeResult(speculative_task)
                   context.states[speculative_task] ← SUCCEEDED
                   EmitEvent(SpeculationValidated, speculative_task, "semantic")
               ELSE:
                   RollbackAndReexecute(speculative_task, actual_predecessor_output)
           
           MISMATCH:
               RollbackAndReexecute(speculative_task, actual_predecessor_output)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: RollbackAndReexecute
────────────────────────────────────────────────────────────────────────────
INPUT:  task: TaskNode
        correct_input: Input

PROCEDURE:
    1. CANCEL IN-PROGRESS EXECUTION
       IF task.state = RUNNING:
           CancelWorkerExecution(task)
       
       EmitEvent(SpeculationRolledBack, task)
    
    2. REVERT SIDE EFFECTS
       IF task.has_side_effects:
           spec_context ← GetSpeculativeContext(task)
           
           IF spec_context.shadow_mode:
               // Discard sandbox, no cleanup needed
               DestroySandbox(spec_context.sandbox)
           ELSE:
               // Execute compensation logic
               ExecuteCompensation(task, spec_context.original_state)
    
    3. PREPARE CORRECT EXECUTION
       task.predicted_input ← NULL
       task.speculation_source ← NULL
       task.inputs ← CollectActualInputs(task)
       context.states[task] ← READY
    
    4. RE-ENQUEUE WITH PRIORITY BOOST
       // Boosted priority since we wasted time
       entry ← CreateQueueEntry(task)
       entry.priority_score ← entry.priority_score * ROLLBACK_PRIORITY_BOOST
       ready_queue.enqueue(entry)
    
    5. UPDATE PREDICTION MODEL
       // Feedback loop to improve future predictions
       prediction_model ← speculation_plan[task].prediction_model
       prediction_model.recordFailure(
           predicted: task.predicted_input,
           actual: correct_input
       )

────────────────────────────────────────────────────────────────────────────
-->

---

# ADAPTIVE RESOURCE ALLOCATION

<!--
=============================================================================
DYNAMIC WORKER POOL MANAGEMENT
=============================================================================

ARCHITECTURE: Heterogeneous worker pools with task affinity

WORKER POOL TYPES:
┌─────────────────────────────────────────────────────────────────────────┐
│  POOL TYPE      │ CAPACITY │ CHARACTERISTICS                           │
├─────────────────────────────────────────────────────────────────────────┤
│  NANO_POOL      │ 100+     │ In-process, <1ms tasks, zero overhead    │
│  MICRO_POOL     │ 50       │ Thread-based, 1-10ms, shared memory      │
│  STANDARD_POOL  │ 20       │ Process-based, 10ms-1s, isolated         │
│  HEAVY_POOL     │ 5        │ Container-based, >1s, resource-isolated  │
│  GPU_POOL       │ 2        │ GPU-enabled, ML inference, matrix ops    │
│  MEMORY_POOL    │ 3        │ High-RAM, >4GB tasks, data processing    │
└─────────────────────────────────────────────────────────────────────────┘

=============================================================================

ALGORITHM: AdaptiveResourceAllocator
────────────────────────────────────────────────────────────────────────────
BACKGROUND PROCESS: Continuously adjusts pool sizes based on demand

STATE:
    pool_sizes: Map<PoolType, Integer>
    queue_depths: Map<PoolType, Integer>
    utilization: Map<PoolType, Float>
    historical_demand: TimeSeries<Map<PoolType, Integer>>

PROCEDURE:
    LOOP every ALLOCATION_INTERVAL:
        
        1. MEASURE CURRENT STATE
           FOR EACH pool_type IN PoolTypes:
               queue_depths[pool_type] ← GetQueueDepth(pool_type)
               utilization[pool_type] ← GetPoolUtilization(pool_type)
        
        2. PREDICT NEAR-TERM DEMAND
           FOR EACH pool_type IN PoolTypes:
               // Analyze pending tasks in graph
               pending_demand ← CountPendingTasksOfType(pool_type)
               
               // Time-series prediction for bursty workloads
               predicted_burst ← PredictDemandSpike(historical_demand[pool_type])
               
               target_capacity[pool_type] ← MAX(
                   current_demand: queue_depths[pool_type] + pending_demand,
                   predicted_demand: predicted_burst,
                   minimum: MIN_POOL_SIZE[pool_type]
               )
        
        3. COMPUTE SCALING DECISIONS
           FOR EACH pool_type IN PoolTypes:
               current ← pool_sizes[pool_type]
               target ← target_capacity[pool_type]
               
               IF target > current * SCALE_UP_THRESHOLD:
                   // Scale up aggressively
                   new_size ← MIN(target * 1.2, MAX_POOL_SIZE[pool_type])
                   ScalePool(pool_type, new_size)
               
               ELIF target < current * SCALE_DOWN_THRESHOLD:
                   // Scale down conservatively
                   IF utilization[pool_type] < 0.3 FOR last 5 minutes:
                       new_size ← MAX(target, MIN_POOL_SIZE[pool_type])
                       ScalePool(pool_type, new_size)
        
        4. REBALANCE ACROSS POOLS
           // Move workers between compatible pools if imbalanced
           FOR EACH (over_pool, under_pool) IN FindImbalancedPairs():
               IF AreCompatible(over_pool, under_pool):
                   TransferWorker(over_pool, under_pool)
        
        5. RECORD METRICS
           historical_demand.append(now(), queue_depths)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: AllocateWorker
────────────────────────────────────────────────────────────────────────────
INPUT:  task: TaskNode
        priority: Priority (HIGH|NORMAL|LOW)

OUTPUT: worker: Worker

PROCEDURE:
    1. DETERMINE OPTIMAL POOL
       candidate_pools ← []
       
       // Primary selection by task characteristics
       IF task.estimated_duration < 1ms:
           candidate_pools.append(NANO_POOL)
       ELIF task.estimated_duration < 10ms:
           candidate_pools.append(MICRO_POOL)
       ELIF task.gpu_required:
           candidate_pools.append(GPU_POOL)
       ELIF task.memory_mb > 4096:
           candidate_pools.append(MEMORY_POOL)
       ELIF task.estimated_duration > 1s:
           candidate_pools.append(HEAVY_POOL)
       ELSE:
           candidate_pools.append(STANDARD_POOL)
       
       // Fallback options
       candidate_pools.extend(GetFallbackPools(candidate_pools[0]))
    
    2. SELECT BEST AVAILABLE WORKER
       FOR EACH pool_type IN candidate_pools:
           pool ← GetPool(pool_type)
           
           // Check for task affinity (warm cache, loaded libraries)
           affinity_worker ← pool.findWorkerWithAffinity(task.type)
           IF affinity_worker AND affinity_worker.available:
               RETURN affinity_worker
           
           // Check general availability
           available_worker ← pool.getAvailableWorker()
           IF available_worker:
               RETURN available_worker
       
       // No workers available - queue or scale
       IF priority = HIGH:
           // Emergency scale-up
           new_worker ← SpawnWorker(candidate_pools[0])
           RETURN new_worker
       ELSE:
           // Wait for availability
           RETURN pool.waitForAvailableWorker(timeout: ALLOCATION_TIMEOUT)
    
    3. HANDLE ALLOCATION FAILURE
       IF no worker obtained:
           IF task.is_preemptible:
               // Preempt lower priority task
               victim ← FindPreemptibleTask(candidate_pools[0])
               IF victim:
                   PreemptTask(victim)
                   RETURN victim.worker
           
           RAISE ResourceExhaustedError(task)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: PredictResourceRequirements
────────────────────────────────────────────────────────────────────────────
INPUT:  task: TaskNode
OUTPUT: requirements: ResourceRequirements

PROCEDURE:
    // Use ML model trained on historical task executions
    
    1. EXTRACT FEATURES
       features ← {
           task_type: task.type,
           input_size: EstimateInputSize(task.inputs),
           historical_avg_cpu: GetHistoricalAvg(task.type, "cpu"),
           historical_avg_mem: GetHistoricalAvg(task.type, "memory"),
           historical_p99_duration: GetHistoricalP99(task.type, "duration"),
           input_complexity: EstimateInputComplexity(task.inputs),
           time_of_day: CurrentHour(),  // Capture load patterns
           concurrent_tasks: CountConcurrentTasks()
       }
    
    2. PREDICT WITH SAFETY MARGIN
       base_prediction ← ResourcePredictor.predict(features)
       
       // Add safety margins based on variance
       variance ← GetHistoricalVariance(task.type)
       
       requirements ← {
           cpu: base_prediction.cpu * (1 + variance.cpu_factor),
           memory: base_prediction.memory * (1 + variance.memory_factor),
           duration: base_prediction.duration * (1 + variance.duration_factor),
           gpu_probability: base_prediction.gpu_probability
       }
       
       // Apply minimum guarantees
       requirements.cpu ← MAX(requirements.cpu, MIN_CPU_ALLOCATION)
       requirements.memory ← MAX(requirements.memory, MIN_MEMORY_ALLOCATION)
    
    RETURN requirements

────────────────────────────────────────────────────────────────────────────
-->

---

# FAULT TOLERANCE & SELF-HEALING

<!--
=============================================================================
COMPREHENSIVE FAILURE HANDLING FRAMEWORK
=============================================================================

FAILURE TAXONOMY:
┌─────────────────────────────────────────────────────────────────────────┐
│ FAILURE TYPE        │ DETECTION METHOD    │ RECOVERY STRATEGY          │
├─────────────────────────────────────────────────────────────────────────┤
│ TRANSIENT           │ Exception type      │ Exponential backoff retry  │
│ RESOURCE_EXHAUSTED  │ OOM/CPU signals     │ Scale resources + retry    │
│ TIMEOUT             │ Deadline exceeded   │ Extend timeout + retry     │
│ VALIDATION          │ Input validation    │ Dead-letter, no retry      │
│ DEPENDENCY_FAILED   │ Upstream failure    │ Skip or alternative path   │
│ WORKER_CRASHED      │ Heartbeat timeout   │ Reassign to new worker     │
│ NETWORK_PARTITION   │ Connection timeout  │ Wait + retry with backoff  │
│ DATA_CORRUPTION     │ Checksum mismatch   │ Restore from checkpoint    │
│ POISON_PILL         │ Repeated failure    │ Quarantine + alert         │
└─────────────────────────────────────────────────────────────────────────┘

=============================================================================

ALGORITHM: ExponentialBackoffWithJitter
────────────────────────────────────────────────────────────────────────────
INPUT:  attempt: Integer
        retry_policy: RetryPolicy

OUTPUT: delay: Duration

PROCEDURE:
    base_delay ← retry_policy.initial_delay
    max_delay ← retry_policy.max_delay
    
    // Exponential component
    exponential_delay ← base_delay * (2 ^ attempt)
    
    // Cap at maximum
    capped_delay ← MIN(exponential_delay, max_delay)
    
    // Add jitter to prevent thundering herd
    jitter_range ← capped_delay * retry_policy.jitter_factor
    jitter ← random(-jitter_range, +jitter_range)
    
    final_delay ← capped_delay + jitter
    
    RETURN MAX(final_delay, base_delay)  // Never below base

────────────────────────────────────────────────────────────────────────────

ALGORITHM: CheckpointController
────────────────────────────────────────────────────────────────────────────
PURPOSE: Enable resumable execution for long-running tasks

STATE:
    checkpoint_store: DistributedStore (S3/Minio)
    checkpoint_interval: Duration
    checkpoint_registry: Map<TaskId, List<Checkpoint>>

PROCEDURE OnTaskStart(task):
    IF task.checkpoint_enabled:
        // Register checkpoint hooks
        task.registerProgressHook(OnProgress)
        
        // Check for existing checkpoint
        latest_checkpoint ← GetLatestCheckpoint(task.id)
        IF latest_checkpoint AND latest_checkpoint.valid:
            RETURN RestoreFromCheckpoint(task, latest_checkpoint)
    
    RETURN null  // Start fresh

PROCEDURE OnProgress(task, progress, state):
    IF ShouldCheckpoint(task, progress):
        checkpoint ← CreateCheckpoint(task, state)
        SaveCheckpoint(checkpoint)
        PruneOldCheckpoints(task.id)

PROCEDURE ShouldCheckpoint(task, progress):
    last_checkpoint ← GetLastCheckpointTime(task.id)
    time_since_last ← now() - last_checkpoint
    
    // Checkpoint based on time interval
    IF time_since_last > checkpoint_interval:
        RETURN true
    
    // Checkpoint at significant progress milestones
    IF progress IN [0.25, 0.50, 0.75]:
        RETURN true
    
    // Checkpoint before risky operations
    IF task.next_operation.is_risky:
        RETURN true
    
    RETURN false

PROCEDURE CreateCheckpoint(task, state):
    checkpoint ← {
        id: GenerateCheckpointId(),
        task_id: task.id,
        timestamp: now(),
        progress: task.current_progress,
        state: SerializeState(state),
        state_hash: ComputeHash(state),
        inputs_hash: ComputeHash(task.inputs),
        version: CHECKPOINT_VERSION
    }
    
    RETURN checkpoint

PROCEDURE RestoreFromCheckpoint(task, checkpoint):
    // Validate checkpoint integrity
    IF ComputeHash(checkpoint.state) ≠ checkpoint.state_hash:
        WARN "Checkpoint corrupted"
        RETURN null
    
    // Validate input compatibility
    IF ComputeHash(task.inputs) ≠ checkpoint.inputs_hash:
        WARN "Inputs changed since checkpoint"
        RETURN null
    
    restored_state ← DeserializeState(checkpoint.state)
    task.current_progress ← checkpoint.progress
    
    EmitEvent(CheckpointRestored, task, checkpoint)
    
    RETURN restored_state

────────────────────────────────────────────────────────────────────────────

ALGORITHM: DeadLetterQueueHandler
────────────────────────────────────────────────────────────────────────────
PURPOSE: Quarantine permanently failed tasks for analysis

STATE:
    dlq: MessageQueue (Kafka topic: "tasks.dead-letter")
    quarantine_store: Database
    alert_thresholds: Map<TaskType, Integer>
    failure_counts: Map<TaskType, Integer>

PROCEDURE EnqueueToDeadLetter(task, error, context):
    1. CREATE DLQ ENTRY
       entry ← {
           task_id: task.id,
           task_type: task.type,
           task_definition: SerializeTaskDefinition(task),
           inputs: task.inputs,
           error: {
               type: error.type,
               message: error.message,
               stacktrace: error.stacktrace,
               timestamp: now()
           },
           execution_context: {
               attempt_count: context.attempts[task],
               graph_id: context.graph.id,
               worker_id: context.last_worker,
               duration: task.actual_duration
           },
           metadata: {
               created_at: now(),
               expires_at: now() + DLQ_RETENTION_PERIOD,
               reprocessable: task.idempotency_class IN [PURE, IDEMPOTENT]
           }
       }
    
    2. PERSIST TO DLQ
       dlq.publish(entry)
       quarantine_store.insert(entry)
    
    3. UPDATE FAILURE COUNTS
       failure_counts[task.type] ← failure_counts[task.type] + 1
       
       IF failure_counts[task.type] > alert_thresholds[task.type]:
           TriggerAlert(
               severity: HIGH,
               message: "Task type exceeds failure threshold",
               task_type: task.type,
               count: failure_counts[task.type]
           )
    
    4. PATTERN ANALYSIS
       // Background process to detect systemic issues
       AnalyzeFailurePatterns(task.type, error.type)

PROCEDURE ReprocessFromDeadLetter(entry_id):
    entry ← quarantine_store.get(entry_id)
    
    IF NOT entry.reprocessable:
        RAISE CannotReprocessError("Non-idempotent task")
    
    // Recreate task from stored definition
    task ← DeserializeTaskDefinition(entry.task_definition)
    task.inputs ← entry.inputs
    task.retry_count ← 0  // Reset retry count
    
    // Mark as reprocessing
    quarantine_store.update(entry_id, status: REPROCESSING)
    
    // Resubmit to queue with context
    ready_queue.enqueue(CreateQueueEntry(task), context: "reprocessed_from_dlq")
    
    EmitEvent(TaskReprocessed, task, entry_id)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: CircuitBreaker
────────────────────────────────────────────────────────────────────────────
PURPOSE: Prevent cascade failures by isolating failing components

STATE per component:
    state: CLOSED | OPEN | HALF_OPEN
    failure_count: Integer
    success_count: Integer
    last_failure_time: Timestamp
    threshold: Integer
    timeout: Duration
    half_open_requests: Integer

PROCEDURE Execute(component, operation):
    circuit ← GetCircuit(component)
    
    SWITCH circuit.state:
        CASE CLOSED:
            TRY:
                result ← operation()
                circuit.failure_count ← 0
                RETURN result
            CATCH error:
                circuit.failure_count ← circuit.failure_count + 1
                circuit.last_failure_time ← now()
                
                IF circuit.failure_count >= circuit.threshold:
                    TransitionTo(circuit, OPEN)
                
                RAISE error
        
        CASE OPEN:
            IF now() - circuit.last_failure_time > circuit.timeout:
                TransitionTo(circuit, HALF_OPEN)
                // Fall through to HALF_OPEN
            ELSE:
                RAISE CircuitOpenError(component)
        
        CASE HALF_OPEN:
            IF circuit.half_open_requests >= HALF_OPEN_LIMIT:
                RAISE CircuitOpenError(component)
            
            circuit.half_open_requests ← circuit.half_open_requests + 1
            
            TRY:
                result ← operation()
                circuit.success_count ← circuit.success_count + 1
                
                IF circuit.success_count >= SUCCESS_THRESHOLD:
                    TransitionTo(circuit, CLOSED)
                
                RETURN result
            CATCH error:
                TransitionTo(circuit, OPEN)
                RAISE error

PROCEDURE TransitionTo(circuit, new_state):
    old_state ← circuit.state
    circuit.state ← new_state
    
    SWITCH new_state:
        CASE CLOSED:
            circuit.failure_count ← 0
            circuit.success_count ← 0
        CASE OPEN:
            circuit.half_open_requests ← 0
        CASE HALF_OPEN:
            circuit.success_count ← 0
            circuit.half_open_requests ← 0
    
    EmitEvent(CircuitStateChange, circuit.component, old_state, new_state)

────────────────────────────────────────────────────────────────────────────
-->

---

# BACKPRESSURE & FLOW CONTROL

<!--
=============================================================================
ADAPTIVE BACKPRESSURE MECHANISM
=============================================================================

PROBLEM: Without backpressure, fast producers overwhelm slow consumers,
         causing memory exhaustion, increased latency, and cascade failures.

SOLUTION: Multi-level backpressure with adaptive flow control

=============================================================================

ALGORITHM: BackpressureController
────────────────────────────────────────────────────────────────────────────
STATE:
    queue_depths: Map<QueueId, Integer>
    processing_rates: Map<QueueId, Float>  // tasks/second
    arrival_rates: Map<QueueId, Float>     // tasks/second
    pressure_levels: Map<QueueId, PressureLevel>
    
    PRESSURE_LEVELS: NORMAL | ELEVATED | HIGH | CRITICAL

PROCEDURE MonitorAndApply():
    LOOP every PRESSURE_CHECK_INTERVAL:
        FOR EACH queue_id IN monitored_queues:
            depth ← queue_depths[queue_id]
            processing_rate ← processing_rates[queue_id]
            arrival_rate ← arrival_rates[queue_id]
            
            // Compute pressure metrics
            drain_time ← depth / processing_rate IF processing_rate > 0 ELSE INFINITY
            growth_rate ← arrival_rate - processing_rate
            
            // Determine pressure level
            pressure ← CASE:
                depth < LOW_THRESHOLD AND growth_rate <= 0:
                    NORMAL
                depth < MEDIUM_THRESHOLD OR (depth < HIGH_THRESHOLD AND growth_rate <= 0):
                    ELEVATED
                depth < HIGH_THRESHOLD OR drain_time < CRITICAL_DRAIN_TIME:
                    HIGH
                ELSE:
                    CRITICAL
            
            IF pressure ≠ pressure_levels[queue_id]:
                ApplyBackpressure(queue_id, pressure)
                pressure_levels[queue_id] ← pressure

PROCEDURE ApplyBackpressure(queue_id, level):
    SWITCH level:
        CASE NORMAL:
            // Full speed
            SetIngestionRate(queue_id, UNLIMITED)
            SetBatchSize(queue_id, NORMAL_BATCH_SIZE)
            EnableSpeculativeExecution(queue_id)
        
        CASE ELEVATED:
            // Slight throttling
            SetIngestionRate(queue_id, 0.8 * MAX_RATE)
            SetBatchSize(queue_id, NORMAL_BATCH_SIZE)
            EnableSpeculativeExecution(queue_id)
        
        CASE HIGH:
            // Significant throttling
            SetIngestionRate(queue_id, 0.5 * MAX_RATE)
            SetBatchSize(queue_id, SMALL_BATCH_SIZE)
            DisableSpeculativeExecution(queue_id)
            ScaleUpWorkers(queue_id)
        
        CASE CRITICAL:
            // Emergency measures
            SetIngestionRate(queue_id, 0.1 * MAX_RATE)
            SetBatchSize(queue_id, MINIMUM_BATCH_SIZE)
            DisableSpeculativeExecution(queue_id)
            ScaleUpWorkersAggressive(queue_id)
            RejectNewGraphs(queue_id)
            TriggerAlert(CRITICAL, "Queue backpressure critical", queue_id)
    
    EmitEvent(BackpressureApplied, queue_id, level)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: AdaptiveLoadShedding
────────────────────────────────────────────────────────────────────────────
PURPOSE: Gracefully degrade service under extreme load

PROCEDURE EvaluateAndShed():
    current_load ← ComputeSystemLoad()
    capacity ← ComputeSystemCapacity()
    
    IF current_load > capacity * SHED_THRESHOLD:
        overload_factor ← current_load / capacity
        
        // Determine shed strategy based on overload severity
        CASE:
            overload_factor < 1.2:
                // Shed only BACKGROUND criticality tasks
                ShedTasks(criticality: BACKGROUND)
            
            overload_factor < 1.5:
                // Shed BACKGROUND and LOW criticality
                ShedTasks(criticality: [BACKGROUND, LOW])
            
            overload_factor < 2.0:
                // Shed up to MEDIUM, keep CRITICAL and HIGH
                ShedTasks(criticality: [BACKGROUND, LOW, MEDIUM])
            
            ELSE:
                // Extreme overload - shed all but CRITICAL
                ShedTasks(criticality: [BACKGROUND, LOW, MEDIUM, HIGH])
                TriggerAlert(CRITICAL, "Extreme load shedding active")

PROCEDURE ShedTasks(criticality_levels):
    FOR EACH task IN GetPendingTasks():
        IF task.criticality IN criticality_levels:
            // Don't shed if already running
            IF task.state = PENDING OR task.state = READY:
                task.state ← SHED
                NotifyGraph(task.graph_id, TaskShed, task)
                EmitEvent(TaskShed, task, "load_shedding")
                
                // Schedule for later if recoverable
                IF task.shed_policy = DEFER:
                    DeferredQueue.enqueue(task, delay: SHED_DEFER_DELAY)

────────────────────────────────────────────────────────────────────────────

ALGORITHM: TokenBucketRateLimiter
────────────────────────────────────────────────────────────────────────────
PURPOSE: Smooth bursty traffic while allowing short bursts

STATE per limiter:
    tokens: Float
    capacity: Float
    refill_rate: Float  // tokens per second
    last_refill: Timestamp

PROCEDURE Acquire(limiter, requested_tokens):
    RefillTokens(limiter)
    
    IF limiter.tokens >= requested_tokens:
        limiter.tokens ← limiter.tokens - requested_tokens
        RETURN ACQUIRED
    
    ELIF limiter.allow_partial:
        available ← limiter.tokens
        limiter.tokens ← 0
        RETURN PARTIAL(available)
    
    ELSE:
        wait_time ← (requested_tokens - limiter.tokens) / limiter.refill_rate
        RETURN WAIT(wait_time)

PROCEDURE RefillTokens(limiter):
    now_ts ← now()
    elapsed ← now_ts - limiter.last_refill
    
    refill_amount ← elapsed.seconds * limiter.refill_rate
    limiter.tokens ← MIN(limiter.tokens + refill_amount, limiter.capacity)
    limiter.last_refill ← now_ts

────────────────────────────────────────────────────────────────────────────
-->

---

# OBSERVABILITY & METRICS

<!--
=============================================================================
COMPREHENSIVE OBSERVABILITY FRAMEWORK
=============================================================================

THREE PILLARS:
1. METRICS - Aggregated numerical measurements
2. LOGS - Structured event records
3. TRACES - Distributed request flows

=============================================================================

METRICS SPECIFICATION
────────────────────────────────────────────────────────────────────────────

COUNTER METRICS (monotonically increasing):
    adage_tasks_total{status, task_type, graph_id}
    adage_tasks_retried_total{task_type, error_type}
    adage_speculative_executions_total{outcome}  // validated|rolled_back
    adage_checkpoints_created_total{task_type}
    adage_dead_letters_total{task_type, error_type}

GAUGE METRICS (current values):
    adage_queue_depth{queue_name, priority}
    adage_active_tasks{state, pool_type}
    adage_worker_pool_size{pool_type}
    adage_worker_utilization{pool_type}
    adage_memory_usage_bytes{component}
    adage_circuit_breaker_state{component}  // 0=closed, 1=open, 2=half_open

HISTOGRAM METRICS (distributions):
    adage_task_duration_seconds{task_type, status}
    adage_queue_wait_time_seconds{queue_name}
    adage_speculation_lead_time_seconds{task_type}
    adage_checkpoint_size_bytes{task_type}
    adage_graph_completion_time_seconds{graph_type}

SUMMARY METRICS (percentiles):
    adage_task_latency_summary{task_type, quantile}

────────────────────────────────────────────────────────────────────────────

STRUCTURED LOGGING SCHEMA
────────────────────────────────────────────────────────────────────────────

LogEvent {
    timestamp:      ISO8601
    level:          TRACE|DEBUG|INFO|WARN|ERROR|FATAL
    event_type:     EventType enum
    
    // Context
    trace_id:       UUID
    span_id:        UUID
    graph_id:       UUID
    task_id:        UUID (optional)
    worker_id:      String (optional)
    
    // Event details
    message:        String
    attributes:     Map<String, Any>
    
    // Error context (if applicable)
    error: {
        type:       String
        message:    String
        stacktrace: String
        cause:      Error (nested)
    }
}

EVENT TYPES:
    // Graph lifecycle
    GRAPH_SUBMITTED, GRAPH_COMPILED, GRAPH_STARTED
    GRAPH_COMPLETED, GRAPH_FAILED, GRAPH_CANCELLED
    
    // Task lifecycle
    TASK_READY, TASK_DISPATCHED, TASK_STARTED
    TASK_COMPLETED, TASK_FAILED, TASK_RETRIED
    TASK_CHECKPOINTED, TASK_RESTORED
    TASK_SPECULATION_STARTED, TASK_SPECULATION_VALIDATED
    TASK_SPECULATION_ROLLED_BACK, TASK_DEAD_LETTERED
    
    // System events
    WORKER_STARTED, WORKER_STOPPED, WORKER_HEALTH_CHECK
    POOL_SCALED, CIRCUIT_STATE_CHANGE
    BACKPRESSURE_APPLIED, LOAD_SHED_ACTIVATED

────────────────────────────────────────────────────────────────────────────

DISTRIBUTED TRACING
────────────────────────────────────────────────────────────────────────────

Trace Structure:
    
    GraphExecution (root span)
    ├── Compilation
    │   ├── GraphValidation
    │   ├── TopologicalSort
    │   ├── CriticalPathAnalysis
    │   └── OptimizationPass
    │
    ├── Stage[0] (parallel span group)
    │   ├── Task[T1] (span)
    │   │   ├── Dispatch
    │   │   ├── Execution
    │   │   └── ResultStore
    │   └── Task[T2] (span)
    │       ├── Dispatch
    │       ├── Execution
    │       └── ResultStore
    │
    ├── Stage[1]
    │   ├── Task[T3]
    │   │   ├── Dispatch
    │   │   ├── Speculation (child span if speculative)
    │   │   ├── Execution
    │   │   └── ResultStore
    │   └── Task[T4]
    │
    └── Finalization
        ├── ResultAggregation
        └── Cleanup

Span Attributes:
    task.name, task.type, task.criticality
    worker.id, worker.pool_type
    execution.attempt, execution.duration_ms
    speculation.enabled, speculation.outcome
    checkpoint.count, checkpoint.total_size

────────────────────────────────────────────────────────────────────────────

HEALTH CHECK ENDPOINTS
────────────────────────────────────────────────────────────────────────────

/health/live
    Purpose: Kubernetes liveness probe
    Response: 200 if process running
    
/health/ready
    Purpose: Kubernetes readiness probe
    Response: 200 if accepting work, 503 otherwise
    Checks: Queue connectivity, worker pool health, storage availability
    
/health/detailed
    Purpose: Detailed system status
    Response: JSON with component-level health
    {
        status: "healthy" | "degraded" | "unhealthy",
        components: {
            scheduler: { status, latency_ms, queue_depth },
            workers: { status, pool_sizes, utilization },
            storage: { status, latency_ms },
            messaging: { status, lag }
        },
        checks_passed: 8,
        checks_total: 10
    }

────────────────────────────────────────────────────────────────────────────
-->

---

# PSEUDO ALGORITHM SPECIFICATIONS

<!--
=============================================================================
COMPLETE SYSTEM INITIALIZATION
=============================================================================

ALGORITHM: SystemBootstrap
────────────────────────────────────────────────────────────────────────────
PROCEDURE:
    1. LOAD CONFIGURATION
       config ← LoadConfig(environment: ENV)
       ValidateConfig(config)
    
    2. INITIALIZE PERSISTENCE
       state_store ← InitializeRedis(config.redis)
       result_cache ← InitializeRedis(config.redis_cache)
       checkpoint_store ← InitializeS3(config.s3)
       dlq ← InitializeKafka(config.kafka, topic: "dead-letter")
    
    3. INITIALIZE WORKER POOLS
       FOR EACH pool_config IN config.worker_pools:
           pool ← CreateWorkerPool(pool_config)
           RegisterPool(pool)
           StartHealthMonitor(pool)
    
    4. INITIALIZE SCHEDULERS
       compilation_scheduler ← StartCompilationScheduler(config)
       execution_scheduler ← StartExecutionScheduler(config)
       speculation_controller ← StartSpeculationController(config)
    
    5. INITIALIZE OBSERVABILITY
       metrics_exporter ← StartPrometheusExporter(config.metrics_port)
       trace_exporter ← StartJaegerExporter(config.jaeger_endpoint)
       log_shipper ← StartFluentBit(config.log_endpoint)
    
    6. INITIALIZE CONTROLLERS
       backpressure_controller ← StartBackpressureController(config)
       resource_allocator ← StartResourceAllocator(config)
       circuit_breaker_manager ← StartCircuitBreakerManager(config)
    
    7. START API SERVER
       api ← StartAPIServer(config.api_port)
       RegisterEndpoints(api, handlers)
    
    8. SIGNAL READY
       MarkSystemReady()
       EmitEvent(SystemStarted, config.instance_id)

────────────────────────────────────────────────────────────────────────────

=============================================================================
END-TO-END EXECUTION FLOW
=============================================================================

ALGORITHM: CompleteExecutionPipeline
────────────────────────────────────────────────────────────────────────────
INPUT:  graph_definition: GraphDefinition
        input_data: Map<String, Any>
        options: ExecutionOptions

OUTPUT: result: ExecutionResult

PROCEDURE:
    
    // PHASE 1: ADMISSION
    1.1 VALIDATE INPUT
        ValidateGraphDefinition(graph_definition)
        ValidateInputData(input_data, graph_definition.input_schema)
    
    1.2 CHECK CAPACITY
        IF BackpressureLevel() = CRITICAL:
            IF NOT options.bypass_backpressure:
                RAISE SystemOverloadedError()
    
    1.3 CREATE EXECUTION CONTEXT
        execution_id ← GenerateExecutionId()
        context ← CreateExecutionContext(execution_id, options)
    
    // PHASE 2: COMPILATION
    2.1 BUILD GRAPH
        graph ← BuildExecutionGraph(graph_definition)
    
    2.2 VALIDATE GRAPH
        ValidateDependencies(graph)
        DetectCycles(graph)
        ValidateTypes(graph)
    
    2.3 OPTIMIZE GRAPH
        ComputeTopologicalOrder(graph)
        ComputeCriticalPath(graph)
        FuseCompatibleTasks(graph)
        IdentifySpeculativeTasks(graph)
    
    2.4 GENERATE SCHEDULE
        schedule ← GenerateExecutionSchedule(graph)
        AllocateResourceEstimates(schedule)
    
    // PHASE 3: EXECUTION
    3.1 INITIALIZE STATE
        InitializeTaskStates(context, graph)
        StoreInputData(context, input_data)
    
    3.2 SEED READY TASKS
        ready_tasks ← GetTasksWithNoPredecsors(graph)
        FOR EACH task IN ready_tasks:
            EnqueueTask(task, context)
    
    3.3 EXECUTE UNTIL COMPLETE
        WHILE NOT