In [42]:
import pandas as pd
import numpy as np
import networkx as nx
from pathlib import Path
import matplotlib.pyplot as plt
import os
import uuid
import shutil
from datetime import datetime, timedelta, date

In [43]:
# --- constants ---
RUN_DATE = "10_8_25"

# parent directory of current script
BASE_DIR = os.path.dirname(os.getcwd())
RESULTS_DIR = os.path.join(BASE_DIR, "Results", RUN_DATE)

# ensure directory exists
os.makedirs(RESULTS_DIR, exist_ok=True)

# output filename
outname = os.path.join(RESULTS_DIR, f"timeline_{uuid.uuid4().hex[:8]}.xlsx")

In [44]:
# -----------------------------
# 1) Load cleaned Excel
# -----------------------------
WORKBOOK_PATH = Path(f"../../data/SR8_{RUN_DATE}_v2.xlsx")
df = pd.read_excel(WORKBOOK_PATH, sheet_name=0)

# -----------------------------
# 2) Assign each assembly a unique integer ID
# -----------------------------
unique_assys = df['Assembly'].unique().tolist()
assy_to_id = {name: idx for idx, name in enumerate(unique_assys, start=1)}
df['ID'] = df['Assembly'].map(assy_to_id)

# -----------------------------
# 3) Parse “Predecessors” and “Successors” into lists of integer IDs
# -----------------------------
def names_to_ids(cell):
    if pd.isna(cell) or str(cell).strip() in ('', 'N/A', '-'):
        return []
    names = [x.strip() for x in str(cell).split(',') if x.strip()]
    return [assy_to_id[n] for n in names if n in assy_to_id]

df['PredecessorIDs'] = df['Predecessors'].apply(names_to_ids)
df['SuccessorIDs']   = df['Successors'].apply(names_to_ids)

# -----------------------------
# 4) Coerce weights to numeric and validate
# -----------------------------
for col in ['EdgeWeight', 'PartnerWeight']:
    df[col] = pd.to_numeric(df.get(col), errors='coerce').fillna(0)

# Validate non-negative
neg_edge = df.loc[df['EdgeWeight'] < 0, ['ID','Assembly','EdgeWeight']]
neg_partner = df.loc[df['PartnerWeight'] < 0, ['ID','Assembly','PartnerWeight']]
if not neg_edge.empty or not neg_partner.empty:
    raise ValueError(f"Negative weights detected:\n{neg_edge}\n{neg_partner}")

In [45]:
# -----------------------------
# 5) Initialize directed graph with source and sink
# -----------------------------
G = nx.DiGraph()
source_id = 0
sink_id   = max(assy_to_id.values()) + 1
G.add_node(source_id, duration=0.0, description='Source', department='')
G.add_node(sink_id,   duration=0.0, description='Sink',   department='')

# -----------------------------
# 6) Add task and partner nodes
# -----------------------------
task_nodes = set()
for _, row in df.iterrows():
    tid = int(row['ID'])
    task_nodes.add(tid)

    dur = float(row['EdgeWeight']) if pd.notna(row['EdgeWeight']) else 0.0
    dept = str(row.get('Department', '')).strip()
    desc = row.get('Assembly', '')

    G.add_node(tid, duration=dur, description=desc, department=dept)

    pw = float(row['PartnerWeight']) if pd.notna(row['PartnerWeight']) else 0.0
    p  = str(row.get('Partners', '')).strip()
    if pw > 0 and p and p.lower() != 'nan':
        pid = f"{p}_{tid}"  # partner node id (string)
        G.add_node(pid, duration=pw, description=f"{p} for {tid}", department=dept)
        G.add_edge(source_id, pid, weight=0.0)
        G.add_edge(pid, tid,      weight=pw)

# -----------------------------
# 7) Add predecessor edges
# -----------------------------
for _, row in df.iterrows():
    u = int(row['ID'])
    for v in row['PredecessorIDs']:
        w = float(df.loc[df['ID'] == v, 'EdgeWeight'].iat[0]) if not df.loc[df['ID'] == v, 'EdgeWeight'].empty else 0.0
        if not G.has_edge(v, u):
            G.add_edge(v, u, weight=w)

