# Reproducing DQF Anomaly (Chandler, 2022) in Python
Chandler's Original Code is in R. Can be viewed at https://github.com/GabeChandler/AnomalyDetection

In [1]:
import statistics
import numpy as np
import random
import scipy.stats as sps
import math
import tqdm

## Functions

### `sd_w()`

`sd_w()` computes the winsorized standard deviation, called by `dqf_outlier()`. Winsorizing or winsorization is the transformation of statistics by limiting extreme values in the statistical data to reduce the effect of possibly spurious outliers. The standard deviations can be heavily influenced by extreme values. Winsorized standard deviation compensates for this by setting the extreme outliers equal to a certain percentile value.

In [2]:
def sd_w(x, k):
    """ computes the windsorized standard deviation, called by dqf_outlier
    Input:
        x: list of numeric data values
        k: number of observations at each extreme to alter
    Returns:
        windsorized standard deviation
    """
    k = int(k)
    if k == 0: return np.std(x)
    else: 
        x.sort()
        l1 = [x[k]]*k
        l2 = [x[-k-1]]*k
        l3 = x[k:-k]
        return np.std(np.concatenate((([x[k]]*k),([x[-k-1]]*k),x[k:-k])))


`sd_w()` test

In [3]:
data = np.array([1,0,0,2,1,1,1,1,1,1,1,2])

assert sd_w(data,0) == np.std(data)
assert sd_w(data,2) == 0

### `subsamp_dqf()`

`subsamp_dqf()` is called by dqf.outlier; It computes random subset of pairs of points.

In [4]:
def subsamp_dqf(n_obs, subsample):
    pairs = []
    subsample = int(subsample/2)*2
    for i in range(1,n_obs+1):
        for j in range(i+1,i+int(subsample/2)+1):
            pairs.append((i-1,j*(j<=n_obs)+(j-n_obs)*(j>n_obs)-1))

    return pairs

`subsamp_dqf()` test

In [5]:
subsamp_dqf(8,3)

[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 0)]

### `dqf_outlier()`

computes kernelized version of depthity.

