In [2]:
import pandas as pd
import numpy as np
import random 

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [3]:
master_height = 40
master_width = 40
master_map = []

# Init master_map
for i in range(master_height):
    row = []
    for j in range(master_width):
        row.append(None)
    master_map.append(row)


In [4]:
master_map = np.array(master_map)

In [6]:
layout_5 = pd.read_csv('layout_5.csv', header=None).to_numpy()
layout_6 = pd.read_csv('layout_6.csv', header=None).to_numpy()
layout_7 = pd.read_csv('layout_7.csv', header=None).to_numpy()
layout_8 = pd.read_csv('layout_8.csv', header=None).to_numpy()
submap = layout_7
# submap_3 = layout_7

In [7]:
mastermap_height = master_map.shape[0]
mastermap_width = master_map.shape[1]

submap_height = submap.shape[0]
submap_width = submap.shape[1]

In [8]:
submap_height, submap_width

(7, 7)

In [142]:
STATE_MERGE = "merge"
STATE_END_MERGES = "end_merges"

class Tile:
    def __init__(self, tile_type, position):
        self.tile_type = tile_type
        self.position = position
        
    def get_tile_str_repr(self):
        return self.tile_type
    
    def can_overlap(self):
        pass
    
    def resolve_overlap(self, tile):
        pass
    
    def update(self, state):
        pass
    
class Wall(Tile):
    def __init__(self, position):
        super().__init__(tile_type="Wall", position=position)
    
    def can_overlap(self):
        return False
    
    def resolve_overlap(self, incoming_tile):
        return self
    
    def update(self, state):
        return self

class Space(Tile):
    def __init__(self, position):
        super().__init__(tile_type="Space", position=position)
    
    def can_overlap(self):
        return True
    
    def resolve_overlap(self, incoming_tile):
        return incoming_tile
    
    def update(self, state):
        return self


class Exit(Tile):
    def __init__(self, position):
        super().__init__(tile_type="Exit", position=position)
    
    def can_overlap(self):
        return True
    
    def resolve_overlap(self, incoming_tile):
        return incoming_tile
    
    def update(self, state):
        if state == STATE_MERGE:
            return Space(position=self.position)
        
        elif state == STATE_END_MERGES:
            return Wall(position=self.position)

class Undefined(Tile):
    def __init__(self, position, payload=None):
        self.payload = payload
        
        self.alt_mapping = {
            'Wall': Wall(position),
            'Space': Space(position),
        }
        
        super().__init__(tile_type="Undf", position=position)
    
    def get_tile_str_repr(self):
        return f"Undf({self.payload})"
    
    def can_overlap(self):
        return True
    
    def resolve_overlap(self, incoming_tile):
        return incoming_tile
    
    def update(self, state):
        if state == STATE_MERGE:
            return self
        
        elif state == STATE_END_MERGES:
            return self.alt_mapping[self.payload]

