In [1]:
import os
import math

In [2]:
"""
create and return a vocabulary as a list of word types 
with counts >= cutoff in the training directory
@param training_directory - the path to the training directory
@param cuttoff - the cutoff in the vocabulary
@return the vocabulary in the form of a dictionary
"""
def create_vocabulary(training_directory, cutoff):
    vocab = dict()
    
    # Get every file in the directory
    sub_dir = os.listdir(training_directory)

    # Go through each branch of the main directory
    for sub in sub_dir:
        # Join the path of the directory with the current folder
        sub = os.path.join(training_directory, sub)
        # Get all the training set in that sub directory
        training_set = os.listdir(sub)
        # Go through every training set to add new words count the frequency
        for file in training_set:
            # Join the path to access the file
            with open(os.path.join(sub, file), 'r', errors='ignore') as f:
                for i in f:
                    # Get the word in each line
                    i = i.split('\n')
                    i = i[0]
                    
                    # If it is already in the vocabulary, increase the frequency
                    if i in vocab:
                        vocab[i] += 1
                    # Add that word in otherwise
                    else:
                        vocab[i] = 1
    
    # Remove words have frequency lower than the cutoff if the cutoff is not 1
    if cutoff>1:
        key = list(vocab.keys())
        for word in key:
            if vocab[word]<cutoff:
                del vocab[word]

    # Create and return the vocab
    vocab = list(vocab.keys())
    vocab = sorted(vocab)
    return vocab

In [3]:
"""
create and return a bag of words Python dictionary from a single document
@param vocab - the vocabulary
@param filepath - the path to the document
@return a bag of word in the form of a dictionary
"""
def create_bow(vocab, filepath):
    bow = dict()
    # Initialize an element to store the frequency of OOV
    bow[None] = 0
    
    # Go through the file
    with open(filepath, 'r', errors='ignore') as f:
        for i in f:
            # Get the word in each line
            i = i.split('\n')
            i = i[0]
            
            # Categorize the word or increment the count
            if i in bow:
                bow[i] += 1
            elif i in vocab:
                bow[i] = 1
            # If the word is not in the vocab, increment the OOV count
            else:
                bow[None] += 1
                
    return bow

In [4]:
"""
create and return training set (bag of words Python dictionary + label) 
from the files in a training directory
@param vocab - the vocabulary
@param directory - the directory of the training data
"""
def load_training_data(vocab, directory):
    training_data = []
    # Get all the sub training directory
    sub_dir = os.listdir(directory)

    # Go through each sub file
    for sub in sub_dir:
        # Get the label of the training set
        label = str(sub);
        # Get the path to the sub
        sub = os.path.join(directory, sub)
        # Get all training set
        training_set = os.listdir(sub)
        # Append the training data
        for file in training_set:
            training_data.append({'label':label, 'bow':create_bow(vocab,os.path.join(sub, file))})
    
    return training_data

In [5]:
"""
given a training set, estimate and return 
the prior probability p(label) of each label
@param training_data - the processed training data
@param label_list - the list of label for prediction
@return the prior probability
"""
def prior(training_data, label_list):
    prob = dict()
    
    # Initialize the probability for each label
    for i in label_list:
        prob[i]=0
    
    # Get the count for each label
    for i in training_data:
        label = str(i['label'])
        if label in prob:
            prob[label] += 1
    
    # Divide by the total number of data and take the log
    for i in prob:
        prob[i] /= len(training_data)
        prob[i] = math.log(prob[i])
        
    return prob

In [6]:
"""
given a training set and a vocabulary, estimate and return 
the class conditional distribution P(word|label) over all words
@param vocab - the vocabulary
@param training_data - the processed training data
@param label - the label needs predicting the probability
@return the probability of words for each label
"""
def p_word_given_label(vocab, training_data, label):
    # Initialize the count for each word in the vocab
    p_word = dict(zip(vocab, [0]*len(vocab)))
    # Initialize the count for OOV
    p_word[None] = 0
    # Smoothing by initialize the sum equal the total of words in the vocab plus OOV
    sum_word = len(vocab)+1
    
    # Count the frequency of each word in a particular label
    for i in training_data:
        if i['label']==label:
            for j in i['bow']:
                count = i['bow'][j]
                if j in p_word:
                    p_word[j]+=count
                # If the word is not in the vocab, increment OOV count
                else:
                    p_word[None]+=count
                # Add the word count to the total number of words
                sum_word += count
    
    # Calculate P(word|label) with smoothing
    for i in p_word:
        p_word[i] = math.log((p_word[i]+1)/sum_word)
    
    return p_word

In [7]:
"""
loads the training data, estimates the prior distribution P(label)
and class conditional distributions P(word|label), return the trained model
@param training_directory - the path to the training directory
@param cutoff - the cut off to create the vocabulary
@return the trained model
"""
def train(training_directory, cutoff):
    # Create the vocabulary for the training
    vocab = create_vocabulary(training_directory, cutoff)
    
    # Load the training data
    training_data = load_training_data(vocab, training_directory)
    
    # Calculate the log prior
    log_prior = prior(training_data, ['2016','2020'])
    
    # Calculate the P(w|label) for each label
    p_w_2016 = p_word_given_label(vocab, training_data, '2016')
    p_w_2020 = p_word_given_label(vocab, training_data, '2020')
    
    return {'vocabulary': vocab,
           'log prior': log_prior,
           'log p(w|2016)': p_w_2016,
           'log p(w|2020)': p_w_2020}

In [9]:
"""
given a trained model, predict the label for the test document
@param model - the trained model
@param filepath - the path to the file needs classifying
@return the prediction and the probability for each label
"""
def classify(model, filepath):
    # Create a bow for the file
    bow = create_bow(model['vocabulary'], filepath)
    
    # Initialize the probability f(label) and the prediction y
    f_2016 = model['log prior']['2016']
    f_2020 = model['log prior']['2020']
    y = ''
    
    # Calculate the probability
    for i in bow:
        f_2016 += model['log p(w|2016)'][i]*bow[i]
        f_2020 += model['log p(w|2020)'][i]*bow[i]
        
    # Predict based on the calculated probability
    if f_2016>f_2020:
        y = '2016'
    else:
        y = '2020'
    
    return {'predicted y': y, 'log p(y=2016|x)': f_2016, 'log p(y=2020|x)': f_2020}

In [11]:
train('./EasyFiles/', 2)

{'vocabulary': ['.', 'a'],
 'log prior': {'2016': -0.40546510810816444, '2020': -1.0986122886681098},
 'log p(w|2016)': {'.': -1.7047480922384253,
  'a': -1.2992829841302609,
  None: -0.6061358035703156},
 'log p(w|2020)': {'.': -1.6094379124341003,
  'a': -2.3025850929940455,
  None: -0.35667494393873245}}

In [12]:
model = train('./corpus/training/', 2)
classify(model, './corpus/test/2016/0.txt')

{'predicted y': '2020',
 'log p(y=2016|x)': -3916.464890789782,
 'log p(y=2020|x)': -3906.349624562405}