In [72]:
import pandas as pd
import numpy as np
import random
import math

df = pd.read_csv('test_data1.csv', sep=' ')
df

Unnamed: 0,任务编号,紧前任务,任务时间
0,1,,5
1,3,1.0,2
2,2,,4
3,5,23.0,6
4,6,3.0,3
5,7,56.0,2
6,8,57.0,5
7,4,2.0,1
8,9,48.0,2


In [73]:
df['任务时间'] = df['任务时间'].astype(str).str.extract('(\d+)').astype(float)
def process_predecessors(pred_str):
    if pd.isna(pred_str):
        return []
    else:
        return [int(x.strip()) for x in str(pred_str).split(',')]

predecessors = {task: process_predecessors(pred) for task, pred in zip(df['任务编号'], df['紧前任务'])}
successors = {task: [] for task in df['任务编号']}
for task, preds in predecessors.items():
    for pred in preds:
        successors[pred].append(task)

task_times = dict(zip(df['任务编号'], df['任务时间']))

 
def get_rpw(task_times, successors):
    all_successors = {task: [] for task in task_times.keys()}
    rpw_values = {}
    for task in task_times:
        if not successors[task]: 
            rpw_values[task] = task_times[task] 
    while len(rpw_values) < len(task_times):
        for task in task_times:
            if task not in rpw_values:  
                if all(succ in rpw_values for succ in successors[task]):
                    prev_all_successors = [all_successors[succ] for succ in successors[task]]
                    # print(prev_all_successors, successors[task])
                    all_successors[task] = list(set(np.concatenate((*prev_all_successors, successors[task]))))
                    # print(all_successors[task])
                    rpw_values[task] = task_times[task] + sum(task_times[succ] for succ in all_successors[task])
                    # print(f"task {task}, rpw {rpw_values[task]}")
                    del task
    return rpw_values, all_successors
rpw_values, all_successors = get_rpw(task_times, successors)
df['RPW'] = df['任务编号'].map(rpw_values)

df

Unnamed: 0,任务编号,紧前任务,任务时间,RPW
0,1,,5.0,25.0
1,3,1.0,2.0,20.0
2,2,,4.0,20.0
3,5,23.0,6.0,15.0
4,6,3.0,3.0,12.0
5,7,56.0,2.0,9.0
6,8,57.0,5.0,7.0
7,4,2.0,1.0,3.0
8,9,48.0,2.0,2.0


In [74]:
def allocate_workstations(df, predecessors, cycle_time=10):
    workstations = {1: []}  
    station_times = {1: 0}  
    allocated_tasks = set()
    
    sorted_tasks = df.sort_values('RPW', ascending=False)
    
    current_station = 1
    while len(allocated_tasks) < len(df):
        tasks_allocated_this_round = False
        
        for _, task in sorted_tasks.iterrows():
            task_id = task['任务编号']
            if task_id in allocated_tasks:
                continue
                
            cur_predecessors = predecessors[task_id]
            if not all(pred in allocated_tasks for pred in cur_predecessors):
                continue
                
            if station_times[current_station] + task['任务时间'] <= cycle_time:
                workstations[current_station].append(task_id)
                station_times[current_station] += task['任务时间']
                allocated_tasks.add(task_id)
                tasks_allocated_this_round = True
        
        if not tasks_allocated_this_round:
            current_station += 1
            workstations[current_station] = []
            station_times[current_station] = 0
            
    return workstations

# Use the function
workstation_assignments = allocate_workstations(df, predecessors)
for station, tasks in workstation_assignments.items():
    print(f"Workstation {station}: Tasks {tasks}")

Workstation 1: Tasks [1, 3, 6]
Workstation 2: Tasks [2, 5]
Workstation 3: Tasks [7, 8, 4, 9]


In [75]:
def find_available_tasks(predecessors, assigned_tasks):
    #  找到没分配且无前序任务
    available = []
    for task, preds in predecessors.items():
        if task not in assigned_tasks and all(p in assigned_tasks for p in preds):
            available.append(task)
    return available

def allocate_tasks(df, cycle_time):
    workstations = []
    current_station = []
    current_time = 0
    assigned_tasks = set()
    
    while len(assigned_tasks) < len(df):
        available = find_available_tasks(predecessors, assigned_tasks)
        if not available:
            break
            
        # 排序
        available.sort(key=lambda x: task_times[x], reverse=True)
        
        tasks_added = False
        for task in available:
            if current_time + task_times[task] <= cycle_time:
                current_station.append(task)
                current_time += task_times[task]
                assigned_tasks.add(task)
                tasks_added = True
        
        # 不能再添加任务了
        if not tasks_added and current_station:
            workstations.append(current_station)
            current_station = []
            current_time = 0
        
    if current_station:
        workstations.append(current_station)
        
    return workstations

