In [3]:
import time

import numpy as np
from numba import jit, cuda
import holoviews as hv
#from holoviews.operation.datashader import datashade, dynspread


hv.extension("bokeh")


N = 1000
MIN = 0.01

class Simulation:
    def __init__(self, n, seed=2**8, tau=10.0, mass=1.0):
        i = np.arange(0, n)
        rng = np.random.default_rng(seed=seed)
        self.r = rng.random(size=(n, 6)).sum(axis=1)
        self.r = np.abs((self.r / 3.0 - 1.0))

        a = rng.random(size=(n))*tau

        self.pos = np.array([np.cos(a) * 10.0 * self.r * np.sqrt(i),
                             np.sin(a) * 10.0 * self.r * np.sqrt(i)]).T
        
        self.vel = np.array([np.sin(a), -np.cos(a)]).T
        self.acc = np.zeros(shape=(n, 2))
        self.mass = np.ones(shape=n) * mass
        sort_args = (self.pos[:, 0] ** 2 + self.pos[:, 1]**2).argsort()

        self.pos = self.pos[sort_args]
        self.vel = self.vel[sort_args]
        self.vel *= (np.nan_to_num(np.sqrt(np.arange(n) / np.sqrt((self.pos[:, 0] ** 2 + self.pos[:, 1]**2))), 0.0)).reshape(1, -1).T

    def pos_bounds(self, scaler=1.5):
        xlim = range_pos(self.pos)
        ylim = range_pos(self.pos, axis=1)
        bound = max(max(xlim), max(ylim))
        return (-scaler*bound, scaler*bound), (-scaler*bound, scaler*bound)

    def vel_bounds(self, scaler=1.5):
        xlim = range_pos(self.vel)
        ylim = range_pos(self.vel, axis=1)
        bound = max(max(xlim), max(ylim))
        return (-scaler*bound, scaler*bound), (-scaler*bound, scaler*bound)        


def range_pos(arr, axis=0):
    _max = max(arr[:, axis])
    _min = min(arr[:, axis])
    bound = max(abs(_max), abs(_min))
    return (-bound, bound)


@jit
def step(pos, vel, acc, mass, delta_t=0.01):
    for i in range(pos.shape[0]):
        for j in range(i+1, pos.shape[0]):
            if i == j:
                continue
            r = pos[j, :] - pos[i, :]
            mag_sq = r[0]**2 + r[1]**2
            mag = np.sqrt(mag_sq)
            tmp = r/(max(mag_sq, MIN) * mag)
            acc[i, :] += mass[j] * tmp
            acc[j, :] -= mass[i] * tmp
        vel[i, :] += delta_t * acc[i, :]
    acc[:, :] = 0.0
    pos += delta_t * vel


@jit
def uv_to_angle_mag(uv):
    mag = np.sqrt(uv[:, 0]**2 + uv[:, 1]**2)
    angle = (np.pi/2.0) - np.arctan2(uv[:, 1]/mag, uv[:, 2]/mag)
    return mag, angle


sim = Simulation(n=N, tau=2.0*np.pi)

  self.vel *= (np.nan_to_num(np.sqrt(np.arange(n) / np.sqrt((self.pos[:, 0] ** 2 + self.pos[:, 1]**2))), 0.0)).reshape(1, -1).T


In [7]:
xlim, ylim = sim.pos_bounds()

pos_pipe = hv.streams.Pipe(data=[sim.pos[:, 0], sim.pos[:, 1]])
scatter_dmap = hv.DynamicMap(hv.Scatter, streams=[pos_pipe]).opts(height=800, width=800,
                                                              xlim=xlim, ylim=ylim)

scatter_dmap

In [8]:
for i in range(1000):
    time.sleep(0.001)
    step(sim.pos, sim.vel, sim.acc, sim.mass, delta_t=0.01)
    mag, angle = uv_to_angle_mag(sim.vel)
    pos_pipe.send((sim.pos[:, 0], sim.pos[:, 1]))

In [4]:
@jit
def subdivide(arr): 
    width = arr[2] / 2
    return np.array([
    [arr[0] + width, arr[1] + width, width, 0.0, 0.0],
    [arr[0] - width, arr[1] + width, width, 0.0, 0.0],
    [arr[0] + width, arr[1] - width, width, 0.0, 0.0],
    [arr[0] - width, arr[1] - width, width, 0.0, 0.0],
    ])


@jit
def isin(x, y, quad):
    if (
        (x > quad[0] - quad[2]) & (x <= quad[0] + quad[2]) &
        (y > quad[1] - quad[2]) & (y <= quad[1] + quad[2])
    ):
        return True
    return False


@jit
def accumulate_mass(positions, masses, quadtree):
    pass


assert isin(0.0, 0.0, np.array([0.0, 0.0, 1.0]))
assert not isin(10.0, 10.0, np.array([0.0, 0.0, 1.0]))

In [9]:
import numba
from numba import deferred_type, optional, f8
from numba.types import ListType
from numba.typed import List


node_type = deferred_type()
spec = (
    ("value", optional(f8[:, :])),
    ("bounds", f8[:]),
    ("children", ListType(node_type))
)


@numba.experimental.jitclass(spec)
class QuadTreeNode:
    def __init__(self, bounds):
        self.value = None
        self.bounds = bounds
        self.children = List()

    def set_nodes(self):
        self.children = [QuadTreeNode(quad) for quad in subdivide(self.bounds)[:]]
        

node_type.define(QuadTreeNode.class_type.instance_type)

In [10]:
quad = QuadTreeNode(np.array([0.0, 0.0, 10.0]))

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
Failed in nopython mode pipeline (step: nopython frontend)
Cannot infer the type of variable '$42call.6' (temporary variable), have imprecise type: ListType[undefined]. 

File "../../../../../../tmp/ipykernel_226210/2812285496.py", line 20:
<source missing, REPL/exec in use?>

During: resolving callee type: jitclass.QuadTreeNode#7fac06a1bbc0<value:OptionalType(array(float64, 2d, A)),bounds:array(float64, 1d, A),children:ListType[DeferredType#140376822362048]>
During: typing of call at <string> (3)

During: resolving callee type: jitclass.QuadTreeNode#7fac06a1bbc0<value:OptionalType(array(float64, 2d, A)),bounds:array(float64, 1d, A),children:ListType[DeferredType#140376822362048]>
During: typing of call at <string> (3)


File "<string>", line 3:
<source missing, REPL/exec in use?>


In [65]:
quad.value = split(quad.bounds)

In [66]:
quad.value

array([[ 5.,  5.,  5.,  0.],
       [-5.,  5.,  5.,  0.],
       [ 5., -5.,  5.,  0.],
       [-5., -5.,  5.,  0.]])