# -----------------------------
# 8) Add successor edges
# -----------------------------
for _, row in df.iterrows():
    u = int(row['ID'])
    for v in row['SuccessorIDs']:
        w = float(row.get('EdgeWeight', 0.0)) if pd.notna(row.get('EdgeWeight', 0.0)) else 0.0
        if not G.has_edge(u, v):
            G.add_edge(u, v, weight=w)

# -----------------------------
# 9) Connect isolated tasks to source/sink
# -----------------------------
for n in task_nodes:
    if G.in_degree(n) == 0:
        G.add_edge(source_id, n, weight=0.0)
    if G.out_degree(n) == 0:
        G.add_edge(n, sink_id, weight=float(G.nodes[n].get('duration', 0.0) or 0.0))

# -----------------------------
# Check cycles
# -----------------------------
id_to_assy = {v: k for k, v in assy_to_id.items()}
cycles = list(nx.simple_cycles(G))
if cycles:
    print("Warning: cycles detected!")
    for cycle in cycles:
        names = [
            id_to_assy[node] if isinstance(node, int) and node in id_to_assy else str(node)
            for node in cycle
        ]
        loop = names + [names[0]]
        print("Cycle: " + " -> ".join(loop))
else:
    print("No cycles detected.")

No cycles detected.


In [46]:
# -----------------------------
# 10) Compute ES and EF (robust to NaN durations)
# -----------------------------
def node_dur(n):
    d = G.nodes[n].get('duration', 0.0)
    return 0.0 if pd.isna(d) else float(d)

topo = list(nx.topological_sort(G))
ES = {n: 0.0 for n in topo}
EF = {}
for n in topo:
    ES[n] = max((EF[p] for p in G.predecessors(n)), default=0.0)
    EF[n] = ES[n] + node_dur(n)

# -----------------------------
# 11) Compute LS and LF (robust to missing successors)
# -----------------------------
def safe_min(iterable, default):
    vals = [x for x in iterable if pd.notna(x)]
    return min(vals) if vals else default

LS, LF = {}, {}
for n in reversed(topo):
    if n == sink_id:
        LF[n] = EF[n]
    else:
        LF[n] = safe_min((LS.get(s) for s in G.successors(n)), default=EF[n])
    LS[n] = LF[n] - node_dur(n)

# -----------------------------
# 12) Helper: format week label (no dates)
# -----------------------------
def fmt_week(w, base=0, prefix='W'):
    return None if pd.isna(w) else f"{prefix}{int(w + base)}"

# -----------------------------
# 13) Build schedule DataFrame
# -----------------------------
rows = []
for n in topo:
    rows.append({
      'ID':          n,
      'Description': G.nodes[n].get('description', ''),
      'Department':  G.nodes[n].get('department', ''),
      'Duration':    node_dur(n),
      'ES':          ES[n],
      'ES_Week':     fmt_week(ES[n]),   # week label
      'EF':          EF[n],
      'EF_Week':     fmt_week(EF[n]),
      'LS':          LS[n],
      'LS_Week':     fmt_week(LS[n]),
      'LF':          LF[n],
      'LF_Week':     fmt_week(LF[n]),
      'Slack':       LS[n] - ES[n],
      'Critical':    (LS[n] - ES[n] == 0)
    })
res_df = pd.DataFrame(rows)

# -----------------------------
# 14) Department schedule
# -----------------------------
task_nodes_int = [n for n in task_nodes if isinstance(n, int)]
dept_df = (
  res_df[res_df['ID'].isin(task_nodes_int)]
    [['Department','ID','Description','ES','ES_Week','EF','EF_Week','LS','LS_Week','Duration']]
    .sort_values(['Department','ES','ID'])
)

# -----------------------------
# 15) Edge list
# -----------------------------
edge_data = pd.DataFrame([
  {'Edge': f"{u}->{v}",
   'Description': f"{G.nodes[u].get('description','')}->{G.nodes[v].get('description','')}",
   'Weight': d.get('weight', 0.0)}
  for u, v, d in G.edges(data=True)
])

