# Chapter-1 Agent And Control

###a. Representing Agents and Environments

In [None]:
# Body of the AI Agent
import random

class Agent(object):
  def __init__(self,env):
    "setup the agent "
    self.env=env

def go(self,n):
  "acts for n time steps"
  raise NotImplementedError("go") # abstract method


The environment implements a do(action) method where action is a variable value dictionary. This returns a percept, which is also a variable-value dictionary. The use of dictionaries allows for structured actions and percepts.

Note that Environment is a subclass of Displayable so that it can use the
display method described in Section 1.7.1.


In [None]:
! pip install display



In [None]:
from display import Displayable
class Environment(Displayable):
  def initial_percepts(self):
    """returns the initial percepts for the agent"""
    raise NotImplementedError("initial_percepts") # abstract method
    
  def do(self,action):
    """does the action in the environment
    returns the next percept """
    raise NotImplementedError("do") # abstract method


### The Environments.py




The environment state is given in terms of the time and the amount of paper in
stock. It also remembers the in-stock history and the price history. The percepts
are the price and the amount of paper in stock. The action of the agent is the
number to buy.


Here we assume that the prices are obtained from the prices list plus a random integer in range [0, max price addon) plus a linear ”inflation”. The agent
cannot access the price model; it just observes the prices and the amount in
stock.


