In [1]:
import torch
import math
import numpy as np
import scipy.stats
from scipy.stats import rankdata
from scipy.interpolate import UnivariateSpline
from itertools import combinations

In [None]:
def simulatingU(x): 
    """
    Given an vector x this will return a single U vector of size 1xM where i assume M is the number of observations 
    x(1xM)/np.array: Represents one functional data point. Where M is the number of observations. 
    """
    m = x.shape[1]
    # sigma = torch.zeros(m,m,dtype=torch.float64)
    sigma = torch.zeros(m,m)
    for i in range(0,m): 
        newRow = list()
        for j in range(i,x.shape[1]):
            value = math.e**(-5* abs(x[0][i]-x[0][j]))
            sigma[i][j]=value
    sigma = (sigma + sigma.T).fill_diagonal_(1)
    z = torch.normal(mean=0, std=1, size=(m,1)) # m by 1 vector
    y = torch.matmul(torch.sqrt(sigma),z) 
    u = y/torch.norm(y)
    return u.T

In [None]:
# Original Depth Function
# where y is a matrix of J x N 
# where z is a vector of J x 1 
# returns a J x 1 vector 
def compute_f_Hat(y,z): 
    """
    This function checks each row in y to check if each element is less than z in that same row
    Parameters: 
    y (JxN Matrix): Where J represents the number of unit Vectors and N represents the number of functions 
    z (Jx1): A vector to compare each row by. 
    """
    count_less_than_threshold = (y <= z[:, np.newaxis]).sum(axis=1)
    result = count_less_than_threshold/ y.shape[1]
    return result


# arr represents a singular function in points on a grid 1 x M ( where M represents the number of points n the grid)
# Xn represents the matrix for all functions N x M (Where N is the number of functions)
# U represents the matrix of random numbers pulled from the normal distrubution with shape J x M 
# Where J represents the unit vectors and M represents the number of points
# This function will return the depth of function x in respect to all other functions in the data set

def depth(arr, u, xn_uj):
    """
    The depth function will tell us the depth of any given functional data point relative to the data set 
    Parameters: 
    arr(1xM)/np.array: Represents one functional data point. Where M is the number of observations. 
    xn(NxM)np.array: Represents all functional data points. 
    u(JxM)np.array: Represents the unit vectors, where J is the number of unit vectors and M is number of points.
    Returns: 
    np.array : An double that represents the depth of each function relative to eachother. 
    """
    arr_u_dot = torch.matmul(u,arr)
    fHatVector = compute_f_Hat(xn_uj,arr_u_dot)
    result = ((torch.matmul(fHatVector, 1- fHatVector))/u.shape[0])
    return result

def getDepth(functionalDataSet,UnitVectorMatrix):
    xn = torch.from_numpy(functionalDataSet)
    uj = torch.from_numpy(UnitVectorMatrix)
    xn_uj = torch.matmul(xn,uj.T).T # jxn 
    depthList = list()
    for row in xn:
        depthList.append(depth(row,uj,xn_uj).item())    
    # Have to convert to a numpy array since scipcy doesnt like Tensors
    depthVector = np.array(depthList)
    return depthVector



# We might need to add a flag for how rank method is caluclated 
def KW_H_Test(functionalDataSet: np.ndarray, UnitVectorMatrix: np.ndarray, 
              groups:np.ndarray): 
    
    # # Gets the rank giving us a Nx1 vector 
    x = getDepth(functionalDataSet,UnitVectorMatrix)
    rankVector = rankdata(x)
        
    # there is a scipy version of kruskal wallis we can use 
    groupRankDict = dict()
    for i in range(0,groups.shape[0]): # Runs at O(N)
        oldValue = groupRankDict.get(groups[i])
        if oldValue != None: # the key value exsits 
            groupRankDict[groups[i]] = oldValue + rankVector[i]
        else:
            groupRankDict[groups[i]] = rankVector[i]

    unique, counts = np.unique(groups, return_counts=True) # Runs at O(Groups) which should be fast 
    
    summation = 0 
    for key, value in dict(zip(unique, counts)).items(): # Runs at O(N)
        summation += (groupRankDict[key] **2) / value
    
    hStat =  12 /(groups.size* (groups.size +1)) *summation - (3*(groups.size +1))
    degreeOfFreedom = unique.size -1 
    chiSquare = scipy.stats.chi2.ppf(1-.05, df=degreeOfFreedom)
    return(hStat>chiSquare)