In [6]:
def dqf_outlier(data = None,
                gram_mat = None, 
                g_scale=2, 
                angle=[30,45,60],
                kernel="linear",
                p1=1,p2=0,
                n_splits=100,
                subsample=50,
                z_scale=True,
                k_w=3,
                adaptive=True,
                G="norm"):
    if G == "norm": param1=0;param2=1
    if G == "unif": param1=-1;param2=1
    if not data and not gram_mat: raise Exception("Either a dataset or Gram matrix must be provided")
    if min(angle) <= 0 or max(angle) >= 90: raise Exception("Angles must be between 0 and 90")
    if not data: n_obs = len(gram_mat)
    if not gram_mat: n_obs = len(data); data = np.array(data)

    # shuffle data points
    scram = np.arange(0,n_obs, 1)
    random.shuffle(scram)
    #   create unscram order array
    unscram = np.zeros(len(scram)).astype(int)
    for i in range(len(unscram)):
        index = scram[i]
        unscram[index] = int(i)

    # subsample pairs of points in dataset
    pairs = subsamp_dqf(n_obs, subsample)

    # if gram matrix is not provided as parameter
    if not gram_mat:
        if z_scale:
            data = sps.zscore(data) # compute the z score of each value in each column of matrix, relative to the sample mean and standard deviation.
        if hasattr(kernel, '__call__'):
            kern = kernel
        if kernel == "linear":
            def kern(x,y):
                return sum(x*y)
        if kernel == "rbf":
            def kern(x,y):
                return math.exp(-sum((x-y)**2)/p1)
        if kernel == "poly":
            def kern(x,y):
                return (sum(x*y)+p2)**p1
        data = data[scram,]
        gram = np.zeros((n_obs,n_obs))
        for i in range(n_obs):
            for j in range(i,n_obs):
                gram[i][j] = kern(data[i,],data[j,])
                gram[j][i] = gram[i][j]
    else: # check properties of gram matrix
        gram_mat = np.array(gram_mat) # convert to np.array
        if gram_mat.shape[0] != gram_mat.shape[1]:
            raise Exception("Gram matrix must be square")
        if np.allclose(gram_mat, gram_mat.T):
            raise Exception("Gram matrix must be symmetric")
        gram = gram_mat.copy()
        gram = gram[scram,] # align gram_mat rows to data
        gram = gram[:,scram] # align gram_mat cols to data

    # returns (g_scale'd) z_scores; calls sps.(G).ppf()
    splits = getattr(sps, G).ppf(np.array(range(1,n_splits+1))/(n_splits+1),param1,param2)*g_scale
    depthity1 = np.array([0]*len(splits)); depthity2 = depthity1.copy(); depthity3 = depthity1.copy()
    norm_k2 = np.array([0 for i in range(n_obs)]); error_k = np.array([0 for i in range(n_obs)]); k_to_mid = np.array([0 for i in range(n_obs)])

    dep1 = np.zeros((len(pairs),n_splits)); dep2 = dep1.copy(); dep3 = dep1.copy()
    qfs1 = np.zeros((len(pairs),100)); qfs2 = qfs1.copy(); qfs3 = qfs1.copy()

    for i_sub in tqdm.trange(len(pairs)):
        i = pairs[i_sub][0]; j = pairs[i_sub][1]
        for k in range(n_obs):
            norm_k2[k] = gram[k][k] +.25*(gram[i][i]+gram[j][j]) + .5*gram[i][j] - gram[k][i] - gram[k][j]
            k_to_mid[k] = (gram[k][i]-gram[k][j] + .5*(gram[j][j]-gram[i][i]))/math.sqrt(gram[i][i]+gram[j][j]-2*gram[i][j])
            error_k[k] = math.sqrt(abs(norm_k2[k] - k_to_mid[k]**2))
        for c in range(len(splits)):
            s = splits[c]*(sd_w(k_to_mid,k_w)*adaptive + (not adaptive))
            good = np.array([0 if k_to_mid[i]/s > 1 else 0 for i in range(len(k_to_mid))]) # points on other side of cone tip removed
            d_to_tip = abs(k_to_mid - s)

            good1 = good*(abs(np.vectorize(math.atan)(error_k/k_to_mid)) < (angle[0]/360*2*math.pi))
            good1 = good1*(1-(2*np.sign(k_to_mid)==np.sign(s)))
            depthity1[c] = min(sum([g1==-1 for g1 in good1]),sum([g1==1 for g1 in good1]))
            # same code for depthity2 and depthity3
        
        qfs1[i_sub,] = np.quantile([d for d in depthity1 if not np.isnan(d)], np.linspace(.01,1,100))
    
    dqf1 = np.zeros((n_obs, 100))
    for i in range(n_obs):
        dqf1[i] = np.mean(qfs1[np.concatenate((np.where([pair[0] for pair in pairs] == i), np.where([pair[1] for pair in pairs] == i)))])
    dqf1 = dqf1[unscram,]


    return dqf1

In [7]:
import matplotlib.pyplot as plt

In [21]:
x = np.arange(-2,2,.05)
y = x**2
df = []
for i in range(len(x)):
    df.append([x[i],y[i]])
df.append([0,2])

plt.scatter(df[:,0],df[:,1])

TypeError: list indices must be integers or slices, not tuple

In [None]:
dqf_outlier(data = df)

In [11]:
test = np.array([[2,1],[1,0]])
print(sps.zscore(test,axis=0))
print(sps.zscore(test))
print(sps.zscore(test,axis=1))

[[ 1.  1.]
 [-1. -1.]]
[[ 1.  1.]
 [-1. -1.]]
[[ 1. -1.]
 [ 1. -1.]]


In [None]:
list = np.array([-3,3,-6,-8,5])

l1 = np.array([2,4,6])
l2 = np.array([2,2,6])
l1/l2

np.sign(list)
np.sign(-4)

good1 = [-2,-2,2,2,-1,-1]
sum([g1==-1 for g1 in good1])

np.linspace(.01,1,100)

pairs = np.array([(1,2),(2,1),(4,7),(7,4),(1,6)])
pairs[[1,3]]

mat = np.array([[0,0],[1,0],[2,0],[3,0],[4,0],[5,0]])
scram = np.arange(0,5,1)
random.shuffle(scram)
unscram = np.zeros(len(scram)).astype(int)
for i in range(len(unscram)):
    index = scram[i]
    unscram[index] = int(i)
mat = mat[scram,]
print(type(unscram[0]))
print(mat)
print(mat[unscram,])

In [None]:
G = "norm"
print(getattr(sps, G).ppf(np.array(range(1,100+1))/(100+1),0,1))
G = "uniform"
getattr(sps, G).ppf(np.array(range(1,100+1))/(100+1),0,1)