class Map:
    
    def __init__(self, layout_map):
        self.map = layout_map
        self.height = len(layout_map)
        self.width = len(layout_map[0])
        self.obj_map = self.get_obj_map()
        
    def get_obj_map(self):
        obj_map = []
        for row_idx in range(len(self.map)):
            row = []
            for col_idx in range(len(self.map[0])):
                tile_str = self.map[row_idx][col_idx]
                tile_class, payload = self.get_class_from_tile_str(tile_str)
                obj = None
                if tile_class is not None:
                    if payload is None:
                        obj = tile_class(position=(row_idx, col_idx))
                    else: 
                        obj = tile_class(position=(row_idx, col_idx), payload=payload)

                row.append(obj)
            obj_map.append(row)
            
        self.obj_map = obj_map
        
        return obj_map
        
    def get_exit_coords_list(self):
        _map = self.map
        exits = []
        for row_idx in range(len(_map)):
            for col_idx in range(len(_map[0])):
                # TODO: Handle Tile
                if _map[row_idx][col_idx] == 'Exit':
                    exits.append((row_idx, col_idx))
                    
        self.exit_coords_list = exits
        
        return exits
    
    def get_class_from_tile_str(self, tile_str):
        if tile_str is None:
            return None, None
        
        payload = None
        l_bracket, r_bracket = tile_str.find("("), tile_str.find(")")
        
        if l_bracket == -1 or r_bracket == -1:
            tile_type = tile_str
        else:
            tile_type = tile_str[:l_bracket]
            payload = tile_str[l_bracket+1:r_bracket]
        
        OBJ_MAPPING = {
            'Exit': Exit,
            'Space': Space,
            'Wall': Wall,
            'Undf': Undefined,
        }
        
        obj_class = OBJ_MAPPING.get(tile_type, None)
        
        return obj_class, payload
            
    
    def sync_map_using_obj_map(self):
        _map = []
        for row_idx in range(len(self.obj_map)):
            row = []
            for col_idx in range(len(self.obj_map[0])):
                obj = self.obj_map[row_idx][col_idx]

                if obj is None:
                    row.append(None)
                else:
                    row.append(obj.get_tile_str_repr())
            _map.append(row)
            
        self.map = _map
        
        return _map
    
