# 交通分析大作业

## 建立路网

![title](net.png)

## 参数取值

In [1]:
#路段行驶时间正比于路段长度
tm=10 #电动车初始电量最长行驶时间
#路段容量正比于路段长度
C=10000 #充电站的建设成本
a=0.0001 #时间价值转换参数

## Dijkstra 算法

In [2]:
from xml.etree.ElementTree import ElementTree,Element
import numpy as np
import pandas as pd

def read_xml(in_path):#读取xml网络文件
    tree = ElementTree()
    tree.parse(in_path)
    return tree
    
def find_nodes(tree, path):#根据路径查找节点
    return tree.findall(path)

def if_match(node, kv_map):#判断key是否匹配
    for key in kv_map:
        if node.get(key) != kv_map.get(key):
            return False
    return True

def get_node_by_keyvalue(nodelist, kv_map):#根据key查找节点
    result_nodes = []
    for node in nodelist:
        if if_match(node, kv_map):
            result_nodes.append(node)
    return result_nodes

def show_value_by_path(nodelist, tag, kv_map):#显示路径节点
    rlt = []
    for parent_node in nodelist:
        children = list(parent_node)
        if children[0].text == kv_map:
            for child in children:
                print(child.tag+"  "+child.text)

def Dijkstra(O,D,k,tm,pathset,pathtrace,pathlenth,pathrate):#最短路算法
    P=np.zeros(15) #永久性标号
    T=[float('inf')]*15 #试探性标号
    λ=np.zeros(15) #最短路中上一结点
    path=[] #最短路经过结点
    Stranding=np.zeros(15) #最短路中上一路段
    Strand=[] #最短路经过路段
    Lenthing=np.zeros(15) #最短路中上一路段长度
    Lenth=[] #最短路经过路段长度
    S=[O] #节点已用标记
    flag=True
    
    unode=O #上游节点
    tnode=D #下游/目标节点
    nodelist = find_nodes(tree, "nodes/node") #在路网中提取节点
    linklist = find_nodes(tree, "links/link") #在路网中提取路径
    
    while T!=[float('inf')]*15 or flag==True:
        flag=False
        result_nodes = get_node_by_keyvalue(nodelist, {"id": str(unode).zfill(2)}) #在节点中查找上游节点信息
        for parent_node in result_nodes:
            children = list(parent_node)
            for child in children:
                tnode=int(child.attrib.get('id')) #提取该上游节点所连结的节点
                if (tnode in S) == 0: #如果该点未标记已用
                    L=child.text #提取通往相邻节点的路径编号
                    #print(L)
                    result_link=get_node_by_keyvalue(linklist, {"id": L}) #在路网中提取该路径
                    t=float(result_link[0].attrib.get('t')) #提取路径路阻
                    w=t*pathrate[int(L)-1] #并乘上路阻的变化率
                    if T[tnode-1]>(P[unode-1]+w): #如果试探路段更耗时
                        T[tnode-1]=P[unode-1]+w #则更新试探路段标号
                        λ[tnode-1]=unode #同时更新其上一节点
                        Stranding[tnode-1]=int(L) #更新通过路段
                        Lenthing[tnode-1]=t #更新通过路段长度
            unode=T.index(min(T))+1 #查找试探路径中耗时最短的设置为下一次循环的上游
            tnode=T.index(min(T))+1
            P[tnode-1]=min(T) #并对应赋值给永久性标号
            S.append(tnode) #添加标记
            #print('P=',P)
            #print('T=',T)
            #print('λ=',λ)
            #print(unode,tnode)
            T[tnode-1]=float('inf') #将使用过的最短路节点重新设置为无穷
    #print("ok")
    i=D
    #print(Stranding)
    while i!=0: #反查
        path.append(i) #记录最短路经过节点
        Strand.append(int(Stranding[i-1])) #记录最短路经过路段
        Lenth.append(Lenthing[i-1])
        i=int(λ[i-1])
    Strand.reverse()
    Lenth.reverse()
    
    #print(Strand)
    path.reverse()
    if (path in pathset)==0:
        pathset.append(path) #记录最短路经过节点
        pathtrace.append(Strand) #记录最短路经过路段
        pathlenth.append(Lenth) #记录最短路经过路段长度
    else:
        pathset.append([0])
        pathtrace.append([0])
        pathlenth.append([0])
    for l in Strand:
        pathrate[l-1]=pathrate[l-1]+0.5 #增加路阻
    #print(path)
    #print(pathset)
    #print(pathrate)
        
    
                
