Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/engine/stdlib/src/py/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pytest == "6.2.2"
black == "20.8b1"
pylint == "2.7.2"
pytest == 6.2.2
black == 20.8b1
pylint == 2.7.2
mypy==0.812
7 changes: 7 additions & 0 deletions packages/engine/stdlib/src/py/hstd/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,16 @@ def generate_agent_id():
@dataclass
class AgentState:
agent_id: str = field(default_factory=generate_agent_id)
agent_name: Optional[str] = None
position: Optional[List[float]] = None
direction: Optional[List[float]] = None

def __setitem__(self, key, value):
setattr(self, key, value)

def __getitem__(self, key):
return getattr(self, key)


class AgentFieldError(Exception):
def __init__(self, agent_id: str, field: str, msg: str = ""):
Expand Down
9 changes: 9 additions & 0 deletions packages/engine/stdlib/src/py/hstd/context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dataclasses import dataclass, field
from typing import Optional, List


@dataclass
class Topology:
x_bounds: List[float]
y_bounds: List[float]
z_bounds: Optional[List[float]] = field(default_factory=lambda: [0.0, 0.0])
130 changes: 130 additions & 0 deletions packages/engine/stdlib/src/py/hstd/init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""
Initialization utility functions.
"""
import math
import random
from copy import deepcopy
from typing import Dict, List, Union, Callable, Mapping

from .agent import AgentState
from .context import Topology

# AgentTemplate can be an AgentState, or function which returns an AgentState
AgentFunction = Callable[[], AgentState]
Comment thread
nshlapo marked this conversation as resolved.
AgentTemplate = Union[AgentState, AgentFunction]


def create_agent(template: AgentTemplate) -> AgentState:
if callable(template):
return template()
else:
return deepcopy(template)


def scatter(count: int, topology: Topology, template: AgentTemplate) -> List[AgentState]:
Comment thread
nshlapo marked this conversation as resolved.
"""
Generate `count` agents using the `template`, assigning them random positions within
the `topology` bounds.

Args:
count: the number of agents to generate.
topology: the `context.globals()["topology"]` value.
template: an agent definition, or a function which returns an agent definition.
"""
x_bounds = topology.x_bounds
y_bounds = topology.y_bounds

width = x_bounds[1] - x_bounds[0]
height = y_bounds[1] - y_bounds[0]

def assign_random_position() -> AgentState:
x = random.uniform(0, width) + x_bounds[0]
y = random.uniform(0, height) + y_bounds[0]

agent = create_agent(template)
agent["position"] = [x, y]

return agent

agents = [assign_random_position() for i in range(count)]

return agents


def stack(count: int, template: AgentTemplate) -> List[AgentState]:
"""
Generate `count` agents using the `template`.

Args:
count: the number of agents to generate.
template: an agent definition, or a function which returns an agent definition.
"""
agents = [create_agent(template) for i in range(count)]

return agents


def grid(topology: Topology, template: AgentTemplate) -> List[AgentState]:
"""
Generate agents on every integer location within the `topology` bounds.

Args:
topology: the `context.globals()["topology"]` value.
template: an agent definition, or a function which returns an agent definition.
"""
x_bounds = topology.x_bounds
y_bounds = topology.y_bounds

width = x_bounds[1] - x_bounds[0]
height = y_bounds[1] - y_bounds[0]
count = width * height

def assign_grid_position(ind: int) -> AgentState:
x = (ind % width) + x_bounds[0]
y = math.floor(ind / width) + y_bounds[0]

agent = create_agent(template)
agent["position"] = [x, y]

return agent

agents = [assign_grid_position(i) for i in range(int(count))]

return agents


def create_layout(
layout: List[List[str]], templates: Mapping[str, AgentState], offset: List[float] = [0, 0, 0]
) -> List[AgentState]:
"""
Generate agents with positions based on a `layout`, and definitions
based on the `templates`.

