## Artificial Intelligence - Nonogram solver
 - Alexandru Dochian

In [1]:
import json;
import math;
import time
import matplotlib.pyplot as pyplot
from functools import reduce;
from copy import deepcopy;
from heapq import heappush, heappop, heapify;

easy_test_files = ['test1.json', 'test2.json']
hard_test_files = ['test3.json']

def parse_json_input_file(filename):
    with open(filename) as fileDescriptior:
        data = json.load(fileDescriptior)
        return data

In [2]:
class GameHelper:
    """
    This method constructs a list with the lengths of the continuos items of an `array` that have `fill_value`

    """
    @staticmethod
    def get_list_of_continous_sequences(array, fill_value):
        list_of_continous_sequences = []

        current_searching_index = 0

        continous_sequence_found = False
        current_continuous_sequence_length = 0

        for index in range(0, len(array)):
            if array[index] == fill_value:
                continous_sequence_found = True
                current_continuous_sequence_length += 1
            else:
                if continous_sequence_found:
                    list_of_continous_sequences.append(current_continuous_sequence_length)
                continous_sequence_found = False
                current_continuous_sequence_length = 0

        if continous_sequence_found:
            list_of_continous_sequences.append(current_continuous_sequence_length)

        return list_of_continous_sequences
    
    @staticmethod
    def get_respected_constraints_ratio(final_constraints, actual_constraints):
        #constraint_pairs item (final, actual)
        FINAL = 0
        ACTUAL = 1

        if len(actual_constraints) > len(final_constraints):
            return 0

        constraints_pairs = list(zip(final_constraints, actual_constraints))

        respected_constraints = 0
        for constraint_pair in constraints_pairs:  
            if constraint_pair[FINAL] != constraint_pair[ACTUAL]:
                break
            respected_constraints += 1

        return respected_constraints / len(final_constraints)
    
    @staticmethod
    def get_number_of_continuous_columns_with_fully_respected_constraints(game):
        number_of_columns = game.nonogram_static_information.width
        number_of_rows = game.nonogram_static_information.height
        columns_constraints = game.nonogram_static_information.columns

        respected_constraints = 0
        for column_index in range(number_of_columns):
            column_values = game.get_values_on_column(column_index)
            column_constraints = columns_constraints[column_index]
            actual_constraints = GameHelper.get_list_of_continous_sequences(column_values, game.FILL)
            constraints_ratio_for_column = GameHelper.get_respected_constraints_ratio(column_constraints, actual_constraints)

            # we consider only the ratios of consecutive fully respected constraints
            if constraints_ratio_for_column < 1:
                break
            respected_constraints += constraints_ratio_for_column

        return respected_constraints
    
    @staticmethod
    def get_respected_ratio_for_column(game, column_index):
        #constraint_pairs item (final, actual)
        FINAL = 0
        ACTUAL = 1
        
        actual_constraints = GameHelper.get_list_of_continous_sequences(game.get_values_on_column(column_index), game.FILL)
        final_constraints = game.nonogram_static_information.columns[column_index]
        
        return GameHelper.get_respected_constraints_ratio(final_constraints, actual_constraints)


In [3]:
# tests for GameHelper.get_respected_constraints_ratio 
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], [2, 2, 3]) == 0.75), "Fix me"
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], []) == 0), "Fix me"
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], [1]) == 0), "Fix me"
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], [2, 2, 3, 4]) == 1), "Fix me"
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], [2, 5, 3]) == 0.25), "Fix me"
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], [2, 2, 5, 6]) == 0.5), "Fix me"
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], [1, 1, 1, 1, 1, 1, 1]) == 0), "Fix me"
assert(GameHelper.get_respected_constraints_ratio([2, 2, 3, 4], [2, 2, 3, 7, 5, 6]) == 0), "Fix me"

