In [3]:
import math
import numpy as np
from sklearn.metrics import confusion_matrix
from sklearn.neighbors import LocalOutlierFactor
from joblib import Parallel, delayed

class Local_Outlier_Factor:
    def __init__(self, n_neighbors, metric, datasize):
        self.k = n_neighbors
        self.distanceType = metric
        self.lofScores = []
        self.threshold = None
        self.neighborInfo = [0] * datasize  # contains list of tuples with information about neighbors for each datapoint [(neigbor1Distance, neighbor1Index), 
                                            # (neigbor2Distance, neighbor2Index) ...]
            
    # Assumes that 0 is negative and 1 is positive and the matrix will be 2 x 2
    def confusionMatrix(self, trueLabels, predictedLabels):
        matrix = [[0, 0], [0, 0]]
        for i in range(len(trueLabels)):
            matrix[trueLabels[i]][predictedLabels[i]] += 1
        return np.array(matrix)

    # input precision, recall, f1score, or parts
    def matrixScores(self, matrix, scoreType):
        neg, pos = matrix
        tn, fp = neg
        fn, tp = pos
        if scoreType == "parts":
            return [tn, fn, fp, tp]
        if scoreType == "precision":
            if tp == 0:
                return 0
            return tp/(tp+fp)
        if scoreType == "recall":
            if tp == 0:
                return 0
            return tp/(tp+fn)
        if scoreType == "f1score":
            if tp == 0:
                return 0
            precision = tp/(tp+fp)
            recall = tp/(tp+fn)
            return 2 * ((precision * recall)/(precision + recall))
        else:
            print("Please enter a valid scoreType")
            

    def distance(self, a, b):
        if self.distanceType == "euclidean":
            try: 
                s = 0
                if type(a) != type(b) or len(a) != len(b):
                    raise Exception("The inputs a and b need to be same type and equal length") 
                for i in range(len(a)):
                    s += (a[i] - b[i])**2
                return math.sqrt(s)
            except Exception as e:
                print(f"Exception: {e}")
        elif self.distanceType == "manhattan":
            try: 
                s = 0
                if type(a) != type(b) or len(a) != len(b):
                    raise Exception("The inputs a and b need to be same type and equal length") 
                for i in range(len(a)):
                    s += abs(a[i] - b[i])
                return s
            except Exception as e:
                print(f"Exception: {e}")
        else:
            raise Exception("Please input a valid distance type into the LocalOutlierFactor object.")

    def kNeighbors(self, datapoints, query):
        neighbors = [0] * (self.k+1)
        neighbor = 0
        worstBest = float('inf')
        for i in range(len(datapoints)):
            d = self.distance(datapoints[i], query)
            if d < worstBest:
                if neighbor < self.k+1:
                    neighbors[neighbor] = (d, i)
                    neighbor += 1
                else:
                    maximum = float('-inf')
                    maxIndex = 0
                    for j in range(len(neighbors)):
                        if neighbors[j][0] > maximum:
                            maximum = neighbors[j][0]
                            maxIndex = j
                    worstBest = maximum
                    if d < worstBest:
                        neighbors[maxIndex] = (d, i)
        neighbors.sort()
        return neighbors[1:]

    def reachabilityDistance(self, distance, kDistance):
        return max(distance, kDistance)

    def localReachabilityDensity(self, datapoints, neighbors):
        foundNeighbors = set()
        reachabilityDistanceSum = 0
        # Calculates local reachability density
        for i in range(len(neighbors)):
            distanceToNeighbor = neighbors[i][0]
            neighborIndex = neighbors[i][1]
            reachabilityDistanceSum += self.reachabilityDistance(distanceToNeighbor, self.neighborInfo[neighborIndex][-1][0])
        avgReachabilityDistance = reachabilityDistanceSum / self.k
        return 1.0 / (avgReachabilityDistance + 1e-10)

    def localOutlierFactor(self, datapoints, query, neighbors):
        lrdQuery = self.localReachabilityDensity(datapoints, self.neighborInfo[query])
        
        lrdNeighborsSum = 0
        for i in range(len(neighbors)):
            neighborIndex = neighbors[i][1]
            lrdNeighborsSum += self.localReachabilityDensity(datapoints, self.neighborInfo[neighborIndex])
        avgLRDNeighbors = lrdNeighborsSum / self.k
        
        LOF = avgLRDNeighbors / lrdQuery
        return LOF
    
    # returns start and end indices of chunks of data
    # ex. [[0, 10], [10, 20]] if the number of datapoints is 20 and 2 chunks are needed
    def getChunks(self, numDatapoints, numChunks):
        chunkSize = numDatapoints//numChunks
        res = []
        startIndex = 0
        if numDatapoints%numChunks == 0: 
            while startIndex < numDatapoints:
                res.append([startIndex, startIndex+chunkSize])
                startIndex += chunkSize
        else:
            while startIndex < numDatapoints-chunkSize:
                res.append([startIndex, startIndex+chunkSize])
                startIndex += chunkSize
            res[-1][1] = numDatapoints
        return res
        
    def parallel_method_call(self, datapoints, indices):
        res = []
        for i in range(indices[0], indices[1]):
            res.append([i, self.kNeighbors(datapoints, datapoints[i])])
        return res
        
    def createOutlierFactor(self, datapoints, cores = 1):
        chunkIndices = self.getChunks(len(datapoints), cores) # indices of data for each core
        
        # Calculates information for each neighbor based on the n_neighbors specified within the Local_Outlier_Factor object.
        results = Parallel(n_jobs=-1)(delayed(self.parallel_method_call)(datapoints, chunkIndices[i]) for i in range(len(chunkIndices)))
        
        for i in range(len(results)):
            for j in range(len(results[i])):
                self.neighborInfo[results[i][j][0]] = results[i][j][1]
        
        # Calculates the local reachability factors for each datapoint and puts it within a list
        for i in range(len(datapoints)):
            self.lofScores.append(self.localOutlierFactor(datapoints, i, self.neighborInfo[i]))
        return np.array(self.lofScores)
    
    def findThreshold(self, trueLabels):
        if not self.lofScores:
            raise Exception("Please calculate local outlier factors first before trying to find a threshold value for them.")
        highestLOF = max(self.lofScores)
        curThreshold = min(self.lofScores)
        bestThresholdPrecision = float('-inf')
        bestThreshold = curThreshold
        while curThreshold < highestLOF:
            predicted = []
            for lof in self.lofScores:
                if lof < curThreshold:
                    predicted.append(1)
                else:
                    predicted.append(0)
            precision = self.matrixScores(self.confusionMatrix(trueLabels, predicted), "precision")
            if precision > bestThresholdPrecision:
                bestThresholdPrecision = precision
                bestThreshold = curThreshold
            curThreshold += 0.1
        self.threshold = bestThreshold
    
    def predict(self, datapoints, newData, cores = 1):
        if newData:
            self.lofScores = []
            self.neighborInfo = [0] * len(datapoints)
            self.createOutlierFactor(datapoints, cores)
            predicted = []
            for lof in self.lofScores:
                if lof > self.threshold:
                    predicted.append(1)
                else:
                    predicted.append(0)
            return predicted
        else:
            predicted = []
            for lof in self.lofScores:
                if lof > self.threshold:
                    predicted.append(1)
                else:
                    predicted.append(0)
            return predicted

