In [1]:
import numpy as np
from scipy import linalg
from sklearn.cluster import KMeans
from random import shuffle

In [2]:
class Hidden_Markov_Model:
    
    def __init__(self,n_states,A,B,initial_distribution=None):
        self.n = n_states
        self.A = A
        self.B = B
        if initial_distribution is None:
            self.initial_distribution = np.linalg.matrix_power(self.A,2000)[0]
        else:
            self.initial_distribution = initial_distribution
            
    
    def forward_backward(self,observations):
        
        alpha = np.zeros((observations.shape[0], self.A.shape[0]))
        beta = np.zeros((observations.shape[0], self.A.shape[0]))
        
        alpha[0, :] = self.initial_distribution * self.B[:, observations[0]]
        beta[observations.shape[0] - 1] = np.ones((self.A.shape[0]))

        for t in range(1, observations.shape[0]):
            for j in range(self.A.shape[0]):
                alpha[t, j] = alpha[t - 1].dot(self.A[:, j]) * self.B[j, observations[t]]
                
        for t in range(observations.shape[0] - 2, -1, -1):
            for j in range(self.A.shape[0]):
                beta[t, j] = np.dot((beta[t + 1] * self.B[:, observations[t + 1]]),self.A[j, :])


        return alpha,beta,np.sum(alpha[-1])
    
    @staticmethod
    def forward_algorithm(observations,A,B,initial_distribution):
        alpha = np.zeros((observations.shape[0], A.shape[0]))
        alpha[0, :] = initial_distribution * B[:, observations[0]]

        for t in range(1, observations.shape[0]):
            for j in range(A.shape[0]):
                alpha[t, j] = alpha[t - 1].dot(A[:, j]) * B[j, observations[t]]

        return alpha
    
    @staticmethod
    def backward_algorithm(observations,A,B):
        beta = np.zeros((observations.shape[0], A.shape[0]))
        beta[observations.shape[0] - 1] = np.ones((A.shape[0]))

        for t in range(observations.shape[0] - 2, -1, -1):
            for j in range(A.shape[0]):
                beta[t, j] = (beta[t + 1] * B[:, observations[t + 1]]).dot(A[j, :])

        return beta

        
    
    def viterbi(self,observations):
        
        T = int(observations.shape[0])
        M = int(self.A.shape[0])

        gamma = np.zeros((T,M))
        psi = np.zeros((1,T))
        gamma[0,:] = self.initial_distribution * self.B[:, observations[0]]
        psi[0] = np.argmax(gamma[0,:])

        for i in range(1,T):
            gamma[i,:] = np.max(gamma[i-1,:])*self.A[np.argmax(gamma[i-1,:]),:]*self.B[:,observations[i]]
            psi[:,i] = np.argmax(gamma[i,:])
        
        psi = psi.astype(int)
            
        return gamma
    
    def baum_welch(self,observations,n_iter=1000):
        
        A = self.A
        B = self.B
        M = A.shape[0]
        T = len(observations)
        initial_distribution = self.initial_distribution

        for n in range(n_iter):
            
            alpha = Hidden_Markov_Model.forward_algorithm(observations,A,B,initial_distribution)
            beta = Hidden_Markov_Model.backward_algorithm(observations,A,B)
           
            Eta = np.zeros((M, M, T - 1))
            
            
            for t in range(T - 1):
                den = np.dot(np.dot(alpha[t, :].T, A) * B[:, observations[t + 1]].T, beta[t + 1, :])
                for i in range(M):
                    num = alpha[t, i] * A[i, :] * B[:, observations[t + 1]].T * beta[t + 1, :].T
                    Eta[i, :, t] = num / den

            gamma = np.sum(Eta, axis=1)
            A = np.sum(Eta, 2) / np.sum(gamma, axis=1).reshape((-1, 1))

            gamma = np.hstack((gamma, np.sum(Eta[:, :, T - 2], axis=0).reshape((-1, 1))))

            K = B.shape[1]
            den = np.sum(gamma, axis=1)
            for l in range(K):
                B[:, l] = np.sum(gamma[:, observations == l], axis=1)

            B = np.divide(B, den.reshape((-1, 1)))

        return A,B


In [3]:
class FDS:
    def __init__(self,n_states,n_observations,Observations):
        self.n_states = n_states
        self.n_observations = n_observations
        self.Observations = Observations
        self.clusters = self.Kmeans()[0]
        self.pi,self.A,self.B = self.probability_matrices()
        self.train_HMM()
        
    def Kmeans(self,value=None):
        kmeans = KMeans(n_clusters=self.n_observations,random_state=140)
        kmeans.fit(self.Observations.reshape(-1,1))
        if value is not None:
            v = kmeans.predict([[value]])[0]
        else:
            v = None
        
        return kmeans.labels_,v
        
        
    def probability_matrices(self):
        pi_prob = np.zeros(self.n_states)
        transition_prob = np.zeros((self.n_states,self.n_states))
        emission_prob = np.zeros((self.n_states,self.n_observations))

        for i in range(self.n_states):
            pi_prob[i] = 1 / self.n_states

        for i in range(self.n_states):
            for j in range(self.n_states):
                transition_prob[i][j] = 1 / self.n_states

        for i in range(self.n_states):
            for j in range(self.n_observations):
                emission_prob[i][j] = 1 / self.n_observations

        return pi_prob, transition_prob, emission_prob
    
    def train_HMM(self):
        Hmm = Hidden_Markov_Model(self.n_states,self.A,self.B,initial_distribution=self.pi)
        self.A,self.B = Hmm.baum_welch(self.clusters)
        self.pi = np.linalg.matrix_power(self.A,2000)[0]
        Hmm2 = Hidden_Markov_Model(self.n_states,self.A,self.B,initial_distribution=self.pi)
        self.alpha = Hmm2.forward_backward(self.clusters[-10:])[2]
    
    def test(self,value):
        self.clusters = self.Kmeans()[0]
        self.train_HMM()
        clusters = np.append(self.clusters[-9:],self.Kmeans(value=value)[1])
        #print(self.clusters,clusters)
        Hmm2 = Hidden_Markov_Model(self.n_states,self.A,self.B,initial_distribution=self.pi)
        alpha2 = Hmm2.forward_backward(clusters)[2]
        threshold = 0.60
        delta = (self.alpha-alpha2)/self.alpha
        if delta >= threshold:
            print("This Transaction is fraud")
        else:
            print("This Transaction is not fraud")
            self.Observations = np.append(self.Observations,value)
            #print(self.Observations)
        

### Transactions from a real credit card

In [4]:
Transactions = np.array([15999,975.3,19737,1028.25,599,2070,1152,149,729,591.56,50,599,499,3791.5,2070,1999,100,149,648])

In [5]:
fds = FDS(2,3,Transactions)

In [6]:
fds.test(718.7) # This should not be fraud

This Transaction is not fraud


In [7]:
fds.test(974) # This should not be fraud

This Transaction is not fraud


In [8]:
fds.test(455) # This should not be fraud

This Transaction is not fraud


In [9]:
fds.test(13000) # This should be fraud

This Transaction is fraud
