# Pruebas para la igualdad de dos distribuciones (localización)

### Modelos no paramétricos y de regresión
##### Por: Jorge Iván Reyes Hernández

In [6]:
from abc import ABC
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import binom
from scipy.stats import norm, chi2

In [2]:
class EqualityTest(ABC):
    """ Clase para implementar la prueba de igualdad de dos distribuciones (localización). """
    
    def __init__(self, x, y):
        """ Constructor de la clase.
        
        :param x: Mediciones de la primera distribución.
        :param y: Mediciones de la segunda distribución.
        """
        self._x = x
        self._y = y
        self._sample = None
        self._indices = {"X": [], "Y": []}
        self._n_1: int = x.size
        self._n_2: int = y.size
        self._n: int = self._n_1 + self._n_2
        self._significance_level: float = 0.0
        self.__merge_data()
        
    def get_x(self):
        """ Regresa los datos de la primera muestra.
        
        :return: Mediciones de la primera distribución.
        """
        return self._x
    
    def get_y(self):
        """ Regresa los datos de la segunda muestra.
        
        :return: Mediciones de la segunda distribución.
        """
        return self._y
    
    def get_sample(self):
        """ Regresa los datos en una única muestra dicotomizada.
        
        :return: Muestra mezclada.
        """
        return self._sample
    
    def get_indices(self):
        """ Regresa los índices (rangos) de los elementos de la muestra."""
        return self._indices
    
    def __merge_data(self):
        """ Método para dicotomizar los datos.
        
            Se ignoran los empates.
            Se guardan los índices de las X's y de las Y's.
        """
        
        x = np.sort(self._x, kind="mergesort")
        y = np.sort(self._y, kind="mergesort")
        
        merged = []
        
        i, j = 0, 0
        n_1, n_2 = 0, 0
        idx = 0
        
        while (i < self._n_1 and j < self._n_2):
            if x[i] < y[j]:
                merged.append("X")
                self._indices["X"].append(idx)
                n_1 += 1
                i += 1
                idx += 1
            elif x[i] > y[j]:
                merged.append("Y")
                self._indices["Y"].append(idx)
                n_2 += 1
                j += 1
                idx += 1
            else:
                i += 1
                j += 1
        
        while i < self._n_1:
            merged.append("X")
            self._indices["X"].append(idx)
            n_1 += 1
            i += 1
            idx += 1
        
        while j < self._n_2:
            merged.append("Y")
            self._indices["Y"].append(idx)
            n_2 += 1
            j += 1
            idx += 1
        
        self._sample = np.array(merged)
        self._n_1 = n_1
        self._n_2 = n_2
        self._n = self._n_1 + self._n_2


### Prueba basada en rachas (Wald-Wolfowitz Runs test)