class MasterMap(Map):    
    def __init__(self, height=None, width=None, layout_map=None):
        if layout_map is None:
            layout_map = self._init_map(height, width)
            
        super().__init__(layout_map)

        self.exit_coords_list = self.get_exit_coords_list()
        
    def _init_map(self, height, width):
        # Init master_map
        _map = []
        
        for i in range(height):
            row = []
            for j in range(width):
                row.append(None)
            _map.append(row)
            
        return _map
    
    def init_submap_in_map(self, submap_obj):
        submap = submap_obj.obj_map
        
        mastermap_height = self.height
        mastermap_width = self.width

        submap_height = submap_obj.height
        submap_width = submap_obj.width
        
        # Randomly pick valid coordinate to place submap

        start_row_idx = random.randint(0, mastermap_height - submap_height)
        start_col_idx = random.randint(0, mastermap_width - submap_width)
        # Insert submap into mastermap
        for row_idx in range(submap_height):
            for col_idx in range(submap_width):
                map_row = row_idx + start_row_idx
                map_col = col_idx + start_col_idx
                if self.obj_map[map_row][map_col] is None:
                    self.obj_map[map_row][map_col] = submap[row_idx][col_idx]
                else:
                    raise ValueError(f"Map has already been initialized")
                    
        self.sync_map_using_obj_map()
    
    def get_exit_coords_list(self):
        _map = self.map
        exits = []
        for row_idx in range(len(_map)):
            for col_idx in range(len(_map[0])):
                # TODO: Handle Tile
                if _map[row_idx][col_idx] == 'Exit':
                    exits.append((row_idx, col_idx))
                    
        self.exit_coords_list = exits
        
        return exits
    
    def _can_merge(self, submap_obj, start_coords):
        _has_space = True
        
        submap = submap_obj.obj_map
        
        mastermap_height = self.height
        mastermap_width = self.width
        
        for r in range(len(submap)):
            for c in range(len(submap[0])):
                row_idx = start_coords[0]+r
                col_idx = start_coords[1]+c
                mastermap_tile = self.obj_map[row_idx][col_idx]
                submap_tile = submap[r][c]
                    
                if (row_idx < 0 or row_idx >= mastermap_height 
                                or col_idx < 0 
                                or col_idx >= mastermap_width 
                                or (mastermap_tile is not None and not mastermap_tile.can_overlap())
                   ):
                    _has_space = False
                    break
            if _has_space is False:
                break
            
        return _has_space

    def merge_maps(self, submap_obj, start_coords):
        submap = submap_obj.obj_map
        
        for r in range(len(submap)):
            for c in range(len(submap[0])):
                row_idx = start_coords[0]+r
                col_idx = start_coords[1]+c
                mastermap_tile = self.obj_map[row_idx][col_idx]
                submap_tile = submap[r][c]

                self.obj_map[row_idx][col_idx] = submap_tile if mastermap_tile is None else mastermap_tile.resolve_overlap(submap_tile)

    def left_right_up_down_merge(self, submap_obj):
        submap = submap_obj.obj_map
        
        directions = [(0, 0), (0, -1), (0, 1), (-1, 0), (1, 0)]
        mastermap_exits_coords_list = self.get_exit_coords_list()
        submap_exits_coords_list = submap_obj.get_exit_coords_list()
        has_merged = False

        for mastermap_coords in mastermap_exits_coords_list:
            for submap_coords in submap_exits_coords_list:
                
                # Attempt overlap, left, right, up, down, merges wrt master map
                for i, direction in enumerate(directions):
                    print(i, direction)
                    start_coords = (mastermap_coords[0]-submap_coords[0]+direction[0], mastermap_coords[1]-submap_coords[1]+direction[1])
                
                    if self._can_merge(submap_obj, start_coords) is True:
                        print("mergeable")
                        self.merge_maps(submap_obj, start_coords)
                        has_merged = True

                        mastermap_tile_r, mastermap_tile_c = mastermap_coords[0], mastermap_coords[1]
                        submap_merge_tile_r, submap_merge_tile_c = mastermap_coords[0]+direction[0], mastermap_coords[1]+direction[1]
                        mastermap_tile = self.obj_map[mastermap_tile_r][mastermap_tile_c]
                        submap_merge_tile = self.obj_map[submap_merge_tile_r][submap_merge_tile_c]

                        # Update the mastermap_tile
                        if mastermap_tile is not None:
                            self.obj_map[mastermap_tile_r][mastermap_tile_c] = mastermap_tile.update(state="merge")                        
                            
                        # Non-Overlapping merge: also need to update the submap tile
                        if mastermap_tile_r != submap_merge_tile_r or mastermap_tile_c != submap_merge_tile_c:
                            if submap_merge_tile is not None:
                                self.obj_map[submap_merge_tile_r][submap_merge_tile_c] = submap_merge_tile.update(state="merge")


                        print(f"Master coords: {mastermap_coords}, submap_coords: {submap_coords}, Successful start_coords: {start_coords}, index {i}")
                        break
                if has_merged:
                    break
            if has_merged:
                break
        return has_merged

    def attempt_to_merge_maps(self, submap_obj, rotation=True):
        has_merged = False
        submap = submap_obj
        # Try rotating map up to 270 degrees
        num_rotations = 3 if rotation else 1
        for i in range(num_rotations):
            has_merged = self.left_right_up_down_merge(submap)                
            submap = SubMap.rotate(submap)
            if has_merged:
                self.sync_map_using_obj_map()
                break
        if not has_merged:
            print(f"No valid merge rotation found!")

        return has_merged
    
    def update_map_tiles(self, state):
        # Run each tile object's update function with a given state
        obj_map = self.obj_map
        for row_idx in range(len(obj_map)):
            for col_idx in range(len(obj_map[0])):
                tile = obj_map[row_idx][col_idx]
                if tile is not None:
                    obj_map[row_idx][col_idx] = tile.update(state=state)
                else:
                    # Turn all None tiles into Wall tiles if it is the end of all merging
                    obj_map[row_idx][col_idx] = Wall(position=(row_idx, col_idx)) if state == STATE_END_MERGES else None
                            
                
        self.sync_map_using_obj_map()

    def to_csv(self, filepath):
        self.sync_map_using_obj_map()
        return pd.DataFrame(np.array(self.map)).to_csv(filepath, index=False, header=False)
    
    @classmethod
    def from_csv(cls, filepath):
        layout_map = pd.read_csv(filepath, header=None).to_numpy().tolist()
        return cls(layout_map)
    
    
