<h1> Application of HMM in Credit Card Fraud Transaction </h1> 

<h2>CLUSTERING</h2>

In [None]:
import numpy as np
from sklearn.cluster import AgglomerativeClustering as Linkage
from sklearn.cluster import DBSCAN
from sklearn.cluster import KMeans


# Kmeans clustering algorithm
class KMeansClustering:
    def __init__(self, n_clusters=3):
        self.n_clusters = n_clusters
        self.__model = KMeans(n_clusters=n_clusters, random_state=0)

    def run(self, data):
        print('Clustering ...')
        data = np.array([[x] for x in data])
        print('Clustering is finished.')
        return self.__model.fit(X=data).labels_

    def predict(self, sample):
        return self.__model.predict(X=[[sample]])


# DBSCAN clustering algorithm
class DBSCANClustering:
    def __init__(self, n_neighbors=5, eps=5):
        self.__model = DBSCAN(min_samples=n_neighbors, eps=eps)

    def run(self, data):
        print('Clustering ...')
        data = np.array([[x] for x in data])
        print('Clustering is finished.')
        return self.__model.fit(X=data).labels_ + 1

    def predict(self, sample):
        return self.__model.fit_predict(X=[[sample]]) + 1


# The single link method clustering
class SLINKClustering:
    def __init__(self, n_clusters=3):
        self.n_clusters = n_clusters
        self.__model = Linkage(linkage='single', n_clusters=n_clusters)

    def run(self, data):
        print('Clustering ...')
        data = np.array([[x] for x in data])
        print('Clustering is finished.')
        return self.__model.fit(X=data).labels_

    def predict(self, sample):
        x = [[sample]]
        for i in range(self.n_clusters):
            x.append([sample + i])
        return self.__model.fit_predict(X=x)[0]


# The complete link method clustering
class CLINKClustering:
    def __init__(self, n_clusters=3):
        self.n_clusters = n_clusters
        self.__model = Linkage(linkage='complete', n_clusters=n_clusters)

    def run(self, data):
        print('Clustering ...')
        data = np.array([[x] for x in data])
        print('Clustering is finished.')
        return self.__model.fit(X=data).labels_

    def predict(self, sample):
        x = [[sample]]
        for i in range(self.n_clusters):
            x.append([sample + i])
        return self.__model.fit_predict(X=x)[0]


# The average method clustering
class AVGClustering:
    def __init__(self, n_clusters=3):
        self.n_clusters = n_clusters
        self.__model = Linkage(linkage='average', n_clusters=n_clusters)

    def run(self, data):
        print('Clustering ...')
        data = np.array([[x] for x in data])
        print('Clustering is finished.')
        return self.__model.fit(X=data).labels_

    def predict(self, sample):
        x = [[sample]]
        for i in range(self.n_clusters):
            x.append([sample + i])
        return self.__model.fit_predict(X=x)[0]


<h2> hidden_markov_model </h2>

In [None]:
import numpy as np
from hidden_markov import hmm


# Divide function
def divide(num, denom):
    if num == 0:
        return 0

    return num / denom