In [1]:
import numpy as np
from scipy import linalg

# Assumes that 0 is negative and 1 is positive and the matrix will be 2 x 2
def confusionMatrixMD(trueLabels, predictedLabels):
    matrix = [[0, 0], [0, 0]]
    for i in range(len(trueLabels)):
        matrix[trueLabels[i]][predictedLabels[i]] += 1
    return np.array(matrix)

# input precision, recall, f1score, or parts
def matrixScoresMD(matrix, scoreType):
    neg, pos = matrix
    tn, fp = neg
    fn, tp = pos
    if scoreType == "parts":
        return [tn, fn, fp, tp]
    if scoreType == "precision":
        if tp == 0:
            return 0
        return tp/(tp+fp)
    if scoreType == "recall":
        if tp == 0:
            return 0
        return tp/(tp+fn)
    if scoreType == "f1score":
        if tp == 0:
            return 0
        precision = tp/(tp+fp)
        recall = tp/(tp+fn)
        return 2 * ((precision * recall)/(precision + recall))
    else:
        print("Please enter a valid scoreType")

"""
Compute the Mahalanobis Distance between each row of x and the data
points: matrix of data that contains the points for which you want the Mahalanobis distance
distribution: ndarray of the distribution from which Mahalanobis distance of each observation of a point in points is to be computed.
"""
def mahalanobisDistance(points, distribution):
    colMeans = []
    for col in distribution.T:
        colMeans.append(np.mean(col))
    points_minus_mean = np.copy(points)
    for i in range(len(points)):
        for j in range(len(points[i])):
            points_minus_mean[i][j] = points[i][j] - colMeans[j]
    covarianceMatrix = np.cov(distribution.T)
    inverseCovarianceMatrix = linalg.inv(covarianceMatrix)
    mahalanobisDistanceMatrix = np.dot(np.dot(points_minus_mean, inverseCovarianceMatrix), points_minus_mean.T)
    return mahalanobisDistanceMatrix.diagonal()

# finds best threshold value for data
def findThresholdMD(mahalanobisDistances, trueLabels):
    longestDistance = max(mahalanobisDistances)
    curThreshold = 0
    bestThresholdPrecision = 0
    bestThreshold = curThreshold
    while curThreshold < longestDistance:
        predicted = []
        for distance in mahalanobisDistances:
            if distance > curThreshold:
                predicted.append(1)
            else:
                predicted.append(0)
        precision = matrixScoresMD(confusionMatrixMD(trueLabels, predicted), "precision")
        if precision > bestThresholdPrecision:
            bestThresholdPrecision = precision
            bestThreshold = curThreshold
        curThreshold += 1
    return bestThreshold

def predictMD(threshold, data):
    mahalanobisDistances = mahalanobisDistance(data, data)
    predicted = []
    for distance in mahalanobisDistances:
        if distance > threshold:
            predicted.append(1)
        else:
            predicted.append(0)
    return predicted