In [3]:
class RunsEqualityTest(EqualityTest):
    """ Clase para implementar la prueba de Wald-Wolfowitz (basada en rachas)."""
    
    def __init__(self, x, y):
        """ Constructor de la clase.
        
        :param x: Mediciones de la primera distribución.
        :param y: Mediciones de la segunda distribución.
        """
        super().__init__(x, y)
        self.__r = 1
        self.__compute_runs()
        
    def get_runs(self):
        """ Regresa la cantidad de rachas en la muestra."""
        return self.__r
        
    def __compute_runs(self):
        """ Calcula la cantidad de rachas observadas."""
        cur = self._sample[0]
        for i in range(1, self._sample.size):
            if cur != self._sample[i]:
                self.__r += 1
            cur = self._sample[i]
    
    def __pmf_even(self, r):
        """ Función de probabilidad de la estadística de prueba cuando r es par.
        
        :param r: Valor que toma R.
        :return: La probabilidad de que R=r
        """
        num = 2 * binom(self._n_1 - 1, r / 2 - 1) * binom(self._n_2 - 1, r / 2 - 1)
        den = binom(self._n, self._n_1)
        return num / den
    
    def __pmf_odd(self, r):
        """ Función de probabilidad de la estadística de prueba cuando r es impar.
        
        :param r: Valor que toma R.
        :return: La probabilidad de que R=r
        """
        num = binom(self._n_1 - 1, (r - 1) / 2) * binom(self._n_2 - 1, (r - 3) / 2)
        num += binom(self._n_1 - 1, (r - 3) / 2) * binom(self._n_2 - 1, (r - 1) / 2)
        den = binom(self._n, self._n_1)
        return num / den
        
    def __test_statistic_exact(self, alpha):
        """ Calcula las constantes (cuántiles) de la pmf de R tales que la proba acumulada
        sea menor que el nivel de significancia para la prueba de dos colas.
        
        :param alpha: Nivel de significancia deseado.
        :return: La constante que define la región crítica.
        """
        prob = 0.0
        k = 1
        while prob <= alpha and (k <= self._n):
            self._significance_level = prob
            k += 1
            p = self.__pmf_even(k) if (k % 2 == 0) else self.__pmf_odd(k)
            prob += p
        k -= 1
        return k
    
    def __test_statistic_asymptotic(self):
        """ Calcula la estadística de prueba asintótica.
        
        :return: El valor que toma la estadística de pruba.
        """
        num = self.__r + 0.5 - 1 - (2 * self._n_1 * self._n_2 / self._n)
        den = 2 * self._n_1 * self._n_2 * (2 * self._n_1 * self._n_2 - self._n)
        den /= (self._n ** 2) * (self._n - 1)
        den = np.sqrt(den)
        
        return num / den
        
    def run_test(self, alpha=0.05, exact=False):
        """ Realiza la prueba de hipótesis. 
        
        :param alpha: El nivel de significancia deseado.
        :param exact: Si la prueba será exacta o asintótica.
        """
        if exact:
            k = self.__test_statistic_exact(alpha)
            print("H_0: F_X = F_Y v.s H_a: F_X != F_Y")
            print(f"Rechazar H_0 si: R <= {k}")
            print(f"Valor que tomó la estadística: R_obs = {self.__r}")
            decision = "-> Rechazamos H_0" if (self.__r <= k) else "-> No rechazamos H_0"
            print(decision + f" con nivel de significancia {self._significance_level:.3f}")
        else:
            z = self.__test_statistic_asymptotic()
            q = -norm.ppf(q=1-alpha)
            print("H_0: F_X = F_Y v.s H_a: F_X != F_Y")
            print(f"Rechazar H_0 si: Z <= {q:.3f}")
            print(f"Valor que tomó la estadística R: R_obs = {self.__r}")
            print(f"Valor que tomó la estadística Z: Z_obs = {z:.3f}")
            decision = "-> Rechazamos H_0" if z <= q else "->No rechazamos H_0"
            print(decision + f" con nivel de significancia {alpha*100}%")


###### Verificamos que funcione

### Prueba de Mann-Whitney (U-test)

In [4]:
from itertools import combinations
import math

