In [95]:
%reset

In [96]:
# 加载所需的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

In [97]:
def initialization(parameter):
    #产生随机的工作表
    joblist = pd.DataFrame(columns=['d_min','d_max','w_o','w_f','p','w','start','stop'],
                           dtype = int)
    #Trapezoidal Suitability Function，打分函数，列标题代表区块开始的时间点
    tsf = pd.DataFrame(0,
                       index=range(parameter['time_units']),
                       columns=range(parameter['n']))
    # 依次生成每一个任务的属性
    for i in range(parameter['n']):
        #时间段：w,d_min,d_max,d
        d_min = np.random.randint(parameter['d_min'],parameter['d_max']+1)
        d_max = np.random.randint(d_min,parameter['d_max']+1)
        w = np.random.randint(d_max,parameter['w_max']+1)
        #时间点：w_o,w_f,start,stop
        w_o = np.random.randint(0,parameter['time_units']-w)
        w_f = w_o + w
        #优先级：p
        p = np.random.randint(1,parameter['p_max']+1)
        # 更新joblist
        joblist.loc[i] = (d_min,d_max,w_o,w_f,p,w,0,0)

        # 更新打分函数
        s_i = tsf[i].copy()
        # 梯形4个顶点
        a = w_o
        b = int( w_o + np.ceil((w - d_max)/2) )
        c = b + d_max
        d = w_f
        #赋值1
        s_i[range(b,c)] = 1
        #赋值2
        if b > a:
            degree_1 = 1/(2*(b-a))
            n_1 = 1
            for j in range(a,b):
                s_i[j] = degree_1 * (2 * n_1 - 1)
                n_1 += 1
        #赋值3
        if d > c:
            degree_2 = 1/(2*(d-c))
            n_2 = 1
            for j in range(d-1,c-1,-1):
                s_i[j] = degree_2 * (2 * n_2 - 1)
                n_2 += 1
        #更新打分函数
        tsf[i] = s_i

    return tsf,joblist

In [125]:
def pd_sequencing(parameter,pd_jobindex,job):
    pd_job = job.copy()
    #总时间轴,1代表位置可用
    pd_timetable = pd.Series(1,index=range(parameter['time_units']))
    #初始化总时间轴，因为有可能已经有任务指定了时间
    for i in range(parameter['n']):
        pd_timetable.loc[range(pd_job.loc[i,'start'],pd_job.loc[i,'stop'])]=0
    #任务时间轴,1代表该时间在该任务的窗口中
    empty_timetable = pd.Series(0,index=range(parameter['time_units']))
    #建立空工作表，按顺序存放被选择的工作的序号
    chosen_job_list = list()

    #对每一个工作进行判断
    for i in range(len(pd_jobindex)):
        job_index = pd_jobindex[i]
        job_info = pd_job.loc[job_index,]

        d_min = job_info.loc['d_min']
        d_max = job_info.loc['d_max']
        w_o = job_info.loc['w_o']
        w_f = job_info.loc['w_f']
        p = job_info.loc['p']
        w = job_info.loc['w']
        #下序列用来存放每一个开始时间，长度为d_min的工作带来的收益
        position_value = pd.Series(0,index=range(parameter['time_units']))
        #假设没有可供使用的位置
        at_least_one = False
        #对每一个位置进行测试
        for j in range(w-d_min+1):
            test_start = w_o+j
            test_stop = test_start+d_min
            test_job = empty_timetable.copy()
            test_job.loc[range(test_start,test_stop)]=1
            #判断位置是否可用
            if np.sum(test_job*pd_timetable) == d_min:
                at_least_one = True
                position_value.loc[test_start] =\
                    np.sum(TSF.loc[range(test_start,test_stop),job_index])
        #如果有至少一个可用位置，则选择最大的那个
        if at_least_one:
            decide_start = \
                np.min(position_value.index[position_value == position_value.max()])
            decide_stop = decide_start+d_min
            #更新工作表
            pd_job.loc[job_index,'start'] = decide_start
            pd_job.loc[job_index,'stop'] = decide_stop
            #更新工作序号表
            chosen_job_list.append(job_index)
        else:
            decide_start = 0
            decide_stop = 0
        #刚刚被占用的位置不再可用
        pd_timetable.loc[range(decide_start,decide_stop)]=0

    return pd_job,chosen_job_list

In [126]:
def pd_timing(pd_chosen_jobindex,pd_job):
    test_pd_job = pd_job.copy()
    #对每一个工作进行左右延伸
    for i in range(len(pd_chosen_jobindex)):
        job_index = pd_chosen_jobindex[i]
        job_info = test_pd_job.loc[job_index,]

        d_min = job_info.loc['d_min']
        d_max = job_info.loc['d_max']
        w_o = job_info.loc['w_o']
        w_f = job_info.loc['w_f']
        p = job_info.loc['p']
        w = job_info.loc['w']
        start = job_info.loc['start']
        stop = job_info.loc['stop']
        #左右边界
        new_start = np.nanmax([test_pd_job.loc[test_pd_job.loc[:,'stop']<=start,'stop'].max(),
                               w_o,stop-d_max])
        new_stop = np.nanmin([test_pd_job.loc[test_pd_job.loc[:,'start']>=stop,'start'].min(),
                              w_f,new_start+d_max])
        #左右延伸
        test_pd_job.loc[job_index,'start'] = new_start
        test_pd_job.loc[job_index,'stop'] = new_stop
    return test_pd_job

In [127]:
def get_timetable(job,parameter):
    empty_timetable = pd.Series(-1,index=range(parameter['time_units']))
    for i in range(parameter['n']):
        empty_timetable.loc[range(job.loc[i,'start'],job.loc[i,'stop'])]=i
    return empty_timetable

