# Task 6: Spam classification

Move the original code from nb.py first

In [1]:
import numpy as np

- Preprocess the data

In [2]:
def readMatrix(file):
    fd = open(file, 'r')
    hdr = fd.readline()
    rows, cols = [int(s) for s in fd.readline().strip().split()]
    tokens = fd.readline().strip().split()
    matrix = np.zeros((rows, cols))
    Y = []
    for i, line in enumerate(fd):
        nums = [int(x) for x in line.strip().split()]
        Y.append(nums[0])
        kv = np.array(nums[1:])
        k = np.cumsum(kv[:-1:2])
        v = kv[1::2]
        matrix[i, k] = v
    return matrix, tokens, np.array(Y)

trainMatrix, tokenlist, trainCategory = readMatrix('MATRIX.TRAIN')
testMatrix, tokenlist, testCategory = readMatrix('MATRIX.TEST')

print(trainCategory)

[1 0 1 ... 0 0 1]


In [3]:
def nb_train(matrix, category):
    state = {}
    N = matrix.shape[1]
    num_classes = len(np.unique(category)) # In this case, num_classes should be 2

    # Calculate class probabilities
    state['class_probs'] = np.bincount(category) / len(category) # p(y)

    # Calculate token probabilities for each class
    state['token_probs'] = np.zeros((num_classes, N)) 
    state['token_counts'] = np.zeros((num_classes, N))

    for c in range(num_classes):
        docs_in_class = matrix[category == c]
        total_tokens_in_class = np.sum(docs_in_class)
        state['token_counts'][c] = np.sum(docs_in_class, axis=0)
        state['token_probs'][c] = (state['token_counts'][c] + 1) / (total_tokens_in_class + N)

    return state

state = nb_train(trainMatrix, trainCategory)
print(state)

{'class_probs': array([0.50093284, 0.49906716]), 'token_probs': array([[2.27032946e-04, 9.34841544e-04, 1.60258550e-04, ...,
        2.67097584e-05, 5.74259806e-04, 1.86968309e-04],
       [2.70691570e-04, 8.21408903e-04, 1.21344497e-04, ...,
        3.92036067e-04, 2.89359954e-03, 1.40012881e-04]]), 'token_counts': array([[ 16.,  69.,  11., ...,   1.,  42.,  13.],
       [ 28.,  87.,  12., ...,  41., 309.,  14.]])}


In [4]:
def nb_test(matrix, state):
    num_classes, N = state['token_probs'].shape
    num_docs = matrix.shape[0]
    output = np.zeros(num_docs)

    for i in range(num_docs):
        doc = matrix[i]
        scores = np.zeros(num_classes)

        for c in range(num_classes):
            scores[c] = np.log(state['class_probs'][c])

            for j in range(N):
                if doc[j] > 0:
                    scores[c] += doc[j] * np.log(state['token_probs'][c, j])

        output[i] = np.argmax(scores)

    return output

def evaluate(output, label):
    error = np.mean(output != label)
    print('Error: %1.4f' % error)

output = nb_test(testMatrix, state)

evaluate(output, testCategory)

Error: 0.0163


In [5]:
def find_most_indicative_tokens(state, tokenlist, num_tokens=5):
    log_ratios = np.log(state['token_probs'][1] / state['token_probs'][0])
    sorted_indices = np.argsort(log_ratios)[::-1]  # Sort in descending order

    most_indicative_tokens = [tokenlist[i] for i in sorted_indices[:num_tokens]]
    return most_indicative_tokens

most_indicative_tokens = find_most_indicative_tokens(state, tokenlist)
print("Most indicative tokens for SPAM class:")
for i, token in enumerate(most_indicative_tokens):
    print(f"{i+1}. {token}")

Most indicative tokens for SPAM class:
1. httpaddr
2. spam
3. unsubscrib
4. ebai
5. valet


- Change the training size then plot the accuracy

In [6]:
def size_test():
    testMatrix, tokenlist, testCategory = readMatrix('MATRIX.TEST')

    training_sizes = [50, 100, 200, 400, 800, 1400]
    test_errors = []

    for size in training_sizes:
        trainMatrix, _, trainCategory = readMatrix(f'MATRIX.TRAIN.{size}')
        state = nb_train(trainMatrix, trainCategory)
        output = nb_test(testMatrix, state)
        error = np.mean(output != testCategory)
        test_errors.append(error)

    for size, error in zip(training_sizes, test_errors):
        print(f"Training size: {size}, Test Error: {error:.4f}")

    best_size = training_sizes[np.argmin(test_errors)]
    print(f"Best training size: {best_size}, Best Test Error: {min(test_errors):.4f}")