# -----------------------------
# 16) Critical path finder
# -----------------------------
def find_critical(graph, src, sink):
    path = [sink]
    cur = sink
    while cur != src:
        preds = list(graph.predecessors(cur))
        if not preds:
            break
        # Choose predecessor p that satisfies LF[cur] == EF[p] (on critical path)
        # If none, pick the one with max EF[p]
        cand = [p for p in preds if np.isclose(LF[cur], EF.get(p, -1e18))]
        if cand:
            cur = max(cand, key=lambda p: EF.get(p, -1e18))
        else:
            cur = max(preds, key=lambda p: EF.get(p, -1e18))
        path.append(cur)
    return list(reversed(path))

crit = find_critical(G, source_id, sink_id)

# -----------------------------
# 17) All simple paths (excluding trivial)
# -----------------------------
allp = [p for p in nx.all_simple_paths(G, source_id, sink_id) if len(p) > 2]

# -----------------------------
# 18) Paths DataFrame
# -----------------------------
def mk_paths(paths, critical_path):
    out = []
    for p in paths:
        ids = [n for n in p if n in task_nodes]
        desc = [f"{G.nodes[n]['description']} ({node_dur(n)}w, {fmt_week(ES[n])})" for n in ids]
        total = sum(node_dur(n) for n in ids)
        out.append({
          'Path_ID': str(uuid.uuid4()),
          'Task_IDs': ','.join(map(str, ids)),
          'Descriptions': ' -> '.join(desc) if desc else 'No tasks',
          'Total_Duration': total,
          'Is_Critical': (p == critical_path)
        })
    return pd.DataFrame(out)

paths_df     = mk_paths([crit], crit)
all_paths_df = mk_paths(allp,  crit)

# -----------------------------
# 19) Export to Excel
# -----------------------------
with pd.ExcelWriter(outname, engine='xlsxwriter') as w:
    df.to_excel(w,           sheet_name='Raw_Tasks',          index=False)
    edge_data.to_excel(w,    sheet_name='Edges',              index=False)
    res_df.to_excel(w,       sheet_name='Scheduled_Results',  index=False)
    paths_df.to_excel(w,     sheet_name='Critical_Path',      index=False)
    all_paths_df.to_excel(w, sheet_name='All_Paths',          index=False)
    dept_df.to_excel(w,      sheet_name='Department_Schedule',index=False)

print(f"Wrote output to {outname}")

Wrote output to /Users/jimmy/Projects/SunswiftTimeline/python/Results/10_8_25/timeline_062fc6b6.xlsx


In [47]:
# 20) Gantt charts
gantt_dir = os.path.join(os.path.dirname(outname), "gantt_charts")
shutil.rmtree(gantt_dir, ignore_errors=True)
os.makedirs(gantt_dir, exist_ok=True)

def create_gantt(path_nodes, df_in, color, fname, title):
    # keep only real task IDs (exclude source/sink and partner string nodes)
    ids = [n for n in path_nodes if isinstance(n, int) and n in task_nodes]
    sub = df_in[df_in['ID'].isin(ids)].sort_values('ES')
    if sub.empty:
        return

    fig, ax = plt.subplots(figsize=(10, max(1.0, len(sub)*0.5 + 1)))
    for _, r in sub.iterrows():
        # Week labels instead of dates
        lbl = f"{r['Description']}\n{r['ES_Week']} to {r['EF_Week']}"
        ax.barh(lbl, r['Duration'], left=r['ES'], color=color, edgecolor='black')

    ax.set_xlabel('Weeks')
    ax.set_title(title)
    ax.invert_yaxis()
    plt.tight_layout()
    pathfile = os.path.join(gantt_dir, fname)
    plt.savefig(pathfile, dpi=300, bbox_inches='tight')
    plt.close()

# Critical path Gantt
crit_duration = sum(
    G.nodes[n]['duration'] for n in crit
    if isinstance(n, int) and n not in (source_id, sink_id)
)
create_gantt(
    crit,
    res_df,
    'red',
    'critical_path.png',
    f"Critical Path ({crit_duration}w)"
)

# Department Gantts
for dept in dept_df['Department'].dropna().unique():
    if not str(dept).strip():
        continue
    subs = dept_df[dept_df['Department'] == dept]
    create_gantt(
        subs['ID'].tolist(),
        subs,
        'blue',
        f"dept_{str(dept).lower().replace(' ', '_')}.png",
        f"{dept} Department"
    )