Args:
layout: the locations of agents, typically uploaded as a csv dataset
templates: the definitions for each type of agent refernced in the layout
offset: optional offset specifying the position of the bottom right corner of the `layout`
"""

height = len(layout)
agents: Dict[str, List[AgentState]] = {}

for pos_y, row in enumerate(layout):
for pos_x, template_type in enumerate(row):
if template_type in templates:
if template_type not in agents:
agents[template_type] = []

agent_name = (templates[template_type].agent_name or template_type) + str(
len(agents[template_type])
)

agent = templates[template_type]
agent["agent_name"] = agent_name
agent["position"] = [pos_x + offset[0], height - pos_y + offset[1], offset[2]]

agents[template_type].append(agent)

agent_list = [agent for sublist in agents.values() for agent in sublist]

return agent_list
131 changes: 131 additions & 0 deletions packages/engine/stdlib/src/py/hstd/neighbor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Neighbor utility functions.
"""
from typing import List

from .spatial import distance_between
from .agent import AgentFieldError, AgentState


def neighbors_on_position(agent: AgentState, neighbors: List[AgentState]) -> List[AgentState]:
"""
Returns all `neighbors` whose position is identical to the `agent`.
"""
if agent.position is None:
raise AgentFieldError(agent.agent_id, "position", "cannot be None")

return [n for n in neighbors if n.position == agent.position]


def neighbors_in_radius(
agent: AgentState,
neighbors: List[AgentState],
max_radius: float = 1,
min_radius: float = 0,
distance_function: str = "euclidean",
z_axis: bool = False,
) -> List[AgentState]:
"""
Returns all neighbors within a certain vision radius of an agent.
Default is 2D (`z_axis` set to false). Set `z_axis` to true for 3D positions.

Args:
agent: central agent
neighbors: context.neighbors() array, or an array of agents
max_radius: minimum radius for valid neighbors
min_radius: maximum radius for valid neighbors
distance_function: type of distance function to use
z_axis: include z-axis in distance calculations
"""

if agent.position is None:
raise AgentFieldError(agent.agent_id, "position", "cannot be None")

def in_radius(neighbor: AgentState) -> bool:
if neighbor.position is None:
return False

d = distance_between(neighbor, agent, distance_function, z_axis)
if d is None:
return False

return (d <= max_radius) and (d >= min_radius)

return [n for n in neighbors if in_radius(n)]


def difference_vector(vec1: List[float], vec2: List[float]):
"""
Calculate the difference vector `vec2` - `vec1`.
"""
return [vec2[ind] - vec1[ind] for ind in range(len(vec1))]


def in_front_planar(agent: AgentState, neighbor: AgentState) -> bool:
"""
Return True if a neighbor is anywhere in front of the agent.
"""

a_dir = agent["direction"]
Comment thread
eadanfahey marked this conversation as resolved.

[dx, dy, dz] = difference_vector(agent["position"], neighbor["position"])
D = a_dir[0] * dx + a_dir[1] * dy + a_dir[2] * dz

return D > 0


def is_linear(agent: AgentState, neighbor: AgentState, front: bool) -> bool:
"""
Check if a neighbor lies along the direction vector of the agent, and is
in front of or behind the agent, based on `front`.
"""
[dx, dy, dz] = difference_vector(agent["position"], neighbor["position"])
[ax, ay, az] = agent["direction"]

cross_product = [dy * az - dz * ay, dx * az - dz * ax, dx * ay - dy * ax]

if cross_product != [0, 0, 0]:
return False

# check if same direction
same_dir = (ax * dx > 0) or (ay * dy > 0) or (az * dz > 0)

return same_dir is front


def neighbors_in_front(
agent: AgentState, neighbors: List[AgentState], colinear: bool = False
) -> List[AgentState]:
"""
Return all `neighbors` in front of the `agent`. If `colinear` is True
check that the neighbor lies along the agent's direction vector.
"""

if agent.position is None:
raise AgentFieldError(agent.agent_id, "position", "cannot be None")
if agent.direction is None:
raise AgentFieldError(agent.agent_id, "direction", "cannot be None")

if colinear:
return [n for n in neighbors if is_linear(agent, n, True)]
else:
return [n for n in neighbors if in_front_planar(agent, n)]