# tests for GameHelper.get_list_of_continous_sequences 
assert(GameHelper.get_list_of_continous_sequences([1, 1, 0, 1, 1, 1], 1) == [2, 3]), "Fix me"
assert(GameHelper.get_list_of_continous_sequences([0, 0], 1) == []), "Fix me"
assert(GameHelper.get_list_of_continous_sequences([], 1) == []), "Fix me"
assert(GameHelper.get_list_of_continous_sequences([1, 1, 1, 1, 1, 1], 1) == [6]), "Fix me"
assert(GameHelper.get_list_of_continous_sequences([1, 1, 1, 0, 1, 0], 1) == [3, 1]), "Fix me"
assert(GameHelper.get_list_of_continous_sequences([0, 1, 0, 1, 1, 1], 1) == [1, 3]), "Fix me"

In [4]:
class NonogramStaticInformation:
    def __init__(self, data):
        self.name = data['name']
        self.height = data['height']
        self.width = data['width']
        self.rows = data['rows']
        self.columns = data['columns']

In [5]:
 class Action:
    def __init__(self, cells_count):
        self.cells_count = cells_count;
        self.applied_from_index = None;

    def get_first_index(self):
        if self.applied_from_index is None:
            return None
        return self.applied_from_index

    def get_second_index(self):
        if self.applied_from_index is None:
            return None
        return self.applied_from_index + self.cells_count
    
    def __str__(self):
        return "cells_count={}, applied_from_index={}".format(self.cells_count, self.applied_from_index);
    
    def __eq__ (self, other):
        return self.cells_count == other.cells_count and self.applied_from_index == other.applied_from_index    

In [6]:
class Game:
    BLANK = 0
    FILL = 1
    
    def __init__(self, data):
        self.nonogram_static_information = NonogramStaticInformation(data);
        self.rows_actions = list(map(lambda row: list(map(lambda cells_count: Action(cells_count), row)), self.nonogram_static_information.rows))
    
    def get_name(self):
        return self.nonogram_static_information.name
    
    def get_matrix(self):
        matrix = [[0 for x in range(self.nonogram_static_information.width)] for y in range(self.nonogram_static_information.height)]
        
        row_index = 0
        for row_actions in self.rows_actions:
            for row_action in row_actions:
                if row_action.applied_from_index != None:
                    for column_index in range(row_action.get_first_index(), row_action.get_second_index()):
                        matrix[row_index][column_index] = self.FILL;
            row_index += 1
        return matrix
    
    def draw_matrix(self):
        pyplot.imshow(self.get_matrix(), cmap='Greys', interpolation='nearest');
    
    def print_matrix(self):
        matrix = self.get_matrix()
        
        for i in range(0, len(matrix)):
            for j in range(0, len(matrix[i])):
                print(matrix[i][j], end = " ")
            print()
        print()

    def apply_next_action_on_row(self, row_index, applied_from_index):
        for row_action in self.rows_actions[row_index]:
            if row_action.applied_from_index == None:
                row_action.applied_from_index = applied_from_index
                break
    
    
    def get_values_on_column(self, column_index):
        column_values = [0] * self.nonogram_static_information.width 

        row_index = -1
        for row_actions in self.rows_actions:
            row_index += 1
            applied_actions = list(filter(lambda action : action.applied_from_index != None, row_actions))            
            for row_action in applied_actions:
                if column_index in range(row_action.get_first_index(), row_action.get_second_index()):
                    column_values[row_index] = self.FILL

        return column_values
    
    def get_possible_actions(self):
        # [(row_index, [possible_starting_index_for_next_action])]
        possible_actions = []
    
        row_index = -1
        for row_actions in self.rows_actions:
            row_index += 1
            # filter actions in two categories: applied and not_applied
            applied_actions = list(filter(lambda action : action.applied_from_index != None, row_actions))            
            not_applied_actions = list(filter(lambda action : action.applied_from_index == None, row_actions))

            # first possible index of the next action on the current row
            possible_start_index = next(map(lambda action: action.get_second_index() + 1, reversed(applied_actions)), 0)            
            minimum_length_of_remaining_actions = reduce(lambda accumulator, current: current.cells_count + accumulator, not_applied_actions, 0) + len(not_applied_actions) - 1
            
            if minimum_length_of_remaining_actions > 0:
                possible_end_index = self.nonogram_static_information.width -  minimum_length_of_remaining_actions;
                if (possible_start_index <= possible_end_index):
                    possible_actions.append((row_index, [x for x in range(possible_start_index, possible_end_index + 1)]))
        return possible_actions

    def is_final(self):
        # verifications on rows
        for row_index in range(self.nonogram_static_information.height):
            necessary_filled_cells = reduce(lambda accumulator, current: accumulator + current, self.nonogram_static_information.rows[row_index], 0)
            actual_filled_cells = 0
            for row_action in self.rows_actions[row_index]:
                if (row_action.applied_from_index != None):
                    actual_filled_cells += row_action.cells_count;
            if necessary_filled_cells != actual_filled_cells:
                return False
        
        # verifications on columns
        for column_index in range(self.nonogram_static_information.width):
            necessary_filled_cells = reduce(lambda accumulator, current: accumulator + current, self.nonogram_static_information.columns[column_index], 0)
            
            actual_filled_cells = 0
            for row_actions in self.rows_actions:
                for row_action in row_actions:
                    if (row_action.applied_from_index != None):
                        if (column_index in range(row_action.get_first_index(), row_action.get_second_index())):
                            actual_filled_cells += 1
            if necessary_filled_cells != actual_filled_cells:
                return False

        # all verifications are ok
        return True
    
    def __eq__(self, other):
        for i in range(0, len(self.rows_actions)):
            for j in range(0, len(self.rows_actions[i])):
                if self.rows_actions[i][j] != other.rows_actions[i][j]:
                    return False
        return True
    
    def __lt__(self, other):
        return False    