class SubMap(Map):
    
    def __init__(self, layout_map, weight=None):
        super().__init__(layout_map)
        self.weight = weight
        self.exit_coords_list = self.get_exit_coords_list()
        
    def get_exit_coords_list(self):
        _map = self.map
        exits = []
        for row_idx in range(len(_map)):
            for col_idx in range(len(_map[0])):
                # TODO: Handle Tile
                if _map[row_idx][col_idx] == 'Exit':
                    exits.append((row_idx, col_idx))
                    
        self.exit_coords_list = exits
        
        return exits
    
    @classmethod
    def rotate(cls, submap):
        new_map = np.rot90(np.array(submap.map)).tolist()
        return cls(new_map, weight=submap.weight)
    
    @classmethod
    def from_csv(cls, filepath, weight=None):
        layout_map = pd.read_csv(filepath, header=None).to_numpy().tolist()
        return cls(layout_map, weight=weight)



In [143]:
mmap = MasterMap(height=40, width=40)
smap_7 = SubMap.from_csv("layout_7.csv")
smap_8 = SubMap.from_csv("layout_8.csv")

mmap.init_submap_in_map(smap_7)
# mmap.to_csv("test_layout.csv")

mmap.attempt_to_merge_maps(smap_8, rotation=True)
mmap.update_map_tiles(state=STATE_END_MERGES)
mmap.to_csv("test_layout.csv")


0 (0, 0)
mergeable
Master coords: (30, 28), submap_coords: (6, 3), Successful start_coords: (24, 25), index 0


## Todo:
1. Simple tool to allow other contributors to create maps (graphical -> Gsheets -> tool -> map)
2. Gsheets clearly show the valid objects to play with
3. New objects with physics (push blocks, portals, doors, etc etc) -> Brainstorm / Kshitij's project presentation

In [135]:
mmap = MasterMap(height=40, width=40)
smap_9 = SubMap.from_csv("layout_9.csv")
smap_10 = SubMap.from_csv("layout_10.csv")

mmap.init_submap_in_map(smap_9)

mmap.attempt_to_merge_maps(smap_10, rotation=True)
mmap.update_map_tiles(state="end_merges")
mmap.to_csv("test_layout.csv")

0 (0, 0)
1 (0, -1)
2 (0, 1)
3 (-1, 0)
mergeable
Master coords: (15, 14), submap_coords: (6, 3), Successful start_coords: (8, 11), index 3


In [23]:
pd.DataFrame(np.array(master_map)).to_csv('test_layout.csv', index=False, header=False)

In [528]:
# Make all None or remaining Exits into Wall
for row in range(len(master_map)):
    for col in range(len(master_map[0])):
        if master_map[row][col] is None or master_map[row][col] == 'Exit':
            master_map[row][col] = 'Wall'

In [529]:
pd.DataFrame(np.array(master_map)).to_csv('test_layout.csv', index=False, header=False)

In [7]:
import numpy as np

vec = [i for i in range(20)]
weights = np.array([i+10 for i in range(20)])
probabilities = weights / np.sum(weights)
# TODO: Handle different weights at different stages
index_choices = np.random.choice(vec,size=5,replace=False, p=probabilities)

In [None]:
chosen_layouts = [layouts[index] for index in index_choices]

In [9]:
from pathlib import Path

TEMPLATES_DIR = Path.cwd().parent.parent / 'templates'
MAP_SAVE_PATH = TEMPLATES_DIR / 'master_layout.csv'
# Note : These layouts are for testing purposes
layout_paths = [f"{TEMPLATES_DIR / 'layout_'}{i}.csv"for i in range(1, 8+1)]

In [10]:
layout_paths

['/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_1.csv',
 '/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_2.csv',
 '/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_3.csv',
 '/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_4.csv',
 '/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_5.csv',
 '/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_6.csv',
 '/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_7.csv',
 '/Users/jonathanlim/workspace/mila/open-ada/Minigrid/templates/layout_8.csv']