# Hidden Markov Model
class HMM:
    def __init__(self, n_states, n_possible_observations):
        # Number of states
        self.n_states = n_states
        # Number of possible observations
        self.n_possible_observations = n_possible_observations
        # Create states and possible observations
        self.states, self.possible_observations = self.__init_names()
        # Create transition matrix, emission matrix and start probability matrix
        self.pi_prob, self.transition_prob, self.emission_prob = self.__init_probabilities()

        # Create model
        self.__model = hmm(states=list(self.states),
                           observations=list(self.possible_observations),
                           start_prob=np.matrix(self.pi_prob),
                           trans_prob=np.matrix(self.transition_prob),
                           em_prob=np.matrix(self.emission_prob))

    # Initialize states and possible observations
    def __init_names(self):
        states = np.array(range(self.n_states))
        possible_observations = np.array(range(self.n_possible_observations))
        return states, possible_observations

    # Initialize probability of transition matrix and emission matrix
    def __init_probabilities(self):
        pi_prob = np.zeros(self.n_states)
        transition_prob = np.zeros((self.n_states, self.n_states))
        emission_prob = np.zeros((self.n_states, self.n_possible_observations))

        for i in range(self.n_states):
            pi_prob[i] = 1 / self.n_states

        for i in range(self.n_states):
            for j in range(self.n_states):
                transition_prob[i][j] = 1 / self.n_states

        for i in range(self.n_states):
            for j in range(self.n_possible_observations):
                emission_prob[i][j] = 1 / self.n_possible_observations

        return pi_prob, transition_prob, emission_prob

    # Implement the Baum-Welch Algorithm for HMM
    def train_model(self, observations, steps):
        print('HMM is training ...')
        pi_prob = np.zeros(self.n_states)
        transition_prob = np.zeros((self.n_states, self.n_states))
        emission_prob = np.zeros((self.n_states, self.n_possible_observations))

        # Main loop for given steps
        for _ in range(steps):
            # Calculation of Forward-Backward variables from the current observations
            fwd = self.forward_process(observations)
            bwd = self.backward_process(observations)

            # Re-estimating of initial state probabilities
            for i in range(self.n_states):
                pi_prob[i] = self.calculate_gamma(i, 0, fwd, bwd)

            # Re-estimating of transition probabilities
            for i in range(self.n_states):
                for j in range(self.n_states):
                    num, denom = 0, 0
                    for t in range(len(observations)):
                        num += self.calculate_path_probability(t, i, j, observations, fwd, bwd)
                        denom += self.calculate_gamma(i, t, fwd, bwd)

                    transition_prob[i][j] = divide(num, denom)

            # Re-estimating of emission probabilities
            for i in range(self.n_states):
                for k in range(self.n_possible_observations):
                    num, denom = 0, 0
                    for t in range(len(observations)):
                        g = self.calculate_gamma(i, t, fwd, bwd)
                        if k == observations[t]:
                            num += g
                        denom += g

                    emission_prob[i][k] = divide(num, denom)

        self.pi_prob = pi_prob
        self.transition_prob = transition_prob
        self.emission_prob = emission_prob
        print('HMM has successfully trained.')

    # Forward algorithm
    # Calculate Forward-Variables fwd[i][t] for state i at time t for current observations
    def forward_process(self, observations):
        fwd = np.zeros((self.n_states, len(observations)))
        # Initialization at time = 0
        for i in range(self.n_states):
            fwd[i][0] = self.pi_prob[i] * self.emission_prob[i][observations[0]]

        # Induction
        for t in range(len(observations) - 1):
            for j in range(self.n_states):
                fwd[j][t + 1] = 0
                for i in range(self.n_states):
                    fwd[j][t + 1] += (fwd[i][t] * self.transition_prob[i][j])

                fwd[j][t + 1] = (fwd[j][t + 1] * self.emission_prob[j][observations[t + 1]])

        return fwd

    # Backward algorithm
    # Calculate Backward-Variables bwd[i][t] for state i at time t for current observations
    def backward_process(self, observations):
        bwd = np.zeros((self.n_states, len(observations)))
        # Initialization at time = 0
        for i in range(self.n_states):
            bwd[i][len(observations) - 1] = 1

        # Induction
        for t in range(len(observations) - 2, -1, -1):
            for i in range(self.n_states):
                bwd[i][t] = 0
                for j in range(self.n_states):
                    bwd[i][t] += (
                            bwd[j][t + 1] * self.transition_prob[i][j] * self.emission_prob[j][observations[t + 1]])

        return bwd

    # Calculate gamma[i][t]; expected count
    def calculate_gamma(self, cur_state, t, fwd, bwd):
        num = fwd[cur_state][t] * bwd[cur_state][t]
        denom = 0
        for i in range(self.n_states):
            denom += (fwd[i][t] * bwd[i][t])

        return divide(num, denom)

    # Calculate the probability of P(x_t = s_i, x_t+1 = s_j | observations).
    # t = current time
    # x_t = current state
    # x_t+1 = next state
    # s_i = i'th state
    # s_j = j'th state
    def calculate_path_probability(self, t, i, j, observations, fwd, bwd):
        num, denom = 0, 0
        if t == len(observations) - 1:
            num = fwd[i][t] * self.transition_prob[i][j]
        else:
            num = fwd[i][t] * self.transition_prob[i][j] * self.emission_prob[j][observations[t + 1]] * bwd[j][t + 1]

        for k in range(self.n_states):
            denom += (fwd[k][t] * bwd[k][t])

        return divide(num, denom)

    # Calculate the probability of the occurrence of specific observation
    def calculate_occurrence_probability(self, observations):
        fwd = self.forward_process(observations)
        bwd = self.backward_process(observations)
        result = np.zeros(len(observations))

        for i in range(len(observations)):
            for j in range(self.n_states):
                result[i] += fwd[j][i] * bwd[j][i]

        return result

    # Detect fraud
    def detect_fraud(self, observations, new_observation, threshold):
        print('Fraud evaluation ...')
        alpha_1 = self.calculate_occurrence_probability(observations)
        alpha_2 = self.calculate_occurrence_probability(new_observation)
        delta = alpha_1[0] - alpha_2[0]
        delta = delta / alpha_1[0]

        if delta > threshold:
            return True
        else:
            return False


<h2> DRIVER </h2>

In [None]:
import numpy as np
import pandas


class Driver:
    def __init__(self, path):
        self.path = path
        self._data = np.array(pandas.read_csv(filepath_or_buffer=path, sep=' ', header=None)).flatten()
        print('Dataset loaded completely.')

    def get_data(self):
        return self._data

    def generate_test_data(self, sample):
        return np.concatenate((self._data[1:], [sample]))


<h2> MAIN </h2>

In [None]:
THRESHOLD = 0.9
STATES = 3
CLUSTERS = 3
STEPS = 200
TEST_RANGE = 10000
TERMINATE = -1

In [None]:
import numpy as np

from clustering import KMeansClustering
from driver import Driver
from hidden_markov_model import HMM

from config import *


def get_input():
    while True:
        new_transaction = input('Please add your new transaction : ')
        if float(new_transaction) == TERMINATE:
            break
        new_transaction = k.predict(float(new_transaction))
        new_observation = np.append(observations[1:], [new_transaction])

        if h.detect_fraud(observations, new_observation, THRESHOLD):
            print('Fraud')
        else:
            print('Normal')


if __name__ == '__main__':
    d = Driver('C:/Users/LENOVO/OneDrive - De La Salle University - Manila/DLSU/3/Bayesian/Source/data.txt')

    h = HMM(n_states=STATES, n_possible_observations=CLUSTERS)
    k = KMeansClustering()

    observations = k.run(d.get_data()[0:192])
    h.train_model(observations=list(observations), steps=STEPS)

    get_input()