In [7]:
# tests for Game.__eq__
data = parse_json_input_file(easy_test_files[0])
game1 = Game(data)
game2 = Game(data)
assert(game1 == game2), "Fix me"

game1.apply_next_action_on_row(0, 0)
game2.apply_next_action_on_row(0, 0)
assert(game1 == game2), "Fix me"

game1.apply_next_action_on_row(1, 0)
game2.apply_next_action_on_row(2, 0)
assert(game1 != game2), "Fix me"

In [8]:
def get_total_of_next_possible_actions(rows_actions):
    count = 0
    for row_actions in rows_actions:
        for row_action_index in row_actions[1]:
            count += 1
    return count

def dfs(game, current_depth = 0, max_depth = math.inf):
    increase_nodes_expanded(1)
    
    rows_actions = game.get_possible_actions()
    if len(rows_actions) == 0 or current_depth == max_depth:
        if game.is_final():
            return game;
        return None

    rows_actions.sort(key = lambda item : len(item[1]))
    increase_nodes_generated(get_total_of_next_possible_actions(rows_actions))
    for row_actions in rows_actions:
        row_index = row_actions[0]
        for row_action_index in row_actions[1]:
            
            new_game = deepcopy(game)
            new_game.apply_next_action_on_row(row_index, row_action_index)
            final_game = dfs(new_game, current_depth + 1, max_depth)
            
            if final_game is not None:
                return final_game

In [9]:
def bfs(game):
    queue = []
    queue.append(game)
 
    while queue:
        current_game = queue.pop(0)
        increase_nodes_expanded(1)
        if current_game.is_final():
            return current_game
        for row_actions in current_game.get_possible_actions():
            row_index = row_actions[0]
            for index_to_apply in row_actions[1]:
                increase_nodes_generated(1)
                new_game = deepcopy(current_game)
                new_game.apply_next_action_on_row(row_index, index_to_apply)
                queue.append(new_game)

In [10]:
def ids(game):
    final_game = None
    depth = 0
    while depth < math.inf:
        final_game = dfs(game, 0, depth)
        if final_game is not None:
            break
        depth += 1
    return final_game

