In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib import animation
from matplotlib.patches import Rectangle
from scipy.spatial import Voronoi
from scipy.optimize import linear_sum_assignment
from shapely.geometry import Polygon, Point
import matplotlib.cm as cm
import matplotlib.colors as mcolors
import json
import random



In [3]:
#The motion functions will return an accurate position of where the with beatTracking feature included. 
#These functions return a single position for all drones without considering the physical states of the drones

In [73]:
def happy_motion(phase, N=10, R=1, A=0.5, f=6, spin=1):
    targets = np.arange(N)
    theta0 = 2 * np.pi * targets / N
    theta = np.arctan2(np.sin(phase + theta0), np.cos(phase + theta0))
    r = R + A * np.sin(f * theta)
    x = r * np.cos(theta)
    y = r * np.sin(theta)
    pos = np.stack([x, y], axis=-1)
    return pos

In [5]:
def surprise_motiona(phase, N, r_min=0.25, r_max=0.95, snaps=1, decay=10.0, spin=0.6):
    targets = np.arange(N)
    theta0 = 2*np.pi*(targets/N)
    theta  = theta0 + 2*np.pi*spin*phase
    frac = (phase * snaps) % 1.0
    burst = np.exp(-decay * frac)          # pop then relax
    r = r_min + (r_max - r_min) * burst
    x = r * np.cos(theta); y = r * np.sin(theta)
    return np.stack([x, y], axis=-1)

In [64]:
def surprise_motion(phase, N, r_min=0.25, r_max=0.95, snaps=1, k=4.0, spin=1):
    # k controls "peakedness": higher k => sharper pops but still smooth
    targets = np.arange(N)
    theta0 = 2*np.pi*(targets/N)
    theta  = theta0 + 2*np.pi*spin*phase

    # periodic, smooth envelope in [0,1]
    # 0.5*(1+cos(..)) peaks at 1 at beat centers; raise to k to sharpen
    env = 0.5 * (1.0 + np.cos(2*np.pi*snaps*phase))
    burst = env**k

    r = r_min + (r_max - r_min) * burst
    x = r * np.cos(theta); y = r * np.sin(theta)
    return np.stack([x, y], axis=-1)

In [65]:
def sadness_motion(phase, N, R=0.5, spin=1):
    targets = np.arange(N)
    theta0 = 2*np.pi*(targets/N)
    theta  = theta0 + 2*np.pi*spin*phase  # slow rotation over the beat
    x = R * np.cos(theta); y = R * np.sin(theta)
    return np.stack([x, y], axis=-1)

In [8]:
class Coverage():

    def __init__ (self, density_function, phase=0, N=10, domain=(-1.0, 1.0, -1.0, 1.0),
                        grid_res=100, seed=0, max_iter=50):
                        

        """ 
        density_function: The density function used. Contributes to computing weighted centroid
        phase: "progress bar" of the complete action
        N: num drones
        domain: 2D rectangular space
        grid_res: sampling rate for map
        seed: for randomized init drone positions
        max_iter: how many steps it takes from start to end
        
        """

        self.N = N
        self.frames = []
        self.max_iter = max_iter


        rng = np.random.default_rng(seed)
        x_min, x_max, y_min, y_max = domain

        # Initial positions with somewhat deformed circle
        angles = rng.uniform(0, 2*np.pi, size=N)
        radii  = 0.7 + 0.25*rng.uniform(-1, 1, size=N)
        targets = np.stack([radii*np.cos(angles), radii*np.sin(angles)], axis=1)

        # Prebuild sampling grid over D
        xs = np.linspace(x_min, x_max, grid_res)
        ys = np.linspace(y_min, y_max, grid_res)
        X, Y = np.meshgrid(xs, ys)

        #flatten
        self.grid_points = np.stack([X.ravel(), Y.ravel()], axis=1)      # [P,2]

        #use a density function to get the value of importance
        density = density_function(self.grid_points[:, 0], self.grid_points[:, 1])     # [P]
        # no divide by 0
        self.density = np.maximum(density, 1e-12)

        targets = np.arange(self.N)
        theta0 = 2*np.pi*(targets / N)
        r0 = 0.75 * np.ones(N)
        p = np.stack([r0*np.cos(theta0), r0*np.sin(theta0)], axis=1)
        self.frames.append(p.copy())

        for _ in range(self.max_iter):
            # For each grid point, find nearest drone index
            dists2 = ((self.grid_points[:, None, :] - p[None, :, :])**2).sum(axis=2)  # [P,N] find distance 2 each drone using squared differnece
            owners = np.argmin(dists2, axis=1)  # [P] indices of closest drone to each sample point

            #weighted centroid
            centroid = np.zeros_like(p)     # the centroid shifts slightly as drones moves
            for drone in range(self.N):
                zone = (owners == drone)    #for each drone, find list of points that belongs to it 
                if np.any(zone):
                    weights = self.density[zone]       # [Pi], weights/density of all the sampled points in zone
                    pts = self.grid_points[zone]             # [Pi,2], all coordinates of the region
                    W = weights.sum()
                    if W > 0:
                        centroid[drone] = (pts * weights[:, None]).sum(axis=0) / W  #compute weighted centroid of the zone for drone k that it will move to
                    else:
                        centroid[drone] = p[drone].copy()
                else:
                    # Empty cell: keep current point
                    centroid[drone] = p[drone].copy()

            #move toward centroid
            p = centroid
            self.frames.append(p.copy())
            
        assert len(self.frames) == self.max_iter + 1


    def pose(self, phase: float) -> np.ndarray:
        # nearest cached step (0..max_iter)
        idx = int(round(np.clip(phase, 0.0, 1.0) * self.max_iter))
        return self.frames[idx]

