<a href="https://colab.research.google.com/github/EngineerBear8000/ML-HMM-project/blob/main/q1%2C2%2C3%2C4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
def parse_data(file_path):
    sentences = []
    current_sentence = []

    try:
        with open(file_path, 'r') as file:
            for line in file:
                if line == '\n':  # Empty line separates sentences
                    if current_sentence:  # Add the sentence to the list
                        current_sentence.append(('','STOP'))
                        current_sentence.insert(0, ('','START'))
                        sentences.append(current_sentence)
                        current_sentence = []
                else:
                    # print(line.strip().split(' '))
                    # try to split line into token and tag, if no split then just use line as token
                    if ' ' not in line:
                        current_sentence.append((line.strip(), None))
                    else:
                        token, tag = line.strip().split(' ')
                        current_sentence.append((token, tag))
    except FileNotFoundError:
        print(f"File {file_path} not found.")

    if current_sentence:  # Add the last sentence to the list
        sentences.append(current_sentence)

    return sentences

labelled_data = parse_data('train')

In [None]:
#implement smoothing function

def smooth_data(data):
    # Count the occurrences of each word
    word_counts = {}
    for sentence in data:
        if type(sentence[0]) == tuple: #if the data consists of a tag and a token
            for token, _ in sentence:
                if token not in word_counts:
                    word_counts[token] = 0 #if its the first time we see that word, then add a new entry for it
                word_counts[token] += 1
        else:       # if the data only consists of tokens
            for token in sentence:
                if token not in word_counts:
                    word_counts[token] = 0 #if its the first time we see that word, then add a new entry for it
                word_counts[token] += 1

    # Replace words that appear less than 3 times with #UNK#
    smoothed_data = []
    for sentence in data:
        smoothed_sentence = []
        if type(sentence[0]) == tuple:
            for token, tag in sentence:
                if word_counts[token] < 3:
                    smoothed_sentence.append(('#UNK#', tag))
                else:
                    smoothed_sentence.append((token, tag))
            smoothed_data.append(smoothed_sentence)
        else:
            for token in sentence:
                if word_counts[token] < 3:
                    smoothed_sentence.append('#UNK#')
                else:
                    smoothed_sentence.append(token)
            smoothed_data.append(smoothed_sentence)
    return smoothed_data

smoothed_data = smooth_data(labelled_data)

In [None]:
#check if movie is in labelled_data

for sentence in smoothed_data:
    for token, _ in sentence:
        if token == 'movie':
            print(sentence)