class MannWhitneyTest(EqualityTest):
    def __init__(self, x, y):
        """ Constructor de la clase.
        
        :param x: Mediciones de la primera distribución.
        :param y: Mediciones de la segunda distribución.
        """
        super().__init__(x, y)
        self.__u: int = 0
        self.__u_prime: int = 0
        self.__compute_u()
        self.__compute_u_prime()
        
    def get_u(self):
        """ Regresa el valor que toma la estadística U. """
        return self.__u
    
    def get_u_prime(self):
        """ Regresa el valor que toma la estadística U'. """
        return self.__u_prime
    
    def __compute_u(self):
        """ Calcula eficientemente el valor de la estadística U. """
        x_indices = self._indices["X"]
        u = sum(x_indices) - (self._n_1 * (self._n_1 - 1) / 2)
        self.__u = int(u)
        
    def __compute_u_prime(self):
        """ Calcula eficientemente el valor de la estadística U'. """
        # Aquí va tu código
        y_indices = self._indices["Y"]
        u_prime = sum(y_indices) - (self._n_2 * (self._n_2 - 1) / 2)
        self.__u_prime = int(u_prime)
    
    def __pmf(self, u):
        """ Función de masa de probabilidad de U (exacta). 
        
        :param u: El valor que toma U.
        """
        #Primero hacemos todas las permutaciones posibles de arreglos con 
        # n_1 X's y n_2 Y's
        total = [i for i in range(self._n)]
        combi = list(combinations(total, self._n_1))
        #variable que cuenta los arreglos que cumplen la condicion
        r = 0
        
        #iteramos sobre todas las combinaciones
        for combination in combi:
            x_per = np.array(combination)
            y_per = np.array(list(set(total) - set(x_per)))

            p = EqualityTest(x_per, y_per)

            #calculamos el valor de U para cada permutación
            indices = p.get_indices()
            x_indices = indices["X"]
            u_per = sum(x_indices) - (self._n_1 * (self._n_1 - 1) / 2)

            if u_per == u:
                r += 1
        
        #calculamos el valor exacto de la estadistica u
        pmf = r/math.comb(self._n, self._n_1)
        return pmf

    def __test_statistic_asymptotic(self, u):
        """ Calcula la estadística de prueba asintótica.
        
        :param u: El valor que toma U o U'.
        :return: El valor que toma la estadística de pruba.
        """
        z = u + 0.5 - (self._n_1 * self._n_2 / 2)
        z /= np.sqrt((self._n_1 * self._n_2) * (self._n + 1 / 12))
        return z
    
    def run_test(self, alternative, alpha=0.05, exact=False):
        """ Realiza la prueba de hipótesis. 
        
        :param alternative: Hipótesis alternativa.
        :param alpha: Nivel de significancia deseado.
        :param exact: Si la prueba será exacta o asintótica.
        """
        if exact:
            if alternative == "F_X != F_Y" or alternative == "F_Y != F_X":
                prob = 0.0
                k_1 = 1
                k_2 = self._n + 1
                while prob <= alpha and (k_1 <= k_2):
                    self.significance_level = prob
                    k_1 += 1
                    k_2 -= 1
                    p_1 = self.__pmf(k_1)
                    p_2 = self.__pmf(k_2)
                    prob += p_1 + p_2
                    print(prob, k_1, k_2)
                k_1 -= 1
                k_2 += 1

                print("H_0: F_X = F_Y v.s H_a: F_X != F_Y")
                print(f"Rechazar H_0: F_X = F_Y si: Z <= {k_1} o Z >= {k_2}")
                print(f"Valor que tomó la estadística U: U_obs = {self.__u}")
                decision = "Rechazar" if self.__u <= k_1 or self.__u >= k_2 else "No rechazar"
                print(decision + f' con un nivel de significación de {self.significance_level:.3f}')
            
            elif alternative == "F_X < F_Y" or alternative == "F_Y > F_X":
                prob = 0.0
                k_1 = 1
                while prob <= alpha:
                    self.significance_level = prob
                    k_1 += 1
                    p_1 = self.__pmf(k_1)
                    prob += p_1
                    print(prob, k_1)
                k_1 -= 1

                print("H_0: F_X = F_Y v.s H_a: F_X < F_Y")
                print(f"Rechazar H_0: F_X = F_Y si: Z <= {k_1}")
                print(f"Valor que tomó la estadística U: U_obs = {self.__u}")
                decision = "Rechazar" if self.__u <= k_1 else "No rechazar"
                print(decision + f' con un nivel de significación de {self.significance_level:.3f}')

            elif alternative == "F_X > F_Y" or alternative == "F_Y < F_X":
                prob = 0.0
                k_2 = self._n + 1
                while prob <= alpha:
                    self.significance_level = prob
                    k_2 -= 1
                    p_2 = self.__pmf(k_2)
                    prob += p_2
                    print(prob, k_2)
                k_2 += 1

                print("H_0: F_X = F_Y v.s H_a: F_X > F_Y")
                print(f"Rechazar H_0: F_X = F_Y si: Z >= {k_2}")
                print(f"Valor que tomó la estadística U: U_obs = {self.__u}")
                decision = "Rechazar" if self.__u >= k_2 else "No rechazar"
                print(decision + f' con un nivel de significación de {self.significance_level:.3f}')

        else:
            if alternative == "F_X != F_Y" or alternative == "F_Y != F_X":
                z = self.__test_statistic_asymptotic(self.__u)
                z_prime = self.__test_statistic_asymptotic(self.__u_prime)
                c_alpha = norm.ppf(q=alpha/2)
                print("H_0: F_X = F_Y v.s H_a: F_X != F_Y")
                print(f"Rechazar H_0: F_X = F_Y si: Z <= {c_alpha:.3f} o Z >= {-c_alpha:.3f}")
                print(f"Valor que tomó la estadística U: U_obs = {self.__u}")
                print(f"Valor que tomó la estadística Z: Z_obs = {z:.3f}")
                print(f"Valor que tomó la estadística U': U'_obs = {self.__u_prime}")
                print(f"Valor que tomó la estadística Z': Z'_obs = {z_prime:.3f}")
                if z <= c_alpha or z_prime <= c_alpha:
                    decision = "-> Rechazamos H_0"
                else:
                    decision = "-> No rechazamos H_0"
                print(decision + f" con nivel de significancia {alpha*100}%")
                
                
            elif alternative == "F_X < F_Y" or alternative == "F_Y > F_X":
                z_prime = self.__test_statistic_asymptotic(self.__u_prime)
                c_alpha = norm.ppf(q=alpha)
                print("H_0: F_X = F_Y v.s H_a: F_X < F_Y")
                print(f"Rechazar H_0: F_X = F_Y si: Z <= {c_alpha:.3f}")
                print(f"Valor que tomó la estadística U': U'_obs = {self.__u_prime}")
                print(f"Valor que tomó la estadística Z': Z'_obs = {z_prime:.3f}")
                decision = "-> Rechazamos H_0" if z_prime <= c_alpha else "->No rechazamos H_0"
                print(decision + f" con nivel de significancia {alpha*100}%")
                
            elif alternative == "F_X > F_Y" or alternative == "F_Y < F_X":
                z = self.__test_statistic_asymptotic(self.__u)
                c_alpha = norm.ppf(q=alpha)
                print("H_0: F_X = F_Y v.s H_a: F_X > F_Y")
                print(f"Rechazar H_0: F_X = F_Y si: Z <= {c_alpha:.3f}")
                print(f"Valor que tomó la estadística U: U_obs = {self.__u}")
                print(f"Valor que tomó la estadística Z: Z_obs = {z:.3f}")
                decision = "-> Rechazamos H_0" if z <= c_alpha else "->No rechazamos H_0"
                print(decision + f" con nivel de significancia {alpha*100}%")