In [9]:
def anger_density(x, y):
    g1 = np.exp(-((x - 0.55)**2 + (y - 0.55)**2) / 0.06)
    g2 = np.exp(-((x - 0.35)**2 + (y - 0.35)**2) / 0.06)
    return 0.15 + 1.6*(g1 + 0.8*g2)

In [10]:
angry = Coverage(anger_density, N=10, grid_res=60, max_iter=20)

def anger_motion(phase, N):
    return angry.pose(float(phase))

In [11]:
#beatrack: gives the time where each beat is 

with open('json/beatTrack.json', 'r') as file:
    beats = list(json.load(file))
    # beats = [0] + beats

with open('json/timeStamps.json', 'r') as file:
    times = list(json.load(file))

with open('json/soft_label.json', 'r') as file:
    slabels = list(json.load(file))

In [12]:
time_labels = np.column_stack((times, slabels))
for time, label in time_labels:
    print(f't={time}, label={label}')


t=0.0, label={'happy': 4.0192125710370923e-07, 'surprise': 0.9999995980700108, 'sad': 2.7245514008202718e-61, 'angry': 8.732052131078617e-12}
t=1.505, label={'happy': 1.4778909220473156e-06, 'surprise': 0.9999985221088149, 'sad': 1.6587048911641894e-67, 'angry': 2.630069992203478e-13}
t=3.01, label={'happy': 9.573247822578921e-07, 'surprise': 0.9999990426746382, 'sad': 3.544126580868198e-66, 'angry': 5.795433506574168e-13}
t=4.515, label={'happy': 7.417706587937013e-06, 'surprise': 0.9999925822923015, 'sad': 7.608854464277255e-64, 'angry': 1.1107289775827126e-12}
t=6.02, label={'happy': 4.2688181312351623e-07, 'surprise': 0.9999995731174737, 'sad': 3.4838880435694605e-66, 'angry': 7.131941197186792e-13}
t=7.5249999999999995, label={'happy': 1.0547548000028785e-05, 'surprise': 0.9999894524510656, 'sad': 5.2758839910880235e-64, 'angry': 9.342762491306901e-13}
t=9.03, label={'happy': 4.0110381898215465e-05, 'surprise': 0.999959889616855, 'sad': 9.470103157665017e-63, 'angry': 1.2466652833

In [13]:
def phasor(beats, cur_t):
    #get the phase of 
    k = np.searchsorted(beats, cur_t, side='right')-1
    k = np.clip(k, 0, len(beats)-2)
    beat_len = beats[k+1]-beats[k]
    phase = (cur_t - beats[k]) / beat_len #[0,1)
    return float(np.clip(phase, 0.0, 1.0))

In [14]:
#append phases
newlabels = np.array([[t, lbls, phasor(beats, t)] for t, lbls in time_labels])
newlabels

array([[0.0,
        {'happy': 4.0192125710370923e-07, 'surprise': 0.9999995980700108, 'sad': 2.7245514008202718e-61, 'angry': 8.732052131078617e-12},
        0.0],
       [1.505,
        {'happy': 1.4778909220473156e-06, 'surprise': 0.9999985221088149, 'sad': 1.6587048911641894e-67, 'angry': 2.630069992203478e-13},
        0.8346354166666666],
       [3.01,
        {'happy': 9.573247822578921e-07, 'surprise': 0.9999990426746382, 'sad': 3.544126580868198e-66, 'angry': 5.795433506574168e-13},
        0.21093749999999983],
       [4.515,
        {'happy': 7.417706587937013e-06, 'surprise': 0.9999925822923015, 'sad': 7.608854464277255e-64, 'angry': 1.1107289775827126e-12},
        0.12890624999999914],
       [6.02,
        {'happy': 4.2688181312351623e-07, 'surprise': 0.9999995731174737, 'sad': 3.4838880435694605e-66, 'angry': 7.131941197186792e-13},
        0.04687499999999947],
       [7.5249999999999995,
        {'happy': 1.0547548000028785e-05, 'surprise': 0.9999894524510656, 'sad': 

In [15]:
#append keyframes where phase = 1

t = newlabels[:, 0]
labels= newlabels[:, 1]

k = np.searchsorted(t, beats, side='right') - 1


downbeats = []
for i, beat in enumerate(beats):
    downbeats.append([beat, labels[k[i]], 1])

m = np.append(newlabels, downbeats, axis=0)
order = np.argsort(m[:, 0])
keyframes = m[order]
keyframes


array([[0.0,
        {'happy': 4.0192125710370923e-07, 'surprise': 0.9999995980700108, 'sad': 2.7245514008202718e-61, 'angry': 8.732052131078617e-12},
        0.0],
       [0.864,
        {'happy': 4.0192125710370923e-07, 'surprise': 0.9999995980700108, 'sad': 2.7245514008202718e-61, 'angry': 8.732052131078617e-12},
        1],
       [1.505,
        {'happy': 1.4778909220473156e-06, 'surprise': 0.9999985221088149, 'sad': 1.6587048911641894e-67, 'angry': 2.630069992203478e-13},
        0.8346354166666666],
       ...,
       [194.14499999999998,
        {'happy': 2.73352657108183e-05, 'surprise': 0.9999726643628081, 'sad': 9.507556340307997e-52, 'angry': 3.7148110280475117e-10},
        1.0],
       [195.64999999999998,
        {'happy': 2.921733189872706e-06, 'surprise': 0.9999970780237701, 'sad': 9.863701466005713e-54, 'angry': 2.430402040044711e-10},
        1.0],
       [197.15499999999997,
        {'happy': 2.657997321926015e-07, 'surprise': 0.9999997288090852, 'sad': 7.1386708045

In [None]:

def tamper_demo(keyframes, peak=0.97):
    emos = list(keyframes[0][1].keys())

    def renorm(p):
        s = sum(p.values())
        for k in p: 
            p[k] = max(p[k], 0.0)/s

    for i, kf in enumerate(keyframes):
        p = kf[1]
        # every 8th frame construct a 50/50 split of 2 emotions
        if i % 8 == 0:
            #pick 2 emotions
            a, b = emos[i % len(emos)], emos[(i+1) % len(emos)]
            for k in p: p[k] = 0
            p[a] = p[b] = 0.5
        #or every 4th frame focus on on one emotion
        elif i % 4 == 0:
            idx = random.randrange(len(emos))
            t = emos[idx]
            base = (1.0 - peak) / (len(emos) - 1)
            for k in emos:
                p[k] = base
            p[t] = peak
        renorm(p)

In [59]:
tamper_demo(keyframes)
keyframes

array([[0.0, {'happy': 0.5, 'surprise': 0.5, 'sad': 0.0, 'angry': 0.0},
        0.0],
       [0.864, {'happy': 0.5, 'surprise': 0.5, 'sad': 0.0, 'angry': 0.0},
        1],
       [1.505,
        {'happy': 0.005101010101010105, 'surprise': 0.005101010101010105, 'sad': 0.005101010101010105, 'angry': 0.4947979797979798, 2: 0.4898989898989899},
        0.8346354166666666],
       ...,
       [194.14499999999998,
        {'happy': 2.73352657108183e-05, 'surprise': 0.9999726643628081, 'sad': 9.507556340307997e-52, 'angry': 3.7148110280475117e-10},
        1.0],
       [195.64999999999998,
        {'happy': 0.005101010101010105, 'surprise': 0.4947979797979798, 'sad': 0.005101010101010105, 'angry': 0.005101010101010105, 2: 0.4898989898989899},
        1.0],
       [197.15499999999997,
        {'happy': 2.657997321926015e-07, 'surprise': 0.9999997288090852, 'sad': 7.138670804524287e-49, 'angry': 5.391182668818202e-09},
        1.0]], dtype=object)

In [18]:
primatives = [happy_motion, surprise_motion, sadness_motion, anger_motion]
labels = ["happy", "surprise", "sadness", "anger"]

In [None]:
# def drone_matcher(pos_A, pos_B):
#     #lerp between 2 drone states
#     costs = ((pos_A[:, None, :] - pos_B[None, :, :])**2).sum(axis=2)  #N x N matrix with distance to each next positions
#     row, column = linear_sum_assignment(costs)
#     return column

In [76]:
# this is where can adjust the parameters

def prim_happy(phi, N):    return happy_motion(N, phi)
def prim_surprise(phi, N): return surprise_motion(N, phi)
def prim_sad(phi, N):      return sadness_motion(N, phi)
def prim_angry(phi, N):    return anger_motion(N, phi)

In [None]:
PRIMS = {
    "happy":    prim_happy,
    "surprise": prim_surprise,
    "sad":      prim_sad,
    "angry":    prim_angry,
}

def interpolate(t_src, y_src, t_dst, smooth=False, alpha=0.2):

    ''' 
    t_src: 1d array of src time stamps
    y_src: vals at src time stamps
    t_dst: 1d array of dst time stamps (shift by 1)
    '''
    # eval 
    y = np.interp(t_dst, t_src, y_src)  # linear in time
    return y

#make sure no non_zero
def normalize_rows(W, eps=1e-12):
    s = W.sum(axis=1, keepdims=True)
    s = np.maximum(s, eps)
    return W / s



def blended_stream(keyframes, N, fps=30, keep_identity=True):
    """
    keyframes: [t, weights_dict, phase]
    N: N drones
    Returns: times (T,), positions list of length T where positions[t] is (N,2)

    fps: this number is subjected to change based on how capable the drones move. fps=30 is used for better animation. Actual number maybe lower
    """
    # unpack
    T_src = np.array([row[0] for row in keyframes], dtype=float)
    Phase_src = np.array([float(row[2]) for row in keyframes], dtype=float)


    emotions = list(PRIMS.keys())  # ["happy","surprise","sad","angry"] as keys
    W_src = np.array([[row[1].get(e, 0.0) for e in emotions] for row in keyframes], dtype=float)    #[keyframes, emotion weights]

    # a base time grid that represents actual time 
    t0, t1 = T_src[0], T_src[-1]
    dt = 1.0 / fps
    timegrid = np.arange(t0, t1 + 1e-9, dt)

    #getting interpolated in-between weights and phases
    weights = np.stack([interpolate(T_src, W_src[:, i], timegrid) for i in range(W_src.shape[1])], axis=1)   #[weights]
    weights = normalize_rows(weights)  # rows sum to 1
    phases = interpolate(T_src, Phase_src, timegrid)
    phases = phases % 1.0 #[0,1)

    #inserting 
    positions = []
    prev = None
    for k, (t, ph) in enumerate(zip(timegrid, phases)):
        # motion per primitive
        comps = []
        for i, e in enumerate(emotions):
            Pi = PRIMS[e](N, ph)  # (N,2)
            comps.append(weights[k, i] * Pi)    # each resulted potision of the emotion is multiplied by it's respective weight

    
        P_blend = np.sum(comps, axis=0)  # (N,2)

        # # 5) maintain identity with an assignment (optional)
        # if keep_identity and prev is not None:
        #     # Hungarian assignment
        #     C = np.linalg.norm(prev[:, None, :] - P_blend[None, :, :], axis=2)  # (N,N)
        #     ri, cj = linear_sum_assignment(C)
        #     P_blend = P_blend[cj]  # reorder to match prev’s indices

        positions.append(P_blend)
        prev = P_blend

    return timegrid, positions, emotions, weights

In [78]:
times_grid, positions, emotions, weight_grid = blended_stream(keyframes=keyframes, N=10)

In [79]:
weight_grid

array([[5.00000000e-01, 5.00000000e-01, 0.00000000e+00, 0.00000000e+00],
       [5.00000000e-01, 5.00000000e-01, 0.00000000e+00, 0.00000000e+00],
       [5.00000000e-01, 5.00000000e-01, 0.00000000e+00, 0.00000000e+00],
       ...,
       [3.08516030e-04, 9.99074962e-01, 3.08258423e-04, 3.08263648e-04],
       [1.90074699e-04, 9.99430292e-01, 1.89813945e-04, 1.89819234e-04],
       [7.42219150e-05, 9.99777857e-01, 7.39580810e-05, 7.39634323e-05]])

In [None]:
def animate_positions(positions, T, xlim=(-1.6,1.6), ylim=(-1.6,1.6), fps=15, outfile=None,
                      k=0.25,   # response time (smaller = snappier)
):
    """
    positions: list/array of shape [F, N, 2] (targets per frame)
    T:         times of length F, strictly increasing
    k:       first-order smoothing time constant (seconds)
    """
    targets = [np.asarray(P, float) for P in positions]
    F = len(targets); N = targets[0].shape[0]
    assert all(P.shape == (N,2) for P in targets)
    assert len(T) == F and np.all(np.diff(T) > 0)

    dt = float(np.median(np.diff(T)))
    alpha = 1.0 - np.exp(-dt / max(k, 1e-6))  # in (0,1)

    drones = targets[0].copy()

    drone_traj = [drones.copy()]
    for k in range(1, F):
        # simple first-order lag toward same-index target
        drones = (1.0 - alpha) * drones + alpha * targets[k]
        drone_traj.append(drones.copy())




    # animation
    fig, ax = plt.subplots(figsize=(5,5))
    ax.set_aspect('equal', 'box')
    ax.set_xlim(*xlim); ax.set_ylim(*ylim)

    targ = ax.scatter(targets[0][:,0], targets[0][:,1], s=60, facecolors='none', edgecolors='orange', marker='o')
    dron = ax.scatter(drone_traj[0][:,0], drone_traj[0][:,1], s=40)
    title = ax.text(0.02, 0.98, "", transform=ax.transAxes, va="top")
    emo = ax.text(0.02, 0.86, "", transform=ax.transAxes, va="top")

    def init():
        targ.set_offsets(targets[0])
        dron.set_offsets(drone_traj[0])
        title.set_text(f"t={T[0]:.2f}s")
        emo.set_text(f"happy: {weight_grid[0][0]}, surprise: {weight_grid[0][1]}, sad: {weight_grid[0][2]}, angry: {weight_grid[0][3]}")
        return (targ, dron, title)

    def update(i):
        targ.set_offsets(targets[i])
        dron.set_offsets(drone_traj[i])
        title.set_text(f"t={T[i]:.2f}s")
        emo.set_text(f"happy: {round(weight_grid[i][0],2)}, surprise: {round(weight_grid[i][1],2)}, sad: {round(weight_grid[i][2],2)}, angry: {round(weight_grid[i][3],2)}")
        return (targ, dron, title)

    interval_ms = int(round(1000.0 * (T[1] - T[0])))
    anim = animation.FuncAnimation(fig, update, init_func=init,
                                   frames=F, interval=interval_ms, blit=True)

    if outfile:
        #mp4 better
        from matplotlib.animation import FFMpegWriter
        writer = FFMpegWriter(fps=int(round(1.0/(T[1]-T[0]))), bitrate=4000)
        anim.save(outfile, writer=writer)
    else:
        plt.show()

    plt.close(fig)


In [81]:
animate_positions(positions=positions, T=times_grid, outfile='drones.mp4')