<h1> Assignment #2 </h1>

You have to move several cargos across the gridworld into a common desirable rectangle area. There are several cargos, you can move each of them either horizontally or vertically by one cell up or down. Size of the overall world may vary, as well as placement of the cargos and desirable area. The game ends when all the cargos are in the desirable area and do not overlap.

Reward function: 
* for each cell of cargo placed in desirable area in the end of the turn, reward is $+1$
* for each cell where cargos overlap in the end of the turn, reward is $-1$

---

Submit `{name}_{surname}.zip` file containing:
* `{name}_{surname}.ipynb` with code for learning your model using your realisation of environment
* `{name}_{surname}.py` script with `complete_task(path_to_infile, path_to_outfile)` function
* report `{name}_{surname}.pdf`, containing information on:
  * choice of the environment representation used for the model training
  * way of training environment generation
  * architecture choice for the neural network you use for the prediction

## Input of the script

`{infile}.txt` file with the field description. Elements of the field are separated by space. For example: 

```
0 0 2 2 0
0 r r r 0
0 r r r 1
0 r r r 1
0 0 0 0 1
```

* `0` - blank space, we may move objects here
* `r` - desirable area, we should move objects here
* `1`, `2`, ... - actual object shapes, does not change, moved as a solid object

In your output file, you have to record lines in the following manner: `{id} {D/U/R/L}`. For the given example, possible sequence of steps is:
```
2 D
1 L
1 U
2 L
```

Here the rewards are: 
1. +2 for 2 cells of `2`
2. +4 for 2 cells of `2` and 2 cells of `1`
3. +3 (= +4 - 1) for 2 cells of `2` and 2 cells of `1` and -1 overlapping cell 
4. **The end**

## Output of the script

Sequence of actions, written in `{outfile}.txt`. Rewrite it after each 100th action, please, and after you rich the final position.

# Solution

In [1]:
from typing import List, Tuple, Dict
from functools import lru_cache

import numpy as np
import random as rnd

