<a href="https://colab.research.google.com/github/ContextLab/storytelling-with-data/blob/master/data-stories/COVID-19/particle_interactions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Particle interactions demo code

This notebook implements a particle-based simulation of dynamic interactions, inspired by [this Washington Post article](https://www.washingtonpost.com/graphics/2020/world/corona-simulator/).  The physics simulations are implemented in [PyMunk](http://www.pymunk.org/en/latest/).

The main class is the `Environment` class; it initializes a physics simulator within a rectangular 2D environment, along with a population of particles that move around and interact.  You can then query the population at various points to gain insights into the state of the simulated population at each moment.

The basic building blocks of the `Environment` class are:
  - The `Rectangle` class implements the basic "canvas" that the simulation runs on.  The class defines the rectangular boundaries of the simulation environment.  It also specifies the size of the environment.
    - To initilize a new `Rectangle` instance, use `r = Rectangle(c1, c2)`, where `c1` and `c2` are 2D vectors specifying opposite corners of the rectangle.
  - The `Circle` class implements an individual particle.  There are a bunch of options you can customize, or just leave them at their default settings.
    - To initilize a new `Circle` instance, use `c = Circle(x)`, where `x` is a 2D vector specifying the circle's center.

Sample code for running simulations is provided at the bottom of this notebook.

In [3]:
!pip install pymunk

Collecting pymunk
[?25l  Downloading https://files.pythonhosted.org/packages/80/0f/25a65edbda2d7d72c28ab150292c85293896183bf5e16cffc8e3608b8934/pymunk-5.6.0-py2.py3-none-manylinux1_x86_64.whl (535kB)
[K     |████████████████████████████████| 542kB 2.7MB/s 
Installing collected packages: pymunk
Successfully installed pymunk-5.6.0


In [4]:
import plotly.express as px
import numpy as np
import pandas as pd
import itertools
from scipy.spatial.distance import cdist, pdist, squareform
from sklearn.decomposition import IncrementalPCA
from copy import copy
import pymunk as pm
import pymunk.matplotlib_util as mpl_util
import matplotlib.pyplot as plt
from matplotlib import animation, rc

Loading chipmunk for Linux (64bit) [/usr/local/lib/python3.6/dist-packages/pymunk/libchipmunk.so]


In [0]:
class Rectangle():
  def __init__(self, c1, c2, linewidth=2, tol=1e-3, sim=None):
    def corners(n): #return corners in n dimensions
      def fullfact(dims):
        vals = np.asmatrix(range(1, dims[0] + 1)).T
        if len(dims) == 1:
          return vals
        else:
          aftervals = np.asmatrix(fullfact(dims[1:]))
          inds = np.asmatrix(np.zeros((np.prod(dims), len(dims))))
          row = 0
          for i in range(aftervals.shape[0]):
            inds[row:(row + len(vals)), 0] = vals
            inds[row:(row + len(vals)), 1:] = np.tile(aftervals[i, :], (len(vals), 1))
            row += len(vals)
        return inds
      x = fullfact(n*[2])

      mask1 = (x == 1)
      mask2 = (x == 2)

      c = np.zeros(np.shape(x))
      for i in range(np.shape(x)[1]):
        m1 = np.zeros(mask1.shape)
        m2 = np.zeros(mask2.shape)

        m1[:, i] = mask1[:, i].ravel()
        m2[:, i] = mask2[:, i].ravel()
        
        c[np.where(m1)] = c1[i]
        c[np.where(m2)] = c2[i]
      
      return c    
    
    def edges(c): #return edges, given corners
      e = []
      for i in range(c.shape[0]):
        for j in range(i):
          if self.adjacent(c[i, :], c[j, :]):
            e.append([c[i, :], c[j, :]])
      return e

    assert (len(c1) == len(c2)) and (len(c1) == 2), 'Rectangle must be 2D'
    self.corners = corners(len(c1))
    self.edges = edges(self.corners)

    #add visual properties to support drawing
    self.linewidth = linewidth
    self.tol = tol

    if not sim: #set simulation engine object
      self.sim = pm.Space()
    else:
      self.sim = sim
    
    #add physical properties to support simulation
    edges_sim = []
    static_body = self.sim.static_body
    for e in self.edges:      
      next_edge = pm.Segment(static_body, e[0], e[1], self.linewidth / 100)
      next_edge.elasticity = 0.99
      next_edge.friction = 0.0
      self.sim.add(next_edge)
  
  def adjacent(self, a, b):
      return np.sum(a == b) == len(a) - 1
  
  def upper_bounds(self):
    return np.max(self.corners, axis=0)
  
  def lower_bounds(self):
    return np.min(self.corners, axis=0)

  def random_point_within(self):
    lb = self.lower_bounds() + self.tol
    ub = self.upper_bounds() - self.tol
    return np.multiply(ub - lb, np.random.rand(len(lb))) + lb

In [0]:
class Circle():
  def __init__(self, center, name=None, status='healthy', transrate=1.0, recovery_time=10, sim=None, radius=0.05, markersize=5, color='gray', mass=1, velocity=[0, 0], tol=1e-3, visible=True):
    assert len(center) == 2, 'Circle must be 2D'
    assert np.isscalar(radius), 'Radius must be a scalar'
    assert radius >= 0, 'Radius must be non-negative'

    self.center = center
    self.radius = radius
    self.markersize = markersize
    self.color = color
    self.mass = mass
    self.tol = tol
    self.name = name
    self.transrate = transrate
    self.visible = visible
    self.infect_time = 0.0
    self.recovery_time = recovery_time

    #physics model
    if not sim: #set simulation engine object
      self.sim = pm.Space()
    else:
      self.sim = sim
    
    self.intertia = pm.moment_for_circle(self.mass, 0, self.radius, (0, 0))  
    self.body = pm.Body(self.mass, self.intertia)      
    self.body.velocity = pm.Vec2d(velocity)
    self.body.position = self.center
    
    self.circle = pm.Circle(self.body, self.radius, (0, 0))
    self.circle.elasticity = 0.99
    self.circle.friction = 0.0
    self.sim.add(self.body, self.circle)

    self.status = 'healthy'
    self.update(status=status)

  def update(self, status=None, now=0):
    #if infection is starting now, start the infection timer
    if (self.status == 'healthy') and (status == 'infected'):
      self.infect_time = now
    
    #if currently infected longer than the recovery time, either recover or die
    if (self.status == 'infected') and ((now - self.infect_time) >= self.recovery_time):
      if not (status == 'deceased'):
        status = 'recovered'
    
    #if already recovered, cannot get re-infected
    if (self.status == 'recovered') and (status == 'infected'):
      status = None

    if status:
      self.status = status
      if self.status == 'healthy':
        self.color = (0.5, 0.5, 0.5, 0.75)
      elif self.status == 'infected':
        self.color = (0.9, 0.0, 0.0, 0.75)
      elif self.status == 'recovered':
        self.color = (0.0, 0.9, 0.0, 0.75)
      elif self.status == 'deceased':
        self.color = (0.1, 0.1, 0.1, 0.75)
        self.visible = False
    
    if not self.visible:
      self.sim.remove(self.circle, self.body)
    else:
      self.circle.color = self.color
    if self.body:  
      self.center = self.body.position

  def within(self, point):
    self.update()

    #return whether point is contained within circle (allowing for fuzziness up to self.tol)
    assert len(point) == len(self.center), 'Point must have the same dimensionality as Circle'
    return np.all(point <= self.upper_bounds() + self.tol) and np.all(point >= self.lower_bounds() - self.tol)
  
  def interact(self, other, now=0):    
    if self.intersect(other.center, tol=other.radius):
      if self.status == 'infected':
        if np.random.rand() < self.transrate:
          other.update(status='infected', now=now)
      elif other.status == 'infected':
        if np.random.rand() < other.transrate:
          self.update(status='infected', now=now)

  def intersect(self, point, tol=None):
    self.update()

    if not tol:
      tol = self.tol

    #return whether point is contained within circle or near its boundary (allowing for fuzziness up to tol)
    dist = np.sqrt(np.sum(np.power(self.center - point, 2)))
    return dist <= self.radius + tol

In [0]:
def random_velocity(dims):
  v = np.random.rand(dims)
  signs = (2 * (np.random.rand(dims) > 0.5)) - 1
  v[signs] *= -1
  v *= velocity_scale
  return v.tolist()

In [2]:
random_velocity(2)

NameError: ignored

In [0]:
class Environment():
  def __init__(self, size=5, popsize=250, pinfect=0.05, velocity_scale=1.0, transrate=1.0, dt=0.1):
    def random_velocity(dims):
      v = np.random.rand(dims)
      signs = (2 * (np.random.rand(dims) > 0.5)) - 1
      v[signs] *= -1
      v *= velocity_scale
      return v.tolist()

    dims = 2 #fixed at 2D while using pymunk

    self.sim = pm.Space()

    self.borders = Rectangle(dims*[0], dims*[size], sim=self.sim)
    self.size = size
    self.dims = dims
    self.population = []
    for p in range(popsize):
      if np.random.rand() < pinfect:
        status = 'infected'
      else:
        status = 'healthy'
      
      self.population.append(Circle(self.borders.random_point_within(), f'Person {p}', status=status, transrate=transrate, velocity=random_velocity(self.dims), sim=self.sim))
    self.dt = dt
  
  def simulate_interactions(self, now=0):
    x = np.vstack([p.center for p in self.population])
    collisions = np.where(squareform(pdist(x) <= 2*self.population[0].radius))    

    for i in collisions[0]:
      for j in collisions[1]:
        env.population[i].interact(self.population[j], now=now)
  
  def update_statuses(self, now=0, return_data=False):
    data = []
    for p in self.population:
      p.update(now=now)
      if return_data:
        data.append({'t': now,
                     'size': p.radius,
                     'name': p.name,
                     'status': p.status,
                     'x': p.center[0],
                     'y': p.center[1]})
    if return_data:
      return data

  def run(self, T):
    assert np.isscalar(T) and (T > 0), 'T must be a positive scalar'            

    data = []
    shapes = []
    for t in np.arange(0, T, self.dt):
      self.sim.step(self.dt)
      self.simulate_interactions(now=t)
      data.extend(self.update_statuses(return_data=True, now=t))
    return pd.DataFrame(data)
  
  def draw(self, fig=None, animate=False, ax=None):
    if not fig:
      fig = plt.figure(figsize=(self.size, self.size))

    if not ax:
      ax = plt.axes(xlim=[0, self.size], ylim=[0, self.size])
      ax.set_aspect('equal')
    opts = mpl_util.DrawOptions(ax)
    opts.collision_point_color = (0, 0, 0, 0)
    
    self.sim.debug_draw(opts)

    if not animate:
      return fig
    else:
      return fig.get_children()[1].get_children()
    return fig
  
  def animate(self, T, fig=None):
    if not fig:
      fig = plt.figure(figsize=(self.size, self.size))
    ax = plt.axes(xlim=[0, self.size], ylim=[0, self.size])
    ax.set_aspect('equal')

    assert np.isscalar(T) and (T > 0), 'T must be a positive scalar'
    def init():
      self.simulate_interactions()
      ax.clear()
      return []
    
    def get_frame(t):
      self.sim.step(self.dt)
      self.simulate_interactions(now=t)
      env.update_statuses(now=t)

      ax.clear()
      ax.set_xlim([0, self.size])
      ax.set_ylim([0, self.size])
      ax.set_aspect('equal')
      return self.draw(fig=fig, animate=True, ax=ax)
    
    ts = np.arange(0, T, self.dt)
    return animation.FuncAnimation(fig, get_frame, interval=20, frames=ts, blit=True)

In [0]:
env = Environment();

In [0]:
anim = env.animate(20)
rc('animation', html='jshtml')
anim

In [0]:
#baseline simulation
env = Environment(pinfect=0.05); #reset environment
df = env.run(50)

In [0]:
df.head()

In [0]:
fig = px.histogram(df, x='t', color='status', barnorm='percent', category_orders={'status': ['infected', 'healthy', 'recovered', 'deceased']}, color_discrete_sequence=['#EF553B', '#636EFA', '#00CC96'])
fig.layout['yaxis']['title']['text'] = 'Percent'
fig.layout['xaxis']['title']['text'] = 'Simulation time (AU)'
fig