if __name__ == "__main__":
    
    #读取xml格式路网
    tree = read_xml("net.xml")
    
    ps=[[0 for x in range(15)] for y in range(15)] #记录经过节点
    pt=[[0 for x in range(15)] for y in range(15)] #记录经过路段
    pl=[[0 for x in range(15)] for y in range(15)] #记录经过路段长度
    #print(p)
    
    for O in range(1,16):#起点
        for D in range(1,16):#终点
            pathset=[] #该OD对的4条备选路径经过节点
            pathtrace=[] #该OD对的4条备选路径经过路段
            pathlenth=[] #该OD对的4条备选路径经过路段的长度
            pathrate=[1]*46 #路径的路阻增加率
            for k in range(4): #计算最短路，记录节点、路段、长度
                Dijkstra(O,D,k,tm,pathset,pathtrace,pathlenth,pathrate)
            #print(pathset)
            #print(O,D)
            ps[O-1][D-1]=pathset
            pt[O-1][D-1]=pathtrace
            pl[O-1][D-1]=pathlenth
    #print(ps)
    #print(pt)
    #print(pl)

## 建立双层模型

### 上层模型

$最小系统总成本 min\ Scost=a*T+Ccost\\
充电桩建设成本 Ccost=cnum*C$

### 下层模型

$求最小总体出行时间 min\ T=\sum(xt)\\
某OD间所有路径交通量总和为该od间需求 \sum f=q\\
经过某路段的交通量等于所有OD经过该路段的交通量之和小于路段容量
x=\sum_{od}\sum f\le Ca\\
路径交通量大于零 f\ge 0\\
经过某路段的所有交通量小于路段容量 x\le Ca\\
电动车最大行驶里程约束：子路径长度小于初始电量对应的最大行驶里程 l\le L$

## 遗传算法

In [27]:
Y=[4,6,7,10] #充电站初始位置节点
y=[1,1,1,1] #充电站初始位置基因组
yy=[1,1,1,1] #充电站位置交换基因组
yyy=[] #子代种群
Scost0=float('inf') #父代评价
Scost1=0 #子代评价
a=17
b=13
c=4
S=88
S=(a*S+b)%41
cnum=int(S/10)+1#初始随机充电桩个数
print(cnum)
for i in range(4-cnum):#取消充电站位置
    S=(a*S+b)%41
    cloc=int(S/10)
    y[cloc]=0
for i in range(4):
    if y[i]==0:
        Y[i]=0
print(y,Y)

matrix=[] #最终矩阵
matrix0=[] #目标函数矩阵
matrix1=[] #OD需求矩阵
matrix2=[] #路段容量矩阵
matrix3=[] #路径置零矩阵

def var():#个体变异
    global S
    global Y
    global y
    S=(a*S+b)%41
    cloc=int(S/10)
    if y[cloc]==0:
        y[cloc]=1
    else:
        y[cloc]=0

def her():#交叉遗传
    global S
    global Y
    global y
    global yy
    global yyy
    S=(a*S+b)%41
    cloc=int(S/10)
    next=[1,1,1,1]
    for i in range(4):
        if i <cloc:
            next[i]=y[i]
        else:
            next[i]=yy[i]
    yyy.append(next)
    
