In [5]:
import numpy as np

In [6]:
# Reading the file
def prepare_data(file_path):
    """Prepare the raw data for training

    Args:
        file_path (string): input file path for preparation

    Returns:
        dictionary: state_to_idx - a mapping of each state to the index of the data.split('\n')
        dictionary: observation_to_idx - a mapping of each observation to the index of the data.split('\n')
        set: states - a set of unique states
        set: observations - a set of unique observations
        string: data - raw data

    """
    with open(file_path, 'r', encoding='utf-8') as file:
        data = file.read()

        # Collect unique states and observations
        states = set()
        observations = set()
        # split the data by new line - essentially split by comments/sentences
        for line in data.split('\n'):
            if line:
                state = line[line.rfind(' ') + 1:]
                observation = line[:line.rfind(' ')]
                # split a valid line by space - for observation and state
                states.add(state)
                observations.add(observation)

        observations.add("#UNK#")  # Include special token for unknown words

    # Create dictionary for states and observations - key is state or observation, value is index
    state_to_idx = {state: idx for idx, state in enumerate(states)}
    observation_to_idx = {obs: idx for idx, obs in enumerate(observations)}

    # the idx is important for the shape of the emission probabilities matrix (num_states x num_observations)

    return state_to_idx, observation_to_idx, states, observations, data


def estimate_b(train_data, state_to_idx, observation_to_idx, states, observations):
    """Estimate emission probabilities

    Args:
        train_data (string): raw data
        state_to_idx (dictionary): a mapping of each state to the index of the data.split('\n')
        observation_to_idx (dictionary): a mapping of each observation to the index of the data.split('\n')
        states (set): a set of unique states
        observations (set): a set of unique observations

    Returns:
        np.ndarray: emission_probabilities - a 2d array of emission probabilities (num_states x num_observations)
        note here that emission probabilities have the rows as the states and columns as the observations
    """
    k = 1
    # Initialize counters
    state_counts = np.zeros(len(states))  # count(y)
    # 2d array of emission counts (num_states x num_observations) for count(y -> x)
    emission_counts = np.zeros((len(states), len(observations)))

    # Count occurrences
    for line in train_data.split('\n'):  # get each line of the data
        if line:  # if the line is not empty
            state = line[line.rfind(' ') + 1:]
            observation = line[:line.rfind(' ')]
            # get the row index for this observation's state
            row_index = state_to_idx[state]
            # get the column index for this observation
            column_index = observation_to_idx[observation]

            # increase the number of occurrences for the state in the data
            state_counts[row_index] += 1
            # increase the number of occurrences for the observation with this state.
            emission_counts[row_index][column_index] += 1

    # Calculate emission probabilities
    emission_probabilities = (emission_counts) / (state_counts[:, None] + k)

    # Calculate for unknown words
    emission_probabilities[:, observation_to_idx["#UNK#"]
                           ] = k / (state_counts + k)

    return emission_probabilities


def predict(test_data, emission_probabilities, observation_to_idx, state_to_idx):
    """Perform sentiment analysis on a sequence of observations

    Args:
        test_data (string): a sequence of observations
        emission_probabilities (np.ndarray): emission probabilities
        observation_to_idx (dictionary): a mapping of each observation to the index of the data.split('\n')
        state_to_idx (dictionary): a mapping of each state to the index of the data.split('\n')

    Returns:
        list: predicted_tags - a list of predicted tags
    """
    predicted_tags = []
    observation_predicted_pairs = []
    for line in test_data.split('\n'):
        if line:  # if the line is not empty
            # remove the new line character
            observation = line.replace('\n', '')
            # get the index of the observation if it exists, otherwise get the index of the unknown token
            observation_idx = observation_to_idx.get(
                observation, observation_to_idx["#UNK#"])

            # get the index of the state with the highest probability
            max_prob_state_idx = np.argmax(
                emission_probabilities[:, observation_idx])
            # predicted_state = [state for state, idx in state_to_idx.items() if idx == max_prob_state_idx][0]

            for state, idx in state_to_idx.items():
                if idx == max_prob_state_idx:  # if the index of the state is the same as the index of the state with the highest probability
                    predicted_state = state
                    break

            predicted_tags.append(predicted_state)
            observation_predicted_pairs.append(
                observation + " " + predicted_state)
        else:
            observation_predicted_pairs.append("")

    return predicted_tags, observation_predicted_pairs


