In [None]:
import pandas as pd
import numpy as np
from collections import Counter
from tqdm import tqdm
from sklearn.model_selection import KFold

## Load data and create transition, emission and start probabilities matrix

In [None]:
def insert_dict(transictions_dict, feature, adjacent_feature):
    """
    Count transition to each feature
    """
    if feature in transictions_dict:
        if adjacent_feature in transictions_dict[feature]:
            transictions_dict[feature][adjacent_feature] += 1
        else:
            transictions_dict[feature][adjacent_feature] = 1
    else:
        transictions_dict[feature] = {adjacent_feature:1}
    
    return transictions_dict

In [None]:
def create_transition_and_emission_matrix(train_sentences):
    """
    Generate transition and emission probabilities
    matrix using a list of sentences with features
    """
    token_dict = {}
    feature_list = []
    transitions_count = {}
    hapax_words = {}
    for line in train_sentences:
        raw_tokens_list = line.split()
        list_size = len(raw_tokens_list)
        sentence_list = []
        # Counting features to create emission probabilities
        for index, raw_token in enumerate(raw_tokens_list):
            splitted_token = raw_token.split('_')
            token = splitted_token[0]
            feature = splitted_token[1]
            feature_list.append(feature)

            # Creating sequence of features
            if index==0:
                sentence_list.append('start_token')
            sentence_list.append(feature)
            if index==(list_size-1):
                # if it is last item
                sentence_list.append('end_token')

            # Create hapax word list with 0 if it reapeat
            # or the tag if it doesn't
            if token in hapax_words:
                hapax_words[token] = 0
            else:
                hapax_words[token] = feature

            if token in token_dict:
                if feature in token_dict[token]:
                    token_dict[token][feature] +=1
                else:
                    token_dict[token][feature] =1
            else:
                token_dict[token] = {feature : 1}
        
        # Counting transitions to each feature to calculate transition probabilities
        for idx, feature in enumerate(sentence_list):
            if feature=='end_token':
                break
            transitions_count = insert_dict(transitions_count, feature, sentence_list[idx+1])

    # Sorted
    token_dict = dict(sorted(token_dict.items()))
    token_matrix = pd.DataFrame(token_dict.values(),index=token_dict.keys()).fillna(0)
    # Create emission probabilities
    emission_matrix_probabilities =  token_matrix/token_matrix.sum()

    transition_matrix = pd.DataFrame(transitions_count.values(), index=transitions_count.keys()).fillna(0)
    # Create transition probabilities
    for index, row in transition_matrix.iterrows():
        transition_matrix.loc[index] = transition_matrix.loc[index]/transition_matrix.loc[index].sum()
    transition_matrix_probabilities = transition_matrix.copy()
    transition_matrix_probabilities.drop(columns='end_token', inplace=True)

    start_probabilities = transition_matrix_probabilities.loc['start_token'].sort_index().copy()
    # To make a sorted matrix with diagonal being transition for its own feature
    transition_matrix_probabilities = transition_matrix_probabilities.drop('start_token').sort_index().T.sort_index().T
    emission_matrix_probabilities = emission_matrix_probabilities.T.sort_index()

    # DEALING WITH UNKNOWN WORDS:
    # Create hapax Series with unique words
    hapax_series = pd.Series(hapax_words)
    hapax_series = hapax_series[hapax_series!=0]
    
    # Count tags present on unique words
    hapax_legomena_count = {}
    for token in hapax_series.keys():
        tag = hapax_series[token]
        if tag in hapax_legomena_count:
            hapax_legomena_count[tag] += 1
        else:
            hapax_legomena_count[tag] = 1
    
    # Crete hepax distribution
    hapax_legomena_series = pd.Series(hapax_legomena_count)
    hapax_legomena_distribution = hapax_legomena_series/hapax_legomena_series.sum()

    
    # Set unknown token to have this distribution
    emission_matrix_probabilities['unknown_token'] = hapax_legomena_distribution
    emission_matrix_probabilities['unknown_token'].fillna(0, inplace=True)

    # Normalize probabilities
    emission_matrix_probabilities = (emission_matrix_probabilities.T/emission_matrix_probabilities.T.sum()).T
    
    return emission_matrix_probabilities , transition_matrix_probabilities, start_probabilities

## Viterbi

