In [2]:
## Testing environment

## Todo: Environment class & independent source params

## Todo: Animation for visualizing environment

## Todo: Convex & non-convex field

## Todo: Vectorized version of spatial diffusion field update

In [60]:
import numpy as np
import matplotlib.pyplot as plt

In [80]:
DEFAULT_FIELD_PARAMS = [1.0, 1.0, 0, 0, 1, 0.25]

class SpatialTemporalDiffusionField():
    def __init__(self, pos=(50,50), map_size=100, field_size=40, field_max=25, n_timesteps=None, params=DEFAULT_FIELD_PARAMS):
        # initialize field params
        self.dx, self.dy, self.vx, self.vy, self.k, self.dt = params
        self.field_size, self.field_max, self.map_size = field_size, field_max, map_size
        self.snapshots = None
        # generate field
        field_hsize = field_size // 2
        self.gaussian_field = self.generate_multivariate_gaussian_field(N=field_size)
        self.field = np.zeros((map_size, map_size))
        self.padded_field = np.zeros((3*map_size, 3*map_size))
        x_start, x_end = map_size+pos[0]-field_hsize, map_size+pos[0]+field_hsize
        y_start, y_end = map_size+pos[1]-field_hsize, map_size+pos[1]+field_hsize
        self.padded_field[y_start:y_end, x_start:x_end]=self.gaussian_field
        self.field = self.padded_field[map_size:map_size*2,map_size:map_size*2]
        self.field = (self.field - np.min(self.field)) / (np.max(self.field)-np.min(self.field)) * self.field_max
        # generate snapshots
        if (n_timesteps is not None) and (n_timesteps > 0):
            self.snapshots = np.zeros((n_timesteps,map_size,map_size))
            self.snapshots[0] = self.field
            for i in range(1, n_timesteps):
                self.snapshots[i] = self.update_diffusion_field(self.snapshots[i-1])        
    
    def generate_multivariate_gaussian_field(self, N=40, mu=np.array([0.,0.]), sigma=np.array([[1.,0.],[0.,1.]])):
        X = np.linspace(-2.8, 2.8, N)
        Y = np.linspace(-2.8, 2.8, N)
        X, Y = np.meshgrid(X, Y)
        pos = np.empty(X.shape + (2,))
        pos[:,:,0] = X
        pos[:,:,1] = Y
        n = mu.shape[0]
        sigma_det, sigma_inv = np.linalg.det(sigma), np.linalg.inv(sigma)
        N = np.sqrt((2*np.pi)**n * sigma_det)
        fac = np.einsum('...k,kl,...l->...', pos-mu, sigma_inv, pos-mu)
        return np.exp(-fac / 2) / N
    
    def update_diffusion_field(self, field):
        updated_u = field.copy()
        u_k = field.copy()
        for i in range(1, field.shape[0] - 1):
            for j in range(1, field.shape[1] - 1):
                updated_u[j, i] = u_k[j, i] + self.k * (self.dt / self.dx ** 2) * \
                    ((u_k[j + 1, i] + u_k[j - 1, i] +
                      u_k[j, i + 1] + u_k[j, i - 1] - 4 * u_k[j, i])) + \
                    self.vx * (self.dt / self.dx) * ((u_k[j + 1, i] - u_k[j, i])) + self.vy * (self.dt / self.dy) * \
                    (u_k[j, i + 1] - u_k[j, i])
        return updated_u

In [102]:
class SpatialTemporalDiffusionEnvironment():
    def __init__(self, configs=[], size=100, use_snapshots=True, n_timesteps=300):
        self.field_size = size
        self.configs = configs
        self.n_timesteps = n_timesteps
        self.fields=[]
        self.timestep = 0
        for idx, config in enumerate(configs):
            self.fields.append(SpatialTemporalDiffusionField(pos=config['pos'], field_size=config['size'], params=config['params'], \
                    map_size=size, n_timesteps=n_timesteps if use_snapshots else None))
        self.set_timestep()
    
    def set_timestep(self, t=0):
        self.isolated_field_states = np.array([self.fields[i].snapshots[t] for i in range(len(self.fields))])
        self.env_field = np.sum(self.isolated_field_states, axis=0)
        
    def get_single_field(self, idx=0):
        if (idx>=0) and (idx<len(self.fields)):
            return self.isolated_field_states[idx]
        else:
            return None

In [144]:
configs = [
    {'pos': (50,50), 'size': 30, 'params': [1.0, 1.0, 0.25, 0.25, 1, 0.25]},
    # {'pos': (75,75), 'size': 30, 'params': [1.0, 1.0, 0.0, 0.0, 1, 0.25]},
]

env = SpatialTemporalDiffusionEnvironment(configs=configs)