In [128]:
def score(JOB,TSF,parameter):
    s = 0
    for i in range(parameter['n']):
        start = JOB.loc[i,'start']
        stop = JOB.loc[i,'stop']
        p = JOB.loc[i,'p']

        s_i = 0
        for j in range(start,stop):
            s_i += TSF.loc[j,i]

        s += s_i * p
    return s

In [129]:
# 设置参数
parameter = {'n':40,
             'time_units' : 100,
             'w_max':25,
             'd_min':1,
             'd_max':25,
             'p_max':10}

In [130]:
#初始化
np.random.seed(1)
TSF,JOB = initialization(parameter)
job_index_ordered_by_p = JOB.sort_values(by='p',ascending = False).index

In [131]:
#PD算法：任务按照p值排序
pd_job = JOB.copy()

In [132]:
pd_job_after_sequencing,pd_job_index_sequencing_chosen = \
    pd_sequencing(parameter,job_index_ordered_by_p,pd_job)

In [133]:
pd_job_after_timing = \
    pd_timing(pd_job_index_sequencing_chosen,pd_job_after_sequencing)

In [134]:
pd_score = score(pd_job_after_timing,TSF,parameter)

In [143]:
pd_timetable = get_timetable(pd_job_after_timing,parameter)

In [145]:
def pd_get_score(parameter,job_index_ordered_by_p,pd_job,TSF):
    pd_job_after_sequencing,pd_job_index_sequencing_chosen = \
        pd_sequencing(parameter,job_index_ordered_by_p,pd_job)
    pd_job_after_timing = \
        pd_timing(pd_job_index_sequencing_chosen,pd_job_after_sequencing)
    pd_score = score(pd_job_after_timing,TSF,parameter)
    return pd_score

In [146]:
pd_get_score(parameter,job_index_ordered_by_p,pd_job,TSF)

686.8555555555556

In [152]:
def LA(job,job_index_ordered_by_p,parameter,TSF):
    LA_job = job.copy()

    #总时间轴,1代表位置可用
    LA_timetable = pd.Series(1,index=range(parameter['time_units']))
    #任务时间轴,1代表该时间在该任务的窗口中
    empty_timetable = pd.Series(0,index=range(parameter['time_units']))

    for i in range(parameter['n']):
        job_index = job_index_ordered_by_p[i]
        job_info = LA_job.loc[job_index,]

        d_min = job_info.loc['d_min']
        d_max = job_info.loc['d_max']
        w_o = job_info.loc['w_o']
        w_f = job_info.loc['w_f']
        p = job_info.loc['p']
        w = job_info.loc['w']

        test_score = 0
        best_start = 0
        best_stop = 0

        for start in range(w_o,w_f-d_min+1):
            for d in range(d_min,min(d_max,w_f-start)+1):
                stop = start+d
                test_timetable = empty_timetable.copy()
                test_timetable.loc[range(start,stop)]=1

                if np.sum(test_timetable*LA_timetable) == d:
                    test_LA_job = LA_job.copy()
                    test_LA_job.loc[job_index,'start'] = start
                    test_LA_job.loc[job_index,'stop'] = stop
                    test_job_index_ordered_by_p = job_index_ordered_by_p[(i+1):]

                    new_test_score = \
                        pd_get_score(parameter,test_job_index_ordered_by_p,test_LA_job,TSF)

                    test_score = max(test_score,new_test_score)
                    if new_test_score == test_score:
                        best_start = start
                        best_stop = stop

        LA_job.loc[job_index,'start'] = best_start
        LA_job.loc[job_index,'stop'] = best_stop

        LA_timetable.loc[range(best_start,best_stop)]=0

    return LA_job

In [153]:
LA_job = LA(JOB,job_index_ordered_by_p,parameter,TSF)

In [154]:
LA_score = score(LA_job,TSF,parameter)

In [155]:
LA_timetable = get_timetable(LA_job,parameter)

In [156]:
def PMX_crossover(parent1, parent2, seed):
    '''
    parent1 and parent2 are 1D np.array
    '''
    rng = np.random.default_rng(seed=seed)

    cutoff_1, cutoff_2 = np.sort(rng.choice(np.arange(len(parent1)+1), size=2, replace=False))

    def PMX_one_offspring(p1, p2):
        offspring = np.zeros(len(p1), dtype=p1.dtype)

        # Copy the mapping section (middle) from parent1
        offspring[cutoff_1:cutoff_2] = p1[cutoff_1:cutoff_2]

        # copy the rest from parent2 (provided it's not already there
        for i in np.concatenate([np.arange(0,cutoff_1), np.arange(cutoff_2,len(p1))]):
            candidate = p2[i]
            while candidate in p1[cutoff_1:cutoff_2]: # allows for several successive mappings
                print(f"Candidate {candidate} not valid in position {i}") # DEBUGONLY
                candidate = p2[np.where(p1 == candidate)[0][0]]
            offspring[i] = candidate
        return offspring

    offspring1 = PMX_one_offspring(parent1, parent2)
    offspring2 = PMX_one_offspring(parent2, parent1)

    return offspring1, offspring2


In [158]:
parent1 = np.array([1,2,3,4,5,6,7,8,9])
parent2 = np.array([5,4,6,7,2,1,3,9,8])
PMX_crossover(parent1, parent2, 1)


Candidate 5 not valid in position 0
Candidate 2 not valid in position 1


(array([2, 4, 6, 7, 5, 1, 3, 9, 8]), array([1, 5, 3, 4, 2, 6, 7, 8, 9]))