In [None]:
class Decoder(object):
    '''
    The Decoder class implements the Viterbi algorithm
    Parameters
    ----------
      initialProb: np.array Tx1
      The initial probability $P(t_i)$
      transProb: np.array NxN
      The transition matrix $P(t_i|t_{i-1})$
      obsProb: np.array NxT
      The emission matrix $P(w_i|t_i)$
    Attributes
    ----------
        N : int
        The number of states (tags in POS-Tagging)
        initialProb:
        A priori probability of stats ($P(t_i)$ in POST)
        transProb:
        Transition matrix ($P(t_i|t{i-1})$ in POST)
        obsProb:
        Emission matrix ($P(w_i|t_i)$ in POST)
    '''


    def __init__(self, initialProb, transProb, obsProb):
        self.N = initialProb.shape[0]
        self.initialProb = initialProb
        self.transProb = transProb
        self.obsProb = obsProb
        assert self.initialProb.shape == (self.N, 1)
        assert self.transProb.shape == (self.N, self.N)
        assert self.obsProb.shape[0] == self.N # no control over 2nd dimension

    def Obs(self, obs):
        return self.obsProb[:, obs, None]

    def Decode(self, obs):
        '''
        This is the Viterbi algorithm
        Parameters
        ----------
        obs : list
            DESCRIPTION.
        Returns
        -------
        list
            List of states
        '''
        trellis = np.zeros((self.N, len(obs)))
        backpt = np.ones((self.N, len(obs)), 'int32') * -1

        # initialization
        trellis[:, 0] = np.squeeze(self.initialProb * self.Obs(obs[0]))

        # steps
        for t in range(1, len(obs)):
            trellis[:, t] = (trellis[:, t-1, None].dot(self.Obs(obs[t]).T) *
                             self.transProb).max(0)
            backpt[:, t] = (np.tile(trellis[:, t-1, None], [1, self.N]) *
                            self.transProb).argmax(0)

        # termination
        tokens = [trellis[:, -1].argmax()]
        for i in range(len(obs)-1, 0, -1):
            tokens.append(backpt[tokens[-1], i])

        return tokens[::-1]


## Measure accuracy of the model

In [None]:
def accuracy_on_test(test_sentences, viterbi_obj, emission_matrix_probabilities, transition_matrix_probabilities):
    """
    Check accuracy on total features and with each feature 
    individually on test setences
    """
    feature_corrected_predicted = {}
    total_accuracy = 0
    features_total_frequency = Counter()
    for line in tqdm(test_sentences):
        token_list = []
        feature_list = []
        raw_tokens_list = line.split()
        # Split for token and feature to check predictions
        for raw_token in raw_tokens_list:
            splitted_token = raw_token.split('_')
            token = splitted_token[0]
            token_list.append(token)
            feature = splitted_token[1]
            feature_list.append(feature)
        word_positions = []
        for token in token_list:
            try:
                # Get position of token
                position = emission_matrix_probabilities.columns.get_loc(token)
            except:
                # if word not in the emission probabilities
                # get unknown_token position with hapax
                # legomena tag distribution
                position = emission_matrix_probabilities.columns.get_loc('unknown_token')
            word_positions.append(position)
        data = word_positions
        # solve for sentence
        path = viterbi_obj.Decode(data)
        predicted_feature_list = []
        # Transform in feature name
        for state in path:
            lexical_name = transition_matrix_probabilities.index[state]
            predicted_feature_list.append(lexical_name)
        # Check correct predicted
        total_correct_predicted = 0
        for feature, real_feature in zip(predicted_feature_list, feature_list):
            # Check for each feature
            if feature == real_feature:
                if feature in feature_corrected_predicted:
                    feature_corrected_predicted[feature] +=1
                else: 
                    feature_corrected_predicted[feature] =1
                # Total accuracy
                total_correct_predicted+=1
            else:
                if real_feature not in feature_corrected_predicted:
                    feature_corrected_predicted[real_feature] = 0

        total_correct_predicted /= len(feature_list)
        total_accuracy += total_correct_predicted
        # Save feature frequency of this sentence
        features_frequency = Counter(feature_list)
        features_total_frequency = features_total_frequency + features_frequency

    total_accuracy /= len(test_sentences)

    for feature, frequency in features_total_frequency.items():
        feature_corrected_predicted[feature] /= frequency

    feature_accuracy = feature_corrected_predicted
    return total_accuracy , feature_accuracy

## Using Crossvalidation

In [None]:
kf = KFold(n_splits=10, shuffle=True, random_state=1)

In [None]:
with open('corpus100.txt', 'r') as f:
    token_dict = {}
    feature_list = []
    file_line_list = []
    transitions_count = {}
    for line in f:
        file_line_list.append(line)
        
sentences_array = np.array(file_line_list)

