In [1]:
# TODO: Notebooks should contain an title and a word description of what they're
# supposed to show.

In [2]:
from cadcad.spaces import space
from cadcad.dynamics import block
from cadcad.spaces import Bit, Real, Integer, EmptySpace
from cadcad.points import Point
from cadcad.systems import Experiment
from copy import deepcopy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
@space
class CartesianPlane:
    x:float
    y:float

In [4]:
@space
class Particle:
    pos:CartesianPlane
    vel:CartesianPlane
    # used to encode a constraint
    #but this is exactly the kind of thing
    #i would like to push into the model params
    maxspeed : float

In [5]:
@space
class ObservedParticle:
    pos:CartesianPlane
    prev:CartesianPlane

In [6]:
from enum import Enum
class Strategy(Enum):
    EVADE: -1
    PURSUE: 1

print(Strategy.__annotations__)


{'EVADE': -1, 'PURSUE': 1}


In [7]:

@space
class Agent:
    plant: Particle
    reference: ObservedParticle
    strategy: Strategy

In [8]:

#this block serves as a kind of constraint checker for Particle
#i would like to be able to assign this block to the Space Paricle
#as a builtin constraint
@block
def isValidParticle(domain:Point[Particle])-> Point[Bit]:
    input = deepcopy(domain)
    dx= input['vel']['x']
    dy= input['vel']['y']

    vmax = input['maxspeed']

    output = {'bit':bool(dx**2+dy**2<=vmax**2)}

    return Point(Bit, output)

In [9]:
@block
def projectValidParticle(domain:Point[Particle])-> Point[Particle]:
    input = deepcopy(domain)

    bit = isValidParticle(input)
    output = input
    if not(bit['bit']):
        dx= input['vel']['x']
        dy= input['vel']['y']
        magnitude = np.sqrt(dx**2+dy**2)
        output['vel']['x'] = input['maxspeed']*dx/magnitude
        output['vel']['y'] = input['maxspeed']*dy/magnitude
      
    return output

In [10]:
ParticleUpdateDomain = (Particle * CartesianPlane).rename_dims({'cartesianplane':"acc"})

In [11]:
@block
def particleSubspace(domain: Point[ParticleUpdateDomain])-> Point[Particle]:
    input = deepcopy(domain)
    return Point(Particle, input.data['particle'])

In [12]:
@block
def particleUpdate(domain:Point[ParticleUpdateDomain])-> Point[Particle]:
    input = deepcopy(domain)
    output = particleSubspace(input)

    output['pos']['x']  += input['particle']['vel']['x']
    output['pos']['y']  += input['particle']['vel']['y']

    output['vel']['x']  += input['acc']['x']
    output['vel']['y']  += input['acc']['y']

    output = projectValidParticle(output)

    return output

In [13]:
@block
def randomPoint(domain:Point[EmptySpace])-> Point[CartesianPlane]:
    data = {}
    data['x'] = .005+np.random.randn()/5.0
    data['y'] = .005+np.random.randn()/5.0

    return Point(CartesianPlane, data)

@block
def randomWalk(domain:Point[CartesianPlane])-> Point[CartesianPlane]:
    input = domain
    data = {}
    data['x'] = input['x']+.005+np.random.randn()/5.0
    data['y'] = input['y']+.005+np.random.randn()/5.0

    return Point(CartesianPlane, data)

In [14]:
emptyPoint = Point(EmptySpace,{})

In [15]:
myPoint = randomPoint(emptyPoint)
print(myPoint)

Point in space CartesianPlane has data
{
    "x": 0.04772523647598587,
    "y": 0.13158406108264037
}



In [16]:
experiment_params = {"iteration_n": 2, "steps": 25}
my_experiment = Experiment(myPoint, experiment_params, (randomWalk,))

# BUG: We're having the following error:
# Block randomWalk requires Point[['CartesianPlane']] as input;
# you passed Point[CartesianPlane]
results = my_experiment.run()

BlockInputError: Block randomWalk requires Point[['CartesianPlane']] as input; you passed Point[CartesianPlane]

In [None]:
def flattenHer(results, exp_params):
    n = exp_params["iteration_n"]
    m = exp_params["steps"]
    dfs = []
    for j in range(n):
        d = results[j].data
        r = np.zeros((m,2))
        for i in range(m):

            r[i,:]= [d[i]['x'], d[i]['y']]
        df = pd.DataFrame(r, columns=['x','y'])
        df["run"]  = j
        df['dx'] = df.x.diff().shift(-1)
        df['dy'] = df.y.diff().shift(-1)
        dfs.append(df)

    return pd.concat(dfs)

In [None]:
df=flattenHer(results, experiment_params)

In [None]:
df.head()

In [None]:
runs = df.run.unique()
for k in runs:
    df[df.run==k].plot(x="x", y='y',kind="scatter")
    r = df[df.run==k].drop('run',axis=1).values
    n = len(r)
    for i in range(n):
        plt.arrow(*r[i,:4], width = 0.01, 
        length_includes_head=True)

In [None]:
Agent.unroll_schema()

In [None]:
@block
def aliceDecision(domain:Point[Agent])->Point[CartesianPlane]:
    alice = deepcopy(domain)
    targetx = alice['reference']['pos']['x']
    targety = alice['reference']['pos']['y']
    
    errorx = alice['plant']['pos']['x']-targetx
    errory = alice['plant']['pos']['y']-targety

    ##WIP

@block
def aliceDynamics(domain:Point[Agent])->Point[Agent]:
    alice = deepcopy(domain)
    acc = randomWalk(emptyPoint)
    
    arg_data = {}
    arg_data['particle'] = alice.data
    arg_data['acc'] = acc.data

    args = Point(ParticleUpdateDomain, arg_data)

    output = particleUpdate(args)

    return output

In [None]:
emptyPoint = Point(EmptySpace,{})



In [None]:
Particle.unroll_schema()

In [None]:
initAlice = Point(Particle, {'pos': {'x': 0.0, 'y': 0.0},
 'vel': {'dx': -1.0, 'dy': -1.0},
 'maxspeed': .2})

experiment_params = {"iteration_n": 5, "steps": 25}
my_experiment = Experiment(initAlice, experiment_params, (aliceDynamics,))

results = my_experiment.run()

In [None]:
results

In [None]:
def flattenHer(results, exp_params):
    n = exp_params["iteration_n"]
    m = exp_params["steps"]
    dfs = []
    for j in range(n):
        d = results[j].data
        r = np.zeros((m,4))
        for i in range(m):

            r[i,:]= [d[i]['pos']['x'], d[i]['pos']['y'],d[i]['vel']['dx'], d[i]['vel']['dy']]
        df = pd.DataFrame(r, columns=['x','y','dx','dy'])
        df["run"]  = j
        dfs.append(df)

    return pd.concat(dfs)

In [None]:
df = flattenHer(results,experiment_params)

In [None]:
df

In [None]:
runs = df.run.unique()
for k in runs:
    df[df.run==k].plot(x="x", y='y',kind="scatter")
    r = df[df.run==k].values
    n = len(r)
    for i in range(n):
        plt.arrow(*r[i,:4], width = 0.01, 
        length_includes_head=True)