In [2]:
class GridWorld:        
    def __init__(self, n: int, m: int,
                #  gamma: float = 0.9,
                 default_reward: float = -1.,
                 cargos: List[Tuple[int, int]] = [],
                 desirable_area: List[Tuple[int, int]] = [],
                 # terminate_states: List[Tuple[int, int]] = [],
                 ):
        self.n = n # number of rows
        self.m = m # number of columns
        self.board = [(r, c) for r in range(n) for c in range(m)]
        # self.gamma = gamma
        self.default_reward = default_reward
        self.cargos = cargos # coordinates of all cargo's cells
        # self.cargo_pivot = self.get_cargo_pivot()
        self.obstacles = obstacles # cooridnates of all obstacles' cells
        self.states = set(self.board) | set(self.cargos)
        self.svfs = {state: 0. for state in self.states} # state value funtctions
        self.terminate_states = set(terminate_states)
        self.policy = {cell: '.' for cell in self.board}


    def train_agent(self):
        eps = 10e-3 # epsilon to finish computation

        def svfs_diff(prev, next):
            res = []
            for key, _ in prev.items():
                res.append(abs(prev[key] - next[key]))
            return res

        t = 0
        while True:
            t += 1
            prev_svfs = self.svfs.copy()
            self.update_svfs() # execute term 0
            diff = svfs_diff(prev_svfs, self.svfs)
            if all(map(lambda el: el < eps, diff)):
                break
        
        print(f"Finished at a time step #{t}")
        print(self)
        self.show_svfs()
        self.show_policy()

        def get_path(policy, pivot, terminates):
            """
            Идёт от пивота по указанным policy,
            если два раза и более зашёл в одно и то же
            состояние, значит пути нет.
            """
            res = set()
            path = []
            actions = {
            "L": (0, -1),
            "R": (0, +1),
            "U": (-1, 0),
            "D": (+1, 0)
            }
            while True:
                if policy[pivot] == '.' and pivot not in terminates:
                    return ["No path"]
                if pivot in res:
                    return ["No path"]
                if pivot in terminates:
                    break
                res.add(pivot)

                r, c = pivot
                action = policy[pivot]
                r_d, c_d = actions[action]
                path.append(action)

                pivot = r + r_d, c + c_d
            
            return path
        
        self.path = get_path(self.policy, 
                             self.cargo_pivot,
                             self.terminate_states)


    def update_svfs(self):
        """
        Update state-value functions for current time step t.
        """
        new_svfs = self.svfs.copy()

        for state in self.states:
            if state in self.terminate_states:
                continue
            action_name, svf = self.compute_svf(state)
            new_svfs[state] = svf
            self.policy[state] = action_name

        self.svfs = new_svfs


    def compute_svf(self, state):
        """
        Compute state-value function for current time step t.
        """
        qvfs = []
        states = self.possible_actions(state) # new possible states
        for action_name, to_state in states:
            svf = self.default_reward + self.gamma * self.svfs[to_state]
            qvfs.append((action_name, svf))

        if not qvfs:
            return self.policy[state], self.svfs[state]
        return max(qvfs, key=lambda el: el[1])


    def possible_actions(self, from_state) -> List[Tuple[int, int]]:
        actions = {
            "L": (0, -1),
            "R": (0, +1),
            "U": (-1, 0),
            "D": (+1, 0)
            }
        r, c = from_state
        
        ret = []
        for name, action in actions.items():
            r_d, c_d = action
            to_state = r + r_d, c + c_d
            if self.is_action_possible(to_state):
                ret.append((name, to_state))
        
        return ret


    def is_action_possible(self, to_state) -> bool:
        """
        Надо проверить, не ломается ли фигура, при перемещении
        в данную ячейку, и не выходит ли за границы мира.
        """
        r, c = self.cargo_pivot
        r_prime, c_prime = to_state
        r_d, c_d = r_prime - r, c_prime - c # считаем насколько переместились
        cargo = [(r + r_d, c + c_d) for r, c in self.cargo] # считаем для каждой
                                                            # ячейки груза её 
                                                            # новые координаты

        in_boundaries = all(map(
            lambda el: 0 <= el[0] < self.n and 0 <= el[1] < self.m, cargo)
            )
        if not in_boundaries:
            return False
        
        for cell in cargo:
            # if cell in self.cargo and cell != self.cargo_pivot:
                # return False
            if cell in self.obstacles:
                return False
        
        return True


    def get_cargo_pivot(self):
        # pivot, *_ = sorted(
        #     self.cargo, key=lambda el: (el[0], el[1]), reverse=True
        #     ) # берём максимальную по row и максимальную по col ячейку за опорную
        #       # (как если бы мы за краешек фигуры перенесли её в новую точку)
        pivot_r = sorted(self.cargo)[-1][0]
        pivot_c = sorted(self.cargo, key=lambda el: el[1])[-1][1]
        pivot = pivot_r, pivot_c

        return pivot


    def show_svfs(self):
        grid = np.zeros((self.n, self.m))
        for state, value in self.svfs.items():
            r, c = state
            grid[r, c] = value
        
        print(grid)
    

    def show_policy(self):
        grid = np.empty([self.n, self.m], dtype=str)
        for state, action in self.policy.items():
            r, c = state
            grid[r, c] = action
        
        print(grid)


    def __str__(self):
        grid = np.zeros((self.n, self.m))
        for r, c in self.cargo:
            grid[r, c] = 2
        for r, c in self.obstacles:
            grid[r, c] = 1
        return str(grid)
    

    def __repr__(self):
        return str(self)

