# Document classification

Your task is to classify documents into the following 7 categories:

* `AA`: Amazon Appliances
* `AB`: Amazon Books
* `BC`: Broadcasting Conversations
* `BN`: Broadcasting News
* `CT`: Conversation Texts
* `NW`: Newswires
* `WB`: Web Blogs

Three files are provided:

* [`docs.trn.tsv`](res/docs.trn.tsv): training set
* [`docs.dev.tsv`](res/docs.dev.tsv): development set
* [`docs.tst.tsv`](res/docs.tst.tsv): test set

## 1. Reading

In [1]:
from typing import List, Tuple

def read_docs(filename: str) -> List[Tuple[str, List[str]]]:
    fin = open(filename)
    docs = []
    
    for line in fin:
        l = line.split('\t')
        label = l[0]
        doc = l[1].split()
        docs.append((label, doc))
    
    return docs

In [2]:
trn_docs = read_docs('res/docs.trn.tsv')
dev_docs = read_docs('res/docs.dev.tsv')
tst_docs = read_docs('res/docs.tst.tsv')

In [3]:
print('TRN: %d, DEV: %d, TST: %d' % (len(trn_docs), len(dev_docs), len(tst_docs)))

TRN: 2355, DEV: 297, TST: 297


## 2. Indexing

In [4]:
from typing import Dict

def index_maps(docs: List[Tuple[str, List[str]]]) -> Tuple[Dict[str, int], Dict[str, int]]:
    map_x = {}
    map_y = {}
    
    for label, doc in docs:
        map_y.setdefault(label, len(map_y))
        
        for token in doc:
            map_x.setdefault(token, len(map_x))
    
    return map_y, map_x

In [5]:
map_y, map_x = index_maps(trn_docs)

In [6]:
print('labels: %d, token types: %d' % (len(map_y), len(map_x)))

labels: 7, token types: 38741


## 3. Vectorizing

In [7]:
def vec_yxs(docs: List[Tuple[str, List[str]]], map_y: Dict[str, int], map_x: Dict[str, int]) -> List[Tuple[int, List[Tuple[int, int]]]]:
    yxs = []
    
    for label, doc in docs:
        y = map_y.get(label, -1)
        x = [(map_x[token], 1) for token in doc if token in map_x]
        x.sort()
        yxs.append((y, x))

    return yxs

In [8]:
trn_yxs = vec_yxs(trn_docs, map_y, map_x)
dev_yxs = vec_yxs(dev_docs, map_y, map_x)
tst_yxs = vec_yxs(tst_docs, map_y, map_x)

## 4. Learning

In [9]:
import numpy as np

def score(w: np.array, x: List[Tuple[int, int]]):
    return sum([w[i] * v for i, v in x])

def update(w: np.array, x: List[Tuple[int, int]], gradient: float):
    for i, v in x:
        w[i] += gradient * v

In [10]:
def train(yxs: List[Tuple[int, List[Tuple[int, int]]]], ws: List[np.array], learning_rate):
    for y, x in yxs:
        yhat = np.argmax([score(w, x) for w in ws])
        if y != yhat:
            for i, w in enumerate(ws):
                sign = 1 if i == y else -1
                update(w, x, sign * learning_rate)

In [11]:
def evaluate(yxs: List[Tuple[int, List[Tuple[int, int]]]], ws: List[np.array]):
    correct = 0
    
    for y, x in yxs:
        yhat = np.argmax([score(w, x) for w in ws])
        if y == yhat:
            correct += 1

    return correct

In [12]:
ws = [np.zeros(len(map_x)) for _ in range(len(map_y))]
learning_rate = 0.01
best_correct = 0
best_ws = None
epochs = 50

for epoch in range(epochs):
    train(trn_yxs, ws, learning_rate)
    correct = evaluate(dev_yxs, ws)
    if correct > best_correct:
        best_correct = correct
        best_ws = [np.array(w) for w in ws]
    
    print('%4d: %5.2f (%d/%d)' % (epoch, 100.0*correct/len(dev_yxs), correct, len(dev_yxs)))

print('==========')
print('Best: %5.2f (%d/%d)' % (100.0*best_correct/len(dev_yxs), best_correct, len(dev_yxs)))

   0: 13.13 (39/297)
   1: 18.52 (55/297)
   2: 22.22 (66/297)


KeyboardInterrupt: 

In [None]:
correct = evaluate(dev_yxs, best_ws)
print('DEV: %5.2f (%d/%d)' % (100.0*correct/len(dev_yxs), correct, len(dev_yxs)))

In [None]:
correct = evaluate(tst_yxs, best_ws)
print('TST: %5.2f (%d/%d)' % (100.0*correct/len(tst_yxs), correct, len(tst_yxs)))

## K-nearest neighbor 

trn_yxs
dev_yxs
tst_yxs

In [20]:
import math

def sim(vec1: List[Tuple[int, float]], vec2: List[Tuple[int, float]]):
    num = sum([v1 * vec2[k1] for k1, v1 in vec1 if k1 in vec2])
    den1 = sum(v**2 for _, v in vec1)
    den2 = sum(v**2 for _, v in vec2)
    return num / (math.sqrt(den1) * math.sqrt(den2))

In [21]:
def compare(trn_yxs: List[Tuple[int, List[Tuple[int, float]]]], vec: List[Tuple[int, float]]):
    scores = [0] * 7
    for y, x in trn_yxs:
        scores[y] += sim(x, vec)
    return scores

In [22]:
def predict(trn_yxs: List[Tuple[int, List[Tuple[int, float]]]], dev_yxs: List[Tuple[int, List[Tuple[int, float]]]]):
    correct = 0
    for y, x in dev_yxs:
        scores = np.array(compare(trn_yxs, x))
        yhat = np.argmax(scores)
        if y == yhat: correct += 1
    
    return correct

In [23]:
correct = predict(trn_yxs, dev_yxs)

KeyboardInterrupt: 

In [None]:
def predict(trn_yxs: List[Tuple[int, List[Tuple[int, float]]]], dev_yxs: List[Tuple[int, List[Tuple[int, float]]]]):
    y_sim = []
    for i, (y, x) in enumerate(dev_yxs):
        y_sim.append(i, sim(trn_yxs, x))
    
    y_sim.sort(key=lambda x: x[1])
    
    n = 10
    l = [0] * 7
    for k, v in y_sim[:n]:
        l[k] += 1
    
    np.argmax(l)
        
    
    return correct