In [1]:
import numpy as np
import time, sys, math
from collections import defaultdict
# Students will submit their files with their team-name.py 
# Student have to use the Team as their parent class

In [2]:
class TwoPlayersGameMonteCarloTreeSearchNode:

    def __init__(self, state, feature_batter, feature_bowler, batting_action=None, parent=None):
        self.state = state
        self.parent = parent
        self.children = []
        self._number_of_visits = np.zeros(6)
        self._results = 0
        self._untried_actions = None
        self.feature_batter = feature_batter
        self.feature_bowler = feature_bowler
        self.batting_action = batting_action

    @property
    def untried_actions(self):
        if self._untried_actions is None:
            self._untried_actions = self.state.get_legal_actions()
        return self._untried_actions

    @property
    def q(self):
        return self._results

    @property
    def n(self):
        if self.batting_action == None:
          return self._number_of_visits[0]
        return self._number_of_visits[5 if self.batting_action==6 else self.batting_action]

    def expand(self):
        batting_action = self.untried_actions.pop()
        bowling_action = self.rollout_policy([0,1,2])
        next_state = self.state.move(self.feature_batter, self.feature_bowler, batting_action, bowling_action)
        child_node = TwoPlayersGameMonteCarloTreeSearchNode(next_state,self.feature_batter, self.feature_bowler, batting_action, parent=self)
        self.children.append(child_node)
        return child_node

    def is_fully_expanded(self):
        return len(self.untried_actions) == 0

    def best_child(self, c_param=1.4):
        choices_weights = [
            (c.q / c.n) + c_param * np.sqrt((2 * np.log(self.n) / c.n))
            for c in self.children
        ]
        return self.children[np.argmax(choices_weights)]

    def rollout_policy(self, possible_moves):        
        return possible_moves[np.random.randint(len(possible_moves))]

    def is_terminal_node(self):
        return self.state.is_game_over()

    def rollout(self):
        current_rollout_state = self.state
        while not current_rollout_state.is_game_over():
            possible_moves = current_rollout_state.get_legal_actions()
            batting_action = self.rollout_policy(possible_moves)
            bowling_action = self.rollout_policy([0,1,2])
            current_rollout_state = current_rollout_state.move(self.feature_batter, self.feature_bowler, batting_action, bowling_action)
        return current_rollout_state.game_result

    def backpropagate(self, result):
        self._number_of_visits[5 if self.batting_action==6 else self.batting_action] += 1.
        self._results = result
        if self.parent:
            self.parent.backpropagate(result)

In [3]:
class MonteCarloTreeSearch(object):

    def __init__(self, node):
        self.root = node

    def best_action(self, simulations_time=None):
        end_time = time.time() + simulations_time
        while(time.time()<end_time):
            v = self._tree_policy()
            reward = v.rollout()
            v.backpropagate(reward)
        # to select best child go for exploitation only
        return self.root.best_child(c_param=0.).batting_action

    def _tree_policy(self):
        current_node = self.root
        while not current_node.is_terminal_node():
            if not current_node.is_fully_expanded():
                return current_node.expand()
            else:
                current_node = current_node.best_child()
        return current_node

In [4]:
class batting_state:
  def __init__(self, ball, wickets_left, total_runs, score_to_chase, environment):
    self.ball = ball
    self.wickets_left = wickets_left 
    self.total_runs = total_runs
    self.score_to_chase = score_to_chase
    self.environment = environment

  @property
  def game_result(self):
    return self.total_runs

  def is_game_over(self):
    if self.ball < 60 and self.wickets_left > 0 and self.score_to_chase > self.total_runs:
      return False
    else:
      return True

  def move(self, feature_batter, feature_bowler, batting_action, bowling_action):
    wicket, runs = self.environment.get_outcome(feature_batter, feature_bowler, batting_action, bowling_action)
    self.total_runs += runs
    self.wickets_left -= wicket
    self.ball += 1
    return batting_state(self.ball, self.wickets_left, self.total_runs, self.score_to_chase, self.environment)

  def get_legal_actions(self):
    return [0,1,2,3,4,6]

