In [1]:
import numpy as np
from numpy import testing

In [2]:
%run numerics.ipynb

In [3]:
class MPDATA:
    def __init__(self, nr, r_min, r_max, dt, pdf_r_lambda, coord, opts):
        self._nm = numerics()
        self.opts = opts
        
        assert r_min.units == r_max.units
        self.r_units = r_min.units
        self.t_units = dt.units
        
        self._n = 0

        #   |-----o-----|-----o--...
        # i-1/2   i   i+1/2   i+1
        # x_min     x_min+dx
        
        self._i = slice(1, nr+1)
        
        # cell-border stuff
        self._ih = self._i % self._nm.hlf
        self._xh, self._dx = np.linspace(
            coord.x(r_min).magnitude, 
            coord.x(r_max).magnitude, 
            nr+1, 
            retstep=True
        )
        self._rh = coord.r(self._xh).to(self.r_units).magnitude
        self._Gh = self._dx / coord.dx_dr(self._rh * self.r_units).magnitude
        self._GCh = np.empty_like(self._Gh)
        
        if opts["n_it"] > 1:
            self._GCh_tmp = np.empty_like(self._Gh)
        
        # cell-centered stuff
        self._x = np.linspace(
            self._xh[0] - self._dx/2,
            self._xh[-1] + self._dx/2, 
            nr+2 
        )
        self._r = coord.r(self._x).to(self.r_units).magnitude
        
        self._G = np.empty(nr+2)
        self._G = self._dx / coord.dx_dr(self._r * self.r_units).magnitude

        self._psi = np.empty_like(self._G)
        self._psi = (self._psi, self._psi.copy())
        
        # dt, dr
        self._dt = dt.magnitude
        self._dr = np.concatenate([
            np.array([np.nan]),
            np.diff(self._rh),
            np.array([np.nan])
        ])
        
        self._psi[-1][self._i] = pdf_r_lambda(self._r[self._i] * self.r_units).magnitude * self._dr[self._i]

                
    @property
    def pdf(self):
        return self._psi[self._n+1][self._i] / self._dr[self._i]

    @property
    def r(self):
        return self._r[self._i] * self.r_units
    
        
    def step(self, drdt_r_lambda):        
        # for convenience (TODO!)
        G, Gh, psi, i, ih, nm = self._G, self._Gh, self._psi, self._i, self._ih, self._nm
                
        # integration
        for it in range(self.opts["n_it"]):
            self._n = (self._n+2) % 2 - 1
            n = self._n
            
            psi[n][i.start - 1] = 0
            psi[n][i.stop] = 0
            
            if it == 0:
                self._GCh[ih] = self._dt * drdt_r_lambda(self._rh).to(self.r_units / self.t_units).magnitude 
                GCh = self._GCh
            else:
                self._GCh_tmp = nm.GC_antidiff(self.opts, psi[n], GCh, G, ih) 
                GCh = self._GCh_tmp             
      
            testing.assert_array_less(np.amax(GCh[ih]/Gh[ih]), 1)
            
            nm.upwind(psi, GCh, G, n, i)
        
            # positive definiteness
            assert np.amin(psi[n+1][i]) >= 0
        
            # conservativeness
            bcflux = (
                max(0, GCh[(i+nm.hlf).stop-1]) * psi[n][i.stop-1] -
                min(0, GCh[(i-nm.hlf).start ]) * psi[n][i.start ] 
            ) 
            testing.assert_approx_equal(
                desired = np.sum(G[i]*psi[n][i]), 
                actual  = np.sum(G[i]*psi[n+1][i]) + bcflux,
                significant = 15
            ) 
