In [9]:
from typing import List, Tuple, Dict, Callable, Optional, Any
from environments.environment_abstract import Environment, State
import numpy as np
from heapq import heappush, heappop
from subprocess import Popen, PIPE

from argparse import ArgumentParser
import torch
from utils import env_utils, nnet_utils, search_utils, misc_utils, data_utils
import pickle
import time
import sys
import os
import socket
from torch.multiprocessing import Process

In [18]:
dataset_path = 'data/sokoban/test/data_0.pkl'

input_data = pickle.load(open(dataset_path, "rb"))
# states: List[State] = input_data['states'][0:]

env: Environment = env_utils.get_environment('sokoban')

In [11]:
print(input_data['states'][0])
input_data['states'][0].agent,input_data['states'][0].boxes,input_data['states'][0].goals,input_data['states'][0].walls

<environments.sokoban.SokobanState object at 0x7fc7bc392f40>


(array([8, 5]),
 array([[False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False,  True, False,
         False],
        [False, False, False, False, False, False, False,  True, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False,  True, False, False,
         False],
        [False, False, False, False, False,  True, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False]]),
 array([[False, False, False, False, False, False, False, False, False,
         False],
   

In [5]:
import glob
import re
import os

ipc_dataset = glob.glob('data/sokoban/ipc_dataset/*.pddl')


In [6]:
def get_shape(positions):
    max_row = 0
    max_col = 0

    for pos in positions:
        parts = pos.split('-')
        if len(parts) == 3:
            col, row = int(parts[1]), int(parts[2])
            max_col = max(max_col, col)
            max_row = max(max_row, row)

    # Assuming one-indexed positions, the shape of the matrix is max_row x max_col
    return max_row, max_col

def find_matrix_shape(pddl_file):

    # Read the PDDL file
    with open(pddl_file, 'r') as file:
        pddl_content = file.read()

    # Find the (:objects) section
    start = pddl_content.find("(:objects")
    end = pddl_content.find(")", start)
    objects_section = pddl_content[start:end]

    # Split the objects section by newline and strip each line
    lines = objects_section.split('\n')
    objects = [line.strip() for line in lines if line.strip()]

    # Extract the object name (before the hyphen) from each line
    object = [obj.split(' - ')[0].strip() for obj in objects if ' - ' in obj]

    matrix_pos_items = [pos_item for pos_item in object if 'pos-' in pos_item]

    return get_shape(matrix_pos_items)

def extract_init_predicates(pddl_file):
    # Find the (:init) section
    with open(pddl_file, 'r') as file:
        pddl_content = file.read()
    start = pddl_content.find("(:init")
    end = pddl_content.find("(:goal")
    init_section = pddl_content[start:end]
    # Split the init section by newline and strip each line
    lines = init_section.split('\n')
    predicates = [line.strip() for line in lines 
                  if line.strip() and line.strip().startswith("(") and line.strip().endswith(")")]

    return predicates


def get_agent_pos(predicates):

    for pred in predicates:
        if '(at player' in pred:
            y, x = [int(item.strip(')')) for item in pred.split(' ')[2].split('-')[1:]]
            return np.array([x-1, y-1])

def generate_matrices(matrix_shape, predicates):
    # Initialize matrices with False values
    if matrix_shape[0] > 10 or matrix_shape[1] > 10:
        print('Too large environment for DeepCubeA rep. - Matrix shape: ', matrix_shape)
        return None, None, None
    
    matrix_shape = (10, 10)

    stones_matrix = np.full(matrix_shape, False, dtype=bool)
    goals_matrix = np.full(matrix_shape, False, dtype=bool)
    walls_matrix = np.full(matrix_shape, True, dtype=bool)  # Initially all positions are walls

    # print(matrix_shape)
    # print(stones_matrix)

    for pred in predicates:
        if '(at stone' not in pred and '(IS-GOAL ' not in pred and '(clear ' not in pred:
            continue

        # Extract position and predicate type
        pred_type = pred.split(' ')[0].strip()
        if '(at stone' in pred:
            y, x = [int(item.strip(')')) for item in pred.split(' ')[2].split('-')[1:]]
        else:
            y, x = [int(item.strip(')')) for item in pred.split(' ')[1].split('-')[1:]]

        # print(pred, pred_type, y, x)
        y, x = int(y) - 1, int(x) - 1  # Adjust for zero-indexing
        
        if pred_type == '(at':
            # Update stones matrix
            stones_matrix[x , y] = True
            walls_matrix[x, y] = False
        elif pred_type == '(IS-GOAL':
            # Update goals matrix
            goals_matrix[x, y] = True
            walls_matrix[x, y] = False
        elif pred_type == '(clear':
            # Update walls matrix
            walls_matrix[x, y] = False

    return stones_matrix, goals_matrix, walls_matrix


In [11]:
import copy
from environments import sokoban

# ipc_data = {"states": []}

for pddl_file in ipc_dataset:
    ipc_data = {"states": []}
    matrix_shape = find_matrix_shape(pddl_file)
    predicates = extract_init_predicates(pddl_file)
    agent_pos = get_agent_pos(predicates)
    stones_matrix, goals_matrix, walls_matrix = generate_matrices(matrix_shape, predicates)

    if stones_matrix is None:
        continue

    walls_matrix[agent_pos[0], agent_pos[1]] = False
    # Create a deep copy of the first state from input_data
    # new_state = copy.deepcopy(input_data['states'][0])
    new_state = sokoban.SokobanState(agent_pos, stones_matrix, walls_matrix, goals_matrix)
    
    # Modify the new state
    # new_state.agent = agent_pos
    # new_state.boxes = stones_matrix
    # new_state.goals = goals_matrix
    # new_state.walls = walls_matrix

    # Append the new state to ipc_data
    ipc_data['states'].append(new_state)
    with open('ipc_data_pkl/' + os.path.basename(pddl_file).replace('.pddl', '.pkl'), 'wb') as file:
        pickle.dump(ipc_data, file)
    




Too large environment for DeepCubeA rep. - Matrix shape:  (10, 12)
Too large environment for DeepCubeA rep. - Matrix shape:  (7, 15)
Too large environment for DeepCubeA rep. - Matrix shape:  (11, 11)
Too large environment for DeepCubeA rep. - Matrix shape:  (14, 9)
Too large environment for DeepCubeA rep. - Matrix shape:  (8, 15)
Too large environment for DeepCubeA rep. - Matrix shape:  (8, 11)
Too large environment for DeepCubeA rep. - Matrix shape:  (9, 15)
Too large environment for DeepCubeA rep. - Matrix shape:  (7, 12)
Too large environment for DeepCubeA rep. - Matrix shape:  (6, 12)
Too large environment for DeepCubeA rep. - Matrix shape:  (8, 11)
Too large environment for DeepCubeA rep. - Matrix shape:  (11, 7)
Too large environment for DeepCubeA rep. - Matrix shape:  (11, 11)
Too large environment for DeepCubeA rep. - Matrix shape:  (10, 12)
Too large environment for DeepCubeA rep. - Matrix shape:  (17, 29)


In [9]:
ipc_data['states'][0].agent,ipc_data['states'][0].boxes,ipc_data['states'][0].goals,ipc_data['states'][0].walls

(array([7, 3]),
 array([[False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False,  True, False,  True, False, False, False, False,
         False],
        [False, False, False,  True,  True, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False],
        [False, False, False, False, False, False, False, False, False,
         False]]),
 array([[False, False, False, False, False, False, False, False, False,
         False],
   

In [10]:
# save pickle file
with open('./ipc_data.pkl', 'wb') as f:
    pickle.dump(ipc_data, f)

In [28]:
import tkinter as tk

def draw_grid(canvas, agent_pos, boxes, goals, walls):
    canvas.delete("all") # Clear the canvas
    for y in range(10):
        for x in range(10):
            if walls[y, x]:
                canvas.create_rectangle(x*50, y*50, x*50+50, y*50+50, fill="gray")
            if boxes[y, x]:
                canvas.create_rectangle(x*50, y*50, x*50+50, y*50+50, fill="brown")
            if goals[y, x]:
                canvas.create_oval(x*50+10, y*50+10, x*50+40, y*50+40, fill="gold")
    canvas.create_oval(agent_pos[1]*50+10, agent_pos[0]*50+10, agent_pos[1]*50+40, agent_pos[0]*50+40, fill="green")

def key_pressed(event, canvas, agent_pos, boxes, goals, walls):
    # print(event.keysym)
    dx, dy = 0, 0
    if event.keysym == 'Up':
        dy = -1
    elif event.keysym == 'Down':
        dy = 1
    elif event.keysym == 'Left':
        dx = -1
    elif event.keysym == 'Right':
        dx = 1

    # Compute new position of the agent
    new_x, new_y = agent_pos[0] + dy, agent_pos[1] + dx

    # Check if the new position is within bounds and not a wall
    if 0 <= new_x < 10 and 0 <= new_y < 10 and not walls[new_x][new_y]:
        # Check if there's a box in the new position
        if boxes[new_x][new_y]:
            # Compute new position of the box
            box_new_x, box_new_y = new_x + dy, new_y + dx
            # Check if the box can be moved
            if 0 <= box_new_x < 10 and 0 <= box_new_y < 10 and not walls[box_new_x][box_new_y] and not boxes[box_new_x][box_new_y]:
                boxes[new_x][new_y] = False
                boxes[box_new_x][box_new_y] = True
                agent_pos[0], agent_pos[1] = new_x, new_y
        else:
            # Move the agent if there's no box
            agent_pos[0], agent_pos[1] = new_x, new_y

    # Redraw the grid
    draw_grid(canvas, agent_pos, boxes, goals, walls)


# Initialize Tkinter window
root = tk.Tk()
root.title("Sokoban")

# Initialize Canvas
canvas = tk.Canvas(root, width=500, height=500)
canvas.pack()

# Game state
agent_pos = input_data['states'][0].agent
boxes = input_data['states'][0].boxes
goals = input_data['states'][0].goals
walls = input_data['states'][0].walls

draw_grid(canvas, agent_pos, boxes, goals, walls)

# Bind key events
root.bind('<KeyPress>', lambda event: key_pressed(event, canvas, agent_pos, boxes, goals, walls))

root.mainloop()