def evo():#开始演化
    global S
    global Y
    global y
    global yy
    global yyy
    global Scost0
    global Scost1
    S=(a*S+b)%41
    Scost0=mtrx()
    if S<=2:#3/41变异概率
        var()
    while (Scost1<Scost0):
        Scost0=Scost1
        for i in range(10):#子代生成
            her()
        next=[]
        for d in yyy:#子代评价生成
            Y=[4,6,7,10] #充电站初始位置节点
            for i in 4:
                if d[i]==0:
                    Y[i]=0
            next.append(mtrx())
        Scost1=min(next)
        yy=y
        y=yyy[next.index(Scost1)]
    return(Scost0)

4
[1, 1, 1, 1] [4, 6, 7, 10]


In [19]:
def mtrx():
    global S
    global Y
    global y
    global yy
    global yyy
    global Scost0
    global Scost1
    cnum=0
    for i in Y:
        if i==1:
            cnum=cnum+1
    aim()
    st13()
    st2()
    Scost1=a*M()+cnum*C
    return Scost1

In [34]:
def aim():#目标函数
    global matrix0 #目标函数矩阵
    matrix0=[]
    target=[0]*(15*14*4+15*14+46)#变量+人工变量+松弛变量
    for O in range(1,16):#起点
        for D in range(1,16):#终点
            if O!=D:
                for k in range(4):#4条备选路径
                    Lenth=sum(pl[O-1][D-1][k])
                    if O>D:
                        target[56*(O-1)+4*(D-1)+(k)]=-Lenth
                    if O<D:
                        target[56*(O-1)+4*(D-2)+(k)]=-Lenth
                if O>D:
                    target[15*14*4+14*(O-1)+(D-1)]=-1000000
                if O<D:
                    target[15*14*4+14*(O-1)+(D-2)]=-1000000
    target.append(0)
    matrix0.append(target)
    #print(matrix0)

In [33]:
def st13():#OD需求 & 路径置零约束
    global matrix1 #OD需求矩阵
    matrix1=[]
    global matrix3 #路径置零矩阵
    matrix3=[]
    f=[[[1 for k in range(4)] for D in range(15)]for O in range(15)]

    ODlist = find_nodes(tree, "ODs/OD") #在路网中提取节点
    #print(ODlist)

    #某OD间所有路径交通量总和为该od间需求 sum(f)=q checked
    for O in range(1,16):#起点
        result_ODs = get_node_by_keyvalue(ODlist, {"O": str(O).zfill(2)}) #在OD需求表中查找
        #print(result_ODs)

        for parent_node in result_ODs:
            Q=parent_node.text.split()
        for D in range(1,16):#终点
            if O!=D:
                constraints=[0]*(15*14*4+15*14+46)#变量+人工变量+松弛变量
                Lenth=0
                lenth1=0
                lenth2=0
                for k in range(4):#4条备选路径
                    zero=[0]*(15*14*4+15*14+46+1)#变量+人工变量+松弛变量+置零
                    Lenth=sum(pl[O-1][D-1][k])
                    flag=[]
                    for p in ps[O-1][D-1][k]:
                        if p in Y:
                            flag.append(ps[O-1][D-1][k].index(p))
                    #print(flag)
                    if Lenth>tm and len(flag)>0:#检测双边长度
                        lenth1=sum(pl[O-1][D-1][k][:flag[0]+1])
                        lenth2=sum(pl[O-1][D-1][k][flag[-1]+1:])
                        #print(Lenth,lenth1,lenth2)
                        if lenth1>tm or lenth2>tm:#超出初始里程的路径直接置零
                            f[O-1][D-1][k]=0
                            if O>D:
                                zero[56*(O-1)+4*(D-1)+(k)]=1
                            if O<D:
                                zero[56*(O-1)+4*(D-2)+(k)]=1
                    if Lenth>tm and len(flag)==0:#超出初始里程的路径直接置零
                        f[O-1][D-1][k]=0
                    if O>D:
                        constraints[56*(O-1)+4*(D-1)+(k)]=f[O-1][D-1][k]
                    if O<D:
                        constraints[56*(O-1)+4*(D-2)+(k)]=f[O-1][D-1][k]
                    if 1 in zero:
                        matrix3.append(zero)
                constraints[15*14*4+14*(O-1)+(D-1)]=1
                constraints.append(float(Q[D-1]))
                #print(constraints)
                matrix1.append(constraints)
                if f[O-1][D-1]==[0]*4:
                    print("error: max travel time",O,D) #OD间所有路径最大里程超出初始里程
    print("max travel time checked")
    #print(matrix1)


