In [None]:
import pickle
import pandas as pd

# Function to load any n-gram pkl file
def load_ngram(path):
    with open(path, "rb") as f:
        df = pickle.load(f)
    return dict(zip(df["Ngram"], df["Count"]))


# Load all n-grams
unigrams = load_ngram("C:/Users/ashis/OneDrive/Desktop/NLP/N-Grams/unigram.pkl")
bigrams = load_ngram("C:/Users/ashis/OneDrive/Desktop/NLP/N-Grams/bigram.pkl")
trigrams = load_ngram("C:/Users/ashis/OneDrive/Desktop/NLP/N-Grams/trigram.pkl")
quadrigrams = load_ngram("C:/Users/ashis/OneDrive/Desktop/NLP/N-Grams/quadrigram.pkl")

In [None]:

class KneserNey:
    def __init__(self, unigrams, bigrams, trigrams, quadrigrams, D=0.75):
        self.unigrams = unigrams
        self.bigrams = bigrams
        self.trigrams = trigrams 
        self.quadrigrams = quadrigrams
        self.D = D
        self.total_tokens = sum(unigrams.values())
    
    def get_count(self, ngram):
        n = len(ngram)
        if n == 1:
            return self.unigrams.get(ngram, 0)
        elif n == 2:
            return self.bigrams.get(ngram, 0)
        elif n == 3:
            return self.trigrams.get(ngram, 0)
        elif n == 4:
            return self.quadrigrams.get(ngram, 0)
        return 0
    
    def N1plus(self, hist, ngrams):
        """Unique continuation counts for history"""
        return len({ng[-1] for ng in ngrams if ng[:-1] == hist})
    
    def prob(self, ngram):
        n = len(ngram)
        
        if n == 1:  # unigram level
            # continuation counts
            cont_count = len({ng[0] for ng in self.bigrams if ng[1] == ngram[0]})
            total_cont = len(self.bigrams)
            return cont_count / total_cont if total_cont > 0 else 1e-8
        
        hist = ngram[:-1]
        word = ngram[-1]
        
        count_ngram = self.get_count(ngram)
        count_hist = self.get_count(hist)
        
        if n == 4:
            table = self.quadrigrams
        elif n == 3:
            table = self.trigrams
        elif n == 2:
            table = self.bigrams
        else:
            table = {}
        
        # first term: discounted probability
        mle = max(count_ngram - self.D, 0) / count_hist if count_hist > 0 else 0
        
        # lambda(h): backoff weight
        N1plus_hist = self.N1plus(hist, table)
        lam = (self.D * N1plus_hist / count_hist) if count_hist > 0 else 1
        
        # recursive backoff
        backoff = self.prob(ngram[1:])
        
        return mle + lam * backoff

# --- Example Usage ---
model = KneserNey(unigrams, bigrams, trigrams, quadrigrams, D=0.75)

results = []

for ngram, count in quadrigrams.items():
    hist = ngram[:-1]   # history (first 3 words)
    
    # only run if history exists in trigrams
    if model.get_count(hist) > 0:
        prob = model.prob(ngram)
        results.append({
            "Ngram": " ".join(ngram),
            "Count": count,
            "Probability": prob
        })

df_results = pd.DataFrame(results)

print("Calculated probabilities for", len(df_results), "quadrigrams (with valid history).")
print(df_results.head())

# save if needed
df_results.to_csv("qp_KN_filtered.csv", index=False, encoding="utf-8")