[('', 'START'), ('Producers', 'B-NP'), ('Don', 'I-NP'), ('#UNK#', 'I-NP'), ('and', 'I-NP'), ('Jerry', 'I-NP'), ('#UNK#', 'I-NP'), (',', 'O'), ('who', 'B-NP'), ('#UNK#', 'B-VP'), ('``', 'O'), ('#UNK#', 'B-NP'), ("''", 'O'), ('through', 'B-PP'), ('several', 'B-NP'), ('#UNK#', 'I-NP'), ('and', 'O'), ('ultimately', 'B-VP'), ('produced', 'I-VP'), ('the', 'B-NP'), ('movie', 'I-NP'), (',', 'O'), ('#UNK#', 'B-VP'), ('when', 'B-ADVP'), ('Messrs.', 'B-NP'), ('Guber', 'I-NP'), ('and', 'I-NP'), ('Peters', 'I-NP'), ('take', 'B-VP'), ('credit', 'B-NP'), ('for', 'B-PP'), ('the', 'B-NP'), ('film', 'I-NP'), ('.', 'O'), ('', 'STOP')]
[('', 'START'), ('We', 'B-NP'), ('are', 'B-VP'), ('the', 'B-NP'), ('producers', 'I-NP'), ('of', 'B-PP'), ('that', 'B-NP'), ('movie', 'I-NP'), ('.', 'O'), ('', 'STOP')]
[('', 'START'), ('Mr.', 'B-NP'), ('Guber', 'I-NP'), ('got', 'B-VP'), ('his', 'B-NP'), ('start', 'I-NP'), ('in', 'B-PP'), ('the', 'B-NP'), ('movie', 'I-NP'), ('business', 'I-NP'), ('at', 'B-PP'), ('Columbia', 

In [None]:
def get_unique_labels(data):

    unique_labels = set()

    # Iterate over each sentence in the data
    for sentence in data:
        if isinstance(sentence, list):  # List of lists or numpy array
            for token_tag in sentence:
                label = token_tag[1]  # Extract the tag from the tuple
                unique_labels.add(label)
        elif isinstance(sentence, dict):
            for label in sentence.values():
                unique_labels.add(label)

    return unique_labels

unique_tags = get_unique_labels(labelled_data)
display(unique_tags)

{'B-ADJP',
 'B-ADVP',
 'B-CONJP',
 'B-INTJ',
 'B-LST',
 'B-NP',
 'B-PP',
 'B-PRT',
 'B-SBAR',
 'B-UCP',
 'B-VP',
 'I-ADJP',
 'I-ADVP',
 'I-CONJP',
 'I-INTJ',
 'I-NP',
 'I-PP',
 'I-SBAR',
 'I-UCP',
 'I-VP',
 'O',
 'START',
 'STOP'}

In [None]:
# Initialize the emission probabilities dictionary
e = {}
for tag in unique_tags:
    for observation in set([token for sentence in smoothed_data for token, _ in sentence]):
        e[(observation, tag)] = 0

# Count the emission probabilities
tag_counts = {}
for tag in unique_tags:
    tag_counts[tag] = 0

for sentence in smoothed_data:
    for token, tag in sentence:
        e[(token , tag)] += 1
        tag_counts[tag] += 1

# Normalize the emission probabilities
for pair in e:
    # print(pair)
    observation, tag = pair
    e[pair] /= tag_counts[tag]

for pair in e:
  if e[pair] == 0:
    print(pair, e[pair])

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
('operates', 'B-CONJP') 0.0
('pressure', 'B-CONJP') 0.0
('measures', 'B-CONJP') 0.0
('desperate', 'B-CONJP') 0.0
('tries', 'B-CONJP') 0.0
('incorrect', 'B-CONJP') 0.0
('experts', 'B-CONJP') 0.0
('war', 'B-CONJP') 0.0
('0.1', 'B-CONJP') 0.0
('Trade', 'B-CONJP') 0.0
('coordinates', 'B-CONJP') 0.0
('keep', 'B-CONJP') 0.0
('gaining', 'B-CONJP') 0.0
('reflected', 'B-CONJP') 0.0
('study', 'B-CONJP') 0.0
('counterpart', 'B-CONJP') 0.0
('15,000', 'B-CONJP') 0.0
('allocated', 'B-CONJP') 0.0
('temporarily', 'B-CONJP') 0.0
('Eurocom', 'B-CONJP') 0.0
('programs', 'B-CONJP') 0.0
('Stone', 'B-CONJP') 0.0
('Judiciary', 'B-CONJP') 0.0
('events', 'B-CONJP') 0.0
('probably', 'B-CONJP') 0.0
('owed', 'B-CONJP') 0.0
('Grant', 'B-CONJP') 0.0
('soared', 'B-CONJP') 0.0
('Marlin', 'B-CONJP') 0.0
('underground', 'B-CONJP') 0.0
('smaller', 'B-CONJP') 0.0
('funding', 'B-CONJP') 0.0
('revenue', 'B-CONJP') 0.0
('exceeding', 'B-CONJP') 0.0
('Office', '

In [None]:
#check if 'movie' is in e
'movie' in [pair[0] for pair in e]

True

In [None]:
test_data = parse_data('dev.in')

In [None]:
known_words = set(observation for observation, tag in e.keys())

def predict_tags(data, e, unique_tags, known_words):
    predictions = []
    for sentence in data:
        sentence_predictions = []
        for token, _ in sentence:

            if token not in known_words:
                # print(f"Unknown word: {token}")
                token = '#UNK#'
            token_predictions = {}
            for tag in unique_tags:
                # print(token)
                # print(tag)
                if (token, tag) in e:
                    token_predictions[tag] = e[(token, tag)]
            predicted_tag = max(token_predictions, key=token_predictions.get)
            if predicted_tag != 'START' and predicted_tag != 'STOP': #strip out all of the start and end tags
                sentence_predictions.append((token, predicted_tag))
        predictions.append(sentence_predictions)
    return predictions

# y_pred = predict_tags(test_data, e, unique_tags, known_words)

In [None]:
#y_pred

In [None]:
def save_results(y_pred, output_filename): # Added output_filename parameter
    # Open a new file in write mode
    with open(output_filename, 'w') as f:
        for sentence_predictions in y_pred:
            # Write each token and its predicted tag for the current sentence
            for token, predicted_tag in sentence_predictions:
                # Handle potential None tags if they somehow sneak in, replace with 'O' or similar default
                tag_to_write = predicted_tag if predicted_tag is not None else 'O'
                f.write(f"{token} {tag_to_write}\n")
            # Add a blank line after each sentence
            f.write("\n")


# save_results(y_pred, 'dev.p2.out')

Q2

In [None]:
from collections import defaultdict
def transitionCount(data):
  # Calculate the number of transitions for each tag in list labels
  transitions = defaultdict(lambda: defaultdict(int))
  tag_counts = defaultdict(int)
  for sentence in data:
    transition_probs = defaultdict(dict)
    for i in range(1, len(sentence)):
      _, prev_tag = sentence[i-1]
      _, current_tag = sentence[i]
      # print(prev_tag, current_tag)
      transitions[prev_tag][current_tag] += 1
      tag_counts[prev_tag] += 1

  # Calculate the probabilities
  transition_probs = defaultdict(dict)
  # display(transitions)
  # display(tag_counts)
  for prev_tag in transitions:
      total = tag_counts[prev_tag] # Count(y(i-1))
      for current_tag in transitions[prev_tag]:
          prob = transitions[prev_tag][current_tag] / total # q = Count(y(i-1),y(i)) / Count(y(i-1))
          transition_probs[prev_tag][current_tag] = prob
  return transition_probs

transition_probs = transitionCount(smoothed_data) # transition_probs stores transition probabilities
display(transition_probs)

defaultdict(dict,
            {'START': {'B-NP': 0.6480490669450607,
              'B-PP': 0.1087041628604985,
              'O': 0.14185045021532036,
              'B-ADVP': 0.05428683283309409,
              'B-ADJP': 0.003262429857758058,
              'B-SBAR': 0.02257601461568576,
              'B-CONJP': 0.00026099438862064463,
              'B-VP': 0.018661098786376094,
              'B-INTJ': 0.0013049719431032234,
              'B-LST': 0.0010439775544825785},
             'B-NP': {'I-NP': 0.6847056336539478,
              'B-VP': 0.13030335059718845,
              'O': 0.08096395729838284,
              'B-PP': 0.05800655321847585,
              'B-NP': 0.028897579537046823,
              'B-ADVP': 0.009808688299334109,
              'B-ADJP': 0.0032131909946094494,
              'B-SBAR': 0.00340344572455343,
              'B-PRT': 0.00035937004544974105,
              'STOP': 0.00023253355882042067,
              'B-UCP': 2.1139414438220062e-05,
              'B-CONJP': 8.4

In [None]:
def viterbi(x, tags, transition_probs, emission_probs):
  n = len(x)
  dp = [{} for _ in range(n)]
  backptr = [{} for _ in range(n)]

  # Initialization for 1st word
  for tag in tags:
    trans_p = transition_probs['START'].get(tag, 1e-6) # .get(key, default)
    emit_p = emission_probs.get((x[0], tag), 1e-6)
    dp[0][tag] = trans_p * emit_p
    backptr[0][tag] = None

  # Recursion for words 1 to n-1
  for i in range(1, n):
        for current_tag in tags:
            max_prob = 0
            best_prev = None
            emit_p = emission_probs.get((x[i], current_tag), 1e-6)

            for prev_tag in tags:
                trans_p = transition_probs.get(prev_tag, {}).get(current_tag, 1e-6)
                prob = dp[i-1][prev_tag] * trans_p * emit_p

                if prob > max_prob:
                    max_prob = prob
                    best_prev = prev_tag

            dp[i][current_tag] = max_prob
            backptr[i][current_tag] = best_prev

  # Termination: Find the best last tag
  max_final_prob = 0
  best_last_tag = None
  for tag in tags:
      if dp[-1][tag] > max_final_prob:
          max_final_prob = dp[-1][tag]
          best_last_tag = tag

  # Backtrack to find the full sequence of tags
  best_tags = [best_last_tag]
  for i in range(n-1, 0, -1):
      best_tags.insert(0, backptr[i][best_tags[0]])

  return best_tags

In [None]:
def parse_test_data(file_path):
    sentences = []
    current_sentence = []

    try:
        with open(file_path, 'r') as file:
            for line in file:
                if line == '\n':  # Empty line separates sentences
                    if current_sentence:  # Add the sentence to the list
                      sentences.append(current_sentence)
                      current_sentence = []
                else:
                    if ' ' not in line:
                        if line.strip() not in known_words:
                            # print(f"Unknown word: {token}")
                            current_sentence.append('#UNK#')
                        else:
                            current_sentence.append(line.strip()) # Only the tag is needed for transmission
                    else:
                        if token not in known_words:
                            # print(f"Unknown word: {token}")
                            token = '#UNK#'
                        else:
                            token = line.strip().split(' ')
                        current_sentence.append(token) # Only the tag is needed for transmission
    except FileNotFoundError:
        print(f"File {file_path} not found.")

    if current_sentence:  # Add the last sentence to the list
        sentences.append(current_sentence)

    return sentences

test_data = parse_test_data('dev.in')
print(test_data)


[['HBO', 'has', 'close', 'to', '24', 'million', 'subscribers', 'to', 'its', 'HBO', 'and', '#UNK#', 'networks', ',', 'while', 'Showtime', 'and', 'its', '#UNK#', 'service', ',', 'The', '#UNK#', '#UNK#', ',', 'have', 'only', 'about', '10', 'million', ',', 'according', 'to', 'Paul', '#UNK#', 'Associates', ',', 'a', '#UNK#', ',', 'Calif.', ',', 'research', 'firm', '.'], ['#UNK#', '#UNK#', '#UNK#', 'after', 'the', 'stock', 'market', "'s", '#UNK#', 'ride', '.'], ['This', 'may', 'seem', 'to', 'be', 'a', '#UNK#', 'and', '#UNK#', '#UNK#', 'effort', 'in', 'Africa', '.'], ['American', 'Express', 'Bank', 'earnings', 'fell', '50', '%', 'to', '$', '#UNK#', 'million', 'from', '$', '#UNK#', 'million', 'despite', 'a', '29', '%', 'revenue', 'gain', '.'], ['Californians', ',', 'meanwhile', ',', 'tried', 'to', '#UNK#', 'with', '#UNK#', 'services', ',', 'blocked', 'roadways', 'and', 'water', 'shortages', 'in', 'the', 'aftermath', 'of', 'the', 'tremor', 'that', 'left', 'scores', 'dead', 'and', '#UNK#', '.'],

In [None]:
# Run algorithm on dev.in and write results to dev.p2.out
predicted_tags = []
for sentence in test_data:
  predicted_tags.append(viterbi(sentence, unique_tags, transition_probs, e))

# Write to dev.p2.out
with open('dev.p2.out', 'w') as f:
    for sentence, tags in zip(test_data, predicted_tags):
        for word, tag in zip(sentence, tags):
            f.write(f"{word} {tag}\n")
        f.write("\n")

In [None]:
with open('dev.p2.out', 'r') as f:
    content = f.read()
    print(content)

HBO B-NP
has B-VP
close I-VP
to B-PP
24 B-NP
million I-NP
subscribers I-NP
to B-PP
its B-NP
HBO I-NP
and O
#UNK# B-NP
networks I-NP
, O
while B-SBAR
Showtime B-NP
and O
its B-NP
#UNK# I-NP
service I-NP
, O
The B-NP
#UNK# I-NP
#UNK# I-NP
, O
have B-VP
only I-VP
about B-PP
10 B-NP
million I-NP
, O
according B-PP
to B-PP
Paul B-NP
#UNK# I-NP
Associates I-NP
, O
a B-NP
#UNK# I-NP
, O
Calif. B-NP
, O
research B-NP
firm I-NP
. O

#UNK# B-NP
#UNK# I-NP
#UNK# I-NP
after B-PP
the B-NP
stock I-NP
market I-NP
's B-NP
#UNK# I-NP
ride I-NP
. O

This B-NP
may B-VP
seem I-VP
to I-VP
be I-VP
a B-NP
#UNK# I-NP
and O
#UNK# B-NP
#UNK# I-NP
effort I-NP
in I-NP
Africa I-NP
. O

American B-NP
Express I-NP
Bank I-NP
earnings I-NP
fell B-VP
50 B-NP
% I-NP
to B-PP
$ B-NP
#UNK# I-NP
million I-NP
from B-PP
$ B-NP
#UNK# I-NP
million I-NP
despite B-PP
a B-NP
29 I-NP
% I-NP
revenue I-NP
gain I-NP
. O

Californians B-NP
, O
meanwhile B-ADVP
, O
tried B-VP
to I-VP
#UNK# I-VP
with B-PP
#UNK# B-NP
services I-NP
, O
blo

**Results of evalResult.py:**

#Entity in gold data: 13179
#Entity in prediction: 13282

#Correct Entity : 11101
Entity  precision: 0.8358
Entity  recall: 0.8423
Entity  F: 0.8390

#Correct Sentiment : 10599
Sentiment  precision: 0.7980
Sentiment  recall: 0.8042
Sentiment  F: 0.8011

# Q3

In [None]:
import math
def topk_viterbi(x, tags, transition_probs, emission_probs, k=4):

    n = len(x)

    # Use log probabilities to avoid numerical underflow
    dp = [{} for _ in range(n)]
    backptr = [{} for _ in range(n)]

    # Initialize for the first word
    for tag in tags:
        # Use log probabilities
        trans_p = transition_probs['START'].get(tag, 1e-10)
        emit_p = max(emission_probs.get((x[0], tag), 1e-10), 1e-10)
        # print(trans_p)
        # print(emit_p)
        log_prob = math.log(trans_p) + math.log(emit_p)

        # Store just one path initially
        dp[0][tag] = [(log_prob, 0)]
        backptr[0][tag] = [(None, None)]

        # Pad with very negative log probabilities for remaining k-1 paths
        for j in range(1, k):
            dp[0][tag].append((float('-inf'), j))
            backptr[0][tag].append((None, None))


    for i in range(1, n):
        for current_tag in tags:
            candidates = []
            emit_p = max(emission_probs.get((x[i], current_tag), 1e-10), 1e-10)  #add laplace smoothing
            log_emit = math.log(emit_p)

            for prev_tag in tags:
                trans_p = transition_probs.get(prev_tag, {}).get(current_tag, 1e-10) #add laplace smoothing
                log_trans = math.log(trans_p)

                # Consider all k paths from the previous tag
                for path_idx in range(k):
                    if path_idx < len(dp[i-1][prev_tag]):
                        prev_log_prob, _ = dp[i-1][prev_tag][path_idx]

                        # Skip impossible paths
                        if prev_log_prob == float('-inf'):
                            continue

                        # Compute new log probability
                        new_log_prob = prev_log_prob + log_trans + log_emit
                        candidates.append((new_log_prob, prev_tag, path_idx))

            # Sort candidates by probability in decending order
            candidates.sort(reverse=True)

            # Keep only top-k candidates
            candidates = candidates[:k]

            # Create new entries for this tag
            dp[i][current_tag] = []
            backptr[i][current_tag] = []

            # Store the candidates we found
            for idx, (log_prob, prev_tag, prev_idx) in enumerate(candidates):
                dp[i][current_tag].append((log_prob, idx))
                backptr[i][current_tag].append((prev_tag, prev_idx))

            # Pad with impossible paths if needed
            while len(dp[i][current_tag]) < k:
                idx = len(dp[i][current_tag])
                dp[i][current_tag].append((float('-inf'), idx))
                backptr[i][current_tag].append((None, None))

    # Termination: Find the k-best final tags
    final_candidates = []

    for tag in tags:
        for path_idx in range(min(k, len(dp[n-1][tag]))):
            log_prob, _ = dp[n-1][tag][path_idx]
            if log_prob > float('-inf'):  # Only consider possible paths
                final_candidates.append((log_prob, tag, path_idx))

    # Sort by log probability (descending) and keep top-k
    final_candidates.sort(reverse=True)
    final_candidates = final_candidates[:k]

    # Backtrack to find all k sequences
    all_sequences = []

    for _, last_tag, path_idx in final_candidates:
        # Start with the last tag
        sequence = [last_tag]
        current_tag = last_tag
        current_path_idx = path_idx

        # Backtrack through the sequence
        for i in range(n-1, 0, -1):
            prev_tag, prev_path_idx = backptr[i][current_tag][current_path_idx]
            if prev_tag is not None:
                sequence.insert(0, prev_tag)
                current_tag = prev_tag
                current_path_idx = prev_path_idx
            else:
                break  # Stop if we hit an invalid path

        all_sequences.append(sequence)

    return all_sequences

In [None]:
predicted_tags = []
for sentence in test_data:
  predicted_tags.append(topk_viterbi(sentence, unique_tags, transition_probs, e))


In [None]:
predicted_tags

[[['B-NP',
   'B-VP',
   'I-VP',
   'B-PP',
   'B-NP',
   'I-NP',
   'I-NP',
   'B-PP',
   'B-NP',
   'I-NP',
   'O',
   'B-NP',
   'I-NP',
   'O',
   'B-SBAR',
   'B-NP',
   'O',
   'B-NP',
   'I-NP',
   'I-NP',
   'O',
   'B-NP',
   'I-NP',
   'I-NP',
   'O',
   'B-VP',
   'I-VP',
   'B-PP',
   'B-NP',
   'I-NP',
   'O',
   'B-PP',
   'B-PP',
   'B-NP',
   'I-NP',
   'I-NP',
   'O',
   'B-NP',
   'I-NP',
   'O',
   'B-NP',
   'O',
   'B-NP',
   'I-NP',
   'O'],
  ['B-NP',
   'B-VP',
   'I-VP',
   'B-PP',
   'B-NP',
   'I-NP',
   'I-NP',
   'B-PP',
   'B-NP',
   'I-NP',
   'O',
   'B-NP',
   'I-NP',
   'O',
   'B-SBAR',
   'B-NP',
   'O',
   'B-NP',
   'I-NP',
   'I-NP',
   'O',
   'B-NP',
   'I-NP',
   'I-NP',
   'O',
   'B-VP',
   'B-ADVP',
   'B-PP',
   'B-NP',
   'I-NP',
   'O',
   'B-PP',
   'B-PP',
   'B-NP',
   'I-NP',
   'I-NP',
   'O',
   'B-NP',
   'I-NP',
   'O',
   'B-NP',
   'O',
   'B-NP',
   'I-NP',
   'O'],
  ['B-NP',
   'B-VP',
   'I-VP',
   'I-VP',
   'B-NP',
   'I-N

# Q4

In [None]:
def get_word_shape(word):
    return ''.join(['C' if c.isupper() else 'l' if c.islower() else 'd' for c in word])

In [None]:
# Perform feature extraction on every word in dataset
def feature_extraction(sentence, index):
  word, label = sentence[index]
  features = {}

  # Extract current word features
  features['word'] = word
  features['is_unk'] = word == '#UNK#' # Check for unknown
  features['is_capitalized'] = word[0].isupper() if word else False # Check if the word starts with a capital letter
  features['is_digit'] = word.isdigit() if word else False # Check if the word is a digit
  features['is_punctuation'] = word in [".", ",", "?", "!", ";", ":"]  # Check if the word is punctuation

  # Extract features based on context (previous and next word)
  if index > 0:
      prev_word = sentence[index - 1][0]  # Get the previous word
  else:
      prev_word = '<START>'  # Special token for the start of the sentence

  if index < len(sentence) - 1:
      next_word = sentence[index + 1][0]  # Get the next word
  else:
      next_word = '<END>'  # Special token for the end of the sentence

  features['prev_word'] = prev_word
  features['next_word'] = next_word

  # Extract label features
  if index > 0:
      prev_chunk_label = sentence[index - 1][1]  # Get the previous  label
  else:
      prev_chunk_label = 'O'  # Outside label for sentence start

  if index < len(sentence) - 1:
      next_chunk_label = sentence[index + 1][1]  # Get the next chunk label
  else:
      next_chunk_label = 'O'  # Outside label for sentence end

  features['prev_chunk_label'] = prev_chunk_label
  features['next_chunk_label'] = next_chunk_label

  # Add bigram features (previous two words, next two words)
  if index > 1:
      prev2_word = sentence[index - 2][0]
  else:
      prev2_word = '<START>'

  if index < len(sentence) - 2:
      next2_word = sentence[index + 2][0]
  else:
      next2_word = '<END>'

  features['prev2_word'] = prev2_word
  features['next2_word'] = next2_word

  # Add more features to boost accuracy
  if index > 2:
    prev3_word = sentence[index - 3][0]
  else:
      prev3_word = '<START>'
  if index < len(sentence) - 3:
      next3_word = sentence[index + 3][0]
  else:
      next3_word = '<END>'

  features['prev3_word'] = prev3_word
  features['next3_word'] = next3_word

  features['lowercase_word'] = word.lower()  # Lowercase version of the word
  features['word_shape'] = get_word_shape(word)  # Add a function to capture word shapes
  features['first_char'] = word[0] if word else ''  # First character of the word
  features['last_char'] = word[-1] if word else ''  # Last character of the word

  features['prev2_word_shape'] = get_word_shape(prev2_word)
  features['next2_word_shape'] = get_word_shape(next2_word)

  features['word_length'] = len(word)
  features['prefix_2'] = word[:2]  # First two characters of the word
  features['suffix_2'] = word[-2:]  # Last two characters of the word



  return features
  '''
  features = {
    'word': 'John',
    'is_capitalized': True,
    'prev_word': '<START>',
  }
  '''

In [None]:
from collections import defaultdict

# Prepare training data in the correct format
def get_sentence_features(sentence): # sentence contains tuples
    features = defaultdict(int)
    for i in range(len(sentence)):
        word_features = feature_extraction(sentence, i) # Returns dictionary of features for each word i
        label = sentence[i][1] # B-NP etc.
        for feature_name, value in word_features.items():
            key = (feature_name, value, label)
            features[key] += 1 # Counts number of time key appears
    return features
    # ('prev_word', 'that', 'B-NP'): 1

In [None]:
def extract_features_for_train_dataset(train_data):
    feature_vectors = []

    for sentence in train_data:
        sentence_features = get_sentence_features(sentence)
        feature_vectors.append(sentence_features)

    return feature_vectors

In [None]:
feature_vectors = extract_features_for_train_dataset(smoothed_data) # Each defaultdict is for 1 sentence, within it is the training data in the format (Feature name, Feature value, Label)

for i in feature_vectors[:2]:
    display(i)

defaultdict(int,
            {('word', '', 'START'): 1,
             ('is_unk', False, 'START'): 1,
             ('is_capitalized', False, 'START'): 1,
             ('is_digit', False, 'START'): 1,
             ('is_punctuation', False, 'START'): 1,
             ('prev_word', '<START>', 'START'): 1,
             ('next_word', '#UNK#', 'START'): 1,
             ('prev_chunk_label', 'O', 'START'): 1,
             ('next_chunk_label', 'B-NP', 'START'): 1,
             ('prev2_word', '<START>', 'START'): 1,
             ('next2_word', 'bonds', 'START'): 1,
             ('prev3_word', '<START>', 'START'): 1,
             ('next3_word', 'are', 'START'): 1,
             ('lowercase_word', '', 'START'): 1,
             ('word_shape', '', 'START'): 1,
             ('first_char', '', 'START'): 1,
             ('last_char', '', 'START'): 1,
             ('prev2_word_shape', 'dCCCCCd', 'START'): 1,
             ('next2_word_shape', 'lllll', 'START'): 1,
             ('word_length', 0, 'START'): 1,

defaultdict(int,
            {('word', '', 'START'): 1,
             ('is_unk', False, 'START'): 1,
             ('is_capitalized', False, 'START'): 1,
             ('is_digit', False, 'START'): 1,
             ('is_punctuation', False, 'START'): 1,
             ('prev_word', '<START>', 'START'): 1,
             ('next_word', 'He', 'START'): 1,
             ('prev_chunk_label', 'O', 'START'): 1,
             ('next_chunk_label', 'B-NP', 'START'): 1,
             ('prev2_word', '<START>', 'START'): 1,
             ('next2_word', 'added', 'START'): 1,
             ('prev3_word', '<START>', 'START'): 1,
             ('next3_word', 'that', 'START'): 1,
             ('lowercase_word', '', 'START'): 1,
             ('word_shape', '', 'START'): 1,
             ('first_char', '', 'START'): 1,
             ('last_char', '', 'START'): 1,
             ('prev2_word_shape', 'dCCCCCd', 'START'): 1,
             ('next2_word_shape', 'lllll', 'START'): 1,
             ('word_length', 0, 'START'): 1,
 

In [None]:
display(unique_tags)

{'B-ADJP',
 'B-ADVP',
 'B-CONJP',
 'B-INTJ',
 'B-LST',
 'B-NP',
 'B-PP',
 'B-PRT',
 'B-SBAR',
 'B-UCP',
 'B-VP',
 'I-ADJP',
 'I-ADVP',
 'I-CONJP',
 'I-INTJ',
 'I-NP',
 'I-PP',
 'I-SBAR',
 'I-UCP',
 'I-VP',
 'O',
 'START',
 'STOP'}

In [None]:
# Prediction function
def predict_sequence(sentence, weights):
    predicted_labels = []

    # For each word, choose the label with the highest score
    for i in range(len(sentence)):
        word_features = feature_extraction(sentence, i) # Get features in the format of a dictionary
        best_label = None
        best_score = float('-inf')

        # Find the label that maximizes the score
        for label in unique_tags: # Loop through unique label list to find label that maxes score
            score = 0
            for feature_name, value in word_features.items(): # For each feature,
                score += weights.get((feature_name, value, label), 0) # Compute score based on current weights
                '''Weight example:
                weights = {
                  ('word', 'John', 'B-NP'): 2,
                  ('is_capitalized', True, 'B-NP'): 1,
                  ('prev_word', '<START>', 'B-NP'): 1,

                  ('word', 'John', 'I-NP'): -1,
                  ('is_capitalized', True, 'I-NP'): 0,
                  ('prev_word', '<START>', 'I-NP'): 0,

                  ('word', 'runs', 'B-VP'): 1,
                  ('is_capitalized', False, 'B-VP'): 1,
                  ('prev_word', 'John', 'B-VP'): 1,
                } '''
            if score > best_score:
                best_score = score
                best_label = label

        predicted_labels.append(best_label)

    return predicted_labels # List of labels

In [None]:
# Training
def perceptron(train_data, num_epochs):
  # Intialize training weights to 0, misclassifying everything
  weights = defaultdict(int)
  cumulative_weights = defaultdict(int)
  update_count = 0

  # Loop through for specified number of epochs
  for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}")

    for sentence_data in train_data:
            true_labels = [label for word, label in sentence_data]  # List of actual labels
            predicted_labels = predict_sequence(sentence_data, weights)  # Prediction based on current weights

            for i in range(len(sentence_data)): # Loop through each label for each word of sentence
              true_label = true_labels[i]
              predicted_label = predicted_labels[i]

              if true_label != predicted_label: # If different, need to update the weights
                word_features = feature_extraction(sentence_data, i)
                # Update weights: Add features of true label, subtract features of predicted label
                for feature_name, value in word_features.items():
                    cumulative_weights[(feature_name, value, true_label)] += 1
                    # Increment weight for true label
                    cumulative_weights[(feature_name, value, true_label)] += 1
                    weights[(feature_name, value, true_label)] += 1
                    # Decrease weight for predicted label
                    cumulative_weights[(feature_name, value, predicted_label)] -= 1
                    weights[(feature_name, value, predicted_label)] -= 1

                update_count += 1

  # After training, average the weights by dividing by the total number of updates
  for key, value in weights.items():
      weights[key] = value - cumulative_weights[key] / update_count

  return weights

In [None]:
weights = perceptron(smoothed_data, 20)
display(weights)

Epoch 1
Epoch 2
Epoch 3
Epoch 4
Epoch 5
Epoch 6
Epoch 7
Epoch 8
Epoch 9
Epoch 10
Epoch 11
Epoch 12
Epoch 13
Epoch 14
Epoch 15
Epoch 16
Epoch 17
Epoch 18
Epoch 19
Epoch 20


defaultdict(int,
            {('word', '', 'START'): 4.999816068890561,
             ('word', '', 'B-NP'): -2.999949836970153,
             ('is_unk', False, 'START'): 0.999882952930357,
             ('is_unk', False, 'B-NP'): 12.869308586238608,
             ('is_capitalized', False, 'START'): 1.999866231920408,
             ('is_capitalized', False, 'B-NP'): -1.1780620349469109,
             ('is_digit', False, 'START'): 0.999882952930357,
             ('is_digit', False, 'B-NP'): 0.8061031686313853,
             ('is_punctuation', False, 'START'): 0.999882952930357,
             ('is_punctuation', False, 'B-NP'): 18.805417607223475,
             ('prev_word', '<START>', 'START'): 5.999799347880612,
             ('prev_word', '<START>', 'B-NP'): -1.999966557980102,
             ('next_word', '#UNK#', 'START'): -1.672100994900092e-05,
             ('next_word', '#UNK#', 'B-NP'): -5.012858456650782,
             ('prev_chunk_label', 'O', 'START'): 3.99983278990051,
             ('prev_

In [None]:
# Perform feature extraction on test set
def feature_extraction_test(sentence, index):
  word = sentence[index]
  features = {}

  # Extract current word features
  features['word'] = word
  features['is_unk'] = word == '#UNK#' # Check for unknown
  features['is_capitalized'] = word[0].isupper() if word else False # Check if the word starts with a capital letter
  features['is_digit'] = word.isdigit() if word else False # Check if the word is a digit
  features['is_punctuation'] = word in [".", ",", "?", "!", ";", ":"]  # Check if the word is punctuation

  # Context features
  prev_word = sentence[index - 1] if index > 0 else '<START>'
  next_word = sentence[index + 1] if index < len(sentence) - 1 else '<END>'
  prev2_word = sentence[index - 2] if index > 1 else '<START>'
  next2_word = sentence[index + 2] if index < len(sentence) - 2 else '<END>'

  features['prev_word'] = prev_word
  features['next_word'] = next_word
  features['prev2_word'] = prev2_word
  features['next2_word'] = next2_word

  # Add more features to boost accuracy
  prev3_word = sentence[index - 3] if index > 2 else '<START>'
  next3_word = sentence[index + 3] if index < len(sentence) - 3 else '<END>'
  features['prev3_word'] = prev3_word
  features['next3_word'] = next3_word

  features['lowercase_word'] = word.lower()  # Lowercase version of the word
  features['word_shape'] = get_word_shape(word)  # Add a function to capture word shapes
  features['first_char'] = word[0] if word else ''  # First character of the word
  features['last_char'] = word[-1] if word else ''  # Last character of the word

  features['prev2_word_shape'] = get_word_shape(prev2_word)
  features['next2_word_shape'] = get_word_shape(next2_word)

  features['word_length'] = len(word)
  features['prefix_2'] = word[:2]  # First two characters of the word
  features['suffix_2'] = word[-2:]  # Last two characters of the word

  return features
  '''
  features = {
    'word': 'John',
    'is_capitalized': True,
    'prev_word': '<START>',
  }
  '''

In [None]:
# Predict sequence using trained weights

def predict_sequence(sentence, trained_weights, all_labels):
  predicted_labels = []

  # Iterate over each word in the sentence
  for i, word in enumerate(sentence):
      word_features = feature_extraction_test(sentence, i)
      #print(f"i: {i}")
      #print(f"word: {word}")

      # Calculate the score for each label
      label_scores = {}

      for label in all_labels:
          score = 0
          for feature_name, value in word_features.items():
              # Get the weight for the current feature and label
              key = (feature_name, value, label)
              score += trained_weights.get(key, 0)  # Use default 0 if the key is not in weights

          label_scores[label] = score

      # Choose the label with the highest score
      best_label = max(label_scores, key=label_scores.get)
      predicted_labels.append(best_label)

  return predicted_labels

In [None]:
def test_model_on_test_data(test_data, weights, label_set):
  all_predictions = []

  # Loop through each sentence in the test data
  for sentence in test_data:
      predictions = predict_sequence(sentence, weights, label_set)
      all_predictions.append(predictions)

  return all_predictions

In [None]:
test_data = parse_test_data('dev.in')
#display(test_data)

In [None]:
predictions = test_model_on_test_data(test_data, weights, unique_tags)

In [None]:
# Write to dev.p4.out
with open('dev.p4.out', 'w') as f:
    for sentence, tags in zip(test_data, predictions):
        for word, tag in zip(sentence, tags):
            f.write(f"{word} {tag}\n")
        f.write("\n")

In [None]:
with open('dev.p4.out', 'r') as f:
    content = f.read()
    print(content)

HBO B-NP
has B-VP
close B-ADJP
to B-PP
24 B-NP
million B-NP
subscribers B-NP
to B-PP
its B-NP
HBO B-NP
and O
#UNK# I-ADJP
networks B-NP
, O
while B-SBAR
Showtime B-NP
and O
its B-VP
#UNK# B-NP
service B-NP
, O
The B-NP
#UNK# I-ADJP
#UNK# B-NP
, O
have B-VP
only B-ADVP
about B-PP
10 B-NP
million I-NP
, O
according B-PP
to B-PP
Paul B-NP
#UNK# B-NP
Associates B-NP
, O
a B-NP
#UNK# B-NP
, O
Calif. I-NP
, O
research I-ADJP
firm B-NP
. O

#UNK# B-VP
#UNK# B-VP
#UNK# B-VP
after B-PP
the O
stock I-NP
market B-NP
's B-NP
#UNK# B-NP
ride B-ADVP
. O

This B-NP
may B-VP
seem B-VP
to B-PP
be I-VP
a B-ADJP
#UNK# B-NP
and O
#UNK# I-ADJP
#UNK# B-VP
effort B-NP
in B-PP
Africa B-NP
. O

American B-NP
Express B-NP
Bank B-VP
earnings B-NP
fell B-VP
50 B-NP
% I-ADJP
to B-PP
$ B-ADJP
#UNK# B-VP
million B-NP
from B-PP
$ B-ADJP
#UNK# B-NP
million B-NP
despite B-PP
a B-NP
29 B-NP
% I-ADJP
revenue B-NP
gain B-NP
. O

Californians B-NP
, O
meanwhile B-ADVP
, O
tried B-VP
to B-PP
#UNK# I-VP
with B-PP
#UNK# B-NP


Results:

#Entity in gold data: 13179
#Entity in prediction: 20191

#Correct Entity : 7336
Entity  precision: 0.3633
Entity  recall: 0.5566
Entity  F: 0.4397

#Correct Sentiment : 6602
Sentiment  precision: 0.3270
Sentiment  recall: 0.5009
Sentiment  F: 0.3957

**Performance on test set**

In [None]:
test_data = parse_test_data('test.in')
predictions = test_model_on_test_data(test_data, weights, unique_tags)

# Write to dev.p4.out
with open('test.p4.out', 'w') as f:
    for sentence, tags in zip(test_data, predictions):
        for word, tag in zip(sentence, tags):
            f.write(f"{word} {tag}\n")
        f.write("\n")
with open('test.p4.out', 'r') as f:
    content = f.read()

In [None]:
print(content)

#UNK# B-VP
#UNK# B-NP
, O
an I-PP
attorney B-NP
with B-PP
the O
#UNK# B-NP
's B-NP
#UNK# B-NP
's B-NP
Rights B-NP
#UNK# B-NP
, O
said B-VP
, O
`` O
They B-NP
wanted B-VP
a B-NP
#UNK# I-ADJP
woman B-NP
, O
and I-PP
a B-NP
pregnant O
woman B-NP
is B-VP
not O
#UNK# B-ADJP
. O

Looking B-VP
ahead B-ADVP
to B-PP
other B-NP
commodity B-NP
markets B-NP
this B-NP
week B-NP
: O

But O
here B-ADVP
in B-PP
Morgenzon B-NP
, O
a B-NP
#UNK# B-NP
town B-NP
amid B-PP
the B-NP
corn I-NP
fields B-NP
of B-PP
the O
#UNK# I-ADJP
#UNK# B-NP
, O
the B-NP
Orange I-NP
Workers B-NP
are B-VP
holding B-VP
the O
#UNK# B-NP
steady B-ADVP
. O

Interest B-NP
may B-VP
remain B-VP
limited B-ADJP
into B-PP
tomorrow B-NP
's B-NP
U.K. B-NP
trade I-ADJP
figures B-NP
, O
which B-NP
the B-NP
market B-NP
will B-VP
be B-VP
watching B-VP
closely B-ADVP
to B-PP
see B-VP
if B-SBAR
there B-NP
is B-VP
any B-NP
improvement B-NP
after B-PP
disappointing B-VP
numbers B-NP
in B-PP
the O
previous B-NP
two B-NP
months B-NP
. O

Ms. B-NP