In [None]:
total_folds_acc = 0
total_feature_acc_list = []
for train_idx, test_idx in kf.split(sentences_array):
    train_sentences = sentences_array[train_idx]
    test_sentences = sentences_array[test_idx]
    emis_matrix_prob , trans_matrix_prob, start_prob = create_transition_and_emission_matrix(train_sentences)

    start_prob = np.array([start_prob.to_numpy()]).T
    trans_prob = trans_matrix_prob.to_numpy()
    emis_prob = emis_matrix_prob.to_numpy()

    d = Decoder(start_prob, trans_prob, emis_prob)

    total_acc, feature_acc = accuracy_on_test(test_sentences, d, emis_matrix_prob, trans_matrix_prob)
    
    total_folds_acc += total_acc
    total_feature_acc_list.append(feature_acc)

100%|██████████| 479/479 [00:01<00:00, 307.33it/s]
100%|██████████| 479/479 [00:01<00:00, 313.55it/s]
100%|██████████| 479/479 [00:01<00:00, 311.11it/s]
100%|██████████| 479/479 [00:01<00:00, 316.13it/s]
100%|██████████| 479/479 [00:01<00:00, 286.42it/s]
100%|██████████| 479/479 [00:01<00:00, 319.05it/s]
100%|██████████| 479/479 [00:01<00:00, 298.77it/s]
100%|██████████| 479/479 [00:01<00:00, 318.15it/s]
100%|██████████| 479/479 [00:01<00:00, 298.20it/s]
100%|██████████| 479/479 [00:01<00:00, 311.83it/s]


In [None]:
total_folds_acc /= 10
total_folds_acc

0.8745718975908963

In [None]:
total_feature_acc = total_feature_acc_list[0]

for feature_acc in total_feature_acc_list[1:]:
    for feature in feature_acc:
        if feature in total_feature_acc:
            total_feature_acc[feature] += feature_acc[feature]
        else:
            total_feature_acc[feature] = feature_acc[feature]

# 10 fold normalization
for feature in total_feature_acc:
    total_feature_acc[feature] /= 10

In [None]:
results = {}
results['Total'] = total_folds_acc*100
for feature in sorted(total_feature_acc):
    percentage = total_feature_acc[feature]*100
    results[feature] = percentage

## Convert tags to category and save results

In [None]:
tag_category = {'Total': 'Total do sistema',
'ADJ' : 'Adjetivo',
'ADV' : 'Advérbio',
'ART' : 'Artigo',
'NUME' : 'Numeral',
'N' : 'Substantivo comum',
'NP' : 'Substantivo próprio',
'CONJ' : 'Conjunção',
'PRON' : 'Pronome',
'PREP' : 'Preposição',
'VERB' : 'Verbo',
'I' : 'Interjeição',
'LOCU' : 'Locução',
'PDEN' : 'Palavra Denotativa',
'PREP+ART' : 'Contração',
'PREP+PREP' : 'Contração',
'PREP+PD' : 'Contração',
'PREP+PPR' : 'Contração',
'PREP+PPOT' : 'Contração',
'PREP+ADJ' : 'Contração',
'PREP+N' : 'Contração',
'PREP+PPOA' : 'Contração',
'PREP+ADV' : 'Contração',
'PPOA+PPOA' : 'Contração',
'ADV+PPR' : 'Contração',
'ADV+PPOA' : 'Contração',
'ADJ+PPOA' : 'Contração',
'RES' : 'Residuais',
'.' : 'Pontuação',
':' : 'Pontuação',
';' : 'Pontuação',
'-' : 'Pontuação',
'(' : 'Pontuação',
'!' : 'Pontuação',
'?' : 'Pontuação',
'...' : 'Pontuação',
')' : 'Pontuação',
'"' : 'Pontuação',
'{' : 'Pontuação',
'}' : 'Pontuação',
',' : 'Pontuação',
'\'' : 'Pontuação'}


In [None]:
category_result = {}
for tag in results:
    category = tag_category.get(tag, None)
    if category is None:
        category='Outras Tags'
    if category in category_result:
        category_result[category].append({tag:results[tag]})
    else:
        category_result[category]= [{tag:results[tag]}]


In [None]:
result_file = open('result.txt','w')
for category in category_result:
    category_size = len(category_result[category])
    category_acc = 0
    result_file.write(f"Taxa de acerto para a classe: {category}:\n")
    for tag_acc in category_result[category]:
        # pass
        percentage = list(tag_acc.values())[0]
        feature = list(tag_acc.keys())[0]
        category_acc += percentage
        if percentage > 0:
            result_file.write(f"Tag '{feature}': {percentage:.3f}%\n")
        else:
            result_file.write(f"Tag '{feature}': Não presente nos corpus de teste. \n")
    result_file.write(f"Total da classe {category}: {category_acc/category_size:.3f}%\n\n")
result_file.close()