In [39]:
def st2():#路段容量约束
    global matrix2 #路段容量矩阵
    matrix2=[]
    #经过某路段的交通量等于所有OD经过该路段的交通量之和小于路段容量 x=sum_od(sum(f))<=Ca
    linklist = find_nodes(tree, "links/link") #在路网中提取路径
    #print(linklist)

    for i in range(1,47):#路段编号
        result_link=get_node_by_keyvalue(linklist, {"id": str(i).zfill(2)}) #在路网中提取该路径
        #print(result_link)
        t=float(result_link[0].attrib.get('t')) #提取路径路阻
        constraints=[0]*(15*14*4+15*14+46)#变量+人工变量+松弛变量
        for O in range(1,16):#起点
            for D in range(1,16):#终点
                if O!=D:
                    for k in range(4):#4条备选路径
                        if i in pt[O-1][D-1][k]:
                            if O>D:
                                constraints[56*(O-1)+4*(D-1)+(k)]=1
                            if O<D:
                                constraints[56*(O-1)+4*(D-2)+(k)]=1
        constraints[i-47]=1
        constraints.append(500*t) #添加正比于路段长度的路段容量限制
        matrix2.append(constraints)
    #print(matrix2)

In [24]:
def pivot():
    l = list(matrix[0][:-1])
    #print(l)
    jnum = l.index(max(l)) #转入下标
    #print("转入下标",jnum)
    m = []
    for i in range(bn):
        if matrix[i][jnum] == 0:
            m.append(float('inf'))
        else:
            m.append(matrix[i][-1]/matrix[i][jnum])
    #print("m",m)
    θ=[x for x in m[1:]]
    for i in range(bn-1):
        if θ[i]<=0:
            θ[i]=float('inf')
    #print("theta",θ)
    if min(θ)==float('inf'):#数据缺失的部分直接设为负数
        matrix[0][jnum]=-float('inf')
    else:
        inum = m.index(min(θ))  #转出下标
        #print("转出下标",inum)
        s[inum-1] = jnum
        #print(s)
        r = matrix[inum][jnum]
        matrix[inum] /= r
        for i in [x for x in range(bn) if x !=inum]:
            r = matrix[i][jnum]
            matrix[i] -= r * matrix[inum]
        
def solve():
    flag = True
    while flag:
        if max(list(matrix[0][:-1])) <= 0: #直至所有系数小于等于0
            flag = False
        else:
            pivot()

In [25]:
def M():
    #求最小总体出行时间 min T=sum(xt)
    matrix=np.array(matrix0+matrix1+matrix2+matrix3)

    #print(matrix[0])
    #print(matrix.shape)
    (bn,cn)=matrix.shape
    s=list(range(15*14*4,cn-1)) #记录基变量
    print(s)
    for j in range(cn-1,cn):#大M法目标值预处理
        S=0
        for i in range(1,bn):
            S=S+matrix[i][j]*matrix[0][cn-bn+i-1]
        #print(matrix[0][j],S)
        matrix[0][j]=matrix[0][j]-S

    for j in range(cn-1):#大M法目标函数预处理
        S=0
        for i in range(1,bn):
            S=S+matrix[i][j]*matrix[0][cn-bn+i-1]
        #print(matrix[0][j],S)
        matrix[0][j]=matrix[0][j]-S

    #print(matrix[0])
    solve()

    for i in range(cn - 1):#计算结果追踪
        if i in s:
            print("x"+str(i+1)+"=%.2f" % matrix[s.index(i)+1][-1])
        else:
            print("x"+str(i+1)+"=0.00")
    #print("objective is %.2f"%(-matrix[0][-1]))
    return -matrix[0][-1]