In [11]:
def f(game):
    number_of_columns = game.nonogram_static_information.width
    number_of_continuous_columns_with_fully_respected_constraints = GameHelper.get_number_of_continuous_columns_with_fully_respected_constraints(game) 
    first_column_index_with_not_respected_constraints = int(number_of_continuous_columns_with_fully_respected_constraints)
    
    rough_result = number_of_columns - number_of_continuous_columns_with_fully_respected_constraints
    
    #1 next column has more filled cells
    if first_column_index_with_not_respected_constraints != number_of_columns:
        first_column_index_with_not_respected_constraints_values = game.get_values_on_column(first_column_index_with_not_respected_constraints)
        necessary_filled_cells = reduce(lambda accumulator, current: accumulator + current, game.nonogram_static_information.columns[first_column_index_with_not_respected_constraints], 0)
        actually_filled_cells = reduce(lambda accumulator, current: accumulator + current, first_column_index_with_not_respected_constraints_values, 0)

        if actually_filled_cells > necessary_filled_cells:
            return math.inf
    
    #2 other rows have been placed farther than the next column to complete
    row_index = -1
    for row_actions in game.rows_actions:
        row_index += 1
        # filter actions in two categories: applied and not_applied
        applied_actions = list(filter(lambda action : action.applied_from_index != None, row_actions))            
        
        for applied_action in applied_actions:
            if applied_action.get_first_index() > first_column_index_with_not_respected_constraints:
                return math.inf

    #3 partial solve for next column to complete 
    if first_column_index_with_not_respected_constraints != number_of_columns: 
        delicate_result = GameHelper.get_respected_ratio_for_column(game, first_column_index_with_not_respected_constraints)
    else:
        delicate_result = 0
            
    return rough_result - delicate_result

def a_star(initial_game):
    # open_set item = (score, id, game)
    SCORE = 0
    GAME = 1
    
    open_set = []
    closed_set_length = 0
    heappush(open_set, (math.inf, initial_game))
    
    index = 0
    while len(open_set) != 0:
        current_game = heappop(open_set)[GAME]
        if current_game.is_final():
            return current_game
        
        increase_nodes_expanded(1)

        number_of_columns_with_met_constraints = GameHelper.get_number_of_continuous_columns_with_fully_respected_constraints(current_game)
        score = f(current_game)
        rows_actions = current_game.get_possible_actions()
        for row_actions in rows_actions:
            row_index = row_actions[0]
            for index_to_apply in row_actions[1]:
                if index_to_apply < number_of_columns_with_met_constraints:
                    continue
                
                next_game = deepcopy(current_game)
                next_game.apply_next_action_on_row(row_index, index_to_apply)
                
                score_for_next = f(next_game)
                if score_for_next != math.inf:
                    increase_nodes_generated(1)
                    heappush(open_set, (score_for_next, next_game))
        index += 1
    return None


In [12]:
nodes_generated = 0
nodes_expanded = 0

def reset_nodes_generated_and_expanded():
    global nodes_generated
    nodes_generated = 0

    global nodes_expanded
    nodes_expanded = 0

def increase_nodes_generated(count):
    global nodes_generated    
    nodes_generated += count

def increase_nodes_expanded(count):
    global nodes_expanded
    nodes_expanded += count

NAME = 'name'
STRATEGY = 'strategy'
NODES_GENERATED = 'nodes_generated'
NODES_EXPANDED = 'nodes_expanded'
TIME = 'time'
SOLUTION = 'solution'

def get_execution_result(strategy, time, solution, name):
    execution_result = {}
    execution_result[NAME] = name
    execution_result[STRATEGY] = strategy
    execution_result[NODES_GENERATED] = nodes_generated
    execution_result[NODES_EXPANDED] = nodes_expanded
    execution_result[TIME] = time
    execution_result[SOLUTION] = solution
    return execution_result


In [13]:
def execute_for_analytics(game, solver):
    reset_nodes_generated_and_expanded()
    
    start_time = time.time()
    final_game = solver(game)
    end_time = time.time()

    return get_execution_result(solver.__name__, end_time - start_time, final_game.get_matrix(), final_game.get_name())
    
    
result_data = []

# run the easy tests
for test_file in easy_test_files:
    data = parse_json_input_file(test_file)
    initial_game = Game(data)
    for solver in [dfs, bfs, ids]:
        execution_result = execute_for_analytics(initial_game, solver)
        result_data.append(execution_result)

# run the easy + hard tests
for test_file in easy_test_files + hard_test_files:
    data = parse_json_input_file(test_file)
    initial_game = Game(data)
    for solver in [a_star]:
        execution_result = execute_for_analytics(initial_game, solver)
        result_data.append(execution_result)
        
# write results to disk
with open('execution_result.json', 'w+') as outfile:
    json.dump(result_data, outfile, indent = 2)