In [None]:
import pandas as pd
import numpy as np
import json
import spacy
import ast
import pickle
from transformers import GPT2Tokenizer, GPT2LMHeadModel
import torch
import os
from collections import defaultdict
from scipy.stats import entropy
from scipy.spatial.distance import jensenshannon

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


### Load Gutenberg Dataset

In [None]:
with open("/content/drive/MyDrive/gutenberg_data.pkl","rb") as f:
    gutenberg_df = pickle.load(f)

In [None]:
nlp = spacy.load("en_core_web_sm")

### Tag Documents

In [None]:
def tag_full_document(text):
    '''
    Retrieve part of speech tags for a full document
    '''
    doc = nlp(text)
    return [[(token.text, token.pos_, idx) for idx, token in enumerate(sent)] for sent in doc.sents]
    return tagged_sentences

In [None]:
# Todo: Extract part of speech tags for more than 1 document
tagged_sentences = tag_full_document(gutenberg_df.iloc[0]['Text'])

### Extract N-Grams

In [None]:
def extract_ngrams_from_sentences(ngram_size, tagged_sentences):
    '''
    Extract ngrams from a list of tagged sentences
    '''
    results = []
    for sent_idx, tagged_sentence in enumerate(tagged_sentences):
       if len(tagged_sentence) < ngram_size:
          continue
       for i in range(len(tagged_sentence) - (ngram_size -1)):
          extracted_ngram = tagged_sentence[i:i+ngram_size]
          results.append((sent_idx, i, extracted_ngram))
    return results

In [None]:
text_ngrams = extract_ngrams_from_sentences(3, tagged_sentences)

### Next Token Generation

In [None]:
tokenizer = GPT2Tokenizer.from_pretrained('gpt2', device="cuda")
tokenizer.add_special_tokens({'pad_token': '[PAD]'})
# Resize model embeddings to accommodate the new token
model = GPT2LMHeadModel.from_pretrained('gpt2').to("cuda")
model.resize_token_embeddings(len(tokenizer))

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


Embedding(50258, 768)

In [None]:
text_segments = []
for ngram in text_ngrams:
  text_segments.append(" ".join([token[0] for token in ngram[2]]))

In [None]:
text_encodings = tokenizer(text_segments, truncation=True, padding=True,return_tensors="pt")

In [None]:
input_ids = text_encodings['input_ids']
attention_mask = text_encodings['attention_mask']
batch_size = 1024
generated_outputs = []
for i in range(0, len(input_ids), batch_size):
    batch_input_ids = input_ids[i:i+batch_size].to("cuda")
    batch_attention_mask = attention_mask[i:i+batch_size].to("cuda")

    with torch.no_grad():
        outputs = model.generate(
            input_ids=batch_input_ids,
            attention_mask=batch_attention_mask,
            max_new_tokens=1,
            temperature=0.5,
            do_sample=True,
            return_dict_in_generate=True,
            output_scores=True,
            pad_token_id=tokenizer.pad_token_id
        )
    generated_outputs.append(outputs.sequences.cpu())

A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='left'` when initializing the tokenizer.
A decoder-only architecture is being used, but right-padding was detected! For correct generation results, please set `padding_side='le

In [None]:
generated_outputs = torch.cat(generated_outputs, dim=0)

In [None]:
next_word_tokens = tokenizer.batch_decode(generated_outputs[:,-1], skip_special_tokens=True)

### Build Probability Matrix of POS Predictions

In [None]:
def get_next_word_pos(sentence, next_word, ngram_position, ngram_size):
  '''
  Get the part of speech tag for the next word in a sentence
  '''
  sentence_tokens = [word for word, pos, _ in sentence]
  extended_sentence = sentence_tokens[:ngram_position+ngram_size] + [next_word]

  tagged = tag_full_document(" ".join(extended_sentence))
  return tagged[-1][-1][1]

In [None]:
transition_matrix = defaultdict(lambda: defaultdict(int))

In [None]:
for ngram, next_word in zip(text_ngrams, next_word_tokens):
  sent_idx = ngram[0]
  ngram_position = ngram[1]
  sentence = tagged_sentences[sent_idx]
  next_word_pos = get_next_word_pos(sentence, next_word, ngram_position, 3)
  pos_ngram = "->".join([token[1] for token in ngram[2]])
  transition_matrix[pos_ngram][next_word_pos] += 1

