In [None]:
import sys
import os
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
from queue import PriorityQueue
import csv
import numpy as np
from numpy import inf
import matplotlib.pyplot as plt

import cv2
# from google.colab.patches import cv2_imshow
def cv2_imshow(img):
    img = np.concatenate((img[:,:,2:3], img[:,:,1:2], img[:,:,0:1]), axis=2)
    plt.imshow(img)
    plt.show()

# !pip3 install rich
from rich.progress import Progress

from utils import get_abs_path

In [None]:
def load_images(path, size):
  data = []
  f = []

  for (dirpath, dirnames, filenames) in os.walk(path):
      f.extend(filenames)
      break

  with Progress() as progress:
    task = progress.add_task('[yellow]Loading images...', total=size)

    for i in range(size):
      img_path = path + f[i]
      img = cv2.imread(img_path)
      data.append(img)
      progress.update(task, advance=1)

  data = np.array(data)
  # print(data.shape)
  data = np.reshape(data, (-1, 120, 120, 3))
  return data

def get_points(image):
  image = np.copy(image)
  points = []
  width = image.shape[0]
  height = image.shape[1]

  for y in range(height):
    for x in range(width):
      value = image[y, x, 1]
      if value == 255:
        x_p = x + 2
        y_p = y + 2
        points.append(x_p)
        points.append(y_p)
        for i in range(y, y+5):
          for j in range(x, x+5):
            image[i, j, 1] = 0

  return points

def get_coordinates(images, size):

  rows = []
  with Progress() as progress:
    task = progress.add_task('[yellow]Loading coordinates...', total=size)

    for i in range(size):
      points = get_points(images[i])
      rows.append(points)
      progress.update(task, advance=1)

  return rows

def load_data(path, size):

  images = load_images(path, size)
  coordinates = get_coordinates(images, size)
  return images, coordinates

In [None]:
project_path = get_abs_path(1)

dataset_path = project_path + '/data/maps/'
results_path = project_path + '/data/planned_maps/'
results_filename = 'paths.json'

samples_count = 10
dataset, coordinates = load_data(dataset_path, samples_count)

In [None]:
for i in range(1):
  cv2_imshow(dataset[i, :, :, :])

## A* planning algorithm

In [None]:
def calculate_cost(point1, point2):
  cost = np.sqrt( (point1[0] - point2[0]) ** 2 + (point1[1] - point2[1]) ** 2 )
  return np.round(cost, 2)

def find_neighbours(map, point, free_space, occupied_space, layer=0):
  size_x, size_y = map.shape[0], map.shape[1]
  occ_map = map[:, :, layer]

  nieghbours = []
  for i in range(-1, 2):
    for j in range(-1, 2):
      if not(point[0] + i < 1 or point[0] + i > size_y - 1 or point[1] + j < 0 or point[1] + j > size_x - 1 or (i == 0 and j == 0)):
        # print(point[0] + i, [point[1] + j], " - ", occ_map[14 + i][72 + j])
        if occ_map[point[1] + j][point[0] + i] != 255:
          neighbour_x = point[0] + i
          neighbour_y = point[1] + j
          cost = calculate_cost(point, (neighbour_x, neighbour_y))
          nieghbours.append( (cost, (neighbour_x, neighbour_y)) )

  return nieghbours

In [None]:
def find_path(oryg_map, oryg_coordinates):
  map = np.copy(oryg_map)
  coordinates = oryg_coordinates.copy()

  free_space = (0, 0, 0)
  occupied_space = (255, 0, 0)

  width, height, layers = map.shape
  pixel_count = width * height

  index = 0
  start_point = ( int(coordinates[0]), int(coordinates[1]) )
  goal_point  = ( int(coordinates[2]), int(coordinates[3]) )
  # print('start point: ', start_point, ' goal point: ', goal_point)

  cost_map = np.array( [[inf for x in range(width)] for y in range(height)] )
  cost_map[start_point[0]][start_point[1]] = 0

  visited = set()

  parent_list = []
  for x in range(width):
    for y in range(height):
      parent_list.append( (x,y) )

  parent = {n: None for n in parent_list}

  q = PriorityQueue()
  q.put( (0, start_point) )

  while not q.empty():
    _, curr_node = q.get(q)
    if curr_node in visited:
      continue

    visited.add(curr_node)
    if curr_node == goal_point:
      break

    for distance, curr_neighbour in find_neighbours(map, curr_node, free_space, occupied_space):
      if curr_neighbour in visited:
        continue

      curr_node_x, curr_node_y = curr_node
      curr_neighbour_x, curr_neighbour_y = curr_neighbour

      old_cost = cost_map[curr_neighbour_x][curr_neighbour_y]
      new_cost = cost_map[curr_node_x][curr_node_y] + distance

      if parent[curr_node] != None:
        first_coords = np.subtract(parent[curr_node], curr_node)
        second_coords = np.subtract(curr_node, curr_neighbour)

        if not(first_coords[0] == second_coords[0] and first_coords[1] == second_coords[1]) :
          new_cost += 1

      if new_cost < old_cost:
        cost_map[curr_neighbour_x][curr_neighbour_y] = new_cost
        parent[curr_neighbour] = curr_node
        priority = new_cost + calculate_cost(goal_point, curr_neighbour)
        q.put( (priority, curr_neighbour) )

  path = []
  curr_node = goal_point
  while curr_node is not None:
    path.append(curr_node)
    curr_node = parent[curr_node]
  path.reverse()

  for x, y in path:
    map[y, x, 1] = 255
    # cv2_imshow(map)

  return map, path

In [None]:
def mark_path_points(oryg_map, oryg_path):
  map = np.copy(oryg_map)
  path = oryg_path.copy()
  start_point = path[0]
  goal_point = path[-1]
  path = path[1:-1]

  # y, x
  directions_dict = {
    (-1,-1): 'NW',
    (-1, 0): 'N',
    (-1, 1): 'NE',
    ( 0, 1): 'E',
    ( 1, 1): 'SE',
    ( 1, 0): 'S',
    ( 1,-1): 'SW',
    ( 0,-1): 'W'
  }

  mark_points = []
  last_x = start_point[0]
  last_y = start_point[1]
  last_dir = None

  for curr_x, curr_y in path:
    x_change = curr_x - last_x
    y_change = curr_y - last_y
    dir = directions_dict[(y_change, x_change)]

    if dir is not last_dir:
      mark_points.append( (last_x, last_y) )

    last_x = curr_x
    last_y = curr_y
    last_dir = dir

  mark_points.append(goal_point)

  for x, y in mark_points:
    map[y, x, 1] = 0
    map[y, x, 2] = 255
  return map, mark_points

In [None]:
def generate_point_paths(dataset, coordinates):
  size = dataset.shape[0]
  maps = []
  paths = {}

  with Progress() as progress:
    task = progress.add_task('[yellow]Loading images...', total=size)

    for i in range(size):
      map = dataset[i]
      coor = coordinates[i]
      name = 'map_' + str(i)
      map, path = find_path(map, coor)
      map, points = mark_path_points(map, path)
      maps.append(map)
      paths[name] = points
      # cv2_imshow(map)

      progress.update(task, advance=1)

  return maps, paths

In [None]:
maps, paths = generate_point_paths(dataset, coordinates)

for i in range(5):
  cv2_imshow(maps[i])

In [None]:
import json

with open(results_path + results_filename, 'w') as f:
    json.dump(paths, f)