In [None]:
class TP_env(Environment):

  prices = [234, 234, 234, 234, 255, 255, 275, 275, 211, 211, 211,
            234, 234, 234, 234, 199, 199, 275, 275, 234, 234, 234, 234, 255,
            255, 260, 260, 265, 265, 265, 265, 270, 270, 255, 255, 260, 260,
            265, 265, 150, 150, 265, 265, 270, 270, 255, 255, 260, 260, 265,
            265, 265, 265, 270, 270, 211, 211, 255, 255, 260, 260, 265, 265,
            260, 265, 270, 270, 205, 255, 255, 260, 260, 265, 265, 265, 265,
            270, 270]
  max_price_addon = 20 # maximum of random value added to get price

  def __init__(self):
    """paper buying agent"""
    self.time=0
    self.stock=20
    self.stock_history = [] # memory of the stock history
    self.price_history = [] # memory of the price history

  def initial_percepts(self):
      """return initial percepts"""
      self.stock_history.append(self.stock)
      price = self.prices[0]+random.randrange(self.max_price_addon)
      self.price_history.append(price)

      return {'price': price,
      'instock': self.stock}

  def do(self, action):
      """does action (buy) and returns percepts (price and instock)"""
      used = pick_from_dist({6:0.1, 5:0.1, 4:0.2, 3:0.3, 2:0.2, 1:0.1})
      bought = action['buy']
      self.stock = self.stock+bought-used
      self.stock_history.append(self.stock)
      self.time += 1

      price = (self.prices[self.time%len(self.prices)] # repeating pattern
      +random.randrange(self.max_price_addon) # plus randomness
      +self.time//2) # plus inflation
      self.price_history.append(price)
      return {'price': price,
      'instock': self.stock}

The **pick_from_dist** method takes in a item : probability dictionary, and returns
one of the items in proportion to its probability.


In [None]:
def pick_from_dist(item_prob_dist):
  """ returns a value from a distribution.
  item_prob_dist is an item:probability dictionary, where the
  probabilities sum to 1.
  returns an item chosen in proportion to its probability
  """
  ranreal = random.random()
  for (it,prob) in item_prob_dist.items():
    if ranreal < prob:
      return it
    else: 
      ranreal -= prob
      raise RuntimeError(str(item_prob_dist)+" is not a probability distribution")

###The Agent.py

In [None]:
class TP_agent(Agent):
  def __init__(self, env):
    self.env = env
    self.spent = 0
    percepts = env.initial_percepts()
    self.ave = self.last_price = percepts['price']
    self.instock = percepts['instock']

  def go(self, n):
    """go for n time steps
    """
    for i in range(n):
      if self.last_price < 0.9*self.ave and self.instock < 60:
        tobuy = 48
      elif self.instock < 12:
        tobuy = 12
      else:
        tobuy = 0
      self.spent += tobuy*self.last_price
      percepts = env.do({'buy': tobuy})
      self.last_price = percepts['price']
      self.ave = self.ave+(self.last_price-self.ave)*0.05
      self.instock = percepts['instock']
env = TP_env()
ag = TP_agent(env)
ag.go(90)
ag.spent/env.time ## average spent per time period

##Plotting.py

In [None]:
import matplotlib.pyplot as plt

class Plot_prices(object):
  """Set up the plot for history of price and number in stock"""
  def __init__(self, ag,env):
    self.ag = ag
    self.env = env
    plt.ion()
    plt.xlabel("Time")
    plt.ylabel("Number in stock.Price.")

  def plot_run(self):
    """plot history of price and instock"""
    num = len(env.stock_history)
    plt.plot(range(num),env.stock_history,label="In stock")
    plt.plot(range(num),env.price_history,label="Price")
    #plt.legend(loc="upper left")
    plt.draw()

#pl = Plot_prices(ag,env)
#ag.go(90); pl.plot_run()


##b.Hierarchical Controller

To run the hierarchical controller, in folder ”aipython”, load
**”agentTop.py”**, using e.g., ipython -i **agentTop.py**, and copy and
paste the commands near the bottom of that file. This requires Python
3 with matplotlib.

In this implementation, each layer, including the top layer, implements the environment class, because each layer is seen as an environment from the layer
above

We arbitrarily divide the environment and the body, so that the environment just defines the walls, and the body includes everything to do with the
agent. Note that the named locations are part of the (top-level of the) agent,
not part of the environment, although they could have been

##Enviroment.py

The environment defines the walls.


In [None]:
import math
from agents import Environment
class Rob_env(Environment):
  def __init__(self,walls = {}):
    """walls is a set of line segments
    where each line segment is of the form ((x0,y0),(x1,y1))
    """
    self.walls = walls
    

##Body.py

The body defines everything about the agent body

In [None]:
import math
from agents import Environment
import matplotlib.pyplot as plt
import time

class Rob_body(Environment):
 def __init__(self, env, init_pos=(0,0,90)):
    """ env is the current environment
    init_pos is a triple of (x-position, y-position, direction)
    direction is in degrees; 0 is to right, 90 is straight-up, etc
    """
    self.env = env
    self.rob_x, self.rob_y, self.rob_dir = init_pos
    self.turning_angle = 18 # degrees that a left makes
    self.whisker_length = 6 # length of the whisker
    self.whisker_angle = 30 # angle of whisker relative to robot
    self.crashed = False
    # The following control how it is plotted
    self.plotting = True # whether the trace is being plotted
    self.sleep_time = 0.05 # time between actions (for real-time plotting)
    # The following are data structures maintained:
    self.history = [(self.rob_x, self.rob_y)] # history of (x,y) positions
    self.wall_history = [] # history of hitting the wall

 def percepts(self):
   return {'rob_x_pos':self.rob_x, 'rob_y_pos':self.rob_y, 'rob_dir':self.rob_dir, 'whisker':self.whisker() , 'rashed':self.crashed}
 initial_percepts = percepts # use percept function for initial percepts too

 def do(self,action):
  """ action is {'steer':direction}
  direction is 'left', 'right' or 'straight'
 """
  if self.crashed:
    return self.percepts()
  direction = action['steer']
  compass_deriv ={'left':1,'straight':0,'right':-1}[direction]*self.turning_angle
  self.rob_dir = (self.rob_dir + compass_deriv +360)%360 # make in range [0,360)
  rob_x_new = self.rob_x + math.cos(self.rob_dir*math.pi/180)
  rob_y_new = self.rob_y + math.sin(self.rob_dir*math.pi/180)
  path = ((self.rob_x,self.rob_y),(rob_x_new,rob_y_new))

  if any(line_segments_intersect(path,wall) for wall in self.env.walls):
    self.crashed = True
    if self.plotting:
      plt.plot([self.rob_x],[self.rob_y],"r*",markersize=20.0)
      plt.draw()
    self.rob_x, self.rob_y = rob_x_new, rob_y_new
    self.history.append((self.rob_x, self.rob_y))

    if self.plotting and not self.crashed:
      plt.plot([self.rob_x],[self.rob_y],"go")
      plt.draw()
      plt.pause(self.sleep_time)
    return self.percepts()

def whisker(self):
 """returns true whenever the whisker sensor intersects with a wall
 """
 whisk_ang_world = (self.rob_dir-self.whisker_angle)*math.pi/180
 # angle in radians in world coordinates
 wx = self.rob_x + self.whisker_length * math.cos(whisk_ang_world)
 wy = self.rob_y + self.whisker_length * math.sin(whisk_ang_world)
 whisker_line = ((self.rob_x,self.rob_y),(wx,wy))
 hit = any(line_segments_intersect(whisker_line,wall)for wall in self.env.walls)
 if hit:

   self.wall_history.append((self.rob_x, self.rob_y))
   if self.plotting:
     plt.plot([self.rob_x],[self.rob_y],"ro")
     plt.draw()
   return hit

 def line_segments_intersect(linea,lineb):
  """returns true if the line segments, linea and lineb intersect.
  A line segment is represented as a pair of points.
  A point is represented as a (x,y) pair.
  """
  ((x0a,y0a),(x1a,y1a)) = linea
  ((x0b,y0b),(x1b,y1b)) = lineb
  da, db = x1a-x0a, x1b-x0b
  ea, eb = y1a-y0a, y1b-y0b
  denom = db*ea-eb*da
  if denom==0: # line segments are parallel
    return False
  cb = (da*(y0b-y0a)-ea*(x0b-x0a))/denom # position along line b
  if cb<0 or cb>1:
    return False
  ca = (db*(y0b-y0a)-eb*(x0b-x0a))/denom # position along line a
  return 0<=ca<=1

  # Test cases:
  # assert line_segments_intersect(((0,0),(1,1)),((1,0),(0,1)))
  # assert not line_segments_intersect(((0,0),(1,1)),((1,0),(0.6,0.4)))
  # assert line_segments_intersect(((0,0),(1,1)),((1,0),(0.4,0.6)))


This detects if the whisker and the wall intersect. It’s value is returned as a
percept.

##Middle Layer.py

The middle layer acts like both a controller (for the environment layer) and an
environment for the upper layer. It has to tell the environment how to steer.
Thus it calls **env.do(·)**. It also is told the position to go to and the timeout. Thus
it also has to implement do(·).


###AgentMiddle.py

In [None]:
from agents import Environment
import math
class Rob_middle_layer(Environment):
  def __init__(self,env):
    self.env=env
    self.percepts = env.initial_percepts()
    self.straight_angle = 11 # angle that is close enough to straight ahead
    self.close_threshold = 2 # distance that is close enough to arrived
    self.close_threshold_squared = self.close_threshold**2 # just compute it once
  def initial_percepts(self):
    return {}
  def do(self, action):
    """action is {'go_to':target_pos,'timeout':timeout}
    target_pos is (x,y) pair
    timeout is the number of steps to try
    returns {'arrived':True} when arrived is true
    or {'arrived':False} if it reached the timeout"""
    if 'timeout' in action:
      remaining = action['timeout']
    else:
      remaining = -1 # will never reach 0
      target_pos = action['go_to']
      arrived = self.close_enough(target_pos)
    while not arrived and remaining != 0:
      self.percepts = self.env.do({"steer":self.steer(target_pos)})
      remaining -= 1
      arrived = self.close_enough(target_pos)
    return {'arrived':arrived}


In [None]:
def steer(self,target_pos):
 if self.percepts['whisker']:
  self.display(3,'whisker on', self.percepts)
  return "left"
 else:
  gx,gy = target_pos
  rx,ry = self.percepts['rob_x_pos'],self.percepts['rob_y_pos']
  goal_dir = math.acos((gx-rx)/math.sqrt((gx-rx)*(gx-rx)
  +(gy-ry)*(gy-ry)))*180/math.pi

  if ry>gy:
    goal_dir = -goal_dir
  goal_from_rob = (goal_dir -self.percepts['rob_dir']+540)%360-180
  assert -180 < goal_from_rob <= 180
  if goal_from_rob > self.straight_angle:
    return "left"
  elif goal_from_rob < -self.straight_angle:
    return "right"
  else:
    return "straight"

 def close_enough(self,target_pos):
    gx,gy = target_pos
    rx,ry = self.percepts['rob_x_pos'],self.percepts['rob_y_pos']
    return (gx-rx)**2 + (gy-ry)**2 <= self.close_threshold_squared

In [None]:
from agentMiddle import Rob_middle_layer
from agents import Environment
class Rob_top_layer(Environment):
 def __init__(self, middle, timeout=200, locations = {'mail':(-5,10),
    'o103':(50,10), 'o109':(100,10),'storage':(101,51)}):
    """middle is the middle layer
    timeout is the number of steps the middle layer goes before giving up
    locations is a loc:pos dictionary
    where loc is a named location, and pos is an (x,y) position.
    """
    self.middle = middle
    self.timeout = timeout # number of steps before the middle layer should give up
    self.locations = locations

 def do(self,plan):
    """carry out actions.
    actions is of the form {'visit':list_of_locations}
    It visits the locations in turn.
    """
    to_do = plan['visit']
    for loc in to_do:
      position = self.locations[loc]
      arrived = self.middle.do({'go_to':position, 'timeout':self.timeout})
      self.display(1,"Arrived at",loc,arrived)


###Plotting.py

In [None]:
import matplotlib.pyplot as plt
class Plot_env(object):
  def __init__(self, body,top):
    """sets up the plot
    """
    self.body = body
    plt.ion()
    plt.clf()
    plt.axes().set_aspect('equal')
    
    for wall in body.env.walls:
      ((x0,y0),(x1,y1)) = wall
      plt.plot([x0,x1],[y0,y1],"-k",linewidth=3)

    for loc in top.locations:
      (x,y) = top.locations[loc]
      plt.plot([x],[y],"k<")
      plt.text(x+1.0,y+0.5,loc) # print the label above and to theright
    plt.plot([body.rob_x],[body.rob_y],"go")
    plt.draw()

def plot_run(self):
  """plots the history after the agent has finished.
  This is typically only used if body.plotting==False
  """
  xs,ys = zip(*self.body.history)
  plt.plot(xs,ys,"go")
  wxs,wys = zip(*self.body.wall_history)
  plt.plot(wxs,wys,"ro")
  #plt.draw()

The following code plots the agent as it acts in the world:

In [None]:
from agentEnv import Rob_body, Rob_env
env = Rob_env({((20,0),(30,20)), ((70,-5),(70,25))})
body = Rob_body(env)
middle = Rob_middle_layer(body)
top = Rob_top_layer(middle)
# try:
# pl=Plot_env(body,top)
# top.do({'visit':['o109','storage','o109','o103']})
# You can directly control the middle layer:
# middle.do({'go_to':(30,-10),"timeout":200)}
# Robot Trap for which the current controller cannot escape:
trap_env = Rob_env({((10,-21),(10,0)), ((10,10),(10,31)),
((30,-10),(30,0)),
((30,10),(30,20)), ((50,-21),(50,31)),
((10,-21),(50,-21)),
((10,0),(30,0)), ((10,10),(30,10)), ((10,31),(50,31))})
trap_body = Rob_body(trap_env,init_pos=(-1,0,90))
trap_middle = Rob_middle_layer(trap_body)
trap_top = Rob_top_layer(trap_middle,locations={'goal':(71,0)})
# pl=Plot_env(trap_body,trap_top)
# trap_top.do({'visit':['goal']})

#Chapter-2 Searching for Solutions

### a)Representing Search Problems


A search problem consists of:
* a start node
* a neighbors function that given a node, returns an enumeration of the
arcs from the node
* a specification of a goal in terms of a Boolean function that takes a node
and returns true if the node is a goal
* a (optional) heuristic function that, given a node, returns a non-negative
real number. The heuristic function defaults to zero.


As far as the searcher is concerned a node can be anything. If multiple-path
pruning is used, a node must be hashable. In the simple examples, it is a string,
but in more complicated examples (in later chapters) it can be a tuple, a frozen
set, or a Python object.

In the following code raise NotImplementedError() is a way to specify that
this is an abstract method that needs to be overridden to define an actual search
problem.


In [None]:
class Search_problem(object):
    """A search problem consists of:
    * a start node
    * a neighbors function that gives the neighbors of a node
    * a specification of a goal
    * a (optional) heuristic function"""

 def start_node(self):
    """returns start node"""
    raise NotImplementedError("start_node") # abstract method

 def is_goal(self,node):
    """is True if node is a goal"""
    raise NotImplementedError("is_goal") # abstract method

 def neighbors(self,node):
    """returns a list of the arcs for the neighbors of node"""
    raise NotImplementedError("neighbors") # abstract method

 def heuristic(self,n):
   
  """Gives the heuristic value of node n.
  Returns 0 if not overridden."""
  return 0

The neighbors is a list of arcs. A (directed) arc consists of a from node node
and a to node node. The arc is the pair ⟨from node, to node⟩, but can also contain
a non-negative cost (which defaults to 1) and can be labeled with an action.

###SearchProblem.py

In [None]:
class Arc(object):
 """An arc has a from_node and a to_node node and a (non-negative)cost"""
 def __init__(self, from_node, to_node, cost=1, action=None):
  assert cost >= 0, ("Cost cannot be negative for"+
  str(from_node)+"->"+str(to_node)+", cost:"+str(cost))
  self.from_node = from_node
  self.to_node = to_node
  self.action = action
  self.cost=cost

 def __repr__(self):
  """string representation of an arc"""
  if self.action:
    return str(self.from_node)+" --"+str(self.action)+"-->"+str(self.to_node)
  else:
    return str(self.from_node)+" --> "+str(self.to_node)

### 3.1.1 Explicit Representation of Search Graph

The first representation of a search problem is from an explicit graph (as opposed to one that is generated as needed).


Representing Search Problems 
* a list or set of nodes
* a list or set of arcs
* a start node
* a list or set of goal nodes
* (optionally) a dictionary that maps a node to a heuristic value for that
node

To define a search problem, we need to define the start node, the goal predicate,
the neighbors function and the heuristic function.

In [None]:
class Search_problem_from_explicit_graph(Search_problem):

  """A search problem consists of:
  * a list or set of nodes
  * a list or set of arcs
  * a start node
  * a list or set of goal nodes
  * a dictionary that maps each node into its heuristic value.
  * a dictionary that maps each node into its (x,y) position
  """

  def __init__(self, nodes, arcs, start=None, goals=set(), hmap={},positions={}):
      
    self.neighs = {}
    self.nodes = nodes

    for node in nodes:
      self.neighs[node]=[]
    self.arcs = arcs

    for arc in arcs:
      self.neighs[arc.from_node].append(arc)
      self.start = start
      self.goals = goals
      self.hmap = hmap
      self.positions = positions

  def start_node(self):
    """returns start node"""
    return self.start 

  def is_goal(self,node):
    """is True if node is a goal"""
    return node in self.goals

  def neighbors(self,node):
    """returns the neighbors of node"""
    return self.neighs[node]

  def heuristic(self,node):
    """Gives the heuristic value of node n.
    Returns 0 if not overridden in the hmap."""
    if node in self.hmap:
      return self.hmap[node]
    else:
      return 0
      
  def __repr__(self):
    """returns a string representation of the search problem"""
    res=""
    for arc in self.arcs:
      res += str(arc)+". "
    return res

  def neighbor_nodes(self,node):
    """returns an iterator over the neighbors of node"""
    return (path.to_node for path in self.neighs[node])



###3.1.2 Paths

A searcher will return a path from the start node to a goal node. A Python list
is not a suitable representation for a path, as many search algorithms consider
multiple paths at once, and these paths should share initial parts of the path.
If we wanted to do this with Python lists, we would need to keep copying the
list, which can be expensive if the list is long. An alternative representation is
used here in terms of a recursive data structure that can share subparts

A path is either:
* a node (representing a path of length 0) or
*  a path, initial and an arc, where the from node of the arc is the node at the end of initial.

These cases are distinguished in the following code by having arc = None if the
path has length 0, in which case initial is the node of the path. Python yield is
used for enumerations only

###PathsSearch.py

In [None]:
class Path(object):
  """A path is either a node or a path followed by an arc"""
  def __init__(self,initial,arc=None):
    """initial is either a node (in which case arc is None) or
    a path (in which case arc is an object of type Arc)"""
    self.initial = initial

    self.arc=arc
    if arc is None:
      self.cost=0
    else:
      self.cost = initial.cost+arc.cost
  def end(self):
    """returns the node at the end of the path"""
    if self.arc is None:
      return self.initial
    else:
      return self.arc.to_node
  def nodes(self):
    """enumerates the nodes for the path.
    This starts at the end and enumerates nodes in the path backwards."""
    current = self
    while current.arc is not None:
      yield current.arc.to_node
      current = current.initial
    yield current.initial

  def initial_nodes(self):
      """enumerates the nodes for the path before the end node.
      This starts at the end and enumerates nodes in the path backwards."""
      if self.arc is not None:
        yield from self.initial.nodes()

  def __repr__(self):
    """returns a string representation of a path"""
    if self.arc is None:
      return str(self.initial)
    elif self.arc.action:
      return (str(self.initial)+"\n --"+str(self.arc.action)
    +"--> "+str(self.arc.to_node))
    else:
      return str(self.initial)+" --> "+str(self.arc.to_node)

In [None]:
problem1 = Search_problem_from_explicit_graph(
{'a','b','c','d','g'},
[Arc('a','c',1), Arc('a','b',3), Arc('c','d',3), Arc('c','b',1), Arc('b','d',1), Arc('b','g',3), Arc('d','g',1)],
start = 'a',
goals = {'g'},
positions={'a': (0, 0), 'b': (1, 1), 'c': (0,1), 'd': (1,2), 'g': (2,2)})

problem2 = Search_problem_from_explicit_graph(
 {'a','b','c','d','e','g','h','j'},
 [Arc('a','b',1), Arc('b','c',3), Arc('b','d',1), Arc('d','e',3),
 Arc('d','g',1), Arc('a','h',3), Arc('h','j',1)],
 start = 'a',
 goals = {'g'},
 positions={'a': (0, 0), 'b': (0, 1), 'c': (0,4), 'd': (1,1), 'e': (1,4),
 'g': (2,1), 'h': (3,0), 'j': (3,1)})

problem3 = Search_problem_from_explicit_graph(
{'a','b','c','d','e','g','h','j'},[],start = 'g',goals = {'k','g'})

acyclic_delivery_problem = Search_problem_from_explicit_graph(
{'mail','ts','o103','o109','o111','b1','b2','b3','b4','c1','c2','c3',
'o125','o123','o119','r123','storage'},
[Arc('ts','mail',6),
Arc('o103','ts',8),
Arc('o103','b3',4),
Arc('o103','o109',12),
Arc('o109','o119',16),
Arc('o109','o111',4),
Arc('b1','c2',3),
Arc('b1','b2',6),
Arc('b2','b4',3),
Arc('b3','b1',4),
Arc('b3','b4',7),
Arc('b4','o109',7),
Arc('c1','c3',8),
Arc('c2','c3',6),
Arc('c2','c1',4),
Arc('o123','o125',4),
Arc('o123','r123',4),
Arc('o119','o123',9),
Arc('o119','storage',7)],
start = 'o103',
goals = {'r123'},
hmap = {
'mail' : 26,
'ts' : 23,
'o103' : 21,
'o109' : 24,
'o111' : 27,
'o119' : 11,
'o123' : 4,
'o125' : 6,
'r123' : 0,
'b1' : 13,
'b2' : 15,
'b3' : 17,
'b4' : 18,
'c1' : 6,
'c2' : 10,
'c3' : 12,
'storage' : 12})

cyclic_delivery_problem = Search_problem_from_explicit_graph(
{'mail','ts','o103','o109','o111','b1','b2','b3','b4','c1','c2','c3',
'o125','o123','o119','r123','storage'},
[ Arc('ts','mail',6), Arc('mail','ts',6),
Arc('o103','ts',8), Arc('ts','o103',8),
Arc('o103','b3',4),
Arc('o103','o109',12), Arc('o109','o103',12),
Arc('o109','o119',16), Arc('o119','o109',16),
Arc('o109','o111',4), Arc('o111','o109',4),
Arc('b1','c2',3),
Arc('b1','b2',6), Arc('b2','b1',6),
Arc('b2','b4',3), Arc('b4','b2',3),
Arc('b3','b1',4), Arc('b1','b3',4),
Arc('b3','b4',7), Arc('b4','b3',7),
Arc('b4','o109',7),
Arc('c1','c3',8), Arc('c3','c1',8),
Arc('c2','c3',6), Arc('c3','c2',6),
Arc('c2','c1',4), Arc('c1','c2',4),
Arc('o123','o125',4), Arc('o125','o123',4),
Arc('o123','r123',4), Arc('r123','o123',4),
Arc('o119','o123',9), Arc('o123','o119',9),
Arc('o119','storage',7), Arc('storage','o119',7)],
start = 'o103',
goals = {'r123'},
hmap = {
'mail' : 26,
'ts' : 23,
'o103' : 21,
'o109' : 24,
'o111' : 27,
'o119' : 11,
'o123' : 4,
'o125' : 6,
'r123' : 0,
'b1' : 13,
'b2' : 15,
'b3' : 17,
'b4' : 18,
'c1' : 6,
'c2' : 10,
'c3' : 12,
'storage' : 12})

##3.2 Generic Searcher and Variants

To run the search demos, in folder **“aipython”**, load
**“searchGeneric.py”** , using e.g., ipython -i searchGeneric.py,
and copy and paste the example queries at the bottom of that file. This
requires Python 3.

###3.2.1 Searcher

A Searcher for a problem can be asked repeatedly for the next path. To solve a
problem, we can construct a Searcher object for the problem and then repeatedly
ask for the next path using search. If there are no more paths, None is returned.

###searchGeneric.py

In [None]:
# from display import Displayable, visualize
class Searcher(Displayable):
  """returns a searcher for a problem.
  Paths can be found by repeatedly calling search().
  This does depth-first search unless overridden
  """
  def __init__(self, problem):
    """creates a searcher from a problem
    """
    self.problem = problem
    self.initialize_frontier()
    self.num_expanded = 0
    self.add_to_frontier(Path(problem.start_node()))
    super().__init__()
  def initialize_frontier(self):
    self.frontier = []
  def empty_frontier(self):
    return self.frontier == []
  def add_to_frontier(self,path):
    self.frontier.append(path)
  @visualize
  def search(self):
    """returns (next) path from the problem's start node
    to a goal node.

    3. Searching for Solutions
    Returns None if no path exists.
    """
    while not self.empty_frontier():

      path = self.frontier.pop()
      self.display(2, "Expanding:",path,"(cost:",path.cost,")")
      self.num_expanded += 1
      if self.problem.is_goal(path.end()): # solution found
        self.display(1, self.num_expanded, "paths have been expanded and",
        len(self.frontier), "paths remain in the frontier")
        self.solution = path # store the solution found
        return path
      else:
        neighs = self.problem.neighbors(path.end())
        self.display(3,"Neighbors are", neighs)
        for arc in reversed(list(neighs)):
          self.add_to_frontier(Path(path,arc))
          self.display(3,"Frontier:",self.frontier)
          self.display(1,"No (more) solutions. Total of",
          self.num_expanded,"paths expanded.")


Note that this reverses the neigbours so that it implements depth-first search in
an intutive manner (expanding the first neighbor first), and list is needed if the
neighboure are generated. Reversing the neighbours might not be required for
other methods. The calls to reversed and list can be removed, and the algothihm
still implements depth-fist search.

**Exercise 3.1** When it returns a path, the algorithm can be used to find another
path by calling search() again. However, it does not find other paths that go
through one goal node to another. Explain why, and change the code so that it
can find such paths when search() is called again.


###3.2.2 Frontier as a Priority Queue


In many of the search algorithms, such as * A∗ * and other best-first searchers, the
frontier is implemented as a priority queue. Here we use the Python’s built-in
priority queue implementations, heapq.

In [None]:
import heapq # part of the Python standard library
from searchProblem import Path

class FrontierPQ(object):
  """A frontier consists of a priority queue (heap), frontierpq, of
  (value, index, path) triples, where
  * value is the value we want to minimize (e.g., path cost + h).
  * index is a unique index for each element
  * path is the path on the queue
  Note that the priority queue always returns the smallest element.
 """
  def __init__(self):
    """constructs the frontier, initially an empty priority queue
    """
    self.frontier_index = 0 # the number of items ever added to the
    ontier
    self.frontierpq = [] # the frontier priority queue

  def empty(self):
    """is True if the priority queue is empty"""
    return self.frontierpq == []

  def add(self, path, value):
    """add a path to the priority queue
    value is the value to be minimized"""
    self.frontier_index += 1 # get a new unique index
    heapq.heappush(self.frontierpq,(value, -self.frontier_index, path))

  def pop(self):
    """returns and removes the path of the frontier with minimum value.
    """
    (_,_,path) = heapq.heappop(self.frontierpq)
    return path


The following methods are used for finding and printing information about
the frontier.

###searchGeneric.py

In [None]:
def count(self,val):
  """returns the number of elements of the frontier with value=val"""
  return sum(1 for e in self.frontierpq if e[0]==val)


def __repr__(self):
  """string representation of the frontier"""
  return str([(n,c,str(p)) for (n,c,p) in self.frontierpq])

def __len__(self):
  """length of the frontier"""
  return len(self.frontierpq)
  
def __iter__(self):
  """iterate through the paths in the frontier"""
  for (_,_,path) in self.frontierpq:
    yield path


##3.2.3 A∗ Search

For an **A∗ Search** the frontier is implemented using the FrontierPQ class.

In [None]:
class AStarSearcher(Searcher):
  """returns a searcher for a problem.
  Paths can be found by repeatedly calling search().
  """
  def __init__(self, problem):
    super().__init__(problem)

  def initialize_frontier(self):
    self.frontier = FrontierPQ()

  def empty_frontier(self):
    return self.frontier.empty()

  def add_to_frontier(self,path):
    """add path to the frontier with the appropriate cost"""
    value = path.cost+self.problem.heuristic(path.end())
    self.frontier.add(path, value)
    # Code should always be tested. The following provides a simple unit test,
    # using problem1 as the the default problem.
   
import searchProblem as searchProblem

def test(SearchClass, problem=searchProblem.problem1,
  solutions=[['g','d','b','c','a']] ):
  """Unit test for aipython searching algorithms.
  SearchClass is a class that takes a problemm and implements search()
  problem is a search problem
  solutions is a list of optimal solutions
  """
  print("Testing problem 1:")
  schr1 = SearchClass(problem)
  path1 = schr1.search()


  print("Path found:",path1)
  assert path1 is not None, "No path is found in problem1"
  assert list(path1.nodes()) in solutions, "Shortest path not found in problem1"
  print("Passed unit test")
if __name__ == "__main__":
  #test(Searcher)
  test(AStarSearcher)
  # example queries:
  # searcher1 = Searcher(searchProblem.acyclic_delivery_problem) # DFS
  # searcher1.search() # find first path
  # searcher1.search() # find next path
  # searcher2 = AStarSearcher(searchProblem.acyclic_delivery_problem) # A*
  # searcher2.search() # find first path
  # searcher2.search() # find next path
  # searcher3 = Searcher(searchProblem.cyclic_delivery_problem) # DFS
  # searcher3.search() # find first path with DFS. What do you expect to happen?
  # searcher4 = AStarSearcher(searchProblem.cyclic_delivery_problem) # A*
  # searcher4.search() # find first path

##3.2.4 Multiple Path Pruning


To run the multiple-path pruning demo, in folder **aipython**, load
**searchMPP.py** , using e.g., ipython -i searchMPP.py, and copy and
paste the example queries at the bottom of that file

The following implements A∗ with multiple-path pruning. It overrides search() in Searcher.


###searchMPP.py

In [None]:
class SearcherMPP(AStarSearcher):
  """returns a searcher for a problem.
  Paths can be found by repeatedly calling search().
  """
  def __init__(self, problem):
    super().__init__(problem)
    self.explored = set()

  @visualize
  def search(self):
    """returns next path from an element of problem's start nodes
    to a goal node.
    Returns None if no path exists.
    """
    while not self.empty_frontier():
      path = self.frontier.pop()
      if path.end() not in self.explored:
        self.display(2, "Expanding:",path,"(cost:",path.cost,")")
        self.explored.add(path.end())
        self.num_expanded += 1
      if self.problem.is_goal(path.end()):
        self.display(1, self.num_expanded, "paths have been expanded and",
        len(self.frontier), "paths remain in the frontier")
        self.solution = path # store the solution found
        return path
      else:
        neighs = self.problem.neighbors(path.end())
        self.display(3,"Neighbors are", neighs)
        for arc in neighs:
          self.add_to_frontier(Path(path,arc))
          self.display(3,"Frontier:",self.frontier)
          self.display(1,"No (more) solutions. Total of",
          self.num_expanded,"paths expanded.")

from searchGeneric import test
if __name__ == "__main__":
 test(SearcherMPP)

import searchProblem
 # searcherMPPcdp = SearcherMPP(searchProblem.cyclic_delivery_problem)
 # print(searcherMPPcdp.search()) # find first path

##3.3 Branch-and-bound Search


To run the demo, in folder “aipython”, load
**searchBranchAndBound.py**, and copy and paste the example queries
at the bottom of that file.


Depth-first search methods do not need an a priority queue, but can use
a list as a stack. In this implementation of branch-and-bound search, we call
search to find an optimal solution with cost less than bound. This uses depthfirst search to find a path to a goal that extends path with cost less than the
bound. Once a path to a goal has been found, that path is remembered as the
best path, the bound is reduced, and the search continues

###searchBranchAndBound.py

In [None]:
from searchProblem import Path
from searchGeneric import Searcher
from display import Displayable, visualize
class DF_branch_and_bound(Searcher):
    """returns a branch and bound searcher for a problem.
    17 An optimal path with cost less than bound can be found by calling
    search()
    """
    def __init__(self, problem, bound=float("inf")):
      """creates a searcher than can be used with search() to find an optimal path.
      bound gives the initial bound. By default this is infinite - meaning there
      is no initial pruning due to depth bound
      """
      super().__init__(problem)
      self.best_path = None
      self.bound = bound
      @visualize
      def search(self):
        """returns an optimal solution to a problem with cost less than bound.
        returns None if there is no solution with cost less than bound."""
        self.frontier = [Path(self.problem.start_node())]
        self.num_expanded = 0
        while self.frontier:
          path = self.frontier.pop()
          if path.cost+self.problem.heuristic(path.end()) < self.bound:
      # if path.end() not in path.initial_nodes(): # for cycle

            self.display(3,"Expanding:",path,"cost:",path.cost)
            self.num_expanded += 1
            if self.problem.is_goal(path.end()):
              self.best_path = path
              self.bound = path.cost
              self.display(2,"New best path:",path," cost:",path.cost)
            else:
              neighs = self.problem.neighbors(path.end())
              self.display(3,"Neighbors are", neighs)
              for arc in reversed(list(neighs)):
                self.add_to_frontier(Path(path, arc))
                self.display(1,"Number of paths expanded:",self.num_expanded,
                "(optimal" if self.best_path else "(no", "solution found)")
                self.solution = self.best_path
          
          return self.best_path


Note that this code used reversed in order to expand the neighbors of a node
in the left-to-right order one might expect. It does this because pop() removes
the rightmost element of the list. The call to list is there because reversed only
works on lists and tuples, but the neighbours can be generated.

In [None]:
from searchGeneric import test
if __name__ == "__main__":
  test(DF_branch_and_bound)
  
# Example queries:
import searchProblem
# searcherb1 = DF_branch_and_bound(searchProblem.acyclic_delivery_problem)
# print(searcherb1.search()) # find optimal path
# searcherb2 = DF_branch_and_bound(searchProblem.cyclic_delivery_problem, bound=100)
# print(searcherb2.search()) # find optimal path

###searchTest.py 

In [None]:
from searchGeneric import Searcher, AStarSearcher
from searchBranchAndBound import DF_branch_and_bound
from searchMPP import SearcherMPP
DF_branch_and_bound.max_display_level = 1
Searcher.max_display_level = 1
def run(problem,name):

    print("\n\n*******",name)
    print("\nA*:")
    asearcher = AStarSearcher(problem)
    print("Path found:",asearcher.search()," cost=",asearcher.solution.cost)
    print("there are",asearcher.frontier.count(asearcher.solution.cost),"elements remaining on the queue with f-value=",asearcher.solution.cost)
    print("\nA* with MPP:"),
    msearcher = SearcherMPP(problem)
    print("Path found:",msearcher.search()," cost=",msearcher.solution.cost)
    print("there are",msearcher.frontier.count(msearcher.solution.cost),"elements remaining on the queue with f-value=",msearcher.solution.cost)
    bound = asearcher.solution.cost+0.01
    print("\nBranch and bound (with too-good initial bound of", bound,")")
    tbb = DF_branch_and_bound(problem,bound) # cheating!!!!
    print("Path found:",tbb.search()," cost=",tbb.solution.cost)
    print("Rerunning B&B")
    print("Path found:",tbb.search())
    bbound = asearcher.solution.cost*2+10
    print("\nBranch and bound (with not-very-good initial bound of", bbound, ")")
    tbb2 = DF_branch_and_bound(problem,bbound) # cheating!!!!
    print("Path found:",tbb2.search()," cost=",tbb2.solution.cost)
    print("Rerunning B&B")
    print("Path found:",tbb2.search())
    print("\nDepth-first search: (Use ˆC if it goes on forever)")
    tsearcher = Searcher(problem)
    print("Path found:",tsearcher.search()," cost=",tsearcher.solution.cost)
import searchProblem
from searchTest import run
if __name__ == "__main__":
  run(searchProblem.problem1,"Problem 1")
  # run(searchProblem.acyclic_delivery_problem,"Acyclic Delivery")
  # run(searchProblem.cyclic_delivery_problem,"Cyclic Delivery")

  # also test some graphs with cycles, and some with multiple least-cost paths

#Chapter-4 Supervised Machine Learning

##7.1 Representations of Data and Predictions

###learnProblem.py

In [None]:
import math, random, statistics
import csv
from display import Displayable
from utilities import argmax

boolean = [False, True]


When creating a data set, we partition the data into a training set (train) and
a test set (test). The target feature is the feature that we are making a prediction
of. A dataset ds has the following attributes

* ds.train a list of the training examples
* ds.test a list of the test examples
* ds.target_index the index of the target
* ds.target the feature corresponding to the target (a function as described
above)
* ds.input_features a list of the input features

In [None]:
class Data_set(Displayable):
 """ A data set consists of a list of training data and a list of test data.
 """

 def __init__(self, train, test=None, prob_test=0.20, target_index=0,
    header=None, target_type= None, seed=None): #12345):
    """A dataset for learning.
    train is a list of tuples representing the training examples
    test is the list of tuples representing the test examples
    if test is None, a test set is created by selecting each
    example with probability prob_test
    target_index is the index of the target.
    If negative, it counts from right.
    If target_index is larger than the number of properties,
    there is no target (for unsupervised learning)
    header is a list of names for the features
    target_type is either None for automatic detection of target type
    or one of "numerical", "boolean", "cartegorical"
    seed is for random number; None gives a different test set each time
    """
    if seed: # given seed makes partition consistent from run-to-run
      random.seed(seed)
    if test is None:
      train,test = partition_data(train, prob_test)
    self.train = train
    self.test = test

    self.display(1,"Training set has",len(train),"examples. Number of columns: ",{len(e) for e in train})
    self.display(1,"Test set has",len(test),"examples. Number of columns: ",{len(e) for e in test})
    self.prob_test = prob_test
    self.num_properties = len(self.train[0])
    if target_index < 0: #allows for -1, -2, etc.
      self.target_index = self.num_properties + target_index
    else:
      self.target_index = target_index
      self.header = header
      self.domains = [set() for i in range(self.num_properties)]
    for example in self.train:
      for ind,val in enumerate(example):
        self.domains[ind].add(val)
    self.conditions_cache = {} # cache for computed conditions
    self.create_features()
    if target_type:
      self.target.ftype = target_type
      self.display(1,"There are",len(self.input_features),"input features")

 def __str__(self):
  if self.train and len(self.train)>0:
    return ("Data: "+str(len(self.train))+" training examples, " +str(len(self.test))+" test examples, "+str(len(self.train[0]))+" features.")
  else:
    return ("Data: "+str(len(self.train))+" training examples, "+str(len(self.test))+" test examples.")

def create_features(self):
    """create the set of features
    """
    self.target = None
    self.input_features = []
    for i in range(self.num_properties):
      def feat(e,index=i):
        return e[index]
        if self.header:
          feat.__doc__ = self.header[i]
        else:
          feat.__doc__ = "e["+str(i)+"]"
          feat.frange = list(self.domains[i])
          feat.ftype = self.infer_type(feat.frange)
        if i == self.target_index:
            self.target = feat
        else:
            self.input_features.append(feat)

def infer_type(self,domain):
  """Infers the type of a feature with domain
  """
  if all(v in {True,False} for v in domain):
    return "boolean"
  if all(isinstance(v,(float,int)) for v in domain):
      return "numeric"
  else:
      return "categoric"
      

###7.1.1 Creating Boolean Conditions from Features

Some of the algorithms require Boolean input features or features with range
{0, 1}. In order to be able to use these algorithms on datasets that allow for
arbitrary domains of input variables, we construct Boolean conditions from
the attributes.

There are 3 cases:
*  When the range only has two values, we designate one to be the “true”
value.
* When the values are all numeric, we assume they are ordered (as opposed
to just being some classes that happen to be labelled with numbers) and
construct Boolean features for splits of the data. That is, the feature is
e[ind] < cut for some value cut. We choose a number of cut values, up to
a maximum number of cuts, given by max num cuts.
* When the values are not all numeric, we create an indicator function for
each value. An indicator function for a value returns true when that value
is given and false otherwise. Note that we can’t create an indicator function for values that appear in the test set but not in the training set because we haven’t seen the test set. For the examples in the test set with a
value that doesn’t appear in the training set for that feature, the indicator
functions all return false.


In [None]:
def conditions(self, max_num_cuts=8, categorical_only = False):
  """returns a set of boolean conditions from the input features
  max_num_cuts is the maximum number of cute for numerical features
  categorical_only is true if only categorical features are made
  binary
  """
  if (max_num_cuts, categorical_only) in self.conditions_cache:
    return self.conditions_cache[(max_num_cuts, categorical_only)]
    conds = []
    for ind,frange in enumerate(self.domains):
      if ind != self.target_index and len(frange)>1:
        if len(frange) == 2:
          # two values, the feature is equality to one of them.
          true_val = list(frange)[1] # choose one as true
          def feat(e, i=ind, tv=true_val):
            return e[i]==tv

          if self.header:
            feat.__doc__ = f"{self.header[ind]}=={true_val}"
          else:
            feat.__doc__ = f"e[{ind}]=={true_val}"
            feat.frange = boolean
            feat.ftype = "boolean"
            conds.append(feat)
        elif all(isinstance(val,(int,float)) for val in frange):
            if categorical_only: # numerical, don't make cuts
              def feat(e, i=ind):
                return e[i]
              feat.__doc__ = f"e[{ind}]"
              
              conds.append(feat)
      else:
        # all numeric, create cuts of the data
        sorted_frange = sorted(frange)
        num_cuts = min(max_num_cuts,len(frange))
        cut_positions = [len(frange)*i//num_cuts for i in
        range(1,num_cuts)]
        for cut in cut_positions:
          cutat = sorted_frange[cut]
          def feat(e, ind_=ind, cutat=cutat):
            return e[ind_] < cutat

          if self.header:
            feat.__doc__ = self.header[ind]+"<"+str(cutat)
          else:
            feat.__doc__ = "e["+str(ind)+"]<"+str(cutat)
            feat.frange = boolean
            feat.ftype = "boolean"
            conds.append(feat)
    else:
    # create an indicator function for every value
      for val in frange:
        def feat(e, ind_=ind, val_=val):
          return e[ind_] == val_
        if self.header:
          feat.__doc__ = self.header[ind]+"=="+str(val)
        else:
          feat.__doc__= "e["+str(ind)+"]=="+str(val)
          feat.frange = boolean
          feat.ftype = "boolean"
          conds.append(feat)
    self.conditions_cache[(max_num_cuts, categorical_only)] = conds
    return conds

##7.1.2 Evaluating Predictions

In [None]:
def evaluate_dataset(self, data, predictor, error_measure):
 """Evaluates predictor on data according to the error_measure
 predictor is a function that takes an example and returns a
 prediction for the target features.
 error_measure(prediction,actual) -> non-negative real
 """
 if data:
  try:
    value = statistics.mean(error_measure(predictor(e),self.target(e))
    for e in data)
  except ValueError: # if error_measure gives an error
    return float("inf") # infinity
  return value
 else:
  return math.nan # not a number


In [None]:
class Evaluate(object):
 """A container for the evaluation measures"""

 def squared_loss(prediction, actual):
  "squared loss "
  if isinstance(prediction, (list,dict)):
    return (1-prediction[actual])**2 # the correct value is 1
  else:
    return (prediction-actual)**2

 def absolute_loss(prediction, actual):
  "absolute loss "
  if isinstance(prediction, (list,dict)):
    return abs(1-prediction[actual]) # the correct value is 1
  else:
    return abs(prediction-actual)

 def log_loss(prediction, actual):
  "log loss (bits)"
  try:
    if isinstance(prediction, (list,dict)):
      return -math.log2(prediction[actual])
    else:
      return -math.log2(prediction) if actual==1 else-math.log2(1-prediction)
  except ValueError:
    return float("inf") # infinity

 def accuracy(prediction, actual):
  "accuracy "
  if isinstance(prediction, dict):
    prev_val = prediction[actual]
  return 1 if all(prev_val >= v for v in prediction.values()) else 0
  if isinstance(prediction, list):
    prev_val = prediction[actual]
    return 1 if all(prev_val >= v for v in prediction) else 0
  else:
    return 1 if abs(actual-prediction) <= 0.5 else 0

 all_criteria = [accuracy, absolute_loss, squared_loss, log_loss]

##7.1.3 Creating Test and Training Sets

The following method partitions the data into a training set and a test set. Note
that this does not guarantee that the test set will contain exactly a proportion of the data equal to prob test.
[An alternative is to use random.sample() which can guarantee that the test
set will contain exactly a particular proportion of the data. However this would
require knowing how many elements are in the data set, which we may not
know, as data may just be a generator of the data (e.g., when reading the data
from a file).]


In [None]:
def partition_data(data, prob_test=0.30):
  """partitions the data into a training set and a test set, where
  prob_test is the probability of each example being in the test set.
  """
  train = []
  test = []
  for example in data:
    if random.random() < prob_test:
      test.append(example)
    else:
      train.append(example)
  return train, test


##7.1.4 Importing Data From File

In [None]:
class Data_from_file(Data_set):
 def __init__(self, file_name, separator=',', num_train=None,prob_test=0.3,has_header=False, target_index=0, boolean_features=True,
              categorical=[], target_type= None, include_only=None,seed=None): #seed=12345):
              """create a dataset from a file
                  separator is the character that separates the attributes
                  num_train is a number specifying the first num_train tuples are training, or None
                  prob_test is the probability an example should in the test set (if num_train is None)
                  has_header is True if the first line of file is a header
                  target_index specifies which feature is the target
                  boolean_features specifies whether we want to create Boolean features
                  (if False, it uses the original features).
                  categorical is a set (or list) of features that should be treated as categorical
                  target_type is either None for automatic detection of target type
                  or one of "numerical", "boolean", "cartegorical"
                  include_only is a list or set of indexes of columns to include
                  """
              self.boolean_features = boolean_features
              with open(file_name,'r',newline='') as csvfile:
                self.display(1,"Loading",file_name)
                # data_all = csv.reader(csvfile,delimiter=separator) # for more complicated CSV files
                data_all = (line.strip().split(separator) for line in csvfile)
                if include_only is not None:
                  data_all = ([v for (i,v) in enumerate(line) if i in include_only]
                for line in data_all)
                  if has_header:
                    header = next(data_all)
                  else:
                    header = None
                    data_tuples = (interpret_elements(d) for d in data_all if len(d)>1)
                    if num_train is not None:
                    # training set is divided into training then text examples
                    # the file is only read once, and the data is placed in appropriate list
                      train = []
                      for i in range(num_train): # will give an error if insufficient examples
                          train.append(next(data_tuples))
                          test = list(data_tuples)
                          Data_set.__init__(self,train, test=test,target_index=target_index,header=header)
                    else: # randomly assign training and test examples
                      Data_set.__init__(self,data_tuples, test=None,prob_test=prob_test,
                                            target_index=target_index, header=header,seed=seed, target_type=target_type)


The following class is used for datasets where the training and test are in different files

In [None]:
class Data_from_files(Data_set):
 def __init__(self, train_file_name, test_file_name, separator=',',
 has_header=False, target_index=0, boolean_features=True,
 categorical=[], target_type= None, include_only=None):
    """create a dataset from separate training and file
    separator is the character that separates the attributes
    num_train is a number specifying the first num_train tuples are training, or None
    prob_test is the probability an example should in the test set (if num_train is None)
    has_header is True if the first line of file is a header
    target_index specifies which feature is the target
    boolean_features specifies whether we want to create Boolean features
    (if False, it uses the original features).
    categorical is a set (or list) of features that should be treated as categorical
    target_type is either None for automatic detection of target type
    or one of "numerical", "boolean", "cartegorical"
    include_only is a list or set of indexes of columns to include
    """
    self.boolean_features = boolean_features
    with open(train_file_name,'r',newline='') as train_file:
      with open(test_file_name,'r',newline='') as test_file:
      # data_all = csv.reader(csvfile,delimiter=separator) # for more complicated CSV files
        train_data = (line.strip().split(separator) for line in train_file)
        test_data = (line.strip().split(separator) for line in test_file)
        if include_only is not None:
          train_data = ([v for (i,v) in enumerate(line) if i in include_only] for line in train_data)
          test_data = ([v for (i,v) in enumerate(line) if i in include_only] for line in test_data)
        if has_header: # this assumes the training file has a header and the test file doesn't
          header = next(train_data)
        else:
          header = None
          train_tuples = [interpret_elements(d) for d in train_data if len(d)>1]
          test_tuples = [interpret_elements(d) for d in test_data if len(d)>1]
          Data_set.__init__(self,train_tuples, test_tuples,target_index=target_index, header=header)
          

When reading from a file all of the values are strings. This next method
tries to convert each values into a number (an int or a float) or Boolean, if it is
possible

In [None]:
def interpret_elements(str_list):
  """make the elements of string list str_list numerical if possible.
  Otherwise remove initial and trailing spaces.
  """
  res = []
  for e in str_list:
    try:
      res.append(int(e))
    except ValueError:
      try:
        res.append(float(e))
      except ValueError:
        se = e.strip()
        if se in ["True","true","TRUE"]:
          res.append[True]
        if se in ["False","false","FALSE"]:
          res.append[False]
        else:
          res.append(e.strip())
  return res

###7.1.5 Augmented Features


Sometimes we want to augment the features with new features computed from
the old features (eg. the product of features). Here we allow the creation of
a new dataset from an old dataset but with new features. Note that special
cases of these are **kernels** mapping the original feature space into a new space,
which allow a neat way to do learning in the augmented space (the “kernel
trick”). This is beyond the scope of AIPython; those interested should read
about support vector machines.

A feature is a function of examples. A unary feature constructor takes a feature and returns a new feature. A binary feature combiner takes two features
and returns a new feature

In [None]:
class Data_set_augmented(Data_set):
  def __init__(self, dataset, unary_functions=[], binary_functions=[],include_orig=True):
      """creates a dataset like dataset but with new features
      unary_function is a list of unary feature constructors
      binary_functions is a list of binary feature combiners.
      include_orig specifies whether the original features should be
      included
      """
      self.orig_dataset = dataset
      self.unary_functions = unary_functions
      self.binary_functions = binary_functions
      self.include_orig = include_orig
      self.target = dataset.target
      Data_set.__init__(self,dataset.train, test=dataset.test,
      target_index = dataset.target_index)

  def create_features(self):
    if self.include_orig:
      self.input_features = self.orig_dataset.input_features.copy()
    else:
      self.input_features = []
    for u in self.unary_functions:
      for f in self.orig_dataset.input_features:
        self.input_features.append(u(f))
        for b in self.binary_functions:
          for f1 in self.orig_dataset.input_features:
            for f2 in self.orig_dataset.input_features:
              if f1 != f2:
                self.input_features.append(b(f1,f2))

def square(f):
  """a unary feature constructor to construct the square of a feature
  """
  def sq(e):
    return f(e)**2
  sq.__doc__ = f.__doc__+"**2"
  return sq

def power_feat(n):
  """given n returns a unary feature constructor to construct the nth power of a feature.
  e.g., power_feat(2) is the same as square, defined above
  """
  def fn(f,n=n):
    def pow(e,n=n):
      return f(e)**n
    pow.__doc__ = f.__doc__+"**"+str(n)
    return pow
  return fn

def prod_feat(f1,f2):
  """a new feature that is the product of features f1 and f2
  """
  def feat(e):

    return f1(e)*f2(e)
  feat.__doc__ = f1.__doc__+"*"+f2.__doc__
  return feat
def eq_feat(f1,f2):
  """a new feature that is 1 if f1 and f2 give same value
  """
  def feat(e):
    return 1 if f1(e)==f2(e) else 0
  feat.__doc__ = f1.__doc__+"=="+f2.__doc__
  return feat
  
def neq_feat(f1,f2):
  """a new feature that is 1 if f1 and f2 give different values
  """
  def feat(e):
    return 1 if f1(e)!=f2(e) else 0
  feat.__doc__ = f1.__doc__+"!="+f2.__doc__
  return feat


In [None]:
 # from learnProblem import Data_set_augmented,prod_feat
# data = Data_from_file('data/holiday.csv', num_train=19, target_index=-1)
 # data = Data_from_file('data/iris.data', prob_test=1/3, target_index=-1)
 ## Data = Data_from_file('data/SPECT.csv', prob_test=0.5, target_index=0)
 # dataplus = Data_set_augmented(data,[],[prod_feat])
 # dataplus = Data_set_augmented(data,[],[prod_feat,neq_feat])


##7.2 Generic Learner Interface


A learnertakes a dataset (and possibly other arguments specific to the method).
To get it to learn, we call the learn() method. This implements Displayable so
that we can display traces at multiple levels of detail (and perhaps with a GUI).

In [None]:
from display import Displayable
class Learner(Displayable):
  def __init__(self, dataset):
    raise NotImplementedError("Learner.__init__") # abstract method



  def learn(self):
   """returns a predictor, a function from a tuple to a value for the
  target feature
   """
  raise NotImplementedError("learn") # abstract method


##7.3 Learning With No Input Features

If we make the same prediction for each example, what prediction should we
make? This can be used as a naive baseline; if a more sophisticated method
does not do better than this, it is not useful. This also provides the base case
for some methods, such as decision-tree learning.

###learnNoInputs.py

In [None]:
from learnProblem import Evaluate
import math, random, collections, statistics
import utilities # argmax for (element,value) pairs

class Predict(object):
 """The class of prediction methods for a list of values.
 Please make the doc strings the same length, because they are used in tables.
 Note that we don't need self argument, as we are creating Predict objects,
 To use call Predict.laplace(data) etc."""

 ### The following return a distribution over values (for classification)
 def empirical(data, domain=[0,1], icount=0):
    "empirical dist "
    # returns a distribution over values
    counts = {v:icount for v in domain}
    for e in data:
     counts[e] += 1
    s = sum(counts.values())
    return {k:v/s for (k,v) in counts.items()}

 def bounded_empirical(data, domain=[0,1], bound=0.01):
   "bounded empirical"
   return {k:min(max(v,bound),1-bound) for (k,v) in Predict.empirical(data, domain).items()}

 def laplace(data, domain=[0,1]):
   "Laplace " # for categorical data
   return Predict.empirical(data, domain, icount=1)

 def cmode(data, domain=[0,1]):
    "mode " # for categorical data
    md = statistics.mode(data)
    return {v: 1 if v==md else 0 for v in domain}

 def cmedian(data, domain=[0,1]):
   "median " # for categorical data
   md = statistics.median_low(data) # always return one of the values
   return {v: 1 if v==md else 0 for v in domain}
 ### The following return a single prediction (for regression). domain is ignored.

 def mean(data, domain=[0,1]):
  "mean "
  # returns a real number
  return statistics.mean(data)

  def rmean(data, domain=[0,1], mean0=0, pseudo_count=1):
    "regularized mean"
    # returns a real number.
    # mean0 is the mean to be used for 0 data points
    # With mean0=0.5, pseudo_count=2, same as laplace for [0,1] data
    # this works for enumerations as well as lists
    sum = mean0 * pseudo_count
    count = pseudo_count
    for e in data:
      sum += e
      count += 1
  return sum/count

  def mode(data, domain=[0,1]):
    "mode "
    return statistics.mode(data)

  def median(data, domain=[0,1]):
    "median "
    return statistics.median(data)

  all = [empirical, mean, rmean, bounded_empirical, laplace, cmode, mode, median,cmedian]

  # The following suggests appropriate predictions as a function of the target type
  select = {
  "boolean": [empirical, bounded_empirical, laplace, cmode, cmedian],
  "categorical": [empirical, bounded_empirical, laplace, cmode, cmedian],
  "numeric": [mean, rmean, mode, median]}
  

##7.3.1 Evaluation

To evaluate a point prediction, we first generate some data from a simple (Bernoulli)
distribution, where there are two possible values, 0 and 1 for the target feature.
Given prob, a number in the range [0, 1], this generate some training and test
data where prob is the probability of each example being 1. To generate a 1 with
probability prob, we generate a random number in range [0,1] and return 1 if
that number is less than prob. A prediction is computed by applying the predictor to the training data, which is evaluated on the test set. This is repeated
num_samples times

In [None]:
def test_no_inputs(error_measures = Evaluate.all_criteria,num_samples=10000, test_size=10 ):
 for train_size in [1,2,3,4,5,10,20,100,1000]:
   results = {predictor: {error_measure: 0 for error_measure in error_measures} for predictor in Predict.all}
 for sample in range(num_samples):
    prob = random.random()
    training = [1 if random.random()<prob else 0 for i in range(train_size)]
    test = [1 if random.random()<prob else 0 for i in range(test_size)]
    for predictor in Predict.all:
      prediction = predictor(training)
      for error_measure in error_measures:
        results[predictor][error_measure] += sum(error_measure(prediction,actual) for actual in test)/test_size
 print(f"For training size {train_size}:")
 print(" Predictor\t","\t".join(error_measure.__doc__ for
 error_measure in error_measures),sep="\t")
 for predictor in Predict.all:
    print(f" {predictor.__doc__}","\t".join("{:.7f}".format(results[predictor][error_measure]/num_samples) for error_measure in error_measures),sep="\t")

if __name__ == "__main__":
  test_no_input()

##7.4 Decision Tree Learning

The decision tree algorithm does binary splits, and assumes that all input
features are binary functions of the examples. It stops splitting if there are
no input features, the number of examples is less than a specified number of
examples or all of the examples agree on the target feature.

###learnDT.py

In [None]:
from learnProblem import Learner, Evaluate
from learnNoInputs import Predict
import math

class DT_learner(Learner):

  def __init__(self,
                dataset,
                split_to_optimize=Evaluate.log_loss, # to minimize for at each split
                leaf_prediction=Predict.empirical, # what to use for value at leaves
                train=None, # used for cross validation
                max_num_cuts=8, # maximum number of conditions to split a numerical feature into
                gamma=1e-7 , # minimum improvement needed to expand a node
                min_child_weight=10):
    
      self.dataset = dataset
      self.target = dataset.target
      self.split_to_optimize = split_to_optimize
      self.leaf_prediction = leaf_prediction
      self.max_num_cuts = max_num_cuts
      self.gamma = gamma
      self.min_child_weight = min_child_weight
      if train is None:
        self.train = self.dataset.train
      else:
        self.train = train

  def learn(self, max_num_cuts=8):
      """learn a decision tree"""
      return self.learn_tree(self.dataset.conditions(self.max_num_cuts),self.train)


The main recursive algorithm, takes in a set of input features and a set of
training data. It first decides whether to split. If it doesn’t split, it makes a point
prediction, ignoring the input features.


##It splits unless:

there are no more input features
* there are fewer examples than min number examples,
* all the examples agree on the value of the target, or
* the best split makes all examples in the same partition.

In [None]:
def learn_tree(self, conditions, data_subset):

  """returns a decision tree
  conditions is a set of possible conditions
  data_subset is a subset of the data used to build this (sub)tree

  where a decision tree is a function that takes an example and
  makes a prediction on the target feature
  """
  self.display(2,f"learn_tree with {len(conditions)} features and {len(data_subset)} examples")
  split, partn = self.select_split(conditions, data_subset)
  if split is None: # no split; return a point prediction
     prediction = self.leaf_value(data_subset, self.target.frange)
     self.display(2,f"leaf prediction for {len(data_subset)} examples is {prediction}")

     def leaf_fun(e):
      
        return prediction
     leaf_fun.__doc__ = str(prediction)
     leaf_fun.num_leaves = 1
     return leaf_fun

  else: # a split succeeded
    false_examples, true_examples = partn
    rem_features = [fe for fe in conditions if fe != split]
    self.display(2,"Splitting on",split.__doc__,"with examples split",
    len(true_examples),":",len(false_examples))
    true_tree = self.learn_tree(rem_features,true_examples)
    false_tree = self.learn_tree(rem_features,false_examples)

  def fun(e):
    if split(e):
      return true_tree(e)
    else:
      return false_tree(e)
  #fun = lambda e: true_tree(e) if split(e) else false_tree(e)
  fun.__doc__ = ("if "+split.__doc__+" then ("+true_tree.__doc__+") else ("+false_tree.__doc__+")")
  fun.num_leaves = true_tree.num_leaves + false_tree.num_leaves
  return fun


In [None]:
def leaf_value(self, egs, domain):
  return self.leaf_prediction((self.target(e) for e in egs), domain)

  def select_split(self, conditions, data_subset):
    """finds best feature to split on.

    conditions is a non-empty list of features.
    returns feature, partition
    where feature is an input feature with the smallest error as
    judged by split_to_optimize or
    feature==None if there are no splits that improve the error
    partition is a pair (false_examples, true_examples) if feature is not None
    """
    best_feat = None # best feature
    # best_error = float("inf") # infinity - more than any error
    best_error = self.sum_losses(data_subset) - self.gamma
    self.display(3," no split has error=",best_error,"with",len(conditions),"conditions")
    best_partition = None
    for feat in conditions:
      false_examples, true_examples = partition(data_subset,feat)

      if min(len(false_examples),len(true_examples))>=self.min_child_weight:
        err = (self.sum_losses(false_examples) + self.sum_losses(true_examples))
        self.display(3," split on",feat.__doc__,"has error=",err, "splits into",len(true_examples),":",len(false_examples),"gamma=",self.gamma)
      if err < best_error:
        best_feat = feat
        best_error=err
        best_partition = false_examples, true_examples
    self.display(2,"best split is on",best_feat.__doc__, "with err=",best_error)

    return best_feat, best_partition

  def sum_losses(self, data_subset):
      """returns sum of losses for dataset (with no more splits)
      There a single prediction for all leaves using leaf_prediction
      It is evaluated using split_to_optimize
      """
      prediction = self.leaf_value(data_subset, self.target.frange)
      error = sum(self.split_to_optimize(prediction, self.target(e)) for e in data_subset)
      return error

def partition(data_subset,feature):
  """partitions the data_subset by the feature"""
  true_examples = []
  false_examples = []
  for example in data_subset:
    if feature(example):
      true_examples.append(example)
    else:
      false_examples.append(example)
  return false_examples, true_examples


###results.py

In [None]:
from learnProblem import Data_set, Data_from_file

def testDT(data, print_tree=True, selections = None, **tree_args):
  """Prints errors and the trees for various evaluation criteria and ways to select leaves.
  """
  if selections == None: # use selections suitable for target type
    selections = Predict.select[data.target.ftype]
  evaluation_criteria = Evaluate.all_criteria
  print("Split Choice","Leaf Choice\t","#leaves",'\t'.join(ecrit.__doc__for ecrit in evaluation_criteria),sep="\t")
  for crit in evaluation_criteria:
    for leaf in selections:
      tree = DT_learner(data, split_to_optimize=crit, leaf_prediction=leaf, **tree_args).learn()
      print(crit.__doc__, leaf.__doc__, tree.num_leaves, "\t".join("{:.7f}".format(data.evaluate_dataset(data.test, tree, ecrit)) for ecrit in evaluation_criteria),sep="\t")
      if print_tree:
        print(tree.__doc__)

 #DT_learner.max_display_level = 4
if __name__ == "__main__":
 # Choose one of the data files
 #data=Data_from_file('data/SPECT.csv', target_index=0); print("SPECT.csv")
 #data=Data_from_file('data/iris.data', target_index=-1); print("iris.data")
 data = Data_from_file('data/carbool.csv', target_index=-1, seed=123)
 #data = Data_from_file('data/mail_reading.csv', target_index=-1); print("mail_reading.csv")
 #data = Data_from_file('data/holiday.csv', num_train=19, target_index=-1); print("holiday.csv")

Note that different runs may provide different values as they split the training and test sets differently. So if you have a hypothesis about what works
better, make sure it is true for different runs.


##Cross Validation and Parameter Tuning


he prediction is overfitting is by cross validation. The code below implements
k-fold cross validation, which can be used to choose the value of parameters to best fit the training data. If we want to use parameter tuning to improve
predictions on a particular data set, we can only use the training data (and not
the test data) to tune the parameter.


In k-fold cross validation, we partition the training set into k approximately
equal-sized folds (each fold is an enumeration of examples). For each fold, we
train on the other examples, and determine the error of the prediction on that
fold. For example, if there are 10 folds, we train on 90% of the data, and then
test on remaining 10% of the data. We do this 10 times, so that each example
gets used as a test set once, and in the training set 9 times.


The code below creates one copy of the data, and multiple views of the data.
For each fold, fold enumerates the examples in the fold, and fold complement
enumerates the examples not in the fold.

##learnCrossValidation.py

In [None]:
from learnProblem import Data_set, Data_from_file, Evaluate
from learnNoInputs import Predict
from learnDT import DT_learner
import matplotlib.pyplot as plt
import random
class K_fold_dataset(object):
  def __init__(self, training_set, num_folds):
    self.data = training_set.train.copy()
    self.target = training_set.target
    self.input_features = training_set.input_features
    self.num_folds = num_folds
    self.conditions = training_set.conditions
    random.shuffle(self.data)
    self.fold_boundaries = [(len(self.data)*i)//num_folds for i in range(0,num_folds+1)]
  def fold(self, fold_num):
    for i in range(self.fold_boundaries[fold_num], self.fold_boundaries[fold_num+1]):
      yield self.data[i]
  def fold_complement(self, fold_num):
    for i in range(0,self.fold_boundaries[fold_num]):
      yield self.data[i]
    for i in range(self.fold_boundaries[fold_num+1],len(self.data)):
      yield self.data[i]


The validation error is the average error for each example, where we test on
each fold, and learn on the other folds

In [None]:
def validation_error(self, learner, error_measure, **other_params):
 error = 0
 try:


  for i in range(self.num_folds):
    predictor = learner(self, train=list(self.fold_complement(i)),**other_params).learn()
    error += sum( error_measure(predictor(e), self.target(e)) for e in self.fold(i))
 except ValueError:
   return float("inf") #infinity
 return error/len(self.data)

def plot_error(data, criterion=Evaluate.squared_loss,leaf_prediction=Predict.empirical,num_folds=5, maxx=None, xscale='linear'):
  """Plots the error on the validation set and the test set
  with respect to settings of the minimum number of examples.
  xscale should be 'log' or 'linear'
  """
  plt.ion()
  plt.xscale(xscale) # change between log and linear scale
  plt.xlabel("min_child_weight")
  plt.ylabel("average "+criterion.__doc__)
  folded_data = K_fold_dataset(data, num_folds)
  if maxx == None:
    maxx = len(data.train)//2+1
  verrors = [] # validation errors
  terrors = [] # test set errors
  for mcw in range(maxx):
    verrors.append(folded_data.validation_error(DT_learner,criterion,leaf_prediction=leaf_predi69, min_child_weight=mcw))
    tree = DT_learner(data, criterion, leaf_prediction=leaf_prediction, min_child_weight=mcw).learn()
    terrors.append(data.evaluate_dataset(data.test,tree,criterion))
  plt.plot(range(maxx), verrors, ls='-',color='k',
  label="validation for "+criterion.__doc__)
  plt.plot(range(maxx), terrors, ls='--',color='k',
  label="test set for "+criterion.__doc__)
  plt.legend()
  plt.draw()
# The following produces Figure 7.15 of Poole and Mackworth [2017]
# data = Data_from_file('data/SPECT.csv',target_index=0, seed=123)
# plot_error(data) # warning, may take a long time depending on the dataset

#also try:
# data = Data_from_file('data/mail_reading.csv', target_index=-1)
# data = Data_from_file('data/carbool.csv', target_index=-1, seed=123)
# plot_error(data, criterion=Evaluate.log_loss,leaf_prediction=Predict.laplace) # warning, may take a long time depending on the dataset


##7.6 Linear Regression and Classification


In [None]:
from learnProblem import Learner
import random, math

class Linear_learner(Learner):
 def __init__(self, dataset, train=None,
 learning_rate=0.1, max_init = 0.2,
 squashed=True):
  """Creates a gradient descent searcher for a linear classifier.
  The main learning is carried out by learn()

  dataset provides the target and the input features
  train provides a subset of the training data to use
  number_iterations is the default number of steps of gradient descent
  learning_rate is the gradient descent step size
  max_init is the maximum absolute value of the initial weights
  squashed specifies whether the output is a squashed linear function
  """
 self.dataset = dataset
 self.target = dataset.target
 if train==None:
  self.train = self.dataset.train
 else:
  self.train = train
  self.learning_rate = learning_rate
  self.squashed = squashed
  self.input_features = [one]+dataset.input_features # one is defined below
  self.weights = {feat:random.uniform(-max_init,max_init)
  for feat in self.input_features}

 def predictor(self,e):
  """returns the prediction of the learner on example e"""
 linpred = sum(w*f(e) for f,w in self.weights.items())
 if self.squashed:
   return sigmoid(linpred)
 else:
   return linpred

 def predictor_string(self, sig_dig=3):
  """returns the doc string for the current prediction function
  sig_dig is the number of significant digits in the numbers"""
  doc = "+".join(str(round(val,sig_dig))+"*"+feat.__doc__
  for feat,val in self.weights.items())
  if self.squashed:
    return "sigmoid("+ doc+")"
  else:
    return doc

def learn(self,num_iter=100):
 for it in range(num_iter):
  self.display(2,"prediction=",self.predictor_string())
  for e in self.train:
    predicted = self.predictor(e)
    error = self.target(e) - predicted
    update = self.learning_rate*error
    for feat in self.weights:
      self.weights[feat] += update*feat(e)
  return self.predictor


In [None]:
def softmax(xs,domain=None):
  """xs is a list of values, and
  domain is the domain (a list) or None if the list should be returned
  returns a distribution over the domain (a dict)
  """
  m = max(xs) # use of m prevents overflow (and all values underflowing)
  exps = [math.exp(x-m) for x in xs]
  s = sum(exps)
  if domain:
    return {d:v/s for (d,v) in zip(domain,exps)}
  else:
    return [v/s for v in exps]

def indicator(v, domain):
  return [1 if v==dv else 0 for dv in domain]


The following tests the learner on a data sets. Uncomment the other data
sets for different examples.


In [None]:
from learnProblem import Data_set, Data_from_file, Evaluate
from learnProblem import Evaluate
import matplotlib.pyplot as plt
def test(**args):
 data = Data_from_file('data/SPECT.csv', target_index=0)
 # data = Data_from_file('data/mail_reading.csv', target_index=-1)
 # data = Data_from_file('data/carbool.csv', target_index=-1)
 learner = Linear_learner(data,**args)
 learner.learn()
 print("function learned is", learner.predictor_string())
 for ecrit in Evaluate.all_criteria:
  test_error = data.evaluate_dataset(data.test, learner.predictor,
  ecrit)
  print(" Average", ecrit.__doc__, "is", test_error)


The following plots the errors on the training and test sets as a function of
the number of steps of gradient descent.

In [None]:
def plot_steps(learner=None,
 data = None,
 criterion=Evaluate.squared_loss,
 step=1,
 num_steps=1000,
 log_scale=True,
 legend_label=""):
 """
 plots the training and test error for a learner.
 data is the
 learner_class is the class of the learning algorithm
 criterion gives the evaluation criterion plotted on the y-axis
 step specifies how many steps are run for each point on the plot
 num_steps is the number of points to plot

 """
 if legend_label != "": legend_label+=" "
 plt.ion()
 plt.xlabel("step")
 plt.ylabel("Average "+criterion.__doc__)
 if log_scale:
    plt.xscale('log') #plt.semilogx() #Makes a log scale
 else:
    plt.xscale('linear')
 if data is None:
  data = Data_from_file('data/holiday.csv', num_train=19,target_index=-1)
 #data = Data_from_file('data/SPECT.csv', target_index=0)
 # data = Data_from_file('data/mail_reading.csv', target_index=-1)
 # data = Data_from_file('data/carbool.csv', target_index=-1)
 #random.seed(None) # reset seed
 if  learner is None:
    learner = Linear_learner(data)
 train_errors = []
 test_errors = []
 for i in range(1,num_steps+1,step):
    test_errors.append(data.evaluate_dataset(data.test,learner.predictor, criterion))
    train_errors.append(data.evaluate_dataset(data.train,
    learner.predictor, criterion))
    learner.display(2, "Train error:",train_errors[-1],"Test error:",test_errors[-1])
    learner.learn(num_iter=step)
 plt.plot(range(1,num_steps+1,step),train_errors,ls='-',label=legend_label+"training")
 plt.plot(range(1,num_steps+1,step),test_errors,ls='--',label=legend_label+"test")
 plt.legend()
 plt.draw()

learner.display(1, "Train error:",train_errors[-1],"Test error:",test_errors[-1])
if __name__ == "__main__":
  test()
# This generates the figure
# from learnProblem import Data_set_augmented,prod_feat
# data = Data_from_file('data/SPECT.csv', prob_test=0.5, target_index=0)
# dataplus = Data_set_augmented(data,[],[prod_feat])
# plot_steps(data=data,num_steps=1000)
# plot_steps(data=dataplus,num_steps=1000) # warning very slow


The following plots the prediction as a function of the function of the number of steps of gradient descent. We first define a version of range that allows
for real numbers (integers and floats).

In [None]:
def arange(start,stop,step):
 """returns enumeration of values in the range [start,stop) separated by step.
 like the built-in range(start,stop,step) but allows for integers and floats.
 Note that rounding errors are expected with real numbers. (or use numpy.arange)
 """
 while start<stop:
  yield start
  start += step

def plot_prediction(data,
 learner = None,
 minx = 0,
 maxx = 5,
 step_size = 0.01, # for plotting
 label = "function"):
 plt.ion()
 plt.xlabel("x")
 plt.ylabel("y")
 if learner is None:
    learner = Linear_learner(data, squashed=False)
 learner.learning_rate=0.001
 learner.learn(100)
 learner.learning_rate=0.0001
 learner.learn(1000)
 learner.learning_rate=0.00001
 learner.learn(10000)
 learner.display(1,"function learned is", learner.predictor_string(),"error=",data.evaluate_dataset(data.train, learner.predictor,Evaluate.squared_loss))
 plt.plot([e[0] for e in data.train],[e[-1] for e in data.train],"bo",label="data")
 plt.plot(list(arange(minx,maxx,step_size)),[learner.predictor([x])
 for x in arange(minx,maxx,step_size)],
 label=label)
 plt.legend()


###learnLinear.py

In [None]:
from learnProblem import Data_set_augmented, power_feat
def plot_polynomials(data, learner_class = Linear_learner,
                                          max_degree = 5,
                                          minx = 0,
                                          maxx = 5,
                                          num_iter = 100000,
                                          learning_rate = 0.0001,
                                          step_size = 0.01, # for plotting 
                     ):
 plt.ion()
 plt.xlabel("x")
 plt.ylabel("y")
 plt.plot([e[0] for e in data.train],[e[-1] for e in data.train],"ko",label="data")
 x_values = list(arange(minx,maxx,step_size))
 line_styles = ['-','--','-.',':']
 colors = ['0.5','k','k','k','k']
 for degree in range(max_degree): 
   data_aug = Data_set_augmented(data,[power_feat(n) for n in range(1,degree+1)],
 include_orig=False)
 learner = learner_class(data_aug,squashed=False)
 learner.learning_rate = learning_rate
 learner.learn(num_iter)

 learner.display(1,"For degree",degree, "function learned is", learner.predictor_string(), "error=",data.evaluate_dataset(data.train, learner.predictor, Evaluate.squared_loss))
 ls = line_styles[degree % len(line_styles)]
 col = colors[degree % len(colors)]

 plt.plot(x_values,[learner.predictor([x]) for x in x_values],linestyle=ls, color=col,
 label="degree="+str(degree))
 plt.legend(loc='upper left')
 plt.draw()

# Try:
# data0 = Data_from_file('data/simp_regr.csv', prob_test=0, boolean_features=False, target_index=-1)
# plot_prediction(data0)
# plot_polynomials(data0)
#datam = Data_from_file('data/mail_reading.csv', target_index=-1)
#plot_prediction(datam)


##7.6.1 Batched Stochastic Gradient Descent

This implements batched stochastic gradient descent. If the batch size is 1, it
can be simplified by not storing the differences in d, but applying them directly;
this would the be equivalent to the original code!


This overrides the learner_Linear learner. Note that the comparison with
regular gradient descent is unfair as the number of updates per step is not the
same. (How could it me made more fair?)


###learnLinearBSGD.py 

In [None]:
from learnLinear import Linear_learner
import random, math

class Linear_learner_bsgd(Linear_learner):
 def __init__(self, *args, batch_size=10, **kargs):
    Linear_learner.__init__(self, *args, **kargs)
    self.batch_size = batch_size

 def learn(self,num_iter=None):
    if num_iter is None:
      num_iter = self.number_iterations
    batch_size = min(self.batch_size, len(self.train))
    d = {feat:0 for feat in self.weights}
    for it in range(num_iter):
        self.display(2,"prediction=",self.predictor_string())
        for e in random.sample(self.train, batch_size):
            predicted = self.predictor(e)
            error = self.target(e) - predicted
            update = self.learning_rate*error
            for feat in self.weights:
              d[feat] += update*feat(e)
        for feat in self.weights:
          self.weights[feat] += d[feat]
          d[feat]=0
    return self.predictor

 # from learnLinear import plot_steps
 # from learnProblem import Data_from_file
 # data = Data_from_file('data/holiday.csv', target_index=-1)
 # learner = Linear_learner_bsgd(data) # plot_steps(learner = learner, data=data)

 # to plot polynomials with batching (compare to SGD)
 # from learnLinear import plot_polynomials
 # plot_polynomials(learner_class = Linear_learner_bsgd)

###7.7 Boosting

A Boosted dataset is created from a base dataset by subtracting the prediction of the offset function from each example. This does not save the new
dataset, but generates it as needed. The amount of space used is constant, independent on the size of the data set.


##learnBoosting.py

In [None]:
from learnProblem import Data_set, Learner, Evaluate
from learnNoInputs import Predict
from learnLinear import sigmoid
import statistics
import random

class Boosted_dataset(Data_set):
 def __init__(self, base_dataset, offset_fun, subsample=1.0):
    """new dataset which is like base_dataset,
    but offset_fun(e) is subtracted from the target of each example e
    """
    self.base_dataset = base_dataset
    self.offset_fun = offset_fun
    self.train = random.sample(base_dataset.train,int(subsample*len(base_dataset.train)))
    self.test = base_dataset.test
    #Data_set.__init__(self, base_dataset.train, base_dataset.test,
    # base_dataset.prob_test, base_dataset.target_index)

#  def create_features(self):
    """creates new features - called at end of Data_set.init()
    defines a new target
    """
    self.input_features = self.base_dataset.input_features
    def newout(e):
      return self.base_dataset.target(e) - self.offset_fun(e)
    newout.frange = self.base_dataset.target.frange
    newout.ftype = self.infer_type(newout.frange)
    self.target = newout

 def conditions(self, *args, colsample_bytree=0.5, **nargs):
    conds = self.base_dataset.conditions(*args, **nargs)
    return random.sample(conds, int(colsample_bytree*len(conds)))

class Boosting_learner(Learner):
  def __init__(self, dataset, base_learner_class, subsample=0.8):
    self.dataset = dataset
    self.base_learner_class = base_learner_class
    self.subsample = subsample
    mean = sum(self.dataset.target(e) for e in self.dataset.train)/len(self.dataset.train)
    self.predictor = lambda e:mean # function that returns mean for each example
    self.predictor.__doc__ = "lambda e:"+str(mean)
    self.offsets = [self.predictor] # list of base learners
    self.predictors = [self.predictor] # list of predictors
    self.errors = [data.evaluate_dataset(data.test, self.predictor,Evaluate.squared_loss)]
    self.display(1,"Predict mean test set mean squared loss=",
    self.errors[0] )


  def learn(self, num_ensembles=10):
    """adds num_ensemble learners to the ensemble.
    returns a new predictor.
    """
    for i in range(num_ensembles):
        train_subset = Boosted_dataset(self.dataset, self.predictor,subsample=self.subsample)
        learner = self.base_learner_class(train_subset)
        new_offset = learner.learn()
        self.offsets.append(new_offset)
        def new_pred(e, old_pred=self.predictor, off=new_offset):
          return old_pred(e)+off(e)
        self.predictor = new_pred
        self.predictors.append(new_pred)
        self.errors.append(data.evaluate_dataset(data.test,
        self.predictor, Evaluate.squared_loss))
        self.display(1,f"Iteration {len(self.offsets)-1},treesize = {new_offset.num_leaves}. mean squared loss={self.errors[-1]}")
    return self.predictor


###Testing

In [None]:
from learnDT import DT_learner
from learnProblem import Data_set, Data_from_file

def sp_DT_learner(split_to_optimize=Evaluate.squared_loss,leaf_prediction=Predict.mean,**nargs):
    """Creates a learner with different default arguments replaced by **nargs
    """
    def new_learner(dataset):
      return DT_learner(dataset,split_to_optimize=split_to_optimize,
      leaf_prediction=leaf_prediction, **nargs)
    return new_learner

#data = Data_from_file('data/car.csv', target_index=-1) regression
data = Data_from_file('data/student/student-mat-nq.csv', separator=';',has_header=True,target_index=-1,seed=13,include_only=list(range(30))+[32]) #2.0537973790924946
#data = Data_from_file('data/SPECT.csv', target_index=0, seed=62) #123)
#data = Data_from_file('data/mail_reading.csv', target_index=-1)
#data = Data_from_file('data/holiday.csv', num_train=19, target_index=-1)
#learner10 = Boosting_learner(data,
sp_DT_learner(split_to_optimize=Evaluate.squared_loss,
leaf_prediction=Predict.mean, min_child_weight=10))
#learner7 = Boosting_learner(data, sp_DT_learner(0.7))
#learner5 = Boosting_learner(data, sp_DT_learner(0.5))
#predictor9 =learner9.learn(10)
#for i in learner9.offsets: print(i.__doc__)
import matplotlib.pyplot as plt

def plot_boosting_trees(data, steps=10, mcws=[30,20,20,10], gammas=[100,200,300,500]):
 # to reduce clutter uncomment one of following two lines
 #mcws=[10]
 #gammas=[200]
 learners = [(mcw, gamma, Boosting_learner(data,
 sp_DT_learner(min_child_weight=mcw, gamma=gamma)))
 for gamma in gammas for mcw in mcws]
 plt.ion()
 plt.xscale('linear') # change between log and linear scale
 plt.xlabel("number of trees")
 plt.ylabel("mean squared loss")
 markers = (m+c for c in ['k','g','r','b','m','c','y'] for m in ['-','--','-.',':'])
 for (mcw,gamma,learner) in learners:
      data.display(1,f"min_child_weight={mcw}, gamma={gamma}")
 learner.learn(steps)
 plt.plot(range(steps+1), learner.errors, next(markers),
 label=f"min_child_weight={mcw}, gamma={gamma}")
 plt.legend()
 plt.draw()

###7.7.1 Gradient Tree Boosting


The following implements gradient Boosted trees for classification. If you want
to use this gradient tree boosting for a real problem, we recommend using
XGBoost [Chen and Guestrin, 2016].

GTB_learner subclasses DT-learner. The method learn_tree is used unchanged. DT-learner assumes that the value at the leaf is the prediction of the
leaf, thus leaf_value needs to be overridden. It also assumes that all nodes
at a leaf have the same prediction, but in GBT the elements of a leaf can have
different values, depending on the previous trees. Thus sum_losses also needs
to be overridden.

###learnBoosting.py

In [None]:
class GTB_learner(DT_learner):
  def __init__(self, dataset, number_trees, lambda_reg=1, gamma=0, **dtargs):
    DT_learner.__init__(self, dataset, split_to_optimize=Evaluate.log_loss, **dtargs)
    self.number_trees = number_trees
    self.lambda_reg = lambda_reg
    self.gamma = gamma
    self.trees = []

  def learn(self):
    for i in range(self.number_trees):
      tree = self.learn_tree(self.dataset.conditions(self.max_num_cuts), self.train)
      self.trees.append(tree)
      self.display(1,f"""Iteration {i} treesize = {tree.num_leaves} train logloss={
      self.dataset.evaluate_dataset(self.dataset.train, self.gtb_predictor, Evaluate.log_loss)
    } test logloss={
    self.dataset.evaluate_dataset(self.dataset.test,self.gtb_predictor, Evaluate.log_loss)}""")
    return self.gtb_predictor

  def gtb_predictor(self, example, extra=0):
    """prediction for example,
    extras is an extra contribution for this example being considered
    """
    return sigmoid(sum(t(example) for t in self.trees)+extra)

  def leaf_value(self, egs, domain=[0,1]):
    """value at the leaves for examples egs domain argument is ignored"""
    pred_acts = [(self.gtb_predictor(e),self.target(e)) for e in egs]
    return sum(a-p for (p,a) in pred_acts) /(sum(p*(1-p) for (p,a) in pred_acts)+self.lambda_reg)


  def sum_losses(self, data_subset):
    """returns sum of losses for dataset (assuming a leaf is formed with no more splits)
    """
    leaf_val = self.leaf_value(data_subset)
    error = sum(Evaluate.log_loss(self.gtb_predictor(e,leaf_val), self.target(e))for e in data_subset) + self.gamma
    return error
    # data = Data_from_file('data/carbool.csv', target_index=-1, seed=123)
    # gtb_learner = GTB_learner(data, 10)
    # gtb_learner.learn()

#Chapter-4 Neural Networks and Deep Learning


##8.1 Layers

In [None]:
from learnProblem import Learner, Data_set, Data_from_file,Data_from_files, Evaluate
from learnLinear import sigmoid, one, softmax, indicator
import random, math, time
class Layer(object):
    def __init__(self, nn, num_outputs=None):

      """Given a list of inputs, outputs will produce a list of length
      num_outputs.
      nn is the neural network this layer is part of
      num outputs is the number of outputs for this layer.
      """
      self.nn = nn
      self.num_inputs = nn.num_outputs # output of nn is the input to this layer
      if num_outputs:
        self.num_outputs = num_outputs
      else:
        self.num_outputs = nn.num_outputs # same as the inputs
    def output_values(self,input_values, training=False):
        """Return the outputs for this layer for the given input values.
        input_values is a list of the inputs to this layer (of length
        num_inputs)
        returns a list of length self.num_outputs.
        It can act differently when training and when predicting.
        """
        raise NotImplementedError("output_values") # abstract method
    def backprop(self,errors):
      """Backpropagate the errors on the outputs
      errors is a list of errors for the outputs (of length
      self.num_outputs).
      Returns the errors for the inputs to this layer (of length
      self.num_inputs).

      You can assume that this is only called after corresponding
      output_values,
      which can remember information information required for the
      back-propagation.
      """
    raise NotImplementedError("backprop") # abstract method
    def update(self):
      """updates parameters after a batch.
      overridden by layers that have parameters
      """
      pass

A linear layer maintains an array of weights. self.weights[o][i] is the weight
between input i and output o. A 1 is added to the end of the inputs. The default
initialization is the Glorot uniform initializer [Glorot and Bengio, 2010], which
is the default in Keras. An alternative is to provide a limit, in which case the
values are selected uniformly in the range [−limit, limit]. Keras treats the bias
separately, and defaults to zero.


##learnNN.py

In [None]:
class Linear_complete_layer(Layer):
  """a completely connected layer"""
  def __init__(self, nn, num_outputs, limit=None):
    """A completely connected linear layer.
    nn is a neural network that the inputs come from
    num_outputs is the number of outputs
    the random initialization of parameters is in range [-limit,limit]
    """
    Layer.__init__(self, nn, num_outputs)
    if limit is None:
      limit =math.sqrt(6/(self.num_inputs+self.num_outputs))
    # self.weights[o][i] is the weight between input i and output o
    self.weights = [[random.uniform(-limit, limit) if inf < self.num_inputs else 0 for inf in range(self.num_inputs+1)] for outf in range(self.num_outputs)] self.delta = [[0 for inf in range(self.num_inputs+1)]
    for outf in range(self.num_outputs)]

  def output_values(self,input_values, training=False):
    """Returns the outputs for the input values.
    It remembers the values for the backprop.

    Note in self.weights there is a weight list for every output,
    so wts in self.weights loops over the outputs.
    The bias is the *last* value of each list in self.weights.
    """
    self.inputs = input_values + [1]
    return [sum(w*val for (w,val) in zip(wts,self.inputs))
    for wts in self.weights]

  def backprop(self,errors):
    """Backpropagate the errors, updating the weights and returning the error in its inputs.
    """
    input_errors = [0]*(self.num_inputs+1)
    for out in range(self.num_outputs):
      for inp in range(self.num_inputs+1):
        input_errors[inp] += self.weights[out][inp] * errors[out]
        self.delta[out][inp] += self.inputs[inp] * errors[out]
        return input_errors[:-1] # remove the error for the "1"
  def update(self):
    """updates parameters after a batch"""
    batch_step_size = self.nn.learning_rate / self.nn.batch_size
  for out in range(self.num_outputs):
    for inp in range(self.num_inputs+1):
      self.weights[out][inp] -= batch_step_size *self.delta[out][inp]
      self.delta[out][inp] = 0


##learnNN.py

In [None]:
class ReLU_layer(Layer):
  """Rectified linear unit (ReLU) f(z) = max(0, z).
  The number of outputs is equal to the number of inputs.
  """
  def __init__(self, nn):
    Layer.__init__(self, nn)

  def output_values(self, input_values, training=False):
    """Returns the outputs for the input values.
    It remembers the input values for the backprop.
    """
    self.input_values = input_values
    self.outputs= [max(0,inp) for inp in input_values]
    return self.outputs
  def backprop(self,errors):
    """Returns the derivative of the errors"""
    return [e if inp>0 else 0 for e,inp in zip(errors, self.input_values)]

One of the old standards for the activation function for hidden layers is the
sigmoid. It is included here to experiment with.

##learnNN.py

In [None]:
class Sigmoid_layer(Layer):
  """sigmoids of the inputs.
  The number of outputs is equal to the number of inputs.
  Each output is the sigmoid of its corresponding input.
  """
  def __init__(self, nn):
    Layer.__init__(self, nn)
  def output_values(self, input_values, training=False):
    """Returns the outputs for the input values.
    It remembers the output values for the backprop.
    """
    self.outputs= [sigmoid(inp) for inp in input_values]
    return self.outputs
  def backprop(self,errors):
    """Returns the derivative of the errors"""
    return [e*out*(1-out) for e,out in zip(errors, self.outputs)]


##8.2 Feedforward Networks


##LearnNN.py

In [None]:
class NN(Learner):
  def __init__(self, dataset, validation_proportion = 0.1,learning_rate=0.001):
    """Creates a neural network for a dataset,
      layers is the list of layers
      """
    self.dataset = dataset
    self.output_type = dataset.target.ftype
    self.learning_rate = learning_rate
    self.input_features = dataset.input_features
    self.num_outputs = len(self.input_features)
    validation_num = int(len(self.dataset.train)*validation_proportion)
    if validation_num > 0:
      random.shuffle(self.dataset.train)
      self.validation_set = self.dataset.train[-validation_num:]
      self.training_set = self.dataset.train[:-validation_num]
    else:
      self.validation_set = []
      self.training_set = self.dataset.train
      self.layers = []
      self.bn = 0 # number of batches run

    def add_layer(self,layer):
      """add a layer to the network.
      Each layer gets number of inputs from the previous layers outputs.
      """
      self.layers.append(layer)
      self.num_outputs = layer.num_outputs

    def predictor(self,ex):
      """Predicts the value of the first output for example ex.
      """
      values = [f(ex) for f in self.input_features]
      for layer in self.layers:
        values = layer.output_values(values)
        return sigmoid(values[0]) if self.output_type =="boolean" else softmax(values, self.dataset.target.frange) if self.output_type == "categorical" else values[0]

    def predictor_string(self):
      return "not implemented"



In [None]:
def learn(self, epochs=5, batch_size=32, num_iter = None,report_each=10):
  """Learns parameters for a neural network using stochastic gradient decent.
  epochs is the number of times through the data (on average)
  batch_size is the maximum size of each batch num_iter is the number of iterations over the batches
  - overrides epochs if provided (allows for fractions of epochs)
  report_each means give the errors after each multiple of that iterations
  """
  self.batch_size = min(batch_size, len(self.training_set)) # don't have batches bigger than training size
  if num_iter is None:
    num_iter = (epochs * len(self.training_set)) // self.batch_size
  #self.display(0,"Batch\t","\t".join(criterion.__doc__ for criterion in Evaluate.all_criteria))
  for i in range(num_iter):
    batch = random.sample(self.training_set, self.batch_size)
    for e in batch:
    # compute all outputs
      values = [f(e) for f in self.input_features]
      for layer in self.layers:
        values = layer.output_values(values, training=True)
      # backpropagate
      predicted = [sigmoid(v) for v in values] if self.output_type == "boolean" else softmax(values) if self.output_type =="categorical" else values
      actuals = indicator(self.dataset.target(e),self.dataset.target.frange)  if self.output_type == "categorical" else [self.dataset.target(e)]
      errors = [pred-obsd for (obsd,pred) in zip(actuals,predicted)]
      for layer in reversed(self.layers):
        errors = layer.backprop(errors)
      # Update all parameters in batch
    for layer in self.layers:
      layer.update()
    self.bn+=1
    if (i+1)%report_each==0:
      self.display(0,self.bn,"\t","\t\t".join("{:.4f}".format(self.dataset.evaluate_dataset(self.validation_set,self.predictor, criterion))for criterion in Evaluate.all_criteria),sep="")


##8.3 Improved Optimization

###8.3.1 Momentum

###learnNN.py

In [None]:
class Linear_complete_layer_momentum(Linear_complete_layer):
  """a completely connected layer"""
  def __init__(self, nn, num_outputs, limit=None, alpha=0.9, epsilon =1e-07, vel0=0):
    """A completely connected linear layer.
    nn is a neural network that the inputs come from
    num_outputs is the number of outputs
    max_init is the maximum value for random initialization of
    meters
    vel0 is the initial velocity for each parameter
    """
    Linear_complete_layer.__init__(self, nn, num_outputs, limit=limit)
   # self.weights[o][i] is the weight between input i and output o
    self.velocity = [[vel0 for inf in range(self.num_inputs+1)] for outf in range(self.num_outputs)]
    self.alpha = alpha
    self.epsilon = epsilon
  def update(self):
   """updates parameters after a batch"""
   batch_step_size = self.nn.learning_rate / self.nn.batch_size
   for out in range(self.num_outputs):
    for inp in range(self.num_inputs+1):
      self.velocity[out][inp] = self.alpha*self.velocity[out][inp]- batch_step_size * self.delta[out][inp]
      self.weights[out][inp] += self.velocity[out][inp]
      self.delta[out][inp] = 0


##8.3.2 RMS-Prop


In [None]:
class Linear_complete_layer_RMS_Prop(Linear_complete_layer):
  """a completely connected layer"""
  def __init__(self, nn, num_outputs, limit=None, rho=0.9, epsilon = 1e-07):
    """A completely connected linear layer.
     nn is a neural network that the inputs come from
     num_outputs is the number of outputs
     max_init is the maximum value for random initialization of parameters
     """
    Linear_complete_layer.__init__(self, nn, num_outputs, limit=limit)
    # self.weights[o][i] is the weight between input i and output o
    self.ms = [[0 for inf in range(self.num_inputs+1)]
    for outf in range(self.num_outputs)]
    self.rho = rho
    self.epsilon = epsilon

  def update(self):
    """updates parameters after a batch"""
    for out in range(self.num_outputs):
      for inp in range(self.num_inputs+1):
        gradient = self.delta[out][inp] / self.nn.batch_size
        self.ms[out][inp] = self.rho*self.ms[out][inp]+ (1-self.rho) * gradient**2
        self.weights[out][inp] -=self.nn.learning_rate/(self.ms[out][inp]+self.epsilon)**0.5* gradient
        self.delta[out][inp] = 0


##8.4 Dropout


In [None]:
from utilities import flip
class Dropout_layer(Layer):
 """Dropout layer
 """
 def __init__(self, nn, rate=0):
  """
  rate is fraction of the input units to drop. 0 =< rate < 1
  """
  self.rate = rate
  Layer.__init__(self, nn)

 def output_values(self, input_values, training=False):
    """Returns the outputs for the input values.
    It remembers the input values for the backprop.
    """
    if training:
      scaling = 1/(1-self.rate)
      self.mask = [0 if flip(self.rate) else 1 for _ in input_values]
      self.outputs= [x*y*scaling for (x,y) in zip(input_values, self.mask)]
      return self.outputs
    else:
      return input_values

 def backprop(self,errors):
  """Returns the derivative of the errors"""
  return [x*y for (x,y) in zip(errors, self.mask)]

class Dropout_layer_0(Layer):
  """Dropout layer
  """
  def __init__(self, nn, rate=0): 
   """rate is fraction of the input units to drop. 0 =< rate < 1"""
   self.rate = rate
   Layer.__init__(self, nn)

  def output_values(self, input_values, training=False):
     """Returns the outputs for the input values.
 It remembers the input values for the backprop.
     """
     if training:
      scaling = 1/(1-self.rate)
      self.outputs= [0 if flip(self.rate) else inp*scaling # make 0 with probability rate
      for inp in input_values]
      return self.outputs
     else:
      return input_values

  def backprop(self,errors):
    """Returns the derivative of the errors"""
    return errors


## 8.4.1 Examples

The following constructs a neural network with one hidden layer. The hidden
layer has width 2 with a ReLU activation function. The output layer used a
sigmoid

In [None]:
#data = Data_from_file('data/mail_reading.csv', target_index=-1)
#data = Data_from_file('data/mail_reading_consis.csv', target_index=-1)
data = Data_from_file('data/SPECT.csv', prob_test=0.5, target_index=0)
#data = Data_from_file('data/iris.data', prob_test=0.2, target_index=-1) #
# examples approx 120 test + 30 test
#data = Data_from_file('data/if_x_then_y_else_z.csv', num_train=8, target_index=-1) # not linearly sep
#data = Data_from_file('data/holiday.csv', target_index=-1) #, num_train=19)
#data = Data_from_file('data/processed.cleveland.data', target_index=-1)
random.seed(None)
nn1 = NN(data)
nn1.add_layer(Linear_complete_layer(nn1,3))
#nn1.add_layer(Sigmoid_layer(nn1)) # comment this or the next
nn1.add_layer(ReLU_layer(nn1))
#nn1.add_layer(Linear_complete_layer(nn1,1)) # when using output_type="boolean"
nn1.add_layer(Linear_complete_layer(nn1,1)) # when using output_type="categorical"
#nn1.learn(epochs = 100)

nn1do = NN(data)
nn1do.add_layer(Linear_complete_layer(nn1do,3))
#nn1.add_layer(Sigmoid_layer(nn1)) # comment this or the next
nn1do.add_layer(ReLU_layer(nn1do))
nn1do.add_layer(Dropout_layer(nn1do, rate=0.5))
#nn1.add_layer(Linear_complete_layer(nn1do,1)) # when using output_type="boolean"
nn1do.add_layer(Linear_complete_layer(nn1do,1)) # when using output_type="categorical"
#nn1do.learn(epochs = 100)
nn_r1 = NN(data)
nn_r1.add_layer(Linear_complete_layer_RMS_Prop(nn_r1,3))
#nn_r1.add_layer(Sigmoid_layer(nn_r1)) # comment this or the next
nn_r1.add_layer(ReLU_layer(nn_r1))
#nn_r1.add_layer(Linear_complete_layer(nn_r1,1)) # when using output_type="boolean"
nn_r1.add_layer(Linear_complete_layer_RMS_Prop(nn_r1,1)) # when using output_type="categorical"
#nn_r1.learn(epochs = 100)
nnm1 = NN(data)
nnm1.add_layer(Linear_complete_layer_momentum(nnm1,3))
#nnm1.add_layer(Sigmoid_layer(nnm1)) # comment this or the next
nnm1.add_layer(ReLU_layer(nnm1))
#nnm1.add_layer(Linear_complete_layer(nnm1,1)) # when using output_type="boolean"
nnm1.add_layer(Linear_complete_layer_momentum(nnm1,1)) # when using output_type="categorical"
#nnm1.learn(epochs = 100)
nn2 = NN(data) #"boolean") #
nn2.add_layer(Linear_complete_layer_RMS_Prop(nn2,2))
nn2.add_layer(ReLU_layer(nn2))
nn2.add_layer(Linear_complete_layer_RMS_Prop(nn2,1)) # when using output_type="categorical"
nn3 = NN(data) #"boolean") #
nn3.add_layer(Linear_complete_layer_RMS_Prop(nn3,5))
nn3.add_layer(ReLU_layer(nn3))
nn3.add_layer(Linear_complete_layer_RMS_Prop(nn3,1)) # when using output_type="categorical"
nn0 = NN(data,learning_rate=0.05)
nn0.add_layer(Linear_complete_layer(nn0,1)) # categorical linear regression
#nn0.add_layer(Linear_complete_layer_RMS_Prop(nn0,1)) # categorical linear regression

##learnNN.py

In [None]:
from learnLinear import plot_steps
383 from learnProblem import Evaluate
384
385 # To show plots:
386 # plot_steps(learner = nn1, data = data, criterion=Evaluate.log_loss,
num_steps=10000, log_scale=False, legend_label="nn1")
387 # plot_steps(learner = nn2, data = data, criterion=Evaluate.log_loss,
num_steps=10000, log_scale=False, legend_label="nn2")
388 # plot_steps(learner = nn3, data = data, criterion=Evaluate.log_loss,
num_steps=100000, log_scale=False, legend_label="nn3")
389 # plot_steps(learner = nn0, data = data, criterion=Evaluate.log_loss,
num_steps=10000, log_scale=False, legend_label="nn0")
390
391 # plot_steps(learner = nn0, data = data, criterion=Evaluate.accuracy,
num_steps=10000, log_scale=False, legend_label="nn0")
392 # plot_steps(learner = nn1, data = data, criterion=Evaluate.accuracy,
num_steps=10000, log_scale=False, legend_label="nn1")
393 # plot_steps(learner = nn2, data = data, criterion=Evaluate.accuracy,
num_steps=10000, log_scale=False, legend_label="nn2")
394 # plot_steps(learner = nn3, data = data, criterion=Evaluate.accuracy,
num_steps=10000, log_scale=False, legend_label="nn3")
395
396
397 # Print some training examples
398 #for eg in random.sample(data.train,10): print(eg,nn1.predictor(eg))
399
400 # Print some test examples
401 #for eg in random.sample(data.test,10): print(eg,nn1.predictor(eg))
402
403 # To see the weights learned in linear layers
404 # nn1.layers[0].weights
405 # nn1.layers[2].weights
406
407 # Print test:
408 # for e in data.train: print(e,nn0.predictor(e))
409
410 def test(data, hidden_widths = [5], epochs=100,
411 optimizers = [Linear_complete_layer,
412 Linear_complete_layer_momentum,
Linear_complete_layer_RMS_Prop]):
413 data.display(0,"Batch\t","\t".join(criterion.__doc__ for criterion in
Evaluate.all_criteria))
414 for optimizer in optimizers:
415 nn = NN(data)
416 for width in hidden_widths:
nn.add_layer(optimizer(nn,width))
418 nn.add_layer(ReLU_layer(nn))
419 if data.target.ftype == "boolean":
420 nn.add_layer(optimizer(nn,1))
421 else:
422 error(f"Not implemented: {data.output_type}")
423 nn.learn(epochs)

