# Section 6.5.6
# Theis and Hantush wells animated

Groundwater loweing by wells is animated. We'll have both Theis and Hantush wells and show the difference.

Animation is done with `matplotlib.anamation.FuncAnimation`.

This requires two functions. One that initializes the lines to be drawn and the other to update its data in an iteration loop. The latter function takes a single integer input that is the frame number within the iteration.

It makes sense to define a class for the well and a wellcollection, which have methods to compute themselvesl. Doing this, the class is instantiated with all its data before running the animation, so that the aninamtion only requires the counter.

@TO 2020-12-11

In [183]:
attribs = lambda obj: [o for o in dir(obj) if not o.startswith('_')]

import numpy as np
from scipy.special import exp1
from scipy.integrate import quad
from matplotlib import pyplot as plt
from matplotlib import animation, rc
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
import pandas as pd
import pdb

plt.rcParams["animation.html"] = "html5"

def newfig(title='title', xlabel='xlabel', ylabel='ylabel',
                   xlim=None, ylim=None, size_inches=(8, 6)):
    fig, ax = plt.subplots()
    fig.set_size_inches(size_inches)
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    if xlim:
        ax.set_xlim(xlim)
    if ylim:
        ax.set_ylim(ylim)
    ax.grid()
    return fig, ax

### Fluctuation due to surface water varying according to the sine function

Governing partial differential equation:

$$\frac{\partial^2s}{\partial r^2} + \frac 1 r \frac{\partial s}{\partial r}= \frac S {kD} \frac{\partial s}{\partial t}$$

Solution for a well in a confined aquifer or a water-table aquifer with constant transmissivity (Theis well). The drawdown is:

$$s(r,t) = \frac Q {4 \pi kD} \mbox{W}(u), \,\,\,\,\, u = \frac {r^2 S}{4 kD t}$$

Where $W(-)$ is the so-called Theis' well function, generally also known as the exponential integral $E_i(-)$, which can be imported as `exp1` from the module `scipy.special`.

Mathematically, the exponential integral is:
$$E_i(z) = \intop_u^\infty\frac {e^{-y}}{y}dy$$

Which can also be expressed as a power series:

$$E_i(u) = -\gamma -\ln u + u - \frac {u^2}{2\times2!} + \frac {u^3}{3\times3!}-\frac{u^4}{4\times4!}+...,\mbox{ with }\gamma = 0.577216...\mbox{, which is Euler's constant.}$$

The total flow on across a circle at distance r from the well is

$$ Q(r, t) = -2 \pi r kD \frac {\partial s}{\partial r}$$

after filling in $s(r, t) = \frac {Q_0}{4\pi kD} \intop_u^\infty \frac {e^{-y}} y dy$ and noting that $\frac{\partial r}{\partial r} = \frac{2 r} u$ we obtain

$$ Q(r, t)= Q_0 e^{-u}$$

In [184]:
# First set up the figure, the axis, and the plot element we want to animate
def Wh(u, rho):
    """Return Hantush well function as a regualar np.ufunc.
    
    Parameters
    ----------
    u: np.ndarray or float
        input, u = r^2 S / (4 kD t)
    rho: np.ndarray or float
        r / lambda = r / sqrt(kD c)
    """
    def whkernel(y, rho):
        return np.exp(-y - (rho/2) ** 2 / y ) / y    

    def whquad(u, rho):
        return quad(whkernel, u, np.inf, args=(rho))

    Whantush = np.frompyfunc(whquad, 2, 2) # 2 inputs and tow outputs h and err
    
    return Whantush(u, rho)[0]

def Wt(u):
    """Return Theis's well function"""
    return exp1(u)


class Well:
    """A Theis well class. (Also superclass for Hantush wells)
    
    The well contains it's aquifer information, its flow and its radius and its
    location. Then for any points the drawdown due to the well can be computed
    as well as the velocity and discharge vectors at these points due to this
    well.    
    """

    required_aquif_keys = set(('kD', 'S', 'por'))
    required_well_keys  = set(('name', 'x', 'y', 'r0', 'Q'))
    
    dims = {'kD':(':.0f','[m2/d]'), 'S': (':.3f','[-]'), 'c':(':.0f','[d]'),
                  'por': (':.2f','[-]'),
                  'x': (':.1f','[m]'), 'y':(':.1f','[m]'), 'ro':(':.2f','[m]'),
                  'Q':(':.0f','[m3/s]')}

    @staticmethod      
    def s(xy):
        """Return distance along line defined by points xy."""
        d = np.sqrt(np.sum(np.diff(xy, axis=0) ** 2, axis=1))
        return np.hstack((0, np.cumsum(d)))

    def __init__(self, kD=None, S=None, c=None, por=None,
                    name=None, x=None, y=None, r0=None, Q=None):
        """Return a Theis well object.
        
        Parameters
        ----------
        kD: float [m2/d]
            transmissivity
        S: float [-]
            Storage coefficient
        c: float [D]
            resistance agains vertical flow through aquitard.
            c is not needed for Theis well
        por: float [-]
            porosity
        name: int or str
            id or name of the object
        x: float [m]
            x-coordinate
        y: float [m]
            y-coordinate
        r0: float [m]
            well radius
        Q: float
            constant extraction
        """
        self.kD = kD
        self.S = S
        self.c = c # Only required for Hantush well not used in Well
        self.por = por
        self.name = name
        self.x = x
        self.y = y
        self.r0 = r0
        self.Q = Q
  
    def r2(self, xy):
        """Return square of distance between points xy and the object."""
        return np.fmax(self.r0, (xy.T[0] - self.x) ** 2 + (xy.T[1] - self.y) ** 2)
    
    def u(self, xy=None, t=None):
        """Return argument of Well function."""
        return self.r2(xy) * self.S / (4 * self.kD * t)
        
    def dd(self, xy=None, t=None):
        """Return drawdown for all xy points and time t."""
        dd = self.Q / (4 * np.pi * self.kD) * Wt(self.u(xy=xy, t=t))
        return dd
    
    def plot_dd(self, xy=None, t=None, ax=None):
        """Plot the dawdown along line defined by xy at tie t."""
        return ax.plot(Well.s(xy), self.dd(xy=xy, t=t),
                       label="well {}".format(str(self.name)))
    
    def Qr(self, r=None, t=None):
        """Return the total discharge at a distance r from well."""
        xy = np.array([[self.x + r, self.y + 0 * r]])
        return self.Q * self.u(xy=xy, t=t)
    
    def qxy(self, xy=None, t=None):
        """Return the discharge vector at the points xy at time t."""
        qa = self.Qr(xy=xy, t=t) / 2 * np.pi
        qxqy = np.array([xy.T[0] - self.x, xy.T[1]-self.y]) / self.r2(xy)
        return qxqy
    
    def vyx(self, xy=None, t=None):
        """Return the velocity vector at the points xy at time t."""
        return self.qxqy(xy=xy, t=t) / self.por
    
    
class HantushWell(Well):
    """Return an Hantush well object."""
    
    required_aquif_keys = Well.required_aquif_keys.union('c',)
    required_well_keys  = Well.required_well_keys

    def __init__(self, kD=None, S=None, c=None, por=None,
                    name=None, x=None, y=None, r0=None, Q=None):
        """Instantiate a Hantush well object.
        
        Paameters
        ---------
        see Well class docstring.
        """
        super().__init__(kD=kD, S=S, c=c, por=por, x=x, y=y, r0=r0, Q=Q)
        self.L = np.sqrt(self.kD * self.c)

    def dd(self, xy=None, t=None):
        """Return drawdown for all xy points and time t."""
        rho = np.sqrt(self.r2(xy) / (self.kD * self.c))
        dd = self.Q / (4 * np.pi * self.kD) * Wh(self.u(xy=xy, t=t), rho)
        return dd

    def Qr(self, r=None, t=None):
        """Return the total discharge at a distance r from well."""
        xy = np.array([[self.x + r, self.y + 0 * r]])
        raise NotImplementedError("Qr is not implemented for Hantush wells")
        return self.Q * self.u(xy=xy, t=t)
            
    
class WellCollection:
    """Class of holding multiple Transient wells be it Theis or Hantus wells.
    
    It uses well objects, which may be either Hantush of Theis well objects.
    """
    
    def check_aquif(self):
        """Verify the given aquifer for its type and fields."""        
        msg = "aquif must be a dict with keys " +  str(
                            self.wellClass.required_aquif_keys)
        if not isinstance(self.aquif, dict):
            raise ValueError(msg)
        missing = set(self.wellClass.required_aquif_keys).difference(self.aquif.keys())
        if len(missing):
            raise KeyError("missing keys in aquif: {}".format(str(missing)))

    def check_objs(self):
        """Convert given objs to a list of obj of correct type."""
        objs = []
        well_data = self.aquif.copy()

        if isinstance(self.objs, dict):
            for k in self.objs:
                if isinstance(self.objs[k], dict):
                    objs.append(self.wellClass(**self.aquif, **self.objs[k]))
                elif isinstance(self.objs[k], self.wellClass):
                    objs.append(objs[k])
        elif isinstance(self.objs, pd.DataFrame):
            for i in self.objs.index:
                obj = self.objs.loc[i]
                well_data.update(obj)
                objs.append(self.wellClass(**well_data))
        elif isinstance(self.objs, (list, tuple)):
            # is record must contain values for 'name', x', 'y', 'r0' and 'Q'
            for i in range(len(self.objs)):
                obj = {k:objs[i][k] for k in self.wellClass.required_well_keys}
                objs.append(Well(**self.aquif, **obj))
        # replace self.objs by a list of objs of the proper type
        self.objs = objs

    
    def __init__(self, aquif=None, objs=None, wellClass=Well):
        """Return an instance of WaveCollection.
        
        Parameters
        ----------
        aquif: dict
            aquifer properties: kD, S, por
        objs: dict of Well objects | a dict of dict |
                a pd.DataFrame.
                Required fields: ['name|Id', 'x', 'y', r0', 'Q']
        wellClass: class
            class of the well to be generated (default Well=Theis,
            alternative HantushWell)
        """
        self.wellClass = wellClass
        self.dims = Well.dims # dimensions and formats
        self.aquif = aquif
        self.objs  = objs
                     
        self.check_aquif()
        self.check_objs()
        self.wtype = 'Theis' if self.wellClass == Well else "Hantush"
                                 
    def __getitem__(self, k):
        return self.objs.__getitem__(k)
    
    def fig_title(self, times=None):
        """Return an informative fig title for this well collection."""
        if self.wellClass == Well: # Theis
            L_str =''
        else: # Hantush
            L_char = np.sqrt(self.aquif['kD'] * self.aquif['c'])
            L_str  = ', lambda={:.4g} m'.format(L_char)
        dstr = ', '.join([f'{p}= {{{self.dims[p][0]}}} {self.dims[p][1]}'
                                for p in self.wellClass.required_aquif_keys])
        dstr1 = dstr.format(*[self.aquif[k] for k in self.wellClass.required_aquif_keys])
        title = ("Drawdown by multiple {} wells along a line.\n".format(self.wtype) +
                dstr1 + L_str + '\n' +
          f'Time from {times[0]:.3f} to {times[-1]:.0f} d, increasing logarithmically.')
        return title
                                 
    def plot_dd(self, xy=None, t=None, all=False, ax=None):
        lines = []
        s = Well.s(xy)
        dd = self.dd(xy=xy, t=t, all=all)
        if all:
            for d, obj in zip(dd, self.objs):
                    lines += ax.plot(s, d, label='well {}'.format(str(obj.name)))
        lines += ax.plot(s, dd[-1], label='total dd')
        return lines
                                
    def dd(self, xy=None, t=None, all=False):
        dd = np.zeros((len(self.objs) + 1, xy.shape[0]))
        for i, obj in enumerate(self.objs):
            dd[i] = obj.dd(xy=xy, t=t)
        dd[-1] = np.sum(dd, axis=0)
        if all:
            return dd
        else:
            return dd[-1]
    
    def qxy(self, xy=None, t=None):
        qxy = np.zeros_like(xy)
        for obj in self.objs:
            qxy += obj.qxy(xy=xy, t=t)
        return qxy
    
    def vxy(self, xy=None, t=None):
        return self.qxy(xy, t)

In [185]:
# First define the aquifer
aquif = {'kD':600, 'S':0.22, 'c':200, 'por':0.35}

# Then the wells
wellData = [[0, -160., 0., 0.001, -240.],
            [1,  -80., 0., 0.001, -240.],
            [2,    0., 0., 0.001, -240.],
            [3,   80., 0., 0.001, -240.],
            [4,  160., 0., 0.001, -240.]]
# Turn this list into a DataFrame with the correct fields
wellsdf = pd.DataFrame(wellData, columns=['name', 'x', 'y', 'r0', 'Q'])
print(wellsdf)

# instantiate the wave collection
if False:
    wells = WellCollection(aquif=aquif, objs=wellsdf, wellClass=Well)
else:
    wells = WellCollection(aquif=aquif, objs=wellsdf, wellClass=HantushWell)

# Generate a line of points along whose connection lines the head will be shown
y = np.logspace(-1, np.log10(2000), 101)
x = np.zeros_like(y)
xy = np.vstack((x, y)).T

xlim = y[0], y[-1]

# Chose the simulation times for the frames to be used in the animation
times = np.logspace(-2, 3, 100)

# An informative title for the axes

# Generate the figure and axes to plot on
fig, ax = newfig(wells.fig_title(times), "y [m]", "drawdown [m]",
                  xlim=xlim,
                  ylim=(-1.5, 0),
                  size_inches=(12, 8))

if False: # Set to True for testing without anination
    well = Well(**aquif, **wellsdf.iloc[2]) # instantiate a single well
    #well.plot_dd(xy=xy, t=times[0], ax=ax) # plot its drawdown
    wells.plot_dd(xy=xy, t=10, all=True, ax=ax) # plot the wellcollection
else: # Do the animation
    
    # initialize the artists in the aninmation
    t = times[0]
    # The well collection
    lines = wells.plot_dd(t=t, xy=xy, ax=ax, all=True)
    # The last one is the darwdown due to all wells superimposed 
    lines[-1].set_linewidth(3) # Set it's linewidth to thick
    tstr = 'Heads at time = {:8.3f} d' # To be used during the animation
    text  = ax.text(0.05, 0.8, tstr.format(t),
                        fontsize=14, transform=ax.transAxes)
    #ax.legend(loc='lower right')

    # initialization function: plot the background of each frame
    def init():
        """Setup the background for the animation."""
        global lines, text
        for line in lines:
            line.set_data([],[])
        text.set_text([])
        ax.legend(loc='lower right') # fix the legend
        return lines

    # animation function.  This is called sequentially
    def animate(t, objs, xy):
        global lines, text # not necessary as lines and text are implicitly global

        # Update the artists
        s = Well.s(xy) # coordinates along the line through the points
        dd = objs.dd(xy=xy, t=t, all=True)
        for line, d in zip(lines, dd):
            line.set_data(s, d)
        text.set_text(tstr.format(t))
        return lines # receiver expects a tuple of the artitsts involved

    # call the animator.  blit=True means only re-draw the parts that have changed.
    # Note that frames is the first argument of animate, fargs has the other possible args.
    # Interval is the time on screen between successive frames.
    # Blit updates only what changes in the figure.
    # Repeat allows ongoing repetition of the video on screen.
    print("Patience, computing and generating video takes about 30 sec. on mac...")
    anim = animation.FuncAnimation(fig, animate, init_func=init,
                                   frames=times, fargs=(wells, xy),
                                   interval=100, blit=True, repeat=False)
    plt.close(anim._fig)
    out = HTML(anim.to_html5_video())
out # to actually show the video.

   name      x    y     r0      Q
0     0 -160.0  0.0  0.001 -240.0
1     1  -80.0  0.0  0.001 -240.0
2     2    0.0  0.0  0.001 -240.0
3     3   80.0  0.0  0.001 -240.0
4     4  160.0  0.0  0.001 -240.0
Patience, computing and generating video takes about 30 sec. on mac...


In [186]:
# The extra_args ensure that the x264 codec is used, so that
# the video can be embedded in html5.  You may need to adjust this for
# your system: for more information, see
# http://matplotlib.sourceforge.net/api/animation_api.html
fname = 'Transient_' + wells.wtype + '_animation' 
anim.save(fname + '.mp4', fps=10, extra_args=['-vcodec', 'libx264'])

print(anim.save_count, " frames saved.") # Shows the number of frames saved.

100  frames saved.