def print_workstation_details(workstations, task_times):
    print("\n工位分配详情:")
    for i, station in enumerate(workstations, 1):
        total_time = sum(task_times[task] for task in station)
        print(f"\n工位 {i}:")
        print(f"任务: {station}")
        print(f"总时间: {total_time:.1f}")
        print(f"任务时间: {[task_times[task] for task in station]}")

cycle_time = 10.0
workstations = allocate_tasks(df, cycle_time)
print_workstation_details(workstations, task_times)


工位分配详情:

工位 1:
任务: [1, 2, 4]
总时间: 10.0
任务时间: [5.0, 4.0, 1.0]

工位 2:
任务: [3, 5]
总时间: 8.0
任务时间: [2.0, 6.0]

工位 3:
任务: [6, 7, 8]
总时间: 10.0
任务时间: [3.0, 2.0, 5.0]

工位 4:
任务: [9]
总时间: 2.0
任务时间: [2.0]


In [76]:
def check_cycle_time_constraint(workstation_tasks, task_times, cycle_time):
    """检查工位时间是否满足循环时间约束"""
    station_time = sum(task_times[t] for t in workstation_tasks)
    print(f"工位时间: {station_time}, 循环时间: {cycle_time}")
    if station_time > cycle_time:
        print(f"超出循环时间约束")
        return False
    return True

def check_predecessor_constraints(task, ws_idx, temp_workstations, predecessors_dict):
    """检查紧前任务约束"""
    print(f"检查任务 {task} 的紧前任务: {predecessors_dict[task]}")
    for pred in predecessors_dict[task]:
        pred_assigned = False
        print(f"检查紧前任务 {pred} 是否在工位 {ws_idx+1} 之前")
        for i in range(ws_idx + 1):
            if pred in temp_workstations[i]:
                print(f"任务 {pred} 在工位 {i+1} 中")
                pred_assigned = True
                break
        if not pred_assigned:
            print(f"任务 {task} 的紧前任务 {pred} 未满足约束")
            return False
    return True

def check_successor_constraints(task, ws_idx, temp_workstations, successors_dict):
    """检查紧后任务约束"""
    print(f"检查任务 {task} 的紧后任务: {successors_dict[task]}")
    for succ in successors_dict[task]:
        if succ in [t for ws in temp_workstations[:ws_idx] for t in ws]:
            print(f"任务 {task} 的紧后任务 {succ} 在工位 {ws_idx+1} 之前，违反约束")
            return False
        print(f"任务 {task} 的紧后任务 {succ} 满足约束")
    return True

def can_move_task(task, from_ws_idx, to_ws_idx, workstations, predecessors_dict, successors_dict, task_times, cycle_time):
    """检查任务是否可以从一个工位移动到另一个工位"""
    temp_workstations = [ws.copy() for ws in workstations]
    temp_workstations[from_ws_idx].remove(task)
    temp_workstations[to_ws_idx].append(task)
    print(f"移动后的临时工位分配: {temp_workstations}")
    
    # 检查循环时间约束
    if not check_cycle_time_constraint(temp_workstations[to_ws_idx], task_times, cycle_time):
        return False
        
    # 检查紧前任务约束
    if not check_predecessor_constraints(task, to_ws_idx, temp_workstations, predecessors_dict):
        return False
        
    # 检查紧后任务约束
    if not check_successor_constraints(task, to_ws_idx, temp_workstations, successors_dict):
        return False
    
    print("所有约束检查通过，任务可以移动")
    return True

def can_swap_tasks(task1, ws1_idx, task2, ws2_idx, workstations, predecessors_dict, successors_dict, task_times, cycle_time):
    """检查两个任务是否可以交换"""
    temp_workstations = [ws.copy() for ws in workstations]
    temp_workstations[ws1_idx].remove(task1)
    temp_workstations[ws2_idx].remove(task2)
    temp_workstations[ws1_idx].append(task2)
    temp_workstations[ws2_idx].append(task1)
    print(f"交换后的临时工位分配: {temp_workstations}")
    
    # 检查循环时间约束
    if not check_cycle_time_constraint(temp_workstations[ws1_idx], task_times, cycle_time):
        return False
    if not check_cycle_time_constraint(temp_workstations[ws2_idx], task_times, cycle_time):
        return False
    
    # 检查task1的约束
    if not check_predecessor_constraints(task1, ws2_idx, temp_workstations, predecessors_dict):
        return False
    if not check_successor_constraints(task1, ws2_idx, temp_workstations, successors_dict):
        return False
    
    # 检查task2的约束
    if not check_predecessor_constraints(task2, ws1_idx, temp_workstations, predecessors_dict):
        return False
    if not check_successor_constraints(task2, ws1_idx, temp_workstations, successors_dict):
        return False
    
    print("所有约束检查通过，任务可以交换")
    return True