The following tests on MNIST. The original files are from http://yann.lecun.
com/exdb/mnist/. This code assumes you use the csv files from https://pjreddie.
com/projects/mnist-in-csv/, and put them in the directory ../MNIST/. Note
that this is very inefficient; you would be better to use Keras or Pytorch. There
are 28 ∗ 28 = input units and 512 hidden units, which makes 401,408 parameters for the lowest linear layer. So don’t be surprised when it takes many hours.

In [None]:
# Simplified version: (6000 training instances)
428 # data_mnist = Data_from_file('../MNIST/mnist_train.csv', prob_test=0.9,
target_index=0, boolean_features=False, target_type="categorical")
429
430 # Full version:
431 # data_mnist = Data_from_files('../MNIST/mnist_train.csv', '../MNIST/mnist_test.csv', target_index=0, boolean_features=False, target_type="categorical")
432
433 # nn_mnist = NN(data_mnist, validation_proportion = 0.02,
learning_rate=0.001) #validation set = 1200
434 # nn_mnist.add_layer(Linear_complete_layer_RMS_Prop(nn_mnist,512));
nn_mnist.add_layer(ReLU_layer(nn_mnist));
nn_mnist.add_layer(Linear_complete_layer_RMS_Prop(nn_mnist,10))
435 # start_time = time.perf_counter();nn_mnist.learn(epochs=1, batch_size=128);end_time = time.perf_counter();print("Time:", end_time - start_time,"seconds") #1 epoch
436 # determine test error:
437 # data_mnist.evaluate_dataset(data_mnist.test, nn_mnist.predictor, Evaluate.accuracy)
438 # Print some random predictions:
439 # for eg in random.sample(data_mnist.test,10):
print(data_mnist.target(eg),nn_mnist.predictor(eg),nn_mnist.predictor(eg)[data_mnist.target(eg)])


