# Pumping test interpretation: Theis vs Hantush methods

**References:**
- [Theis, C.V., 1935. The relation between the lowering of the piezometric surface and the rate and duration of discharge of a well using groundwater storage, Am. Geophys. Union Trans., vol. 16, pp. 519-524](https://water.usgs.gov/ogw/pubs/Theis-1935.pdf)

- [Mora, L. E., JÃ©gat, H. J., MÃ©ndez, N. J., & MejÃ­as, J. E. (2011). Aproximaciones prÃ¡cticas a las funciones de Theis y Hantush para acuÃ­feros confinados y semi-confinados. Agrollania, 8, 95-103.](https://biblat.unam.mx/hevila/Agrollania/2011/vol8/13.pdf)

- Minimization of the objective function for the automatic calibration: [scipy.optimize.minimize, SciPy library](https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html)

## Importing the libraries

In [1]:
#-- Check and install required packages if not already installed 
import sys
import subprocess

def if_require(package):
    try:
        __import__(package)
    except ImportError:
        print(f"{package} not found. Installing...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])
        
#-- Install packages only if they aren't already installed 
if_require("ipywidgets")
if_require("openpyxl")
if_require("xlrd")

#-- Import the rest of libraries
import matplotlib.pyplot as plt
import ipywidgets as widgets
import seaborn as sns
import pandas as pd
import numpy as np
import scipy as sp
import traitlets
import operator
import os

from IPython.display import display, clear_output, HTML
from scipy.optimize import fmin_l_bfgs_b
from ipywidgets import Layout, Output
from tkinter import Tk, filedialog
from functools import reduce
from numpy.linalg import inv

import warnings
warnings.filterwarnings("ignore")

## Widgets and methods definition (Theis and Hantush)

In [2]:
#------------------------------------------#
# WIDGETS DEFINITION                       #
#------------------------------------------#
def create_widget(description: str, style=None, width=None):
    """
    Function for the generation of widget 

    Parameters:
        - description (str): description of the widget
    Return:
        - widget_value: returns the value of the widget
    """ 
    widget_value = widgets.Text(value='', description=description, 
                                style={'description_width': 'initial'}, width = '500px')
    return widget_value


def dropdown_widget(description: str, options: str, value: str):
    """
    Function for the generation of the dropdown widget.
    
    Parameters:
        - description (str): description of the widget
        - options (str): the values to choose in the widget
        - value (str): the value to initialize the widget
    Return:
        - widget_value: returns the value of the widget
    """ 
    widget_value = widgets.Dropdown(description=description, options=options, 
                                    value = value, style = {'description_width': 'initial'},
                                    width = '400px', disabled=False)
    return widget_value
 
def update_numeric_value(widget, variable):
    """Observes changes in the widget's value and updates the variable accordingly.

    When a user enters or modifies text in the widget, the 'update_value' function 
    will be automatically called, allowing you to respond to and handle the changes 
    in the widget's value. 

    Parameters
    ----------
    widget : ipywidgets.Widget
        The widget object that will be observed for changes in its value
    variable : Any
        A global variable to be updated based on the widget's input

    Returns
    -------
    widget : ipywidgets.Widget
        The original widget, with an observer attached to monitor changes

    Notes
    -----
    If the new value in the widget is blank, 'variable' will be set to None. If the 
    input is invalid, an error message will be displayed
    """
    def update_value(change):
        global variable
        new_value = change.new.strip()
        if new_value == '':
            variable = None
        else:
            try:
                variable = float(new_value)
            except ValueError:
                print(f"Invalid input for '{widget.description}', please enter a valid value.")

    widget.observe(update_value, names='value')
    return widget

def Q_input(Q):
    """Widget definition for flow rate.
    """
    Q_wid = create_widget(description='Flow rate [LÂ³/T]:',
                           style={'description_width': 'initial'}, width='500px')  
    Q_wid = update_numeric_value(Q_wid, Q)
    return Q_wid

def T_input(T):
    """Widget definition for transmissivity.
    """
    T_wid = create_widget(description='Transmissivity [LÂ²/T]:',
                           style={'description_width': 'initial'}, width='500px')  
    T_wid = update_numeric_value(T_wid, T)
    return T_wid

def S_input(S):
    """Widget definition for Storage coefficient.
    """
    S_wid = create_widget(description='Storage coefficient [-]:',
                           style={'description_width': 'initial'}, width='500px')  
    S_wid = update_numeric_value(S_wid, S)
    return S_wid

def r_input(r):
    """Widget definition for the distance from observation point to pumping well. 
    """
    r_wid = create_widget(description='Distance from obs. point to well [L]:',
                           style={'description_width': 'initial'}, width='500px')  
    r_wid = update_numeric_value(r_wid, r)
    return r_wid

def cond_input(cond):
    """Widget definition for the distance from observation point to pumping well. 
    """
    cond_wid = create_widget(description='Conductance:',
                           style={'description_width': 'initial'}, width='500px')  
    cond_wid = update_numeric_value(cond_wid, cond)
    return cond_wid

#--Guesses for Automatic Calibration 
def iguess_T_input(iT):
    """Widget definition for the distance from observation point to pumping well. 
    """
    iT_wid = create_widget(description='INITIAL GUESS Transmissivity [LÂ²/T]:',
                           style={'description_width': 'initial'}, width='500px')  
    iT_wid = update_numeric_value(iT_wid, iT)
    return iT_wid

def iguess_S_input(iS):
    """Widget definition for the distance from observation point to pumping well. 
    """
    iS_wid = create_widget(description='INITIAL GUESS Storage [-]:',
                           style={'description_width': 'initial'}, width='500px')  
    iS_wid = update_numeric_value(iS_wid, iS)
    return iS_wid

def iguess_Cond_input(iCond):
    """Widget definition for the conductance. 
    """
    iCond_wid = create_widget(description='INITIAL GUESS Conductance:',
                           style={'description_width': 'initial'}, width='500px')  
    iCond_wid = update_numeric_value(iCond_wid, iCond)
    return iCond_wid

#--Widget defining the method used            
methods = ["Theis", "Hantush"]
def method_input():
    """
    Widget for the method
    """
    method_wid = dropdown_widget(description = 'Choose the interpretation method:', 
                    options = methods, value = 'Theis')
    return method_wid

#------------------------------------------#
# METHODS DEFINITION                       #
#------------------------------------------#
#--Dimensionless time (u)
def dimensionless_time(r:float, S:float, T:float, t:float) -> list:
    """Calculates the dimensionless time (u) and returns a list of its values. ZZ

    Parameters
    ----------
    r : float
        distance from the observation point to the pumping well [D]
    S : float
        storage coefficient [-]
    T : float
        transmissivity [D^2/T]
    t : float
        time 

    Returns
    -------
    float
        dimensionless time [-]
    """
    u = ((r**2)*S)/(4*T*t)
    return u

#--Well function (W), by means of Exponential Integral Approximation
def well_function(u_value:float):
    """Approximation of the well function (W) value and returns a list of its values.

    Parameters
    ----------
    u : float
        dimensionless time

    Returns
    -------
    float 
        well function
    """
    if u_value < 1:
        W = np.log(0.56146/u_value) + (0.9713*u_value) - (0.1742*(u_value**2)) 
    else:
        W = (np.exp(-u_value)/u_value) * ((0.3637+u_value)/(1.282+u_value))
    return W

#--Theis Method
def theis_drawdown(Q:float, r:float, S:float, T:float, t:list) -> list:
    """Calculates the drawdown of the aquifer following the Theis method.

    Parameters
    ----------
    Q : float
        flow rate [D^3/T]
    T : float
        transmissivity [D^2/T]
    W : float
        well function

    Returns
    -------
    float
        Theis drawdown
    """
    
    T = np.log(T)
    S = np.log(S)
        
    u = [dimensionless_time(r, np.exp(S), np.exp(T), t) for t in time]
    W = [well_function(u_value) for u_value in u]

    s_theis = [(Q/(4*np.pi*np.exp(T)))*W_values for W_values in W]
        
    return s_theis
 
#--Hantush, by Mora et al. (2011)
def besselK_function(v:float, u:float, i:int):
    """Calculates the modified Bessel function of the second kind of order 0.
    
    Mora et al. (2011), Eq. 28.

    Parameters
    ----------
    v : float
        r/b
    u : float
        dimensionless time
    i : int
        iterator

    Returns
    -------
    float
        the result of the Bessel function for an specific iterator
    """
    besselK = sp.special.k0(np.sqrt((v**2)+u*(np.log(16)*i)))
    return besselK

def gauss_laguerre_part(v:float, u:float, i:int, xl5:float):
    """Calculates a component of the Hantush function approximation using Gauss-Laguerre integration.
    
    Mora et al. (2011), Eq. 29.

    Parameters
    ----------
    v : float
        r/b
    u : float
        dimensionless time
    i : int
        iterator
    xl5 : float
        nodes for Laguerre's integration

    Returns
    -------
    float
        the result of a component in the Hantush function approximation
    """
    Y = (1 / ((u+xl5[i-1])*np.exp((v**2)/(4*(xl5[i-1]+u)))))
    return Y

def product(u:float, arg, func, i:int):
    """Calculates the product of the modified weights (wmi) times the Bessel function's
    result, if u<=1, or the Laguerre's integration weights (wl5) times the component's
    result, if u>1. 

    Parameters
    ----------
    u : float
        Dimensionless time.
    arg : float
        The weight to be used in the product, either the modified weight (wmi) if u â‰¤ 1  
        or the Laguerre integration weight (wl5) if u > 1.
    func : callable
        The function to apply in the product, either the modified Bessel function if u â‰¤ 1  
        or the function computing a component of Laguerre integration if u > 1.
    i : int
        Index of the summation term.

    Returns
    -------
    float
        The computed product.
    """
    return arg * func

def sum_products(x, y):
    """Calculates the summatory of products.
    """
    return x + y 
   
def hantush_drawdown(Q:float, r:float, S:float, T:float, t:list, cond:float) -> list:
    """Calculates the Hantush function approximation using the modified Bessel 
    function of the second kind of order 0 when u â‰¤ 1 and the Gauss-Laguerre's 
    integration when u > 1. 
    
    Mora et al. (2011), Eq. 28-29.

    Parameters
    ----------
    Q : float
        Flow rate.
    r : float
        Distance from the observation point to the well.
    S : float
        Storage coefficient.
    T : float
        Transmissivity.
    t : list
        Time
    cond : float
        Conductance

    Returns
    -------
    float
        The drawdown for a specific time using Hantush method. 
    """
    
    T = np.log(T)
    S = np.log(S)
    cond = np.log(cond)
    
    P = (Q/(4*np.pi*np.exp(T)))
    
    #--r/B
    r_B = r*(np.sqrt(np.exp(cond)/np.exp(T)))   
    B = 1/(np.sqrt(np.exp(cond)/np.exp(T)))
    v = r_B
  
    #--Stehfest method's modified weights (suitable if u < 1 and r/b < 18.5):
    wmi = [1/6, -385/12, 2558/3, -46871/6, 101093/3, -473915/6, 322210/3, -1020215/12, 109375/3, -13125/2]
    
    #--Weights and nodes for Laguerre's integration:
    wl5 = [5.21755610583E-01, 3.98666811832E-01, 7.59424496817E-02, 3.61175867992E-03, 2.33699723858E-05]
    xl5 = [0.2635603, 1.4134031, 3.5964258, 7.0858100, 12.6408008] 
    
    i_1 = np.arange(1,11,1)
    i_2 = np.arange(0,5,1)
    s_hantush = []    
    for t in time:
        u = dimensionless_time(r, np.exp(S), np.exp(T), t)
        if u <= 1:
            besselK = [besselK_function(v, u, i) for i in i_1]
            prod = [product(u, wmi[i-1], besselK[i-1], i) for i in i_1]       
            s_hantush.append(P * reduce(sum_products, prod))
        else:   
            Y = [gauss_laguerre_part(v, u, i, xl5) for i in i_2]
            prod = [product(u, wl5[i-1], Y[i-1], i) for i in i_2]           
            s_hantush.append(P * (1/np.exp(u)) * reduce(sum_products, prod))
               
    return s_hantush, B

## Importing the data
**NOTE**: Find and load the file ``leakyAq_example_data.txt``. For future use of this code with real or additional scenarios, please ensure that your input files follow the same structure as the provided text file:

-   First row: brief description of the dataset.
-   Second row: column headers for the parameters.
-   Following rows: the numerical data.

In [3]:
#--Initialize a DataFrame and variables to use afterward
data = pd.DataFrame() 

class SelectFileButton(widgets.Button):
    """A file widget that leverages tkinter.filedialog.
    """
    
    def __init__(self):
        super(SelectFileButton, self).__init__()      
        #--Add the selected_files trait
        self.add_traits(files=traitlets.traitlets.List())
        #--Create the button.
        self.description = "Select Files"
        self.icon = "square-o"
        self.style.button_color = "orange"
        #--Set on click behavior.
        self.on_click(self.select_files)
          
    def select_files(self, b):
        """Generate instance of tkinter.filedialog.
    
        Parameters
        ----------
        b : obj:
            An instance of ipywidgets.widgets.Button 
        """
        
        global time, drawdown, weights
        
        with out:
            try:
                #--Create Tk root
                root = Tk()
                #--Hide the main window
                root.withdraw()
                #--Raise the root to the top of all windows.
                root.call('wm', 'attributes', '.', '-topmost', True)
                #--List of selected files will be set to b.value
                selected_files = filedialog.askopenfilename(multiple=True)
                if selected_files:
                    b.files = selected_files
                    b.description = "Files Selected"
                    b.icon = "check-square-o"
                    b.style.button_color = "lightgreen"
                    file_name, file_extension = os.path.splitext(b.files[0])
                    if file_extension != '.txt' and file_extension != '.csv':
                        print("Your selected file is neither a CSV nor an XLSX file. Please select a correct input file.")
                    else:
                        print(f"You selected {os.path.basename(b.files[0])} file as your input observation data.")        
                        raw_data = pd.read_csv(b.files[0], sep="\t", skiprows=1)
                        data['Time [d]'] = pd.to_numeric(raw_data.iloc[1:, 0].reset_index(drop=True))
                        data['Drawdown [m]'] = pd.to_numeric(raw_data.iloc[1:, 1].reset_index(drop=True))
                        data['Weights'] = np.ones(len(data['Time [d]']))        
                        time = data['Time [d]'].to_numpy()
                        drawdown = data['Drawdown [m]'].to_numpy()
                        weights = data['Weights']
                        
                        #------------------------------------------#
                        # Plot the raw data                        #
                        #------------------------------------------#                      
                        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5))

                        # ax1: arihtmetic vs arithmetic   
                        ax1.scatter(time, drawdown, linewidth=1)
                        ax1.set_xlabel('time [d]')
                        ax1.set_ylabel('Drawdown [m]')
                        
                        # ax2: arithmetic vs log (semilog)
                        ax2.scatter(time, drawdown, linewidth=1)
                        ax2.set_xlabel('time [d]')
                        ax2.set_ylabel('Drawdown [m]')
                        ax2.set_xscale('log')

                        # ax2: log-log
                        ax3.scatter(time, drawdown, linewidth=1) 
                        ax3.set_xlabel('time [d]')
                        ax3.set_ylabel('Drawdown [m]')
                        ax3.set_yscale('log')
                        ax3.set_xscale('log')

                        plt.tight_layout()
                        plt.show()                          

                else:
                    print("No file selected.")
            except:
                pass
        
        def get_input_file(self): 
            """Returns the selected file."""
            return self.observations

out = widgets.Output()
input_file = SelectFileButton()
widgets.VBox([input_file, out])

VBox(children=(SelectFileButton(description='Select Files', icon='square-o', style=ButtonStyle(button_color='oâ€¦

## Manual calibration

ðŸ”¹ Known Parameters
- **Flow rate (Q):** 1728 mÂ²/d  
- **Distance from observation point to well (r):** 0.1 m  

ðŸ”¹ Parameters to Calibrate
- **Transmissivity (T):** 490 mÂ²/d  
- **Storage coefficient (S):** 0.122  
- **Conductance (C):** 0.3  

In [6]:
#-- Button to trigger the calculation
output = widgets.Output()
button = widgets.Button(button_style='info', 
                        description="Click to solve it",
                        tooltip='Click me', icon='check', 
                        layout=widgets.Layout(width='300px', height='30px'))

def on_button_clicked(button):
    Q = float(Q_wid.value) 
    T = float(T_wid.value)  
    S = float(S_wid.value) 
    r = float(r_wid.value) 
    cond = float(cond_wid.value)
    method_type = method_wid.value
    
    if method_type == "Theis":
        method_sol = theis_drawdown(Q, r, S, T, time)
    else:
        method_sol, B = hantush_drawdown(Q, r, S, T, time, cond)
        
    with output:
        output.clear_output(wait=True)        
               
        #-- Plot the 3 figures
        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5))

        # ax1: arihtmetic vs arithmetic   
        ax1.scatter(time, drawdown, linewidth=1, color="blue", label="Actual data")  
        ax1.set_xlabel('time [d]')
        ax1.set_ylabel('Drawdown [m]')
        ax1.plot(time, method_sol, color="red", label=method_type + " method")
        ax1.legend(loc='lower right')
        
        # ax2: arithmetic vs log (semilog)
        ax2.scatter(time, drawdown, linewidth=1, color="blue", label="Actual data")  
        ax2.set_xlabel('time [d]')
        ax2.set_ylabel('Drawdown [m]')
        ax2.plot(time, method_sol, color="red", label=method_type + " method")
        ax2.set_xscale('log')
        ax2.legend(loc='lower right')

        # ax2: log-log
        ax3.scatter(time, drawdown, linewidth=1, color="blue", label="Actual data")  
        ax3.set_xlabel('time [d]')
        ax3.set_ylabel('Drawdown [m]')
        ax3.plot(time, method_sol, color="red", label=method_type + " method")
        ax3.set_yscale('log')
        ax3.set_xscale('log')
        ax3.legend(loc='lower right')

        plt.tight_layout()
        plt.show()
 
#-- Attaching the function to the button's click event
button.on_click(on_button_clicked)

#-- Displaying the widgets and button
Q_wid = Q_input(Q=0)  
r_wid = r_input(r=0)
T_wid = T_input(T=0)
S_wid = S_input(S=0)
cond_wid = cond_input(cond=0)
method_wid = method_input()

#-- Defining the order of widgets
widgets_list = [Q_wid, r_wid, T_wid, S_wid, cond_wid, method_wid, button]

#-- Displaying the widgets
display(widgets.VBox(widgets_list), output)

VBox(children=(Text(value='', description='Flow rate [LÂ³/T]:', style=TextStyle(description_width='initial')), â€¦

Output()

## Automatic Calibration 

**NOTE**: Remember to select only the early-time data for Theis, and the full time series for Hantush!

In [7]:
#-- Button to trigger the calculation
output = widgets.Output()
button = widgets.Button(button_style='info', 
                        description="Click to solve it",
                        tooltip='Click me', icon='check', 
                        layout=widgets.Layout(width='300px', height='30px'))


def on_button_clicked(button):
    output.clear_output(wait=True)
    Q = float(Q_wid.value) 
    r = float(r_wid.value) 
    x = [float(iS_wid.value), float(iT_wid.value), float(iCond_wid.value)]
    method_type = method_wid.value
    
    #--Find the indices for selected time range
    in_time = np.where(time == sel_initial_time.value)[0][0]
    end_time = np.where(time == sel_ending_time.value)[0][0]+1
    time_mod = time[in_time:end_time]
       
    def objective_func(x):
        """Objective function defined as the sum of the squared errors.

        Parameters
        ----------
        x : array
            The first value of the array x corresponds to the initial guess of S, 
            while the second corresponds to the initial guess of T, both, defined
            in the pop-up widget.

        Returns
        -------
        float
            sum squared errors between measured and calculated drawdown.
        """  
        #--Calculated drawdown based on both parameters previously defined
        S, T, cond = x
        if method_type == "Theis":
            calc = theis_drawdown(Q, r, S, T, time)
        else:
            calc, B = hantush_drawdown(Q, r, S, T, time, cond)
        #--Calculation of the sum squared error and returning of the value
        sqerror = [(weights[i] * (drawdown[i] - calc[i]))**2 for i in range(len(time_mod))]   
        sse = np.sum(sqerror)
        return sse
    
    #--Perform optimization
    result = sp.optimize.minimize(objective_func, x, method='Nelder-Mead')

    #--Extract optimal values
    S_opt, T_opt, Cond_opt = result.x

    calc_sse_after = objective_func(x=[S_opt, T_opt, Cond_opt])
    if method_type == "Theis":    
        method_sol = theis_drawdown(Q, r, S_opt, T_opt, time)
    else:
        method_sol, B = hantush_drawdown(Q, r, S_opt, T_opt, time, Cond_opt)
           
    with output:
        output.clear_output(wait=True)  
        
        if method_type == "Theis":   
            print(f"Optimized Storage coefficient (S): {S_opt:.4f}")
            print(f"Optimized Transmissivity (T): {T_opt:.2f}")
            print(f"SSE after automatic calibration: {calc_sse_after:.4f}")
        else:
            print(f"Optimized Storage coefficient (S): {S_opt:.4f}")
            print(f"Optimized Transmissivity (T): {T_opt:.2f}")
            print(f"Optimized Conductance: {Cond_opt:.2f}")
            print(f"Resulting leakage factor (B): {B:.2f}")
            print(f"SSE after automatic calibration: {calc_sse_after:.4f}")
                 
        #-- Plot the 3 figures
        fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(15, 5))

        # ax1: arihtmetic vs arithmetic   
        ax1.scatter(time, drawdown, linewidth=1, edgecolors='blue', facecolors = "none")  
        ax1.scatter(time[in_time:end_time], drawdown[in_time:end_time], 
                    linewidth=1, edgecolors='blue', facecolors="limegreen",
                    label="Times used for calibration")  
        ax1.set_xlabel('time [d]')
        ax1.set_ylabel('Drawdown [m]')
        ax1.plot(time, method_sol, color="red", label=method_type + " method")
        ax1.legend(loc='lower right')
        
        # ax2: arithmetic vs log (semilog)
        ax2.scatter(time, drawdown, linewidth=1, edgecolors='blue', facecolors = "none")  
        ax2.scatter(time[in_time:end_time], drawdown[in_time:end_time], 
                    linewidth=1, edgecolors='blue', facecolors="limegreen",
                    label="Times used for calibration")     
        ax2.set_xlabel('time [d]')
        ax2.set_ylabel('Drawdown [m]')
        ax2.plot(time, method_sol, color="red", label=method_type + " method")
        ax2.set_xscale('log')
        ax2.legend(loc='lower right')

        # ax2: log-log
        ax3.scatter(time, drawdown, linewidth=1, edgecolors='blue', facecolors = "none")  
        ax3.scatter(time[in_time:end_time], drawdown[in_time:end_time], 
                    linewidth=1, edgecolors='blue', facecolors="limegreen",
                    label="Times used for calibration")     
        ax3.set_xlabel('time [d]')
        ax3.set_ylabel('Drawdown [m]')
        ax3.plot(time, method_sol, color="red", label=method_type + " method")
        ax3.set_yscale('log')
        ax3.set_xscale('log')
        ax3.legend(loc='lower right')

        plt.tight_layout()
        plt.show()
        

#-- Attaching the function to the button's click event
button.on_click(on_button_clicked)

#-- Displaying the widgets and button
Q_wid = Q_input(Q=0)  
r_wid = r_input(r=0)
iS_wid = iguess_S_input(iS=0)  
iT_wid = iguess_T_input(iT=0)
iCond_wid = iguess_Cond_input(iCond=0)
method_wid = method_input()

initial_time = time
ending_time = time           
sel_initial_time = widgets.Dropdown(options=time, value=time[0], 
                          description="Initial time")
sel_ending_time = widgets.Dropdown(options=time, value=time[-1], 
                           description="Ending time")

#-- Defining the order of widgets
widgets_list = [Q_wid, r_wid, iS_wid, iT_wid, iCond_wid, sel_initial_time, sel_ending_time, 
                method_wid, button]

#-- Displaying the widgets
display(widgets.VBox(widgets_list), output)

VBox(children=(Text(value='', description='Flow rate [LÂ³/T]:', style=TextStyle(description_width='initial')), â€¦

Output()