In [40]:
evo()

max travel time checked
[840, 841, 842, 843, 844, 845, 846, 847, 848, 849, 850, 851, 852, 853, 854, 855, 856, 857, 858, 859, 860, 861, 862, 863, 864, 865, 866, 867, 868, 869, 870, 871, 872, 873, 874, 875, 876, 877, 878, 879, 880, 881, 882, 883, 884, 885, 886, 887, 888, 889, 890, 891, 892, 893, 894, 895, 896, 897, 898, 899, 900, 901, 902, 903, 904, 905, 906, 907, 908, 909, 910, 911, 912, 913, 914, 915, 916, 917, 918, 919, 920, 921, 922, 923, 924, 925, 926, 927, 928, 929, 930, 931, 932, 933, 934, 935, 936, 937, 938, 939, 940, 941, 942, 943, 944, 945, 946, 947, 948, 949, 950, 951, 952, 953, 954, 955, 956, 957, 958, 959, 960, 961, 962, 963, 964, 965, 966, 967, 968, 969, 970, 971, 972, 973, 974, 975, 976, 977, 978, 979, 980, 981, 982, 983, 984, 985, 986, 987, 988, 989, 990, 991, 992, 993, 994, 995, 996, 997, 998, 999, 1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014, 1015, 1016, 1017, 1018, 1019, 1020, 1021, 1022, 1023, 1024, 1025, 1026, 1027, 1028, 1

-1561110236215.0

In [38]:
ps

[[[[1], [0], [0], [0]],
  [[1, 2], [0], [0], [0]],
  [[1, 2, 3], [1, 2, 4, 3], [1, 5, 6, 7, 8, 3], [0]],
  [[1, 2, 4], [1, 5, 6, 4], [0], [0]],
  [[1, 5], [0], [0], [0]],
  [[1, 5, 6], [0], [0], [1, 2, 4, 6]],
  [[1, 5, 6, 7], [1, 5, 9, 10, 7], [0], [1, 2, 4, 8, 7]],
  [[1, 2, 4, 8], [1, 5, 6, 7, 8], [1, 2, 3, 8], [1, 5, 9, 10, 11, 12, 8]],
  [[1, 5, 9], [0], [0], [0]],
  [[1, 5, 9, 10], [1, 5, 6, 10], [0], [0]],
  [[1, 5, 9, 10, 11],
   [1, 5, 6, 10, 11],
   [1, 2, 4, 8, 12, 11],
   [1, 5, 9, 13, 14, 15, 11]],
  [[1, 5, 9, 10, 11, 12],
   [1, 2, 4, 8, 12],
   [1, 5, 6, 7, 8, 12],
   [1, 2, 3, 8, 12]],
  [[1, 5, 9, 13], [0], [0], [1, 5, 6, 10, 14, 13]],
  [[1, 5, 9, 13, 14], [1, 5, 6, 10, 14], [1, 5, 9, 10, 14], [0]],
  [[1, 5, 9, 13, 14, 15],
   [1, 5, 6, 10, 11, 15],
   [1, 2, 4, 8, 12, 11, 15],
   [1, 5, 9, 10, 14, 15]]],
 [[[2, 1], [0], [0], [0]],
  [[2], [0], [0], [0]],
  [[2, 3], [2, 4, 3], [0], [0]],
  [[2, 4], [0], [0], [0]],
  [[2, 1, 5], [0], [0], [2, 4, 6, 5]],
  [[2, 4, 6],