# Chapter-5 Multiagent System

##12.1 Minimax


In [None]:
from display import Displayable

class Node(Displayable):
 """A node in a search tree. It has a
 name a string
 isMax is True if it is a maximizing node, otherwise it is minimizing node
 children is the list of children
 value is what it evaluates to if it is a leaf.
 """
 def __init__(self, name, isMax, value, children):
  self.name = name
  self.isMax = isMax
  self.value = value
  self.allchildren = children

 def isLeaf(self):
    """returns true of this is a leaf node"""
    return self.allchildren is None def children(self):
    """returns the list of all children."""
    return self.allchildren

 def evaluate(self):
    """returns the evaluation for this node if it is a leaf"""
    return self.value


##masProblem.py

In [None]:
fig10_5 = Node("a",True,None, [
 Node("b",False,None, [
 Node("d",True,None, [
 Node("h",False,None, [
 Node("h1",True,7,None),
 Node("h2",True,9,None)]),
 Node("i",False,None, [
 Node("i1",True,6,None),
 Node("i2",True,888,None)])]),
 Node("e",True,None, [
 Node("j",False,None, [
 Node("j1",True,11,None),
 Node("j2",True,12,None)]),
 Node("k",False,None, [
 Node("k1",True,888,None),
 Node("k2",True,888,None)])])]),
 Node("c",False,None, [
 Node("f",True,None, [
 Node("l",False,None, [
 Node("l1",True,5,None),
 Node("l2",True,888,None)]),
 Node("m",False,None, [
 Node("m1",True,4,None),
 Node("m2",True,888,None)])]),
 Node("g",True,None, [
 Node("n",False,None, [
 Node("n1",True,888,None),
 Node("n2",True,888,None)]),
 Node("o",False,None, [
 Node("o1",True,888,None),
 Node("o2",True,888,None)])])])])