Xn = np.random.random((100, 10)) # represents all the data  in points
U = np.random.random((20, 10))   # unit matrix
groups = np.random.randint(1, 11, size=100) # where size must be the amount of rows in Xn 
temp = KW_H_Test(Xn,U,groups)


In [None]:
print(3*(1+2)+4)

Another way of doing the simplicial depth would be if i can instead calculate the convex hull of a set of points
and then compute if my point P exsists within the convex hull the the point is considered central. We can make this more 'extreme' by removing the poitns from the convex hull and redoing the process
and checking if the new point exsists within the new convex hull awarding the point P for everytime it is in. Will some variation of this be more computationally effient than 

In [20]:
# Gets the derivate of a single function resented by points
# Is able to find derivatives by using interpolation of the points
# To calculate the dirivative of a dataset Xn we need to know at which t they are at, so for now we will make the assumption that they are equally seperated at 1 unit apart 

def getDirivativeOfRow(x,y):
    spline = UnivariateSpline(x, y, s=0)  
    derivative = spline.derivative()
    derivative_values = derivative(x)
    return torch.tensor(derivative_values)


def isPointIn (P, A,B,C ):
    x = 0
    y = 1 
    w1 = (A[x] * (C[y]-A[y]) + (P[y]-A[y]) * (C[x]-A[x]) - P[x]*(C[y]-A[y]))/ ((B[y]-A[y])*(C[x]-A[x]) - (B[x]-A[x])*(C[y]-A[y]))
    w2 = (P[y] - A[y] -  w1 * (B[y]- A[y]) ) / (C[y] - A[y])
    return w1 >= 0 and w2 >= 0 and (w1 + w2) <= 1


def simplicial_depth(P,P_list):
    simplexList = list(combinations(P_list, 3))

    for triangle in simplexList:
        total = int(isPointIn(P,triangle[0],triangle[1],triangle[2]))
    return total

# X is a row vector with M number of points
def depth2 (x,u, xn,yn,T):
    """
    X(1xM)/np.array: Represents one functional data point. Where M is the number of observations. 
    U(JxM)np.array: Represents the unit vectors, where J is the number of unit vectors and M is number of points.
    returns the depth of a single row of Xn averaged out
    """
    # T = torch.arange(xn.shape[0]) 
    xPrime = getDirivativeOfRow(T,x) # 1 x M

    x_u_j = torch.matmul(u,x) # 1 x j 
    xPrime_u_j = torch.matmul(u,xPrime) #1 x j 
    # For every Unit vector in Matrix U
    average = 0 
    for i in range(0, xn.shape[0]):
        # x1= xn[i].unsqueeze(1)
        # y1 = yn[i]
        P_list = torch.cat((xn[i].unsqueeze(1), yn[i].unsqueeze(1)), dim=1)

        subDepthOfU = simplicial_depth((x_u_j[i], xPrime_u_j[i]),P_list)
        average += subDepthOfU

    return average/U.shape[0]
def getDepth2(functionalDataSet,unitVectorMatrix):
    """
    functionalDataSet(NxM)np.array: Represents all functional data observations.N is the number of observations different rows and M is the number of time points per row 
    unitVectorMatrix(JxM)np.array: Represents the unit vectors, where J is the number of unit vectors and M is number of points.
    Returns: 
    np.array : An double that represents the depth of each function relative to eachother. 
    """
    # Convert to Tensors
    xn = torch.from_numpy(functionalDataSet) # still n x m 
    uj = torch.from_numpy(unitVectorMatrix) # still j x m 

    # create some T that represents at what time the observations were made lets assume equally distant
    T = torch.arange(xn.shape[1]) 
    
    # Take the derivative of functional data set
    yn = torch.stack([getDirivativeOfRow(T,row) for row in Xn])  # n x m 
    
    xn_uj = torch.matmul(xn,uj.T).T # jxn 
    yn_uj = torch.matmul(yn,uj.T).T # jxn 

    depthList = list()
    for row in xn: # row is 1 x m 
        value = depth2(row,uj,xn_uj,yn_uj,T)
        depthList.append(value)    
    # Have to convert to a numpy array since scipcy doesnt like Tensors
    depthVector = np.array(depthList)
    return depthVector


Xn = np.random.random((15, 10)) # represents all the data  in points
U = np.random.random((20, 10))   # unit matrix
temp = getDepth2(Xn,U)


How to compute multiple row operations at the same time: https://saturncloud.io/blog/pytorch-batch-rowwise-application-of-function/#:~:text=PyTorch%20provides%20an%20efficient%20way%20to%20apply%20a%20function%20row,that%20applies%20to%20each%20row.