size_test()

Training size: 50, Test Error: 0.0387
Training size: 100, Test Error: 0.0262
Training size: 200, Test Error: 0.0262
Training size: 400, Test Error: 0.0187
Training size: 800, Test Error: 0.0175
Training size: 1400, Test Error: 0.0163
Best training size: 1400, Best Test Error: 0.0163


- Training the same data with SVM with gaussian Kernel

In [9]:
tau = 8.

def readMatrix(file):
    fd = open(file, 'r')
    hdr = fd.readline()
    rows, cols = [int(s) for s in fd.readline().strip().split()]
    tokens = fd.readline().strip().split()
    matrix = np.zeros((rows, cols))
    Y = []
    for i, line in enumerate(fd):
        nums = [int(x) for x in line.strip().split()]
        Y.append(nums[0])
        kv = np.array(nums[1:])
        k = np.cumsum(kv[:-1:2])
        v = kv[1::2]
        matrix[i, k] = v
    category = (np.array(Y) * 2) - 1
    return matrix, tokens, category

def svm_train(matrix, category):
    state = {}
    M, N = matrix.shape
    #####################
    Y = category
    matrix = 1. * (matrix > 0)
    squared = np.sum(matrix * matrix, axis=1)
    gram = matrix.dot(matrix.T)
    K = np.exp(-(squared.reshape((1, -1)) + squared.reshape((-1, 1)) - 2 * gram) / (2 * (tau ** 2)) )

    alpha = np.zeros(M)
    alpha_avg = np.zeros(M)
    L = 1. / (64 * M)
    outer_loops = 40

    alpha_avg
    for ii in range(outer_loops * M):
        i = int(np.random.rand() * M)
        margin = Y[i] * np.dot(K[i, :], alpha)
        grad = M * L * K[:, i] * alpha[i]
        if (margin < 1):
            grad -=  Y[i] * K[:, i]
        alpha -=  grad / np.sqrt(ii + 1)
        alpha_avg += alpha

    alpha_avg /= (ii + 1) * M

    state['alpha'] = alpha
    state['alpha_avg'] = alpha_avg
    state['Xtrain'] = matrix
    state['Sqtrain'] = squared
    ####################
    return state

def svm_test(matrix, state):
    M, N = matrix.shape
    output = np.zeros(M)
    ###################
    Xtrain = state['Xtrain']
    Sqtrain = state['Sqtrain']
    matrix = 1. * (matrix > 0)
    squared = np.sum(matrix * matrix, axis=1)
    gram = matrix.dot(Xtrain.T)
    K = np.exp(-(squared.reshape((-1, 1)) + Sqtrain.reshape((1, -1)) - 2 * gram) / (2 * (tau ** 2)))
    alpha_avg = state['alpha_avg']
    preds = K.dot(alpha_avg)
    output = np.sign(preds)
    ###################
    return output

def evaluate(output, label):
    error = (output != label).sum() * 1. / len(output)
    return error

def size_test_svm():
    testMatrix, tokenlist, testCategory = readMatrix('MATRIX.TEST')

    training_sizes = [50, 100, 200, 400, 800, 1400]
    test_errors = []

    for size in training_sizes:
        trainMatrix, _, trainCategory = readMatrix(f'MATRIX.TRAIN.{size}')
        state = svm_train(trainMatrix, trainCategory)
        output = svm_test(testMatrix, state)
        error = evaluate(output, testCategory)
        test_errors.append(error)

    for i in range(len(training_sizes)):
        print(f"Training size: {training_sizes[i]}, Test Error: {test_errors[i]}")

    best_size = training_sizes[np.argmin(test_errors)]
    print(f"Best training size: {best_size}, Best Test Error: {min(test_errors):.4f}")


size_test_svm()

# trainMatrix, tokenlist, trainCategory = readMatrix('MATRIX.TRAIN.400')
# testMatrix, tokenlist, testCategory = readMatrix('MATRIX.TEST')

# state = svm_train(trainMatrix, trainCategory)
# output = svm_test(testMatrix, state)

# evaluate(output, testCategory)


Training size: 50, Test Error: 0.02625
Training size: 100, Test Error: 0.005
Training size: 200, Test Error: 0.0025
Training size: 400, Test Error: 0.00125
Training size: 800, Test Error: 0.0
Training size: 1400, Test Error: 0.0
Best training size: 800, Best Test Error: 0.0000