def trade_and_transfer(workstations, predecessors_dict, successors_dict, task_times, cycle_time):
    """执行Trade and Transfer阶段均衡分析"""
    improved = True
    iteration = 0
    workstations = [ws.copy() for ws in workstations]  # 深拷贝工位分配
    while improved:
        iteration += 1
        print(f"\n迭代 {iteration}:")
        improved = False
        
        # 计算每个工位的工作时间
        station_times = [sum(task_times[task] for task in station) for station in workstations]
        print(f"工位时间: {station_times}")
        # 步骤1: 确定最大时间工位和最小时间工位
        max_time = max(station_times)
        min_time = min(station_times)
        max_ws_candidates = [idx for idx, time in enumerate(station_times) if time == max_time]
        min_ws_candidates = [idx for idx, time in enumerate(station_times) if time == min_time]
        max_ws_idx = random.choice(max_ws_candidates)
        min_ws_idx = random.choice(min_ws_candidates)
        
        print(f"最大时间工位: {max_ws_idx+1}, 时间: {max_time}")
        print(f"最小时间工位: {min_ws_idx+1}, 时间: {min_time}")
        
        # 步骤2: 计算目标值G
        G = (max_time - min_time) / 2
        print(f"目标值G: {G}")
        
        if G <= 0.01:  # 如果差距很小，认为已经均衡
            print("工位已经基本均衡，算法终止")
            break
        
        # 步骤3: 初始化候选集合C
        candidates = []
        
        # 步骤4: Transfer - 考虑从最大工位转移任务到最小工位
        for task in workstations[max_ws_idx]:
            # 检查任务时间是否小于2G
            if task_times[task] <= 2 * G:
                # 检查是否可行  
                print("-"*30)
                print(f"检查任务 {task} 从工位 {max_ws_idx+1} 转移到工位 {min_ws_idx+1}")
                if can_move_task(task, max_ws_idx, min_ws_idx, workstations, predecessors_dict, successors_dict, task_times, cycle_time):
                    print(f"任务 {task} 可以转移")
                    # 计算转移后的工位时间
                    new_max_time = max_time - task_times[task]
                    new_min_time = min_time + task_times[task]
                    # 计算不平衡度 (用最大工位和最小工位时间差评估)
                    imbalance = abs(new_max_time - new_min_time)
                    candidates.append({
                        'type': 'transfer',
                        'task': task,
                        'from_ws': max_ws_idx,
                        'to_ws': min_ws_idx,
                        'imbalance': imbalance
                    })
        
        # 步骤5: Trade - 考虑交换任务
        for task1 in workstations[max_ws_idx]:
            for task2 in workstations[min_ws_idx]:
                # 检查交换后工位时间的变化是否满足条件
                time_decrease = task_times[task1] - task_times[task2]
                if time_decrease <= 2 * G:  # 确保最大工位时间减少且减少量不超过2G
                    print("-"*30)
                    print(f"检查任务 {task1} (工位 {max_ws_idx+1}) 与任务 {task2} (工位 {min_ws_idx+1}) 交换")
                    # 检查交换是否可行
                    if can_swap_tasks(task1, max_ws_idx, task2, min_ws_idx, workstations, predecessors_dict, successors_dict, task_times, cycle_time):
                        # 计算交换后的工位时间
                        new_max_time = max_time - time_decrease
                        new_min_time = min_time + time_decrease
                        # 计算不平衡度
                        imbalance = abs(new_max_time - new_min_time)
                        candidates.append({
                            'type': 'trade',
                            'task1': task1,
                            'ws1': max_ws_idx,
                            'task2': task2,
                            'ws2': min_ws_idx,
                            'imbalance': imbalance
                        })
        
        # 步骤6: 执行最优的移动或交换
        if candidates:
            # 按不平衡度排序，选择使工位最平衡的操作
            best_move = min(candidates, key=lambda x: x['imbalance'])
            print("-"*30)
            if best_move['type'] == 'transfer':
                task = best_move['task']
                from_ws = best_move['from_ws']
                to_ws = best_move['to_ws']
                workstations[from_ws].remove(task)
                workstations[to_ws].append(task)
                print(f"执行Transfer: 任务 {task} 从工位 {from_ws+1} 移动到工位 {to_ws+1}")
                improved = True
            else:  # trade
                task1 = best_move['task1']
                ws1 = best_move['ws1']
                task2 = best_move['task2']
                ws2 = best_move['ws2']
                workstations[ws1].remove(task1)
                workstations[ws2].remove(task2)
                workstations[ws1].append(task2)
                workstations[ws2].append(task1)
                print(f"执行Trade: 任务 {task1} (工位 {ws1+1}) 与任务 {task2} (工位 {ws2+1}) 交换")
                improved = True
        else:
            print("无可行的移动或交换，算法终止")
            break
    
    return workstations