In [None]:
for ngram, pos_counts in transition_matrix.items():
   total = sum(pos_counts.values())
   for pos, count in pos_counts.items():
       transition_matrix[ngram][pos] = count / total

In [None]:
transition_df = pd.DataFrame(transition_matrix).transpose().fillna(0)

## POS TAG Meanings
* ADJ: Adjective
* ADP: Adposition
* ADV: Adverb
* AUX: Auxiliary
* CONJ: Conjunction
* CCONJ: Coordinating conjunction
* DET: Determiner
* INTJ: Interjection
* NOUN: Noun
* NUM: Numeral
* PART: Particle
* PRON: Pronoun
* PUNCT: Punctuation
* SCONJ: Subordinating conjunction
* SPACE: Space
* SYM: Symbol
* VERB: Verb
* X: Other
* EOL: End of line

In [None]:
transition_df

Unnamed: 0,PUNCT,PROPN,AUX,PART,SPACE,VERB,CCONJ,ADJ,NOUN,PRON,ADP,NUM,ADV,INTJ,SCONJ,X,SYM,DET
NOUN->PROPN->PROPN,0.472222,0.333333,0.027778,0.027778,0.083333,0.027778,0.027778,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.0,0.0,0.0
PROPN->PROPN->PUNCT,0.656250,0.118750,0.040625,0.006250,0.018750,0.031250,0.012500,0.015625,0.078125,0.015625,0.003125,0.003125,0.000000,0.000000,0.0,0.0,0.0,0.0
PROPN->PUNCT->PROPN,0.751678,0.067114,0.000000,0.006711,0.046980,0.013423,0.000000,0.000000,0.060403,0.006711,0.013423,0.006711,0.020134,0.006711,0.0,0.0,0.0,0.0
PUNCT->PROPN->NOUN,0.562500,0.125000,0.000000,0.000000,0.125000,0.000000,0.062500,0.000000,0.125000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.0,0.0,0.0
PROPN->NOUN->CCONJ,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.600000,0.200000,0.000000,0.000000,0.000000,0.2,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
PRON->ADV->PROPN,1.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.0,0.0,0.0
ADP->PUNCT->X,1.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.0,0.0,0.0
PROPN->NUM->SYM,1.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.0,0.0,0.0
NUM->SYM->NUM,1.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.000000,0.0,0.0,0.0,0.0


In [None]:
transition_df.to_csv('/content/drive/MyDrive/transition_probs.csv')

In [None]:
#transition_df = pd.read_csv('/content/drive/MyDrive/transition_probs.csv')

### Build Ground Truth Probability Matrix

In [None]:
def extract_ngram_ground_truth_pairings(ngram_size, tagged_sentences):
    '''
    Extract ngrams from a list of tagged sentences
    '''
    ngram_pairs = []
    for sent_idx, tagged_sentence in enumerate(tagged_sentences):
       if len(tagged_sentence) < ngram_size:
          continue

       for i in range(len(tagged_sentence) - (ngram_size -1)):
          extracted_ngram = tagged_sentence[i:i+ngram_size]
          next_word = tagged_sentence[i + ngram_size] if (i + ngram_size) < len(tagged_sentence) else None
          ngram_pairs.append((extracted_ngram, next_word))
    return ngram_pairs

In [None]:
ngrams_with_next = extract_ngram_ground_truth_pairings(3, tagged_sentences)

In [None]:
gt_transition_matrix = defaultdict(lambda: defaultdict(int))

In [None]:
for ngram, next_word in ngrams_with_next:
  pos_ngram = "->".join([token[1] for token in ngram])
  next_word_pos = next_word[1] if next_word else 'NONE'
  gt_transition_matrix[pos_ngram][next_word_pos] += 1

In [None]:
for ngram, pos_counts in gt_transition_matrix.items():
   total = sum(pos_counts.values())
   for pos, count in pos_counts.items():
       gt_transition_matrix[ngram][pos] = count / total

In [None]:
gt_transition_df = pd.DataFrame(gt_transition_matrix).transpose().fillna(0)