### Prueba suma de rangos de Wilcoxon

In [5]:
class WilcoxonTest(EqualityTest):
    def __init__(self, x, y):
        """ Constructor de la clase.
        
        :param x: Mediciones de la primera distribución.
        :param y: Mediciones de la segunda distribución
        """
        super().__init__(x, y)
        self.__w: int = 0
        self.__compute_w()
        
    def __compute_w(self):
        """ Calcula el valor observado de W. """
        # primero obtenemos los rangos de las X
        x_ranks = self._indices['X']
        #le sumamos una unidad a cada uno de los rangos, pues inician en 0
        x_ranks = [x + 1 for x in x_ranks]

        #ahora calculamos la estaditica de prueba sumando los rangos de las X
        self.__w = int(sum(x_ranks))
        
    def __test_statistic_asymptotic(self, w):
        """ Calcula la estadística de prueba asintótica. """
        z = w + 0.5 - (self._n_1*(self._n + 1)/2)
        z /= np.sqrt(self._n_1*self._n_2*(self._n + 1)/12)
        return z
        
    def run_test(self, alternative, alpha=0.05, exact=False):
        """ Realiza la prueba de hipótesis.
        
        :param alternative: Hipótesis alternativa.
        :param alpha: Nivel de significancia.
        :param exact: Si la prueba será exacta o asintótica.
        """
        if alternative == 'F_X != F_Y' or alternative == 'F_Y != F_X':
            z = self.__test_statistic_asymptotic(self.__w)
            c_alpha = norm.ppf(q=alpha/2)
            print("H_0: F_X = F_Y v.s H_a: F_X != F_Y")
            print(f"Rechazar H_0: F_X = F_Y si: Z <= {c_alpha:.3f} o Z >= {-c_alpha:.3f}")
            print(f"Valor que tomó la estadística W: W_obs = {self.__w}")
            print(f"Valor que tomó la estadística Z: Z_obs = {z:.3f}")
            decision = "-> Rechazamos H_0" if z <= c_alpha or z >= -c_alpha else "->No rechazamos H_0"
            print(decision + f" con nivel de significancia {alpha*100}%")

        elif alternative == 'F_X > F_Y' or alternative == 'F_Y < F_X':
            z = self.__test_statistic_asymptotic(self.__w)
            c_alpha = norm.ppf(q=alpha)
            print("H_0: F_X = F_Y v.s H_a: F_X > F_Y")
            print(f"Rechazar H_0: F_X = F_Y si: Z <= {c_alpha:.3f}")
            print(f"Valor que tomó la estadística W: W_obs = {self.__w}")
            print(f"Valor que tomó la estadística Z: Z_obs = {z:.3f}")
            decision = "-> Rechazamos H_0" if z <= c_alpha else "->No rechazamos H_0"
            print(decision + f" con nivel de significancia {alpha*100}%")

        elif alternative == 'F_X < F_Y' or alternative == 'F_Y > F_X':
            z = self.__test_statistic_asymptotic(self.__w)
            c_alpha = norm.ppf(q=alpha)
            print("H_0: F_X = F_Y v.s H_a: F_X < F_Y")
            print(f"Rechazar H_0: F_X = F_Y si: Z >= {c_alpha:.3f}")
            print(f"Valor que tomó la estadística W: W_obs = {self.__w}")
            print(f"Valor que tomó la estadística Z: Z_obs = {z:.3f}")
            decision = "-> Rechazamos H_0" if z >= c_alpha else "->No rechazamos H_0"
            print(decision + f" con nivel de significancia {alpha*100}%")          
    