def calculate_smoothness_index(workstations, task_times):
    """
    计算平滑指数(SI)
    参数:
        workstations: 工位分配方案
        task_times: 任务时间字典
    返回:
        smoothness_index: 平滑指数
    """
    # 计算每个工位的时间
    station_times = [sum(task_times[task] for task in station) for station in workstations]
    
    # 找出最大工位时间
    max_station_time = max(station_times)
    
    # 计算平滑指数
    squared_diff_sum = sum((max_station_time - time) ** 2 for time in station_times)
    smoothness_index = math.sqrt(squared_diff_sum)
    
    return smoothness_index

# 在初始分配之后执行Trade and Transfer阶段
print("\n开始执行Trade and Transfer阶段均衡分析...")
balanced_workstations = trade_and_transfer(workstations, predecessors, successors, task_times, cycle_time)

# 显示优化后的结果
print("\n优化后的工位分配:")
for i, station in enumerate(balanced_workstations):
    station_time = sum(task_times[task] for task in station)
    print(f"工位 {i+1}:")
    print(f"  任务: {station}")
    print(f"  总时间: {station_time}")
    print(f"  闲置时间: {cycle_time - station_time}")

# 在显示优化结果时添加平滑指数计算
print("\n计算优化前的平滑指数...")
si = calculate_smoothness_index(workstations, task_times)
print(f"平滑指数(SI): {si:.2f}")
print("\n计算优化后的平滑指数...")
si = calculate_smoothness_index(balanced_workstations, task_times)
print(f"平滑指数(SI): {si:.2f}")


开始执行Trade and Transfer阶段均衡分析...

迭代 1:
工位时间: [10.0, 8.0, 10.0, 2.0]
最大时间工位: 3, 时间: 10.0
最小时间工位: 4, 时间: 2.0
目标值G: 4.0
------------------------------
检查任务 6 从工位 3 转移到工位 4
移动后的临时工位分配: [[1, 2, 4], [3, 5], [7, 8], [9, 6]]
工位时间: 5.0, 循环时间: 10.0
检查任务 6 的紧前任务: [3]
检查紧前任务 3 是否在工位 4 之前
任务 3 在工位 2 中
检查任务 6 的紧后任务: [7]
任务 6 的紧后任务 7 在工位 4 之前，违反约束
------------------------------
检查任务 7 从工位 3 转移到工位 4
移动后的临时工位分配: [[1, 2, 4], [3, 5], [6, 8], [9, 7]]
工位时间: 4.0, 循环时间: 10.0
检查任务 7 的紧前任务: [5, 6]
检查紧前任务 5 是否在工位 4 之前
任务 5 在工位 2 中
检查紧前任务 6 是否在工位 4 之前
任务 6 在工位 3 中
检查任务 7 的紧后任务: [8]
任务 7 的紧后任务 8 在工位 4 之前，违反约束
------------------------------
检查任务 8 从工位 3 转移到工位 4
移动后的临时工位分配: [[1, 2, 4], [3, 5], [6, 7], [9, 8]]
工位时间: 7.0, 循环时间: 10.0
检查任务 8 的紧前任务: [5, 7]
检查紧前任务 5 是否在工位 4 之前
任务 5 在工位 2 中
检查紧前任务 7 是否在工位 4 之前
任务 7 在工位 3 中
检查任务 8 的紧后任务: [9]
任务 8 的紧后任务 9 满足约束
所有约束检查通过，任务可以移动
任务 8 可以转移
------------------------------
检查任务 6 (工位 3) 与任务 9 (工位 4) 交换
交换后的临时工位分配: [[1, 2, 4], [3, 5], [7, 8, 9], [6]]
工位时间: 9.0, 循环时间: 10.0
工位时间: 3.