In [5]:
class Team:
  def __init__(self,self_features,opponent_features, environment):
    self.self_features = self_features
    self.opponent_features = opponent_features
    self.explore_wicket = 0
    self.explore_runs = 0
    self.batting_order = np.zeros(5)
    self.environment = environment

  def get_next_batter(self):
    next_batter = self.batting_order[0]
    self.batting_order = np.delete(self.batting_order, 0)
    return next_batter

  def get_next_bowler(self):
    return np.random.randint(0,5)

  def get_batting_action(self,ball,total_runs,wickets_left,score_to_chase,feature_batter,feature_bowler):
    state = batting_state(ball, wickets_left, total_runs, score_to_chase,self.environment)
    root = TwoPlayersGameMonteCarloTreeSearchNode(state, feature_batter, feature_bowler)  
    mcts = MonteCarloTreeSearch(root)
    batting_action = mcts.best_action(9)   
    return batting_action

  def get_bowling_action(self,ball,total_runs,wickets_left,score_to_chase,feature_batter,feature_bowler):
    bowling_action = np.random.randint(0,3) 
    return bowling_action

  def explore(self, explore_num_balls):
    runs_by_players = np.zeros(5)
    wickets_of_players = np.zeros(5)
    for i in range(5):
      feature_batter = self.opponent_features[i,0:2]
      for j in range(explore_num_balls//5):
        bowler = self.get_next_bowler()
        feature_bowler = self.opponent_features[bowler,2:4]
        batting_action = np.random.choice([0,1,2,3,4,6])
        bowling_action = np.random.randint(0,3)
        wicket, runs = self.environment.get_outcome(feature_batter, feature_bowler, batting_action, bowling_action)
        runs_by_players[i] += runs
        wickets_of_players[i] += wicket
    players_batting_score = runs_by_players/(wickets_of_players+1)
    self.batting_order = np.argsort(players_batting_score)
    return self.batting_order

class Australia(Team):
  pass

class India(Team):
  pass


In [6]:
class Environment: 
    def get_outcome(self,feature_batter, feature_bowler, batting_action, bowling_action):
      ### Probability of getting out given a particular feature
      p_out_bowl_act = {0: 0.3, 1: 0.6, 2: 0.9}
      p_out_bat_act = {0: 0.3, 1: 0.4, 2: 0.5, 3: 0.6, 4: 0.7, 6: 0.8}
      p_out_avg_bat = {1: 0.5, 2: 0.6, 3: 0.7, 4: 0.8, 5: 0.9}
      p_out_avg_bowl = {1: 0.9, 2: 0.8, 3: 0.7, 4: 0.6, 5: 0.5}

      ### Probability of scoring runs
      p_runs_bowl_act = {0: 0.7, 1: 0.8, 2: 0.9}
      p_runs_str_rate = {1: 0.95, 2: 0.9, 3: 0.85, 4: 0.8, 5: 0.75}
      p_runs_bowl_eco = {1: 0.75, 2: 0.8, 3: 0.85, 4: 0.9, 5: 0.95}

      p_out = p_out_bowl_act[bowling_action]*p_out_bat_act[batting_action]*p_out_avg_bat[feature_batter[0]]*p_out_avg_bowl[feature_bowler[0]]
      p_runs = p_runs_bowl_act[bowling_action]*p_runs_str_rate[feature_batter[1]]*p_runs_bowl_eco[feature_bowler[1]]

      out = np.random.random_sample()
      if out <= p_out:
        wicket = 1
        runs = 0
      else:
        wicket = 0
        runs = batting_action*p_runs

      return wicket, runs

class Match:

  def __init__(self,num_balls, explore_num_balls,action_timeout, explore_timeout, TeamOne, TeamTwo):
    self.environment = Environment()
    self.num_balls = num_balls
    self.explore_num_balls = explore_num_balls
    self.action_timeout = action_timeout          # time limit for any given act
    self.explore_timeout = explore_timeout        # time limit for exploration phase
    self.team_one_features = self.team_features()   #feature[0] : batting average, feature[1]: strike-rate, feature[2]: bowling average, feature[3]: economy
    self.team_two_features = self.team_features()
    self.team_one = TeamOne(self.team_one_features,self.team_two_features,self.environment)
    self.team_two = TeamTwo(self.team_two_features,self.team_one_features,self.environment)
    self.current_batters_list = np.array([1,1,1,1,1])  # a coordinate is set to 0 when that corresponding batter gets out, if the third player is out then you have [1,1,0,1,1]
    self.current_bowlers_list = np.array([2,2,2,2,2])  # if the 4th bowler bowls the first over, after first over we have [2,2,2,1,2].
    self.num_miss_team_batting = 0
    self.num_miss_team_bowling = 0

  def team_features(self):
    features = []
    for i in range(4):
      feat = np.arange(5)+1
      np.random.shuffle(feat)
      features.append(feat)
    features = np.array(features).T
    return features

  def explore_phase_team(self,team_id):    #timed phase
    if (team_id == 1):
      team = self.team_one
    else:
      team = self.team_two
    start_time = time.time()
    batting_order = team.explore(self.explore_num_balls)    
    end_time = time.time()  
    if(end_time - start_time > self.explore_timeout):  
      print("Timing Violation During Exploration Phase")
    return batting_order
      

  def explore_phase(self):     #allow each of the teams to explore
    batting_order = self.explore_phase_team(1)
    print(f"Australia Batting Order: {batting_order}")
    batting_order = self.explore_phase_team(2)
    print(f"India Batting Order: {batting_order}")


  def get_valid_bowler(self,next_bowler):
    if (self.current_bowlers_list[next_bowler]==0):
      bowlers_with_overs_left = np.where(self.current_bowlers_list>0)[0]
      next_bowler = np.random.choice(bowlers_with_overs_left)
    return next_bowler


  def get_valid_batter(self,next_batter):
    if (self.current_batters_list[next_batter]==0):
      batters_not_out = np.where(self.current_batters_list>0)[0]
      next_batter = np.random.choice(batters_not_out)
    return next_batter

  def next_batter(self,team_batting):
    next_batter = team_batting.get_next_batter()
    next_batter = self.get_valid_batter(next_batter)
    feature_batter = team_batting.self_features[next_batter,0:2]
    return next_batter,feature_batter


  def next_bowler(self,team_bowling):
    next_bowler = team_bowling.get_next_bowler()
    next_bowler = self.get_valid_bowler(next_bowler)
    feature_bowler = team_bowling.self_features[next_bowler,2:4]
    return next_bowler, feature_bowler

  def get_team_batting_action(self,team_batting,ball,total_runs,wickets_left,runs_to_chase,feature_batter,feature_bowler):
    start_time      = time.time()
    batting_action  = team_batting.get_batting_action(ball,total_runs,wickets_left,runs_to_chase,feature_batter,feature_bowler)
    end_time        = time.time()
    if(end_time - start_time > self.action_timeout):
      batting_action = 0 #this is the default option, we have to fix the penalisation strategy
      self.num_miss_team_batting = self.num_miss_team_batting + 1
    return batting_action


  def get_team_bowling_action(self,team_bowling,ball,total_runs,wickets_left,runs_to_chase,feature_batter,feature_bowler):
    start_time      = time.time()
    bowling_action  = team_bowling.get_bowling_action(ball,total_runs,wickets_left,runs_to_chase,feature_batter,feature_bowler)
    end_time        = time.time()
    if(end_time - start_time > self.action_timeout):
      bowling_action = 0 #this is the default option, we have to fix the penalisation strategy
      self.num_miss_team_bowling = self.num_miss_team_bowling + 1
    return bowling_action

  def innings(self,innings_id,runs_to_chase):
    print("Note: Score is in decimals beacuse of the probability of scoring runs for an action."); print()
    total_runs = 0
    wickets_left = 5
    self.current_batters_list =np.array([1,1,1,1,1])
    self.current_bowlers_list =np.array([2,2,2,2,2])
    self.num_miss_team_batting = 0
    self.num_miss_team_bowling = 0
    if (innings_id == 1 ):
      team_batting = self.team_one
      team_bowling = self.team_two
    else:
      team_batting = self.team_two
      team_bowling = self.team_one
    # Initialising the first batter and first bowler
    batter, feature_batter = self.next_batter(team_batting)
    bowler, feature_bowler = self.next_bowler(team_bowling)
    
    for ball in range(self.num_balls):
      if np.sum(self.current_batters_list) > 0 :
        batting_action = self.get_team_batting_action(team_batting,ball,total_runs,wickets_left,runs_to_chase,feature_batter,feature_bowler)
        bowling_action = self.get_team_bowling_action(team_bowling,ball,total_runs,wickets_left,runs_to_chase,feature_batter,feature_bowler)
        wicket, runs   = self.environment.get_outcome(feature_batter, feature_bowler, batting_action, bowling_action)
        total_runs     = round(total_runs+runs,2)
        
        if (wicket > 0):
          wickets_left -= 1
          self.current_batters_list[batter] = 0
          if(np.sum(self.current_batters_list) > 0 ):
            batter,feature_batter = self.next_batter(team_batting)

        print("\r", f"Batting_action:{batting_action}, Score: {total_runs}/{5-wickets_left}, Overs:{(ball+1)//6}.{(ball+1)%6}", end="")

        if ((ball+1)%6 ==0 ):
          self.current_bowlers_list[bowler] = self.current_bowlers_list[bowler]-1
          if(np.sum(self.current_bowlers_list) > 0 ) :
            bowler, feature_bowler = self.next_bowler(team_bowling)
        if (runs_to_chase<total_runs):
          break

    print()
    print("---"*20)
    if (innings_id == 1):
      print(f"First innings over! Australia scored {total_runs}. Wickets Left: {wickets_left}")
      print(f"Target for India is {math.floor(total_runs)+1}")
    else:
      print(f"Second innings over! India scored {total_runs}. Wickets Left: {wickets_left}")
      if runs_to_chase > total_runs :
        print(f"Australia won by {runs_to_chase - total_runs} runs!!!")
      elif runs_to_chase < total_runs :
        print(f"Inida won by {wickets_left} wickets!!!")
      else:
        print("Match is draw!!!")
    print("---"*20)
    print()

    return total_runs, self.current_batters_list, self.current_bowlers_list, self.num_miss_team_batting, self.num_miss_team_bowling

In [7]:
# setting time outs
explore_timeout = 10
action_timeout = 10

explore_num_balls = 175000
num_balls = 60         
match = Match(num_balls, explore_num_balls,action_timeout, explore_timeout,Australia,India)
match.explore_phase()

Australia Batting Order: [3 4 0 2 1]
India Batting Order: [4 3 0 2 1]


In [8]:
first_innings_score, batters_list, bowlers_list, num_miss_team_batting, num_miss_team_bowling = match.innings(1,float('inf'))

Note: Score is in decimals beacuse of the probability of scoring runs for an action.

 Batting_action:4, Score: 56.0/5, Overs:5.3
------------------------------------------------------------
First innings over! Australia scored 56.0. Wickets Left: 0
Target for India is 57
------------------------------------------------------------



In [9]:
second_innings_score, batters_list, bowlers_list, num_miss_team_batting, num_miss_team_bowling = match.innings(2,first_innings_score)

Note: Score is in decimals beacuse of the probability of scoring runs for an action.

 Batting_action:6, Score: 58.45/4, Overs:7.5
------------------------------------------------------------
Second innings over! India scored 58.45. Wickets Left: 1
Inida won by 1 wickets!!!
------------------------------------------------------------



Play with probabilities of getting out and scoring run for different scenarios of game.