In [3]:
class GridWorld:
    
    class Cargo:
        def __init__(self, name: int, coordinates: List[Tuple], world):
            self.name = name
            self.coordinates = coordinates
            self.world = world
        

        def get_cargo_boundaries(self) -> Tuple:
            """
            Возвращает координаты верхнего левого угла груза,
            и координату правого нижнего угла достроенного прямоугольника.
            """
            ir = max(self.coordinates, key=lambda x: x[0])[0]
            ic = max(self.coordinates, key=lambda x: x[1])[1]
            return min(self.coordinates), (ir, ic)


        def move(self, action):
            """
            Двигаем груз в одном из четырёх направлений, если возможно.
            """
            actions = {
                "L": (0, -1),
                "R": (0, +1),
                "U": (-1, 0),
                "D": (+1, 0)
            }

            new_coords = []
            for cell in self.coordinates:
                r, c = cell
                dr, dc = actions[action]
                
                # если не можем двигаться туда, то остаёмся на месте
                if self.is_move_possible(action):
                    new_coords.append((r + dr, c + dc))
                else:
                    return self.coordinates
            
            return new_coords


        def is_move_possible(self, action) -> bool:
            actions = {
                "L": (0, -1),
                "R": (0, +1),
                "U": (-1, 0),
                "D": (+1, 0)
            }
            rd, cd = actions[action]
            cargo = [(r + rd, c + cd) for r, c in self.coordinates]
            
            in_boundaries = all(
                map(lambda el: 0 <= el[0] < self.world.r and 0 <= el[1] < self.world.c, cargo)
            )
            if not in_boundaries:
                return False
            
            return True


    def __init__(self, n: int, m: int,
                 cargos: List[Tuple[int, int, int]],
                 desirable_area: List[Tuple[int, int]]):
        self.r = n # number of rows
        self.c = m # number of columns
        self.board = [(r, c) for r in range(n) for c in range(m)]
        self.cargos = self.__get_cargos(cargos)
        self.desirable_area = desirable_area


    def move_cargo(self, name, action):
        self.cargos[name].coordinates = self.cargos[name].move(action)


    def compute_cargo_reward(self, cargo_name):
        pass


    def check_terminate(self) -> bool:
        """
        Проверяет входят ли все грузы в терминальную зону без пересечений.
        """
        area_lub, area_rdb = self.get_desirable_area_boundaries()
        for cargo in self.cargos.values():
            cells = cargo.coordinates
            in_boundaries = all(
                map(lambda cell: 
                    area_lub[0] <= cell[0] <= area_rdb[0] and 
                    area_lub[1] <= cell[1] <= area_rdb[1],
                    cells)
            )
            if not in_boundaries:
                return False
        
        intersections = set()
        for cargo in self.cargos.values():
            for cell in cargo.coordinates:
                if cell not in intersections:
                    intersections.add(cell)
                else:
                    return False

        return True


    @lru_cache()
    def get_desirable_area_boundaries(self) -> Tuple:
        """
        Возвращает координаты правого верхнего и левого нижнего углов,
        по которым можно уникально определить расположение терминальной зоны.
        """
        return min(self.desirable_area), max(self.desirable_area)


    def __get_cargos(self, cargos) -> Dict[str, Cargo]:
        """
        Создаёт список объектов грузов по именованным координатам.
        """
        names = set()
        for name, *_ in cargos:
            names.add(name)
        
        tf_cargos = {}
        for name in names:
            coordinates = list(
                map(lambda x: (x[1], x[2]), filter(lambda x: x[0] == name, cargos))
            )
            tf_cargos[name] = (self.Cargo(name, coordinates, self))

        return tf_cargos


    def __str__(self):
        board = np.full((self.r, self.c), '0', dtype=str)
        
        for cell in self.desirable_area:
            board[cell] = 'r'
        
        for name, cargo in self.cargos.items():
            for cell in cargo.coordinates:
                board[cell] = str(cargo.name)
        
        return str(board)


    def __repr__(self):
        return str(self)

In [5]:
# 0 0 2 2 0
# 0 r r r 0
# 0 r r r 1
# 0 r r r 1
# 0 0 0 0 1

cargos = [(2, 0, 2), (2, 0, 3), (1, 2, 4), (1, 3, 4), (1, 4, 4)]
desirable_area = [(2, 1), (2, 2), (2, 3), (1, 1), (1, 2), (1, 3), (3, 1), (3, 2), (3, 3)]

world = GridWorld(5, 5, cargos, desirable_area)
# world.move_cargo(2, 'R')
# print(world.get_desirable_area_boundaries())
# print(world.cargos[1].get_cargo_boundaries())
# world.move_cargo(1, 'L')
# world.move_cargo(1, 'U')
# world.move_cargo(2, 'L')
# world.move_cargo(2, 'D')
# print(world.check_terminate())
print(world)

[['0' '0' '2' '2' '0']
 ['0' 'r' 'r' 'r' '0']
 ['0' 'r' 'r' 'r' '1']
 ['0' 'r' 'r' 'r' '1']
 ['0' '0' '0' '0' '1']]


In [8]:
def load_world(n, m, grid) -> GridWorld:
    cargos = []
    desirable_area = []
    for i in range(n):
        for j in range(m):
            if grid[i][j] == 'r':
                desirable_area.append((i, j))
            elif grid[i][j] != '0':
                cargos.append((int(grid[i][j]), i, j))
    
    return GridWorld(n, m, cargos, desirable_area)

In [9]:
def complete_task(infile, outfile):
    grid = []
    with open(f'{infile}.txt', 'r') as f:
        for line in f:
            grid.append(line.split())
    
    n, m = len(grid), len(grid[0])
    world = load_world(n, m, grid)

    # res = ...
    # with open(f'{outfile}.txt') as f:
    #     pass


In [10]:
# complete_task('infile', 'outfile')
# with open('infile.txt', 'w') as f:
#     f.write("""0 0 2 2 0
# 0 r r r 0
# 0 r r r 1
# 0 r r r 1
# 0 0 0 0 1""")