The following is a representation of a magic-sum game, where players take
turns picking a number in the range [1, 9], and the first player to have 3 numbers that sum to 15 wins. Note that this is a syntactic variant of tic-tac-toe or
naughts and crosses. To see this, consider the numbers on a magic square (Figure 12.1); 3 numbers that add to 15 correspond exactly to the winning positions

## masProblem.py

In [None]:
class Magic_sum(Node):
 def __init__(self, xmove=True, last_move=None,
    available=[1,2,3,4,5,6,7,8,9], x=[], o=[]):
    """This is a node in the search for the magic-sum game.
    xmove is True if the next move belongs to X.
    last_move is the number selected in the last move
    available is the list of numbers that are available to be chosen
    x is the list of numbers already chosen by x
    o is the list of numbers already chosen by o
    """
    self.isMax = self.xmove = xmove
    self.last_move = last_move
    self.available = available
    self.x = x
    self.o = o
    self.allchildren = None #computed on demand
    lm = str(last_move)
    self.name = "start" if not last_move else "o="+lm if xmove else "x="+lm

 def children(self):
    if self.allchildren is None:
      if self.xmove:
        self.allchildren = [
        Magic_sum(xmove = not self.xmove,
        last_move = sel,
        available = [e for e in self.available if e is not sel], x = self.x+[sel], o = self.o) for sel in self.available]
      else:
        self.allchildren = [Magic_sum(xmove = not self.xmove, last_move = sel, available = [e for e in self.available if e is not sel], x = self.x, o = self.o+[sel]) for sel in self.available]
        return self.allchildren

 def isLeaf(self):
  """A leaf has no numbers available or is a win for one of the
  players.
  We only need to check for a win for o if it is currently x's turn,
  and only check for a win for x if it is o's turn (otherwise it would
  have been a win earlier).
  """
  return (self.available == [] or
  (sum_to_15(self.last_move,self.o)
  if self.xmove
  else sum_to_15(self.last_move,self.x)))

 def evaluate(self):
  if self.xmove and sum_to_15(self.last_move,self.o):
    return -1
  elif not self.xmove and sum_to_15(self.last_move,self.x):
    return 1
  else:
    return 0

 def sum_to_15(last,selected):
  """is true if last, toegether with two other elements of selected sum to 15.
  """
  return any(last+a+b == 15 for a in selected if a != last for b in selected if b != last and b != a)