In [None]:
gt_transition_df.to_csv('/content/drive/MyDrive/gt_transition_probs.csv')

### Calculate KL Divergence

In [None]:
gt_transition_df['SPACE'] = 0.0
transition_df['NONE'] = 0.0

In [None]:
columns = sorted(gt_transition_df.columns)

In [None]:
def compute_kl_divergence(real_matrix, generated_matrix, epsilon=1e-10):
   kl_scores = {}

   for ngram in real_matrix.index:
      if ngram in generated_matrix.index:
         real_probs = real_matrix.loc[ngram].values
         gen_probs = generated_matrix.loc[ngram].values

         real_sum = real_probs.sum()
         gen_sum = gen_probs.sum()

         if real_sum == 0 or gen_sum == 0:
            kl_scores[ngram] = 0.0
            continue

         real_probs = real_probs / real_sum
         gen_probs = gen_probs / gen_sum

         gen_probs = np.clip(gen_probs, epsilon, 1)

         kl_scores[ngram] = entropy(real_probs, gen_probs)
   return kl_scores

In [None]:
def compute_js_divergence(real_matrix, generated_matrix, epsilon=1e-10):
    """
    Compute Jensen-Shannon divergence between real and generated POS probability matrices.
    """
    js_scores = {}

    for ngram in real_matrix.index:
        if ngram in generated_matrix.index:
            real_probs = real_matrix.loc[ngram].values
            gen_probs = generated_matrix.loc[ngram].values

            real_sum = real_probs.sum()
            gen_sum = gen_probs.sum()

            if real_sum == 0 or gen_sum == 0:
              js_scores[ngram] = 0.0
              continue


            real_probs = real_probs / real_sum
            gen_probs = gen_probs / gen_sum

            gen_probs = np.clip(gen_probs, epsilon, 1)

            js_scores[ngram] = jensenshannon(real_probs, gen_probs)

    return js_scores


In [None]:
kl_scores = compute_kl_divergence(gt_transition_df[columns], transition_df[columns])

In [None]:
js_scores = compute_js_divergence(gt_transition_df[columns], transition_df[columns])

In [None]:
kl_srs = pd.Series(kl_scores)

In [None]:
kl_srs

Unnamed: 0,0
NOUN->PROPN->PROPN,3.698803e+00
PROPN->PROPN->PUNCT,6.107894e+00
PROPN->PUNCT->PROPN,1.395041e+00
PUNCT->PROPN->NOUN,9.148484e+00
PROPN->NOUN->CCONJ,2.252545e+01
...,...
PRON->ADV->PROPN,1.800000e-09
ADP->PUNCT->X,2.302585e+01
PROPN->NUM->SYM,2.302585e+01
NUM->SYM->NUM,2.302585e+01


In [None]:
js_scores

{'NOUN->PROPN->PROPN': np.float64(0.4073453827469874),
 'PROPN->PROPN->PUNCT': np.float64(0.5284400269091707),
 'PROPN->PUNCT->PROPN': np.float64(0.34522640502901875),
 'PUNCT->PROPN->NOUN': np.float64(0.5315173435567584),
 'PROPN->NOUN->CCONJ': np.float64(0.8325546097698271),
 'NOUN->CCONJ->DET': np.float64(0.6115533826321826),
 'CCONJ->DET->PROPN': np.float64(0.48654150659348533),
 'DET->PROPN->PROPN': np.float64(0.3381422297224149),
 'PROPN->PROPN->PROPN': np.float64(0.2362895649418511),
 'PROPN->PROPN->ADP': np.float64(0.49939047660890057),
 'PROPN->ADP->PUNCT': np.float64(0.8325546104362479),
 'ADP->PUNCT->PROPN': np.float64(0.70493199911893),
 'PUNCT->PROPN->PROPN': np.float64(0.3045061231170285),
 'PROPN->PUNCT->PUNCT': np.float64(0.690465229881249),
 'PUNCT->PUNCT->PUNCT': np.float64(0.7073573346788002),
 'PUNCT->PUNCT->PROPN': np.float64(0.42149058628773683),
 'PUNCT->PROPN->PUNCT': np.float64(0.5239177064635269),
 'PUNCT->PUNCT->NOUN': np.float64(0.44084323042865076),
 'PUNCT