In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Counterexample model

State space is [0,1]. There is only one action.  

In [None]:
# Defining the continuous model

def nextState(s):
    "Sampling successor given concrete state"
    if np.random.random()<s/2:
        return np.random.random()*s/2
    else:
        return 0
    
def cost(s):
    return s
    
def cost_region(corners):
    """
    average cost of region given by integral of cost(s,a) values
    
    Region given by its 2 corners 
    """
    return (1/2)*(corners[1]**2-corners[0]**2)/(corners[1]-corners[0])


In [None]:

# Defining the Discretization

def point_to_region(gran,s):
    return np.min((gran-1,int(np.floor(s*gran))))

def nextState_region(gran,r,alpha,transmatrix=None):
    """
    Sampling successor given current region defined by adversary alpha.
    region is given by index as returned by point_to_region
    
    transmatrix is only used for alpha='mean'
    """
    
    leftp=r/gran
    rightp=(1+r)/gran
    
    corners=np.array([leftp,rightp])
    
    if alpha=="min": 
        return point_to_region(gran,nextState(leftp)),leftp
    
    if alpha=="max": 
        return point_to_region(gran,nextState(rightp)),rightp
    
    if alpha=="sampled" or alpha=="sample":
        s=leftp+np.random.random()*(rightp-leftp)
        rpr=point_to_region(gran,nextState(s))
        return rpr,s
            
    if alpha=="mean":
        rpr=np.random.choice(range(gran),p=transmatrix[r,:])
        c=cost_region(corners)
        return rpr,c
    
def trans_prob(gran,r,rpr):
    """
    computes the transition probability (integral) from region r to rpr (rpr <= r)
    """
    leftp=r/gran
    rightp=(1+r)/gran
    leftrpr=rpr/gran
    rightrpr=(rpr+1)/gran
    
    # first determine the intersection of rpr with the 'target support' of T(r,.)
    leftint=np.max((2*leftrpr,leftp))
    rightint=np.min((2*rightrpr,rightp))
    
    if leftp ==0:
        return 1
    
    tp=0
    if leftint<rightint:
        tp+=rightint**2/4-leftrpr*rightint-leftint**2/4+leftrpr*leftint
    if rightint<rightp:
        tp+=(rightrpr-leftrpr)*(rightp-np.max((rightint,leftp)))
    # adding the pointmass:
    if rpr==0:
        tp+=rightp-rightp**2/4-leftp+leftp**2/4
    tp/=(rightp-leftp)    
    return tp
    
def trans_matrix(gran):
    tm=np.zeros((gran,gran))
    for r in range(gran):
        for rpr in range(r+1):
            tm[r,rpr]=trans_prob(gran,r,rpr)
    return tm

In [None]:
# Defining Q-learning

def qupdate(gran,Q,visits,r,cost,rprime):
    #print("Q before update: ", Q, "r: ", r, " a: ", a, "cost: ", cost, "rpr: ", rprime)
    lrate=1/np.sqrt(visits[r])
    #lrate=0.5
    visits[r]+=1
    Q[r]=(1-lrate)*Q[r]+ lrate*(cost+lbda*Q[rprime])
    #print("Q after update: ", Q)

def Q_learn(gran,lbda,numsteps,numepisodes,alpha,transmatrix=None,withtrace=False):
    """
    gran: granularity of partition
    lbda: the discount factor
    numsteps: number of steps per episode
    numepisodes: number of episodes to train on
    alpha: adversary that determines how next regions and costs are sampled
    transmatrix: for alpha="mean" the matrix with transition probabilities between regions
    withtrace: whether to return also the trace of Q-functions over all iterations
    """
    Q=np.zeros((gran))
    visits=np.ones((gran))

    if withtrace:
        Qtrace=np.zeros((numepisodes,numsteps,gran))
        
    for e in range(numepisodes):    
        r=point_to_region(gran,np.random.random())
        for i in range(numsteps):
            rpr,cst=nextState_region(gran,r,alpha=alpha,transmatrix=transmatrix)
            if withtrace:
                Qtrace[e,i,:]=Q
            qupdate(gran,Q,visits,r,cst,rpr)
            r=rpr
            
    if withtrace:
        return Q,Qtrace
    return Q


Illustrating traces of Q-values over training iterations

In [None]:
# Generate Q-traces
g=10
lbda=0.9
numsteps = 10
numepisodes = 1000
alpha=("mean","sampled")

Qtrace_for_alpha={}
for a in alpha:
    tm=None
    if a=="mean":
        tm=trans_matrix(g)
    Q,Qt=Q_learn(g,lbda,numsteps,numepisodes,a,transmatrix=tm,withtrace=True)
    Qtrace_for_alpha[a]=Qt
    


In [None]:

for r in range(g):
    for a in alpha:
        plt.plot(Qtrace_for_alpha[a][:,numsteps-1,r],alpha=0.5,label=a)

    plt.title("region " + str(r))    
    plt.legend()
    plt.show()

In [None]:
# varying granularities

granularities=(5,25,50,100)
lbda=0.9
numsteps = 10
numepisodes = 500
alpha=("min","max","sampled","mean")

Qs_for_alpha ={}
for a in alpha:
    
    Qs=[]
    for g in granularities:
        tm=None
        if a=="mean":
            tm=trans_matrix(g)
        steps_for_granularity=int(np.floor(numsteps*g/granularities[0]))
        print("granularity ", g, "steps: ", steps_for_granularity)
        Q=Q_learn(g,lbda,steps_for_granularity,numepisodes,a,transmatrix=tm)
        Qs.append(Q)
    Qs_for_alpha[a]=Qs

In [None]:
#plotting costs for given initial state as a function of granularity
#and adversary

state=0.25
statevals={}


for a in alpha:
    Qs=Qs_for_alpha[a]
    iv=np.zeros(len(Qs))
    for i in range(len(Qs)):
        r=point_to_region(granularities[i],state)
        iv[i]=(Qs[i][r])
    statevals[a]=iv

for a in alpha:
    plt.plot(granularities,statevals[a],label=a)
    
plt.legend()
# plt.xlim(40,100)
# plt.ylim(0.67,0.75)
plt.show()

In [None]:
# plotting cost as function of partition:
# index of the granularity

colors=('red','green','blue','purple')

for gidx,g in enumerate(granularities):
    xvals=np.arange(0,1+1/g,1/g)
    for a in alpha:
        Q=Qs_for_alpha[a][gidx]
        #duplicating the last value at the end:
        Q=np.hstack((Q,np.array(Q[-1])))
        plt.step(xvals,Q,where='post',label=a+"," + str(g),c=colors[gidx])

plt.legend()
plt.show()

In [None]:
# varying number of steps (focus on lambda=1 case!)
g=25
lbda=1.0
numsteps = (10,100,1000,10000)
numepisodes = 500
alpha=("min","max")

Qs_for_alpha={}
for a in alpha:
    Qs=[]
    for nst in numsteps:
        print("steps: ", nst)
        Q=Q_learn(g,lbda,nst,numepisodes,a)
        Qs.append(Q)
    Qs_for_alpha[a]=Qs

In [None]:
for a in alpha:
    for nidx,nst in enumerate(numsteps):
        xvals=np.arange(0,1+1/g,1/g)
        Q=Qs_for_alpha[a][nidx]
        #duplicating the last value at the end:
        Q=np.hstack((Q,np.array(Q[-1])))
        plt.step(xvals,Q,where='post',label=str(nst),c=colors[nidx])

    plt.legend()
    plt.title(a)
    plt.show()