##12.1.2 Minimax and α-β Pruning

This is a naive depth-first minimax algorithm:

In [None]:
def minimax(node,depth):
  """returns the value of node, and a best path for the agents
  """
  if node.isLeaf():
    return node.evaluate(),None
  elif node.isMax:
    max_score = float("-inf")
    max_path = None
    for C in node.children():
      score,path = minimax(C,depth+1)
      if score > max_score:
        max_score = score
        max_path = C.name,path
      return max_score,max_path
  else:
    min_score = float("inf")
    min_path = None
    for C in node.children():
      score,path = minimax(C,depth+1)
      if score < min_score:
        min_score = score
        min_path = C.name,path
        return min_score,min_path 
def minimax_alpha_beta(node,alpha,beta,depth=0):
  """node is a Node, alpha and beta are cutoffs, depth is the depth
  returns value, path
  where path is a sequence of nodes that results in the value
"""
  node.display(2," "*depth,"minimax_alpha_beta(",node.name,", ",alpha, ", ", beta,")")
  best=None # only used if it will be pruned
  if node.isLeaf():
    node.display(2," "*depth,"returning leaf value",node.evaluate())
    return node.evaluate(),None
  elif node.isMax:
    for C in node.children():
      score,path = minimax_alpha_beta(C,alpha,beta,depth+1)
      if score >= beta: # beta pruning
         node.display(2," "*depth,"pruned due to beta=",beta,"C=",C.name)
         return score, None
      if score > alpha:
        alpha = score
        best = C.name, path
        node.display(2," "*depth,"returning max alpha",alpha,"best",best)
        return alpha,best
  else:
    for C in node.children():
      score,path = minimax_alpha_beta(C,alpha,beta,depth+1)
      if score <= alpha: # alpha pruning
        node.display(2," "*depth,"pruned due to alpha=",alpha,"C=",C.name)
        return score, None
      if score < beta:
        beta=score
        best = C.name,path
        node.display(2," "*depth,"returning min beta",beta,"best=",best)
        return beta,best

In [None]:
from masProblem import fig10_5, Magic_sum, Node

# Node.max_display_level=2 # print detailed trace
 # minimax_alpha_beta(fig10_5, -9999, 9999,0)
 # minimax_alpha_beta(Magic_sum(), -9999, 9999,0)

 #To see how much time alpha-beta pruning can save over minimax, uncomment the following:

## import timeit
 ## timeit.Timer("minimax(Magic_sum(),0)",setup="from __main__ import minimax, Magic_sum"
 ## ).timeit(number=1)
 ## trace=False
 ## timeit.Timer("minimax_alpha_beta(Magic_sum(), -9999, 9999,0)",
 ## setup="from __main__ import minimax_alpha_beta, Magic_sum"
 ## ).timeit(number=1)

# Chapter-6 Reinforcement Learning


###13.1 Representing Agents and Environments


When the learning agent does an action in the environment, it observes a (state,reward)
pair from the environment. The state is the world state; this is the fully observable assumption.

An RL environment implements a do(action) method that returns a (state,reward)
pair.

##rlProblem.py

In [None]:
import random
from display import Displayable
from utilities import flip
class RL_env(Displayable):
    def __init__(self,actions,state):
      self.actions = actions # set of actions
      self.state = state # initial state
    def do(self, action):
      """do action
      returns state,reward
      """
      raise NotImplementedError("RL_env.do") # abstract method

Here is the definition of the simple 2-state, 2-action party/relax decision

In [None]:
class Healthy_env(RL_env):
  def __init__(self):
      RL_env.__init__(self,["party","relax"], "healthy")
  def do(self, action):
      """updates the state based on the agent doing action.
      returns state,reward
      """
      if self.state=="healthy":
        if action=="party":
          self.state = "healthy" if flip(0.7) else "sick"
          reward = 10
        else: # action=="relax"
          self.state = "healthy" if flip(0.95) else "sick"
          reward = 7
      else: # self.state=="sick"
          if action=="party":
            self.state = "healthy" if flip(0.1) else "sick"
            reward = 2
          else:
            self.state = "healthy" if flip(0.5) else "sick"
            reward = 0
      return self.state,reward

###13.1.1 Simulating an environment from an MDP


Note that the MDP does not contain enough information to simulate a system, because it loses any dependency between the rewards and the resulting
state; here we assume the agent always received the average reward for the
state and action.


##rlProblem.py

In [None]:
class Env_from_MDP(RL_env):
def __init__(self, mdp):
    initial_state = mdp.states[0]
    RL_env.__init__(self,mdp.actions, initial_state)
    self.mdp = mdp
    self.action_index = {action:index for (index,action) in enumerate(mdp.actions)}
    self.state_index = {state:index for (index,state) in enumerate(mdp.states)}
def do(self, action):
    """updates the state based on the agent doing action.
    returns state,reward
    """
    action_ind = self.action_index[action]
    state_ind = self.state_index[self.state]
    self.state = pick_from_dist(self.mdp.trans[state_ind][action_ind], self.mdp.states)
    reward = self.mdp.reward[state_ind][action_ind]return self.state, reward
def pick_from_dist(dist,values):
    """
    e.g. pick_from_dist([0.3,0.5,0.2],['a','b','c']) should pick 'a' withrobability 0.3, etc.
    """
    ran = random.random()
    i=0
    while ran>dist[i]:
      ran -= dist[i]
      i += 1
    return values[i]

##13.1.2 Simple Game


In [None]:
import random
from utilities import flip
from rlProblem import RL_env
class Simple_game_env(RL_env):
    xdim = 5
    ydim = 5
    vwalls = [(0,3), (0,4), (1,4)] # vertical walls right of these locations
    hwalls = [] # not implemented
    crashed_reward = -1
    prize_locs = [(0,0), (0,4), (4,0), (4,4)]
    prize_apears_prob = 0.3
    prize_reward = 10
    monster_locs = [(0,1), (1,1), (2,3), (3,1), (4,2)]
    monster_appears_prob = 0.4
    monster_reward_when_damaged = -10
    repair_stations = [(1,4)]
    actions = ["up","down","left","right"]
def __init__(self):
    # State:
    self.x = 2
    self.y = 2
    self.damaged = False
    self.prize = None
    # Statistics
    self.number_steps = 0
    self.total_reward = 0
    self.min_reward = 0
    self.min_step = 0
    self.zero_crossing = 0
    RL_env.__init__(self, Simple_game_env.actions,
    (self.x, self.y, self.damaged, self.prize))
    self.display(2,"","Step","Tot Rew","Ave Rew",sep="\t")

def do(self,action):
    """updates the state based on the agent doing action.
    returns state,reward
    """
    reward = 0.0
    # A prize can appear:
    if self.prize is None and flip(self.prize_apears_prob):
    self.prize = random.choice(self.prize_locs)
    # Actions can be noisy
    if flip(0.4):
      actual_direction = random.choice(self.actions)
    else:
      actual_direction = action
    # Modeling the actions given the actual direction
    if actual_direction == "right":
      if self.x==self.xdim-1 or (self.x,self.y) in self.vwalls:
          reward += self.crashed_reward
      else:
          self.x += 1
    elif actual_direction == "left":
        if self.x==0 or (self.x-1,self.y) in self.vwalls:
            reward += self.crashed_reward

        else:
          self.x += -1
    elif actual_direction == "up":
        if self.y==self.ydim-1:
            reward += self.crashed_reward
        else:
            self.y += 1
    elif actual_direction == "down":
        if self.y==0:
            reward += self.crashed_reward
        else:
            self.y += -1
    else:
        raise RuntimeError("unknown_direction "+str(direction))
    # Monsters
    if (self.x,self.y) in self.monster_locs and flip(self.monster_appears_prob):
        if self.damaged:
            reward += self.monster_reward_when_damaged
        else:
            self.damaged = True
    if (self.x,self.y) in self.repair_stations:
        self.damaged = False
     # Prizes
    if (self.x,self.y) == self.prize:
      reward += self.prize_reward
      self.prize = None

    # Statistics
    self.number_steps += 1
    self.total_reward += reward
    if self.total_reward < self.min_reward:
        self.min_reward = self.total_reward
        self.min_step = self.number_steps
    if self.total_reward>0 and reward>self.total_reward:
        self.zero_crossing = self.number_steps
    self.display(2,"",self.number_steps,self.total_reward,self.total_reward/self.number_steps,sep="\t")
    return (self.x, self.y, self.damaged, self.prize), reward


##13.1.3 Evaluation and Plotting


In [None]:
import matplotlib.pyplot as plt