def calculate_metrics(predicted_tags, gold_tags):
    """Calculate precision, recall and F-score

    Args:
        predicted_tags (list): a nested list of predicted tags
        gold_tags (list): list of gold standard tags (reference output)

    Returns:
        float: Precision
        float: Recall
        float: F-score
    """
    correct_entities = 0
    predicted_entities = 0
    gold_entities = 0

    predicted_entity_start = False
    predicted_gold_start = False

    for i in range(len(predicted_tags)):
        predicted_tag = predicted_tags[i]
        gold_tag = gold_tags[i]

        # current tags
        if gold_tag != 'O':
            # add correct entities
            if gold_tag == predicted_tag:
                correct_entities += 1

            # check if an entity has started
            if gold_tag == "B-positive" or gold_tag == "B-negative" or gold_tag == "B-neutral":
                gold_entities += 1
                predicted_gold_start = True

            # check if an entity has started without a B tag
            elif (gold_tag == "I-positive" or gold_tag == "I-negative" or gold_tag == "I-neutral") and predicted_gold_start == False:
                gold_entities += 1
                predicted_gold_start = True
        else:
            predicted_gold_start = False  # reset the flag if the current tag is 'O'

        if predicted_tag != 'O':
            # check if an entity has started
            if predicted_tag == "B-positive" or predicted_tag == "B-negative" or predicted_tag == "B-neutral":
                predicted_entities += 1
                predicted_entity_start = True

             # check if an entity has started without a B tag
            elif (predicted_tag == "I-positive" or predicted_tag == "I-negative" or predicted_tag == "I-neutral") and predicted_entity_start == False:
                predicted_entities += 1
                predicted_entity_start = True
        else:
            predicted_entity_start = False

    Precision = correct_entities / predicted_entities
    Recall = correct_entities / gold_entities
    F = 2 / (1/Precision + 1/Recall)

    return Precision, Recall, F, correct_entities, predicted_entities, gold_entities


def gold_labels(data):
    """Get the gold labels from the data

    Args:
        data (string): raw data

    Returns:
        list: gold_labels - a list of gold labels
    """
    gold_labels = []
    for line in data.split('\n'):
        if line:
            observation, state = line.split(' ')
            gold_labels.append(state)
    return gold_labels

In [7]:
# for ES dataset

# prepare data
es_state_to_idx, es_observation_to_idx, es_states, es_observations, es_train_data = prepare_data(
    ".\\ES\\train")

# probability of each state
es_emission_probabilities = estimate_b(
    es_train_data, es_state_to_idx, es_observation_to_idx, es_states, es_observations)


# predict
with open(".\\ES\\dev.in", "r") as f:
    es_test_data = f.read()  # read in the file as a string

# predict the states for the test data
es_predicted_states, es_observation_predicted_tags = predict(
    es_test_data, es_emission_probabilities, es_observation_to_idx, es_state_to_idx)


# get gold standard labels
# with open("C:\\Users\\nryan\\OneDrive - Singapore University of Technology and Design\\Term 5\\ML\\ML Project\\ES\\dev.out", "r") as f:
#     gold_standard = f.read() # read in the file as a string

# gold_standard_labels = gold_labels(gold_standard)

# precision, recall, f, correct_entities, predicted_entities, gold_entities = calculate_metrics(predicted_states, gold_standard_labels)

# print("Correct entities: ", correct_entities)
# print("Predicted entities: ", predicted_entities)
# print("Gold entities: ", gold_entities)
# print()
# print("Precision: ", precision)
# print("Recall: ", recall)
# print("F: ", f)


# Write to dev.p1.out
with open(".\\ES\\dev.p1.out", "w") as f:
    for i in range(len(es_observation_predicted_tags)):
        f.write(es_observation_predicted_tags[i] + "\n")

In [8]:
# for RU dataset

# prepare data
ru_state_to_idx, ru_observation_to_idx, ru_states, ru_observations, ru_train_data = prepare_data(
    ".\\RU\\train")


# probability of each state
ru_emission_probabilities = estimate_b(
    ru_train_data, ru_state_to_idx, ru_observation_to_idx, ru_states, ru_observations)


# predict
with open(".\\RU\\dev.in", "r") as f:
    ru_test_data = f.read()  # read in the file as a string

# predict the states for the test data
ru_predicted_states, ru_observation_predicted_tags = predict(
    ru_test_data, ru_emission_probabilities, ru_observation_to_idx, ru_state_to_idx)


# get gold standard labels
# with open("C:\\Users\\nryan\\OneDrive - Singapore University of Technology and Design\\Term 5\\ML\\ML Project\\ES\\dev.out", "r") as f:
#     gold_standard = f.read() # read in the file as a string

# gold_standard_labels = gold_labels(gold_standard)

# precision, recall, f, correct_entities, predicted_entities, gold_entities = calculate_metrics(predicted_states, gold_standard_labels)

# print("Correct entities: ", correct_entities)
# print("Predicted entities: ", predicted_entities)
# print("Gold entities: ", gold_entities)
# print()
# print("Precision: ", precision)
# print("Recall: ", recall)
# print("F: ", f)


# Write to dev.p1.out
with open(".\\RU\\dev.p1.out", "w") as f:
    for i in range(len(ru_observation_predicted_tags)):
        f.write(ru_observation_predicted_tags[i] + "\n")

# Evaluation for ES

#### Entity in gold data: 229

#### Entity in prediction: 1466

#### Correct Entity : 178

-   Entity precision: 0.1214
-   Entity recall: 0.7773
-   Entity F: 0.2100

#### Correct Sentiment : 97

-   Sentiment precision: 0.0662
-   Sentiment recall: 0.4236
-   Sentiment F: 0.1145

---

# Evaluation for RU

#### Entity in gold data: 389

#### Entity in prediction: 1816

#### Correct Entity : 266

-   Entity precision: 0.1465
-   Entity recall: 0.6838
-   Entity F: 0.2413

#### Correct Sentiment : 129

-   Sentiment precision: 0.0710
-   Sentiment recall: 0.3316
-   Sentiment F: 0.1170