def neighbors_behind(
agent: AgentState, neighbors: List[AgentState], colinear: bool = False
) -> List[AgentState]:
"""
Return all `neighbors` behind the `agent`. If `colinear` is True
check that the neighbor lies along the agent's direction vector.
"""

if agent.position is None:
raise AgentFieldError(agent.agent_id, "position", "cannot be None")
if agent.direction is None:
raise AgentFieldError(agent.agent_id, "direction", "cannot be None")

if colinear:
Comment thread
nshlapo marked this conversation as resolved.
return [n for n in neighbors if is_linear(agent, n, False)]
else:
return [n for n in neighbors if not in_front_planar(agent, n)]
15 changes: 15 additions & 0 deletions packages/engine/stdlib/src/py/hstd/rand.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
Comment thread
eadanfahey marked this conversation as resolved.
Random utility functions.
"""

import random as rand


def set_seed(s: str):
""" Set the random seed for Python's random library """
rand.seed(s)


def random():
""" Returns a random number between 0 and 1 """
return rand.random()
37 changes: 16 additions & 21 deletions packages/engine/stdlib/src/py/hstd/spatial.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,40 @@
"""
import math
import random
from dataclasses import dataclass
from typing import Optional, List

from .agent import AgentState, AgentFieldError
from .context import Topology


@dataclass
class Topology:
x_bounds: List[float]
y_bounds: List[float]
z_bounds: Optional[List[float]]


def manhattan_distance(p1: List[float], p2: List[float]) -> float:
def manhattan_distance(p1: List[float], p2: List[float], z_axis: bool = True) -> float:
dx = abs(p1[0] - p2[0])
dy = abs(p1[1] - p2[1])
dz = abs(p1[2] - p2[2])
return dx + dy + dz
return dx + dy + (dz if z_axis else 0)


def euclidean_squared_distance(p1: List[float], p2: List[float]) -> float:
def euclidean_squared_distance(p1: List[float], p2: List[float], z_axis: bool = True) -> float:
dx = p1[0] - p2[0]
dy = p1[1] - p2[1]
dz = p1[2] - p2[2]
return dx * dx + dy * dy + dz * dz
return dx * dx + dy * dy + (dz * dz if z_axis else 0)


def euclidean_distance(p1: List[float], p2: List[float]) -> float:
return math.sqrt(euclidean_squared_distance(p1, p2))
def euclidean_distance(p1: List[float], p2: List[float], z_axis: bool = True) -> float:
return math.sqrt(euclidean_squared_distance(p1, p2, z_axis))


def chebyshev_distance(p1: List[float], p2: List[float]) -> float:
def chebyshev_distance(p1: List[float], p2: List[float], z_axis: bool = True) -> float:
dx = abs(p1[0] - p2[0])
dy = abs(p1[1] - p2[1])
dz = abs(p1[2] - p2[2])
return max(dx, dy, dz)
return max(dx, dy, (dz if z_axis else 0))


def distance_between(a: AgentState, b: AgentState, distance="euclidean") -> Optional[float]:
def distance_between(
a: AgentState, b: AgentState, distance="euclidean", z_axis=True
) -> Optional[float]:
"""
Returns the specified distance between two agents. The parameter `distance` must be one
of 'euclidean', 'euclidean_sq', 'manhattan' or 'chebyshev'.
Expand All @@ -52,13 +47,13 @@ def distance_between(a: AgentState, b: AgentState, distance="euclidean") -> Opti
raise AgentFieldError(b.agent_id, "position", "cannot be None")

if distance == "euclidean":
return euclidean_distance(a.position, b.position)
return euclidean_distance(a.position, b.position, z_axis)
elif distance == "euclidean_sq":
return euclidean_squared_distance(a.position, b.position)
return euclidean_squared_distance(a.position, b.position, z_axis)
elif distance == "manhattan":
return manhattan_distance(a.position, b.position)
return manhattan_distance(a.position, b.position, z_axis)
elif distance == "chebyshev":
return chebyshev_distance(a.position, b.position)
return chebyshev_distance(a.position, b.position, z_axis)

raise ValueError(
"distance must be one of 'euclidean', 'euclidean_sq', 'manhattan' or 'chebyshev'"
Expand Down
Loading