def plot_rl(ag, label=None, yplot='Total', step_size=None,
    steps_explore=1000, steps_exploit=1000, xscale='linear'):
    """
    plots the agent ag
    label is the label for the plot
    yplot is 'Average' or 'Total'
    step_size is the number of steps between each point plotted
    steps_explore is the number of steps the agent spends exploring
    steps_exploit is the number of steps the agent spends exploiting
    xscale is 'log' or 'linear'

    returns total reward when exploring, total reward when exploiting
    """
    assert yplot in ['Average','Total']
    if step_size is None:
        step_size = max(1,(steps_explore+steps_exploit)//500)
    if label is None:
        label = ag.label
    ag.max_display_level,old_mdl = 1,ag.max_display_level
    plt.ion()
    plt.xscale(xscale)
    plt.xlabel("step")
    plt.ylabel(yplot+" reward")
    steps = [] # steps
    rewards = [] # return
    ag.restart()
    step = 0

    while step < steps_explore:
    ag.do(step_size)
    step += step_size
    steps.append(step)
    if yplot == "Average":
        rewards.append(ag.acc_rewards/step)
    else:
        rewards.append(ag.acc_rewards)
    acc_rewards_exploring = ag.acc_rewards
    ag.explore,explore_save = 0,ag.explore
    while step < steps_explore+steps_exploit:
        ag.do(step_size)
        step += step_size
        steps.append(step)
        if yplot == "Average":
            rewards.append(ag.acc_rewards/step)
        else:
            rewards.append(ag.acc_rewards)
    plt.plot(steps,rewards,label=label)
    plt.legend(loc="upper left")
    plt.draw()
    ag.max_display_level = old_mdl
    ag.explore=explore_save
    return acc_rewards_exploring, ag.acc_rewards-acc_rewards_exploring

##13.2 Q Learning


In [None]:
import random
from display import Displayable
from utilities import argmaxe, flip
class RL_agent(Displayable):
    """An RL_Agent
    has percepts (s, r) for some state s and real reward r
    """


##rlQLearner.py

In [None]:
class Q_learner(RL_agent):
 """A Q-learning agent has
 belief-state consisting of
 state is the previous state
 q is a {(state,action):value} dict
 visits is a {(state,action):n} dict. n is how many times action was done in state
 acc_rewards is the accumulated reward

 it observes (s, r) for some world-state s and real reward r
 """

In [None]:
def __init__(self, env, discount, explore=0.1, fixed_alpha=True, alpha=0.2, alpha_fun=lambda k:1/k, qinit=0, label="Q_learner"):
    """env is the environment to interact with.
    discount is the discount factor
    explore is the proportion of time the agent will explore
    fixed_alpha specifies whether alpha is fixed or varies with the number of visits
    alpha is the weight of new experiences compared to old experiences
    alpha_fun is a function that computes alpha from the number of visits
    qinit is the initial value of the Q's
    label is the label for plotting
    """
    RL_agent.__init__(self)
    self.env = env
    self.actions = env.actions
    self.discount = discount
    self.explore = explore
    self.fixed_alpha = fixed_alpha
    self.alpha = alpha
    self.alpha_fun = alpha_fun
    self.qinit = qinit
    self.label = label
    self.restart()


restart is used to make the learner relearn everything. This is used by the plotter to create new plots.

In [None]:
def restart(self):
 """make the agent relearn, and reset the accumulated rewards
 """
 self.acc_rewards = 0
 self.state = self.env.state
 self.q = {}
 self.visits = {}


do takes in the number of steps.

##rlQLearner.py

In [None]:
def do(self,num_steps=100):
  """do num_steps of interaction with the environment"""
  self.display(2,"s\ta\tr\ts'\tQ")
  alpha = self.alpha
  for i in range(num_steps):
      action = self.select_action(self.state)
      next_state,reward = self.env.do(action)
      if not self.fixed_alpha:
          k = self.visits[(self.state, action)] = self.visits.get((self.state, action),0)+1
          alpha = self.alpha_fun(k)
          self.q[(self.state, action)] = ((1-alpha) * self.q.get((self.state, action),self.qinit)+ alpha * (reward + self.discount
                        * max(self.q.get((next_state, next_act),self.qinit) for next_act in self.actions)))
      self.display(2,self.state, action, reward, next_state,
      self.q[(self.state, action)], sep='\t')
      self.state = next_state
      self.acc_rewards += reward


In [None]:
def select_action(self, state):
  """returns an action to carry out for the current agent given the state, and the q-function
  """
  if flip(self.explore):
     return random.choice(self.actions)
  else:
     return argmaxe((next_act, self.q.get((state, next_act),self.qinit)) for next_act in self.actions)

##13.2.1 Testing Q-learning

In [None]:
from rlProblem import Healthy_env
from rlQLearner import Q_learner
from rlPlot import plot_rl
env = Healthy_env()
ag = Q_learner(env, 0.7)
ag_opt = Q_learner(env, 0.7, qinit=100, label="optimistic" ) # optimistic agent
ag_exp_l = Q_learner(env, 0.7, explore=0.01, label="less explore")
ag_exp_m = Q_learner(env, 0.7, explore=0.5, label="more explore")
ag_disc = Q_learner(env, 0.9, qinit=100, label="disc 0.9")
ag_va = Q_learner(env, 0.7, qinit=100,fixed_alpha=False,alpha_fun=lambda k:10/(9+k),label="alpha=10/(9+k)")
# ag.max_display_level = 2
# ag.do(20)
# ag.q # get the learned q-values
# ag.max_display_level = 1
# ag.do(1000)
# ag.q # get the learned q-values
# plot_rl(ag,yplot="Average")
# plot_rl(ag_opt,yplot="Average")
# plot_rl(ag_exp_l,yplot="Average")
# plot_rl(ag_exp_m,yplot="Average")
# plot_rl(ag_disc,yplot="Average")
# plot_rl(ag_va,yplot="Average")
from mdpExamples import MDPtiny
from rlProblem import Env_from_MDP
envt = Env_from_MDP(MDPtiny())
agt = Q_learner(envt, 0.8)
# agt.do(20)
from rlSimpleEnv import Simple_game_env
senv = Simple_game_env()
sag1 = Q_learner(senv,0.9,explore=0.2,fixed_alpha=True,alpha=0.1)
# plot_rl(sag1,steps_explore=100000,steps_exploit=100000,label="alpha="+str(sag1.alpha))
sag2 = Q_learner(senv,0.9,explore=0.2,fixed_alpha=False)
# plot_rl(sag2,steps_explore=100000,steps_exploit=100000,label="alpha=1/k")
sag3 = Q_learner(senv,0.9,explore=0.2,fixed_alpha=False,alpha_fun=lambda
k:10/(9+k))
# plot_rl(sag3,steps_explore=100000,steps_exploit=100000,label="alpha=10/(9+k)")


##13.3 Q-leaning with Experience Replay


##rlQExperienceReplay.py

In [None]:
from rlQLearner import Q_learner
from utilities import flip
import random
class BoundedBuffer(object):
    def __init__(self, buffer_size=1000):
        self.buffer_size = buffer_size
        self.buffer = [0]*buffer_size
        self.number_added = 0
    def add(self,experience):
        if self.number_added < self.buffer_size:
          self.buffer[self.number_added] = experience
        else:
          if flip(self.buffer_size/self.number_added):
            position = random.randrange(self.buffer_size)
            self.buffer[position] = experience
        self.number_added += 1
    def get(self):
        return self.buffer[random.randrange(min(self.number_added, self.buffer_size))]

class Q_AR_learner(Q_learner):
  def __init__(self, env, discount, explore=0.1, fixed_alpha=True,alpha=0.2, alpha_fun=lambda k:1/k, qinit=0, label="Q_AR_learner",
    max_buffer_size=5000,
    num_updates_per_action=5, burn_in=1000 ):
    Q_learner.__init__(self, env, discount, explore, fixed_alpha, alpha, alpha_fun, qinit, label)
    self.experience_buffer = BoundedBuffer(max_buffer_size)
    self.num_updates_per_action = num_updates_per_action
    self.burn_in = burn_in


  def do(self,num_steps=100):
    """do num_steps of interaction with the environment"""
    self.display(2,"s\ta\tr\ts'\tQ")
    alpha = self.alpha
    for i in range(num_steps):
      action = self.select_action(self.state)
      next_state,reward = self.env.do(action)
      self.experience_buffer.add((self.state,action,reward,next_state)) 
      #remember experience
    if not self.fixed_alpha:
       k = self.visits[(self.state, action)] = self.visits.get((self.state, action),0)+1
       alpha = self.alpha_fun(k)
       self.q[(self.state, action)] = ( (1-alpha) * self.q.get((self.state, action),self.qinit) + alpha * (reward + self.discount* max(self.q.get((next_state,next_act),self.qinit) for next_act in self.actions)))
       self.display(2,self.state, action, reward, next_state,
       self.q[(self.state, action)], sep='\t')
       self.state = next_state
       self.acc_rewards += reward
    # do some updates from experince buffer
    if self.experience_buffer.number_added > self.burn_in:
      for i in range(self.num_updates_per_action):
        (s,a,r,ns) = self.experience_buffer.get()
        if not self.fixed_alpha:
          k = self.visits[(s,a)]
          alpha = self.alpha_fun(k)
          self.q[(s,a)] = (
          (1-alpha) * self.q[(s,a)]
          + alpha * (reward + self.discount
          * max(self.q.get((ns,na),self.qinit)
          for na in self.actions)))

In [None]:
from rlSimpleEnv import Simple_game_env
from rlQTest import sag1, sag2, sag3
from rlPlot import plot_rl
senv = Simple_game_env()
sag1ar = Q_AR_learner(senv,0.9,explore=0.2,fixed_alpha=True,alpha=0.1)
# plot_rl(sag1ar,steps_explore=100000,steps_exploit=100000,label="AR
alpha="+str(sag1ar.alpha))
sag2ar = Q_AR_learner(senv,0.9,explore=0.2,fixed_alpha=False)
# plot_rl(sag2ar,steps_explore=100000,steps_exploit=100000,label="AR
alpha=1/k")
sag3ar =Q_AR_learner(senv,0.9,explore=0.2,fixed_alpha=False,alpha_fun=lambdak:10/(9+k))
# plot_rl(sag3ar,step

#13.4 Model-based Reinforcement Learner

A model-based reinforcement learner builds a Markov decision process model
of the domain, simultaneously learns the model and plans with that model.

###rlModelLearner.py —

In [None]:
import random
from rlQLearner import RL_agent
from display import Displayable
from utilities import argmaxe, flip
class Model_based_reinforcement_learner(RL_agent):
    """A Model-based reinforcement learner
    """
    def __init__(self, env, discount, explore=0.1, qinit=0,
      updates_per_step=10, label="MBR_learner"):
      """env is the environment to interact with.
      discount is the discount factor
      explore is the proportion of time the agent will explore
      qinit is the initial value of the Q's
      updates_per_step is the number of AVI updates per action
      label is the label for plotting
      """
      RL_agent.__init__(self)
      self.env = env
      self.actions = env.actions
      self.discount = discount
      self.explore = explore
      self.qinit = qinit
      self.updates_per_step = updates_per_step
      self.label = label
      self.restart()

    def restart(self):
      """make the agent relearn, and reset the accumulated rewards
      """
      self.acc_rewards = 0
      self.state = self.env.state
      self.q = {} # {(st,action):q_value} map
      self.r = {} # {(st,action):reward} map
      self.t = {} # {(st,action,st_next):count} map
      self.visits = {} # {(st,action):count} map
      self.res_states = {} # {(st,action):set_of_states} map
      self.visits_list = [] # list of (st,action)
      self.previous_action = None

    def do(self,num_steps=100):
        """do num_steps of interaction with the environment"""
        self.display(2,"s\ta\tr\ts'\tQ")
        alpha = self.alpha
        for i in range(num_steps):
          action = self.select_action(self.state)
          next_state,reward = self.env.do(action)
          self.experience_buffer.add((self.state,action,reward,next_state))
          #remember experience
          if not self.fixed_alpha:
              k = self.visits[(self.state, action)] = self.visits.get((self.state, action),0)+1
              alpha = self.alpha_fun(k)
              self.q[(self.state, action)] = ( (1-alpha) * self.q.get((self.state, action),self.qinit)+ alpha * (reward + self.discount* max(self.q.get((next_state, next_act),self.qinit)
              for next_act in self.actions)))
                self.display(2,self.state, action, reward, next_state,
                self.q[(self.state, action)], sep='\t')
                self.state = next_state
                self.acc_rewards += reward
          # do some updates from experince buffer
          if self.experience_buffer.number_added > self.burn_in:
            for i in range(self.num_updates_per_action):
                (s,a,r,ns) = self.experience_buffer.get()
                if not self.fixed_alpha:
                  k = self.visits[(s,a)]
                  alpha = self.alpha_fun(k)
                  self.q[(s,a)] = ( (1-alpha) * self.q[(s,a)]+ alpha * (reward + self.discount* max(self.q.get((ns,na),self.qinit)for na in self.actions)))

In [None]:
from rlQTest import senv # simple game environment
mbl1 = Model_based_reinforcement_learner(senv,0.9,updates_per_step=10)
#plot_rl(mbl1,steps_explore=100000,steps_exploit=100000,label="model-based(10)")
mbl2 = Model_based_reinforcement_learner(senv,0.9,updates_per_step=1)
#plot_rl(mbl2,steps_explore=100000,steps_exploit=100000,label="model-based(1)")

In [None]:
# plot_rl(sag1ar,steps_explore=100000,steps_exploit=100000,label="AR
alpha="+str(sag1ar.alpha))
sag2ar = Q_AR_learner(senv,0.9,explore=0.2,fixed_alpha=False)
# plot_rl(sag2ar,steps_explore=100000,steps_exploit=100000,label="AR alpha=1/k")
sag3ar =Q_AR_learner(senv,0.9,explore=0.2,fixed_alpha=False,alpha_fun=lambda k:10/(9+k))
# plot_rl(sag3ar,steps_explore=100000,steps_exploit=100000,label="AR alpha=10/(9+k)")


##13.4 Model-based Reinforcement Learner

In [None]:
import random
from rlQLearner import RL_agent
from display import Displayable
from utilities import argmaxe, flip
class Model_based_reinforcement_learner(RL_agent):
    """A Model-based reinforcement learner
    """
    def __init__(self, env, discount, explore=0.1, qinit=0,
        updates_per_step=10, label="MBR_learner"):
        """env is the environment to interact with.
        discount is the discount factor
        explore is the proportion of time the agent will explore
        qinit is the initial value of the Q's
        updates_per_step is the number of AVI updates per action
        label is the label for plotting
        """
        RL_agent.__init__(self)
        self.env = env
        self.actions = env.actions
        self.discount = discount
        self.explore = explore
        self.qinit = qinit
        self.updates_per_step = updates_per_step
        self.label = label
        self.restart()
      def restart(self):
          """make the agent relearn, and reset the accumulated rewards
          """
          self.acc_rewards = 0
          self.state = self.env.state
          self.q = {} # {(st,action):q_value} map
          self.r = {} # {(st,action):reward} map
          self.t = {} # {(st,action,st_next):count} map
          self.visits = {} # {(st,action):count} map
          self.res_states = {} # {(st,action):set_of_states} map
          self.visits_list =  [] # list of (st,action)
          self.previous_action = None
      def do(self,num_steps=100):
          """do num_steps of interaction with the environment
          for each action, do updates_per_step iterations of asynchronous
          value iteration
          """
          for step in range(num_steps):
              pst = self.state # previous state
              action = self.select_action(pst)
              self.state,reward = self.env.do(action)
              self.acc_rewards += reward
              self.t[(pst,action,self.state)] = self.t.get((pst,
              action,self.state),0)+1
              if (pst,action) in self.visits:
                  self.visits[(pst,action)] += 1
                  self.r[(pst,action)] +=
                  (reward-self.r[(pst,action)])/self.visits[(pst,action)]
                  self.res_states[(pst,action)].add(self.state)
              else:
                  self.visits[(pst,action)] = 1
                  self.r[(pst,action)] = reward
                  self.res_states[(pst,action)] = {self.state}
                  self.visits_list.append((pst,action))
              st,act = pst,action #initial state-action pair for AVI
              for update in range(self.updates_per_step):
                    self.q[(st,act)] = self.r[(st,act)]+self.discount*( sum(self.t[st,act,rst]/self.visits[st,act]* max(self.q.get((rst,nact),self.qinit) for nact in self.actions)
                          for rst in self.res_states[(st,act)]))
                          st,act = random.choice(self.visits_list)
      def select_action(self, state):
          """returns an action to carry out for the current agent
          given the state, and the q-function
          """
          if flip(self.explore):
              return random.choice(self.actions)
          else:
              return argmaxe((next_act, self.q.get((state, next_act),self.qinit)) for next_act in self.actions)

from rlQTest import senv # simple game environment
 mbl1 = Model_based_reinforcement_learner(senv,0.9,updates_per_step=10)
#plot_rl(mbl1,steps_explore=100000,steps_exploit=100000,label="model-based(10)")
 mbl2 = Model_based_reinforcement_learner(senv,0.9,updates_per_step=1)
# plot_rl(mbl2,steps_explore=100000,steps_exploit=100000,label="model-based(1)")


##13.5 Reinforcement Learning with Features


# 13.5.1 Representing Features
A feature is a function from state and action. To construct the features for a
domain, we construct a function that takes a state and an action and returns the
list of all feature values for that state and action. This feature set is redesigned
for each problem.

###rlSimpleGameFeatures.py

In [None]:
from rlSimpleEnv import Simple_game_env
from rlProblem import RL_env
def get_features(state,action):
    """returns the list of feature values for the state-action pair
    """
    assert action in Simple_game_env.actions
    (x,y,d,p) = state
    # f1: would go to a monster
    f1 = monster_ahead(x,y,action)
    # f2: would crash into wall
    f2 = wall_ahead(x,y,action)
    # f3: action is towards a prize
    f3 = towards_prize(x,y,action,p)
    # f4: damaged and action is toward repair station
    f4 = towards_repair(x,y,action) if d else 0
    # f5: damaged and towards monster
    f5 = 1 if d and f1 else 0
    # f6: damaged
    f6 = 1 if d else 0
    # f7: not damaged  
    f7 = 1-f6
    # f8: damaged and prize ahead
    f8 = 1 if d and f3 else 0
    # f9: not damaged and prize ahead
    f9 = 1 if not d and f3 else 0
    features = [1,f1,f2,f3,f4,f5,f6,f7,f8,f9]
    # the next 20 features are for 5 prize locations
    # and 4 distances from outside in all directions
    for pr in Simple_game_env.prize_locs+[None]:
    if p==pr:
      features += [x, 4-x, y, 4-y]
    else:
      features += [0, 0, 0, 0]
    # fp04 feature for y when prize is at 0,4
    # this knows about the wall to the right of the prize
    if p==(0,4):
      if x==0:
        fp04 = y
      elif y<3:
        fp04 = y
      else:
          fp04 = 4-y
    else:
      fp04 = 0
      features.append(fp04)
  return features

def monster_ahead(x,y,action):
    """returns 1 if the location expected to get to by doing
    action from (x,y) can contain a monster.
    """
    if action == "right" and (x+1,y) in Simple_game_env.monster_locs:
      return 1
    elif action == "left" and (x-1,y) in Simple_game_env.monster_locs:
      return 1
    elif action == "up" and (x,y+1) in Simple_game_env.monster_locs:
      return 1
    elif action == "down" and (x,y-1) in Simple_game_env.monster_locs:
      return 1
    else:
      return 0

 def wall_ahead(x,y,action):
    """returns 1 if there is a wall in the direction of action from (x,y).
    This is complicated by the internal walls.
    """
    if action == "right" and (x==Simple_game_env.xdim-1 or (x,y) in   Simple_game_env.vwalls):
        return 1
    elif action == "left" and (x==0 or (x-1,y) in Simple_game_env.vwalls):

        return 1
    elif action == "up" and y==Simple_game_env.ydim-1:
        return 1
    elif action == "down" and y==0:
        return 1
    else:
        return 0

 def towards_prize(x,y,action,p):
    """action goes in the direction of the prize from (x,y)"""
    if p is None:
      return 0
    elif p==(0,4): # take into account the wall near the top-left prize
      if action == "left" and (x>1 or x==1 and y<3):
        return 1
      elif action == "down" and (x>0 and y>2):
        return 1
      elif action == "up" and (x==0 or y<2):
        return 1
      else:
        return 0
    else:
      px,py = p
      if p==(4,4) and x==0:
        if (action=="right" and y<3) or (action=="down" and y>2) or  (action=="up" and y<2):
          return 1
        else:
          return 0
        if (action == "up" and y<py) or (action == "down" and py<y):
          return 1
        elif (action == "left" and px<x) or (action == "right" and x<px):
          return 1
      else:
        return 0
def towards_repair(x,y,action):
    """returns 1 if action is towards the repair station.
    """
    if action == "up" and (x>0 and y<4 or x==0 and y<2):
      return 1
    elif action == "left" and x>1:
      return 1
    elif action == "right" and x==0 and y<3:
      return 1
    elif action == "down" and x==0 and y>2:
      return 1
    else:
      return 0



In [None]:
def simp_features(state,action):
  """returns a list of feature values for the state-action pair
  """
  assert action in Simple_game_env.actions
  (x,y,d,p) = state
  # f1: would go to a monster
  f1 = monster_ahead(x,y,action)
  # f2: would crash into wall
  f2 = wall_ahead(x,y,action)
  # f3: action is towards a prize
  f3 = towards_prize(x,y,action,p)
  return [1,f1,f2,f3]


#13.5.2 Feature-based RL learner

This learns a linear function approximation of the Q-values. It requires the
function get features that given a state and an action returns a list of values for
all of the features. Each environment requires this function to be provided.

In [None]:
import random
from rlQLearner import RL_agent
from display import Displayable
from utilities import argmaxe, flip
class SARSA_LFA_learner(RL_agent):
      """A SARSA_LFA learning agent has
      belief-state consisting of
      state is the previous state
      q is a {(state,action):value} dict
      visits is a {(state,action):n} dict. n is how many times action was done in state
      acc_rewards is the accumulated reward
      it observes (s, r) for some world-state s and real reward r
      """
    def __init__(self, env, get_features, discount, explore=0.2, step_size=0.01,winit=0, label="SARSA_LFA"):
          """env is the feature environment to interact with
          get_features is a function get_features(state,action) that returns the list of feature values
          discount is the discount factor
          explore is the proportion of time the agent will explore
          step_size is gradient descent step size
          winit is the initial value of the weights
          label is the label for plotting
          """
        RL_agent.__init__(self)
        self.env = env
        self.get_features = get_features
        self.actions = env.actions
        self.discount = discount
        self.explore = explore
        self.step_size = step_size
        self.winit = winit
        self.label = label
        self.restart()
    def restart(self):
        """make the agent relearn, and reset the accumulated rewards
        """
        self.acc_rewards = 0
        self.state = self.env.state
        self.features = self.get_features(self.state, list(self.env.actions)[0])
        self.weights = [self.winit for f in self.features]
        self.action = self.select_action(self.state)

    def do(self,num_steps=100):
        """do num_steps of interaction with the environment"""
        self.display(2,"s\ta\tr\ts'\tQ\tdelta")
        for i in range(num_steps):
            next_state,reward = self.env.do(self.action)
            self.acc_rewards += reward
            next_action = self.select_action(next_state)
            feature_values = self.get_features(self.state,self.action)
            oldQ = dot_product(self.weights, feature_values)
            nextQ = dot_product(self.weights, self.get_features(next_state,next_action))
            delta = reward + self.discount * nextQ - oldQ
        for i in range(len(self.weights)):
            self.weights[i] += self.step_size * delta * feature_values[i]
            self.display(2,self.state, self.action, reward, next_state,
            dot_product(self.weights, feature_values), delta, sep='\t')
            self.state = next_state
            self.action = next_action

    def select_action(self, state):
        """returns an action to carry out for the current agent
        given the state, and the q-function.
        This implements an epsilon-greedy approach
        where self.explore is the probability of exploring."""

        if flip(self.explore):
            return random.choice(self.actions)
        else:
            return argmaxe((next_act, dot_product(self.weights, self.get_features(state,next_act))) for next_act in self.actions)

    def show_actions(self,state=None):
      """prints the value for each action in a state.
      This may be useful for debugging.
      """
      if state is None:
        state = self.state
      for next_act in self.actions:
        print(next_act,dot_product(self.weights, self.get_features(state,next_act)))

    def dot_product(l1,l2):
        return sum(e1*e2 for (e1,e2) in zip(l1,l2))

##Test code:


In [None]:
from rlQTest import senv # simple game environment
from rlSimpleGameFeatures import get_features, simp_features
from rlPlot import plot_rl
fa1 = SARSA_LFA_learner(senv, get_features, 0.9, step_size=0.01)
#fa1.max_display_level = 2
#fa1.do(20)
#plot_rl(fa1,steps_explore=10000,steps_exploit=10000,label="SARSA_LFA(0.01)")
fas1 = SARSA_LFA_learner(senv, simp_features, 0.9, step_size=0.01)
#plot_rl(fas1,steps_explore=10000,steps_exploit=10000,label="SARSA_LFA(simp)")

##13.5.3 Experience Replay


In [None]:
from rlFeatures import SARSA_LFA_learner, dot_product
from utilities import flip
import random
class SARSA_LFA_AR_learner(SARSA_LFA_learner):
    def __init__(self, env, get_features, discount, explore=0.2,step_size=0.01,winit=0, label="SARSA_LFA-AR", max_buffer_size=500,num_updates_per_action=5, burn_in=100 ):
        SARSA_LFA_learner.__init__(self, env, get_features, discount,explore, step_size,winit, label)
        self. max_buffer_size = max_buffer_size
        self.action_buffer = [0]*max_buffer_size
        self.number_added = 0
        self.num_updates_per_action = num_updates_per_action
        self.burn_in = burn_in
        def add_to_buffer(self,experience):
        if self.number_added < self.max_buffer_size:
            self.action_buffer[self.number_added] = experience
        else:
          if flip(self.max_buffer_size/self.number_added):
            position = random.randrange(self.max_buffer_size)
            self.action_buffer[position] = experience
        self.number_added += 1

    def do(self,num_steps=100):
      """do num_steps of interaction with the environment"""
      self.display(2,"s\ta\tr\ts'\tQ\tdelta")
      for i in range(num_steps): 
        next_state,reward = self.env.do(self.action)
        self.add_to_buffer((self.state,self.action,reward,next_state)) #remember experience
        self.acc_rewards += reward
        next_action = self.select_action(next_state)
        feature_values = self.get_features(self.state,self.action)
        oldQ = dot_product(self.weights, feature_values)
        nextQ = dot_product(self.weights, self.get_features(next_state,next_action))
        delta = reward + self.discount * nextQ - oldQ
      for i in range(len(self.weights)):
      self.weights[i] += self.step_size * delta * feature_values[i]
      self.display(2,self.state, self.action, reward, next_state,
      dot_product(self.weights, feature_values), delta, sep='\t')
      self.state = next_state
      self.action = next_action
      if self.number_added > self.burn_in:
      for i in range(self.num_updates_per_action):
      (s,a,r,ns) =
      self.action_buffer[random.randrange(min(self.number_added,
      self.max_buffer_size))]
      na = self.select_action(ns)
      feature_values = self.get_features(s,a)
      oldQ = dot_product(self.weights, feature_values)
      nextQ = dot_product(self.weights, self.get_features(ns,na))
      delta = reward + self.discount * nextQ - oldQ
      for i in range(len(self.weights)):
      self.weights[i] += self.step_size * delta *
      feature_values[i]

from rlQTest import senv # simple game environment
from rlSimpleGameFeatures import get_features, simp_features
from rlPlot import plot_rl

fa1 = SARSA_LFA_AR_learner(senv, get_features, 0.9, step_size=0.01)
#fa1.max_display_level = 2
#fa1.do(20)
#plot_rl(fa1,steps_explore=10000,steps_exploit=10000,label="SARSA_LFA_AR(0.01)")
fas1 = SARSA_LFA_AR_learner(senv, simp_features, 0.9, step_size=0.01)
#plot_rl(fas1,steps_explore=10000,steps_exploit=10000,label="SARSA_LFA_AR(simp)")


##13.6 Multiagent Learning

###masLearn.py

In [None]:
from display import Displayable
import utilities # argmaxall for (element,value) pairs
import matplotlib.pyplot as plt
import random
class GameAgent(Displayable):
    next_id=0
    def __init__(self, actions):
      """
      Actions is the set of actions the agent can do. It needs to be told that!
      """
      self.actions = actions
      self.id = GameAgent.next_id
      GameAgent.next_id += 1
      self.display(2,f"Agent {self.id} has actions {actions}")
      self.dist = {act:1 for act in actions} # unnormalized distibution
      self.total_score = 0
    def init_action(self):
      """ The initial action.
      Act randomly initially
      Could be overridden (but I'm not sure why you would).
      """
      self.act = random.choice(self.actions)
      return self.act
    def select_action(self, reward):
    """
    Select the action given the reward.
    This implements "Act randomly" and should be overridden!
    """
    self.total_score += reward
    self.act = random.choice(self.actions)
    return self.act


##masLearn.py

In [None]:
class SimpleCountingAgent(GameAgent):
  """This agent just counts the number of times (it thinks) it has won and does the
  actions it thinks is most likely to win.
  """   
  def __init__(self, actions, prior_count=1):
    """
    Actions is the set of actions the agent can do. It needs to be toldthat!
    """
    GameAgent.__init__(self, actions)
    self.prior_count = prior_count
    self.dist = {a: prior_count for a in self.actions} # unnormalized distibution
    self.averew = 0
    self.num_steps = 0

  def select_action(self, reward):
    self.total_score += reward
    self.num_steps += 1
    self.display(2,f"The reward for agent {self.id} was {reward}")
    self.averew = self.averew+(reward-self.averew)/self.num_steps
    if reward>self.averew:
        self.dist[self.act] += 1
    else:
        for otheract in self.actions:
        if otheract != self.act:
          self.dist[otheract] += 1/(len(self.actions))
          self.display(2,f"Distribution for agent {self.id} is {normalize(self.dist)}")
          self.act = select_from_dist(self.dist)
          self.display(2,f"Agent {self.id} did {self.act}")
        return self.act

class SimpleQAgent(GameAgent):
    """This agent maintains the Q-function for each state.
    (Or just the average reward as the future state is all the same).
    Chooses the best action using
    """
    def __init__(self, actions, q_init=100, alpha=0.1, prob_step_size=0.001, min_prob=0.01):
        """
        Actions is the set of actions the agent can do. It needs to be told that!
        q_init is the initial q-values
        alpha is the step size for action estimate
        prob_step_size is the step size for probability change
        min_prob is the minimum a probability should become
        """
        GameAgent.__init__(self, actions)
        self.Q = {a:q_init for a in self.actions}
        self.dist = normalize({a:0.7+random.random() for a in
        self.actions}) # start with random dist but not too close to
        zero

        self.alpha = alpha
        self.prob_step_size = prob_step_size
        self.min_prob = min_prob
        self.num_steps = 1 # (1 because it isonly used after initial step)

    def select_action(self, reward):
      self.total_score += reward
      self.display(2,f"The reward for agent {self.id} was {reward}")
      self.Q[self.act] += self.alpha*(reward-self.Q[self.act])
      a_best = utilities.argmaxall(self.Q.items())
      for a in self.actions:
        if a in a_best:
            self.dist[a] += self.prob_step_size
        else:
            self.dist[a] -= min(self.dist[a], self.prob_step_size)
            self.dist[a] = max(self.dist[a],self.min_prob)
            self.dist = normalize(self.dist)
            self.display(2,f"Distribution for agent {self.id} is {self.dist}")
            self.act = select_from_dist(self.dist)
            self.display(2,f"Agent {self.id} did {self.act}")
      return self.act

    def normalize(dist):
      """unnorm dict is a {value:number} dictionary, where the numbers are all non-negative
      returns dict where the numbers sum to one
      """
      tot = sum(dist.values())
      return {var:val/tot for (var,val) in dist.items()}

    def select_from_dist(dist):
        rand = random.random()
        for (act,prob) in normalize(dist).items():
          rand -= prob
          if rand < 0:
            return act



In [None]:
class SimulateGame(Displayable):
  def __init__(self, game, agents):
      self.game = game
      self.agents = agents # list of agents
      self.action_history = []
      self.reward_history = []
      self.dist_history = []
      self.actions = tuple(ag.init_action() for ag in self.agents)
      self.num_steps = 0

  def go(self, steps): 
      for i in range(steps):
      self.num_steps += 1
      self.rewards = self.game.play(self.actions)
      self.reward_history.append(self.rewards)
      self.actions =tuple(self.agents[i].select_action(self.rewards[i]) for i in range(self.game.num_agents))
      self.action_history.append(self.actions)
      self.dist_history.append([normalize(ag.dist) for ag in self.agents])
      print("Scores:", ' '.join(f"Agent {ag.id} average reward={ag.total_score/self.num_steps}" for ag in self.agents))
      #return self.reward_history, self.action_history

  def action_dist(self,which_actions=[1,1]):
      """ which actions is [a0,a1]
      returns the empirical disctribition of actions for agents,
      where ai specifies the index of the actions for agent i
      """
      return [sum(1 for a in sim.action_history if a[i]==gm.actions[i][which_actions[i]])/len(sim.action_history) for i in range(2)]

  def plot_dynamics(self, x_action=0, y_action=0):
    plt.ion() # make it interactive
    agents = self.agents
    x_act = self.game.actions[0][x_action]
    y_act = self.game.actions[1][y_action]
    plt.xlabel(f"Action {self.agents[0].actions[x_action]} for Agent {agents[0].id}")
    plt.ylabel(f"Action {self.agents[1].actions[y_action]} for Agent {agents[1].id}")
    plt.plot([self.dist_history[t][0][x_act] for t in range(len(self.dist_history))], [self.dist_history[t][1][y_act] for t in range(len(self.dist_history))])
    #plt.legend()


###masLearn.py

In [None]:
class ShoppingGame(Displayable):
  def __init__(self):
    self.num_agents = 2
    self.actions = [['shopping', 'football']]*2

  def play(self, actions):
      return {('football', 'football'): (2,1),('football', 'shopping'): (0,0),('shopping', 'football'): (0,0), ('shopping', 'shopping'): (1,2)}[actions]


class SoccerGame(Displayable):
  def __init__(self):
    self.num_agents = 2
    self.actions = [['left', 'right']]*2

  def play(self, actions):
    return {('left', 'left'): (0.6, 0.4),
    ('left', 'right'): (0.2, 0.8),
    ('right', 'left'): (0.3, 0.7),
    ('right', 'right'): (0.9,0.1)
    }[actions]

class GameShow(Displayable):
  def __init__(self):
    self.num_agents = 2
    self.actions = [['take', 'give']]*2

  def play(self, actions):
    return {('take', 'take'): (100, 100),
    ('take', 'give'): (1100, 0),
    ('give', 'take'): (0, 1100),
    ('give', 'give'): (1000,1000)
    }[actions]

class UniqueNEGameExample(Displayable):
  def __init__(self):
    self.num_agents = 2
    self.actions = [['a1', 'b1', 'c1'],['d2', 'e2', 'f2']]
    def play(self, actions):
      return {('a1', 'd2'): (3, 5),
      ('a1', 'e2'): (5, 1),
      ('a1', 'f2'): (1, 2),
      ('b1', 'd2'): (1, 1),
      ('b1', 'e2'): (2, 9),
      ('b1', 'f2'): (6, 4),
      ('c1', 'd2'): (2, 6),
      ('c1', 'e2'): (4, 7),
      ('c1', 'f2'): (0, 8)
      }[actions]
# Choose one:
# gm = ShoppingGame()
# gm = SoccerGame()
# gm = GameShow()
# gm = UniqueNEGameExample()

# Choose one:
# sim=SimulateGame(gm,[SimpleQAgent(gm.actions[0]),
SimpleQAgent(gm.actions[1])]); sim.go(10000)
# sim= SimulateGame(gm,[SimpleCountingAgent(gm.actions[0]),
SimpleCountingAgent(gm.actions[1])]); sim.go(10000)
# sim=SimulateGame(gm,[SimpleCountingAgent(gm.actions[0]),
SimpleQAgent(gm.actions[1])]); sim.go(10000)
# sim.plot_dynamics()
# empirical proportion that agents did their action at index 1:
# sim.action_dist([1,1])
# learned distribution for agent 0
# sim.agents[0].dist


#Chapter-7 Relational Learning

##relnCollFilt.py

In [None]:
import random
12 import matplotlib.pyplot as plt
13 import urllib.request
14 from learnProblem import Learner
15 from display import Displayable
16
17 class CF_learner(Learner):
18 def __init__(self,
19 rating_set, # a Rating_set object
20 rating_subset = None, # subset of ratings to be used as
training ratings
21 test_subset = None, # subset of ratings to be used as test
ratings
22 step_size = 0.01, # gradient descent step size
23 reglz = 1.0, # the weight for the regularization
terms
24 num_properties = 10, # number of hidden properties
25 property_range = 0.02 # properties are initialized to be
between
26 # -property_range and property_range
):
28 self.rating_set = rating_set
29 self.ratings = rating_subset or rating_set.training_ratings #
whichever is not empty
30 if test_subset is None:
31 self.test_ratings = self.rating_set.test_ratings
32 else:
33 self.test_ratings = test_subset
34 self.step_size = step_size
35 self.reglz = reglz
36 self.num_properties = num_properties
37 self.num_ratings = len(self.ratings)
38 self.ave_rating = (sum(r for (u,i,r,t) in self.ratings)
39 /self.num_ratings)
40 self.users = {u for (u,i,r,t) in self.ratings}
41 self.items = {i for (u,i,r,t) in self.ratings}
42 self.user_bias = {u:0 for u in self.users}
43 self.item_bias = {i:0 for i in self.items}
44 self.user_prop = {u:[random.uniform(-property_range,property_range)
45 for p in range(num_properties)]
46 for u in self.users}
47 self.item_prop = {i:[random.uniform(-property_range,property_range)
48 for p in range(num_properties)]
49 for i in self.items}
50 self.zeros = [0 for p in range(num_properties)]
51 self.iter=0
52
53 def stats(self):
54 self.display(1,"ave sumsq error of mean for training=",
55 sum((self.ave_rating-rating)**2 for
(user,item,rating,timestamp)
56 in self.ratings)/len(self.ratings))
57 self.display(1,"ave sumsq error of mean for test=",
58 sum((self.ave_rating-rating)**2 for
(user,item,rating,timestamp)
59 in self.test_ratings)/len(self.test_ratings))
60 self.display(1,"error on training set",
61 self.evaluate(self.ratings))
62 self.display(1,"error on test set",
63 self.evaluate(self.test_ratings))


###relnCollFilt.py

In [None]:
def prediction(self,user,item):
66 """Returns prediction for this user on this item.
67 The use of .get() is to handle users or items not in the training
set.
68 """
69 return (self.ave_rating
70 + self.user_bias.get(user,0) #self.user_bias[user]
+ self.item_bias.get(item,0) #self.item_bias[item]
72 +
sum([self.user_prop.get(user,self.zeros)[p]*self.item_prop.get(item,self.zeros)73 for p in range(self.num_properties)]))
74
75 def learn(self, num_iter = 50):
76 """ do num_iter iterations of gradient descent."""
77 for i in range(num_iter):
78 self.iter += 1
79 abs_error=0
80 sumsq_error=0
81 for (user,item,rating,timestamp) in
random.sample(self.ratings,len(self.ratings)):
82 error = self.prediction(user,item) - rating
83 abs_error += abs(error)
84 sumsq_error += error * error
85 self.user_bias[user] -= self.step_size*error
86 self.item_bias[item] -= self.step_size*error
87 for p in range(self.num_properties):
88 self.user_prop[user][p] -=
self.step_size*error*self.item_prop[item][p]
89 self.item_prop[item][p] -=
self.step_size*error*self.user_prop[user][p]
90 for user in self.users:
91 self.user_bias[user] -= self.step_size*self.reglz*
self.user_bias[user]
92 for p in range(self.num_properties):
93 self.user_prop[user][p] -=
self.step_size*self.reglz*self.user_prop[user][p]
94 for item in self.items:
95 self.item_bias[item] -=
self.step_size*self.reglz*self.item_bias[item]
96 for p in range(self.num_properties):
97 self.item_prop[item][p] -=
self.step_size*self.reglz*self.item_prop[item][p]
98 self.display(1,"Iteration",self.iter,
99 "(Ave Abs,AveSumSq) training
=",self.evaluate(self.ratings),
100 "test =",self.evaluate(self.test_ratings))


###relnCollFilt.py —

In [None]:
def evaluate(self,ratings):
103 """returns (avergage_absolute_error, average_sum_squares_error) for
ratings
104 """
105 abs_error = 0
106 sumsq_error = 0
107 if not ratings: return (0,0)
108 for (user,item,rating,timestamp) in ratings:
error = self.prediction(user,item) - rating
110 abs_error += abs(error)
111 sumsq_error += error * error
112 return abs_error/len(ratings), sumsq_error/len(ratings)

###14.1.1 Alternative Formulation


An alternative formulation is to regularize after each update.

###14.1.2 Plotting

###relnCollFilt.py

In [None]:
def plot_predictions(self, examples="test"):
    """
    examples is either "test" or "training" or the actual examples
    """
    if examples == "test":
        examples = self.test_ratings
    elif examples == "training":
      examples = self.ratings
      plt.ion()
      plt.xlabel("prediction")
      plt.ylabel("cumulative proportion")
      self.actuals = [[] for r in range(0,6)]
      for (user,item,rating,timestamp) in examples:
          self.actuals[rating].append(self.prediction(user,item))
      for rating in range(1,6):
      self.actuals[rating].sort()
      numrat=len(self.actuals[rating])
      yvals = [i/numrat for i in range(numrat)]
    plt.plot(self.actuals[rating], yvals, label="rating="+str(rating))
    plt.legend()
    plt.draw()

  def plot_property(self,
  p, # property
  plot_all=False, # true if all points should be plotted
  num_points=200 # number of random points plotted if not all 140
  ):
  """plot some of the user-movie ratings, if plot_all is true
  num_points is the number of points selected at random plotted.
  the plot has the users on the x-axis sorted by their value on property p and
  with the items on the y-axis sorted by their value on property p and
  the ratings plotted at the corresponding x-y position.
  """
  plt.ion()
  plt.xlabel("users")
  plt.ylabel("items")
  user_vals = [self.user_prop[u][p] for u in self.users]
  item_vals = [self.item_prop[i][p] for i in self.items]
  plt.axis([min(user_vals)-0.02, max(user_vals)+0.05, min(item_vals)-0.02, max(item_vals)+0.05])
  if plot_all:
    for (u,i,r,t) in self.ratings:
      plt.text(self.user_prop[u][p], self.item_prop[i][p], str(r))
  else:
    for i in range(num_points):
      (u,i,r,t) = random.choice(self.ratings)
      plt.text(self.user_prop[u][p],self.item_prop[i][p],str(r))
      plt.show()

###14.1.3 Creating Rating Sets

In [None]:
class Rating_set(Displayable):
  def __init__(self, date_split=892000000,local_file=False,url="http://files.grouplens.org/datasets/movielens/ml-100k/u.data", file_name="u.data"):
    self.display(1,"reading...")
    if local_file:
        lines = open(file_name,'r')
    else:
        lines = (line.decode('utf-8') for line in
        urllib.request.urlopen(url))
    all_ratings = (tuple(int(e) for e in line.strip().split('\t'))for line in lines)
    self.training_ratings = []
    self.training_stats = {1:0, 2:0, 3:0, 4:0 ,5:0}
    self.test_ratings = []
    self.test_stats = {1:0, 2:0, 3:0, 4:0 ,5:0}
    for rate in all_ratings:
      if rate[3] < date_split: # rate[3] is timestamp
        self.training_ratings.append(rate)
        self.training_stats[rate[2]] += 1
      else:
        self.test_ratings.append(rate)
        self.test_stats[rate[2]] += 1
        self.display(1,"...read:", len(self.training_ratings),"training ratings and", len(self.test_ratings),"test ratings")
        tr_users = {user for (user,item,rating,timestamp) in self.training_ratings}
        test_users = {user for (user,item,rating,timestamp) in self.test_ratings}
        self.display(1,"users:",len(tr_users),"training,",len(test_users),"test,", len(tr_users & test_users),"in common")
        tr_items = {item for (user,item,rating,timestamp) in self.training_ratings}
        test_items = {item for (user,item,rating,timestamp) in self.test_ratings}
        self.display(1,"items:",len(tr_items),"training,",len(test_items),"test,", len(tr_items & test_items),"in common")
        self.display(1,"Rating statistics for training set",self.training_stats)
        self.display(1,"Rating statistics for test set: ",self.test_stats)


Sometimes it is useful to plot a property for all (user, item,rating) triples.
There are too many such triples in the data set. The method create top subset
creates a much smaller dataset where this makes sense. It picks the most rated
items, then picks the users who have the most ratings on these items. It is
designed for depicting the meaning of properties, and may not be useful for
other purposes.

###relnCollFilt.py

In [None]:
def create_top_subset(self, num_items = 30, num_users = 30):
    """Returns a subset of the ratings by picking the most rated items,
    and then the users that have most ratings on these, and then all of the
    ratings that involve these users and items.
    """
    items = {item for (user,item,rating,timestamp) in self.training_ratings}
    item_counts = {i:0 for i in items}
    for (user,item,rating,timestamp) in self.training_ratings:
      item_counts[item] += 1

      items_sorted = sorted((item_counts[i],i) for i in items)
      top_items = items_sorted[-num_items:]
      set_top_items = set(item for (count, item) in top_items)
      users = {user for (user,item,rating,timestamp) in self.training_ratings}
      user_counts = {u:0 for u in users}
      for (user,item,rating,timestamp) in self.training_ratings:
        if item in set_top_items:
        user_counts[user] += 1
        users_sorted = sorted((user_counts[u],u) for u in users)
        top_users = users_sorted[-num_users:]
        set_top_users = set(user for (count, user) in top_users)
        used_ratings = [ (user,item,rating,timestamp) for (user,item,rating,timestamp) in self.training_ratings if user in set_top_users and item in set_top_items]
      return used_ratings

movielens = Rating_set()
learner1 = CF_learner(movielens, num_properties = 1)
#learner1.learn(50)
# learner1.plot_predictions(examples = "training")
# learner1.plot_predictions(examples = "test")
#learner1.plot_property(0)
#movielens_subset = movielens.create_top_subset(num_items = 20, num_users = 20)
#learner_s = CF_learner(movielens, rating_subset=movielens_subset,
test_subset=[], num_properties=1)
#learner_s.learn(1000)
#learner_s.plot_property(0,plot_all=True)
