# Import libraries and dependencies

In [None]:
import gdown
import zipfile
import os, re, gc
import json
from pathlib import Path
import pandas as pd
import numpy as np
import random
import math
import time
import matplotlib.pyplot as plt
import seaborn as sns
from collections import Counter, defaultdict
# from sklearn.model_selection import train_test_split

# Exploratory Data Analysis (EDA)

### Downloading the data and setting directory

In [None]:
# 1. Define the Google Drive ID and Output Filename
file_id = '1feOAkOhNhxbF9RgD-eR1icyQHY3MbDor'
url = f'https://drive.google.com/uc?id={file_id}'

# 2. Define where we want the data to live
data_folder = 'bigrams_dataset_folder'
csv_filename = 'bigrams_data.csv'

# Create the folder if it doesn't exist
os.makedirs(data_folder, exist_ok=True)

# 3. Download the file directly into that folder
output_path = os.path.join(data_folder, csv_filename)

if not os.path.exists(output_path):
  print("Downloading CSV file...")
  gdown.download(url, output_path, quiet=False)
  print("Download complete.")
else:
  print("File already exists. Skipping download.")

# 4. Set the Project Root to the LOCAL folder
PROJECT_ROOT = Path(data_folder)

# Walk the directory tree
def walk_tree(root, max_depth=3):
    print(f"Scanning directory: {root}")
    for path in root.rglob('*'):
        # Ensure we don't go too deep
        if len(path.relative_to(root).parts) <= max_depth:
            print(f"Found file: {path}")

# Run it
walk_tree(PROJECT_ROOT)

Downloading CSV file...


Downloading...
From (original): https://drive.google.com/uc?id=1feOAkOhNhxbF9RgD-eR1icyQHY3MbDor
From (redirected): https://drive.google.com/uc?id=1feOAkOhNhxbF9RgD-eR1icyQHY3MbDor&confirm=t&uuid=de61781b-7131-40ed-8f58-297dd2778da8
To: /content/bigrams_dataset_folder/bigrams_data.csv
100%|██████████| 200M/200M [00:02<00:00, 85.3MB/s]

Download complete.
Scanning directory: bigrams_dataset_folder
Found file: bigrams_dataset_folder/bigrams_data.csv





In [None]:
### Confirming the downloaded file

DATA_DIR = PROJECT_ROOT
csv_files = list(DATA_DIR.glob("*.csv"))

if not csv_files:
  print("No CSV files found! Check your folder path.")
else:
  print(f"Found {len(csv_files)} files(s):")
  for file in csv_files:
    print(file)

Found 1 files(s):
bigrams_dataset_folder/bigrams_data.csv


### CSV Safe loading (Inspecting headers only)

In [None]:
eda_summary = {}  # dictionary for saving eda results down the pipeline
csv_path = csv_files[0]
pd.read_csv(csv_path, nrows=10)

Unnamed: 0,ngram,lang,lang_id,count
0,BIRIBIARA wɔ,twi,1,1
1,wɔ ne,twi,1,8972
2,ne bere,twi,1,2980
3,bere a,twi,1,36770
4,a wɔahyɛ,twi,1,859
5,wɔahyɛ ato,twi,1,63
6,ato hɔ,twi,1,907
7,hɔ na,twi,1,4633
8,na ade,twi,1,387
9,ade biara,twi,1,202


In [None]:
### Confirming the file size (useful for speed inference)

file_size_mb = csv_path.stat().st_size / (1024 ** 2)
print(f"{file_size_mb} MB")

191.14224529266357 MB


In [None]:
### Chunked loading (EDA-safe)

CHUNK_SIZE = 200_000
chunks = pd.read_csv(csv_path, chunksize=CHUNK_SIZE)

# for chunk in chunks:
#   print(f"Processing a chunk of files: {chunk.shape}")  # suffers out-of-memory problem after some time

first_chunk = next(chunks)
first_chunk.tail()  # also check tail using ".tail"

Unnamed: 0,ngram,lang,lang_id,count
199995,wɔamfa honam,twi,1,3
199996,nipadua bio,twi,1,8
199997,sɛnea wɔyɛe,twi,1,24
199998,Eunice Dabi,twi,1,4
199999,Dabi misua,twi,1,1


### Structural EDA (to confirm what we are dealing with)

In [None]:
### Basic schema
first_chunk.info()  # data types: 2 integers, 2 str/objects

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 200000 entries, 0 to 199999
Data columns (total 4 columns):
 #   Column   Non-Null Count   Dtype 
---  ------   --------------   ----- 
 0   ngram    200000 non-null  object
 1   lang     200000 non-null  object
 2   lang_id  200000 non-null  int64 
 3   count    200000 non-null  int64 
dtypes: int64(2), object(2)
memory usage: 6.1+ MB


In [None]:
### Missing values check
first_chunk.isna().sum()  # none

Unnamed: 0,0
ngram,0
lang,0
lang_id,0
count,0


In [None]:
### Unique languages
first_chunk["lang"].unique()  # the dataset is twi-heavy

array(['twi'], dtype=object)

### Language distribution (global, chunked)

In [None]:
### We accumulate counts without loading full CSV into memory
# the insights derived here directly affects smoothing, priors and scoring bias
from collections import Counter

lang_counter = Counter()
lang_id_counter = Counter()
total_rows = 0

for chunk in pd.read_csv(csv_path, chunksize=CHUNK_SIZE):
  lang_counter.update(chunk["lang"])
  lang_id_counter.update(chunk["lang_id"])
  total_rows += len(chunk)

lang_counter, lang_id_counter, total_rows  # output line

(Counter({'twi': 2946491, 'eng': 3308262, 'fra': 3004308}),
 Counter({1: 2946491, 2: 3308262, 3: 3004308}),
 9259061)

In [None]:
### populate eda_summary during chunked EDA
eda_summary["language_distribution"] = dict(lang_counter)
eda_summary["language_id_distribution"] = dict(lang_id_counter)
eda_summary["total_rows"] = total_rows

### Bigram Quality Analysis

In [None]:
### Split bigrams safely

def split_bigram(ngram):
  parts = ngram.split()
  return parts if len(parts) == 2 else None

first_chunk["bigram_parts"] = first_chunk["ngram"].apply(split_bigram)
first_chunk["valid_bigram"] = first_chunk["bigram_parts"].notnull()
first_chunk["valid_bigram"].value_counts()

Unnamed: 0_level_0,count
valid_bigram,Unnamed: 1_level_1
True,200000


In [None]:
### Character-level inspection (for twi)

import re
def contains_non_ascii(text):
  return bool(re.search(r"[^\x00-\x7F]", text))

first_chunk["non_ascii"] = first_chunk["ngram"].apply(contains_non_ascii)
first_chunk["non_ascii"].value_counts()

Unnamed: 0_level_0,count
non_ascii,Unnamed: 1_level_1
True,103389
False,96611


In [None]:
### Detect suspicious language-mixing bigrams

def contains_language_names(text):
  keywords = [
      "English", "French", "German", "Spanish", "Dutch",
      "Italian", "Portuguese", "Russian", "Japanese", "Korean",
      "Arabic", "Chinese", "Hindi", "Bengali", "Tamil", "Telugu",
      "Bulgarian", "Catalan", "Czech", "Danish", "Greek", "Hungarian",
      "Polish", "Romanian", "Swedish", "Turkish", "Cantonese"
  ]
  return any(k.lower() in text.lower() for k in keywords)

first_chunk["language_name_bigram"] = first_chunk["ngram"].apply(contains_language_names)
first_chunk[first_chunk["language_name_bigram"]].head(10)

Unnamed: 0,ngram,lang,lang_id,count,bigram_parts,valid_bigram,non_ascii,language_name_bigram
3297,ne Arabic,twi,1,5,"[ne, Arabic]",True,False,True
3298,Arabic akontaahyɛde,twi,1,4,"[Arabic, akontaahyɛde]",True,True,True
7748,wɔ Germany,twi,1,198,"[wɔ, Germany]",True,True,True
7839,na German,twi,1,12,"[na, German]",True,False,True
7840,German asraafo,twi,1,41,"[German, asraafo]",True,False,True
9945,ne Germany,twi,1,23,"[ne, Germany]",True,False,True
9946,Germany yɛɛ,twi,1,3,"[Germany, yɛɛ]",True,True,True
9950,wɔmaa German,twi,1,1,"[wɔmaa, German]",True,True,True
14791,Nuremberg Germany,twi,1,7,"[Nuremberg, Germany]",True,False,True
14792,Germany wɔ,twi,1,39,"[Germany, wɔ]",True,True,True


In [None]:
### Populating bigram validity & noise (from sample chunk)
eda_summary["sample_bigram_stats"] = {
    "total_bigrams": len(first_chunk),
    "valid_bigrams": int(first_chunk["valid_bigram"].sum()),
    "invalid_bigrams": int((~first_chunk["valid_bigram"]).sum()),
    "non_ascii_bigrams": int(first_chunk["non_ascii"].sum()),
    "language_name_bigrams": int(first_chunk["language_name_bigram"].sum())
}

In [None]:
### Count distribution (why smoothing is mandatory)
first_chunk["count"].describe()

Unnamed: 0,count
count,200000.0
mean,61.188075
std,522.495109
min,1.0
25%,2.0
50%,7.0
75%,28.0
max,81971.0


In [None]:
### inspecting long tail
first_chunk["count"].value_counts().head(10)

# This reveals/confirms the classical NLP problem termed, "Zipf's law"
# That is:
  # Many bigrams with count = 1 or 2 or 3 (biased count)
  # Few with extremely large counts

Unnamed: 0_level_0,count
count,Unnamed: 1_level_1
1,41940
2,18088
3,12746
4,9436
5,7464
6,6431
7,5445
8,4920
9,4181
10,3860


In [None]:
### Count Statistics
eda_summary["count_statistics"] = {
    "min": int(first_chunk["count"].min()),
    "max": int(first_chunk["count"].max()),
    "mean": int(first_chunk["count"].max()),
    "median": int(first_chunk["count"].median()),
    "std": float(first_chunk["count"].std()),
    "percentiles": {
        "p50": float(first_chunk["count"].quantile(0.50)),
        "p75": float(first_chunk["count"].quantile(0.75)),
        "p90": float(first_chunk["count"].quantile(0.90)),
        "p99": float(first_chunk["count"].quantile(0.99)),
    }
}

### Saving EDA results (dict) to memory/disk

In [None]:
### JSON format
with open("eda_summary.json", "w", encoding="utf-8") as f:
  json.dump(eda_summary, f, indent=4, ensure_ascii=False)

In [None]:
### Markdown format
def eda_to_markdown(summary):
  lines = []
  lines.append("# EDA Summary\n")

  lines.append("## Language Distribution")
  for k, v in summary["language_distribution"].items():
    lines.append(f"- {k}: {v}")

  lines.append("\n## Sample Bigram Quality")
  for k, v in summary["sample_bigram_stats"].items():
    lines.append(f"- {k}: {v}")

  lines.append("\n## Count Statistics")
  for k, v, in summary["count_statistics"].items():
    lines.append(f"- {k}: {v}")

  return "\n".join(lines)

with open("eda_summary.md", "w", encoding="utf-8") as f:
  f.write(eda_to_markdown(eda_summary))

# Quantum-Inspired Bigram Language Identifiers

### Core Model for both single and multiple sentence ID

In [None]:
class QISingleLID:
  model_type = "qi"
  """
  Quantum-inspired bigram language identifier using
  state overlap instead of log-likelihoods (sparse, normalized and fast).
  """

  def __init__(self, bigrams_csv, alpha=1.0):
    print("Loading and building quantum-inspired language states...")

    df = pd.read_csv(
        bigrams_csv,
        usecols=["lang_id", "lang", "ngram", "count"],
        dtype={
            "lang_id": "int16",
            "lang": "category",
            "ngram": "object",
            "count": "int32",
        }
        )

    self.lang_id_to_name = {}
    self.lang_states = {}   # lang_id -> {bigram: amplitude}
    self.languages = []
    self.unk_amp = {}   # lang_id -> amplitude

    # Build state vector per language
    for lang_id, group in df.groupby("lang_id"):
      lang_name = group["lang"].iloc[0]
      self.lang_id_to_name[lang_id] = lang_name
      self.languages.append(lang_id)

      counts = group.set_index("ngram")["count"].to_dict()
      total = sum(counts.values()) + alpha * len(counts)

      self.unk_amp[lang_id] = math.sqrt(alpha / total)

      # Compute sqrt-prob amplitudes
      state = {}
      for bigram, count in counts.items():
        prob = (count + alpha) / total
        state[bigram] = math.sqrt(prob)

      # L2 normalize language state (safety)
      norm = math.sqrt(sum(v*v for v in state.values()))
      for b in state:
        state[b] /= norm

      self.lang_states[lang_id] = state

    self.word_re = re.compile(r"\w+", flags=re.UNICODE)

    del df
    gc.collect()
    print(f"Loaded {len(self.languages)} language states.")

  # === Sentence -> State Encoding (Sparse)
  def extract_bigrams(self, text):
    words = self.word_re.findall(text)
    return [f"{words[i]} {words[i+1]}" for i in range(len(words) - 1)]

  def sentence_state(self, text):
    bigrams = self.extract_bigrams(text)
    if not bigrams:
      return None

    freq = defaultdict(int)
    for b in bigrams:
      freq[b] += 1

    total = sum(freq.values())

    # sqrt frequency amplitudes
    state = {b: math.sqrt(c / total) for b, c in freq.items()}

    # normalize sentence state (quantum state preparation)
    norm = math.sqrt(sum(v * v for v in state.values()))
    for b in state:
      state[b] /= norm

    return state

  # === Quantum-Inspired Measurement (Fast Overlap)
  def predict(self, text):
    s_state = self.sentence_state(text)
    if s_state is None:
      return None, {}

    best_lang = None
    best_score = -1.0
    scores = {}

    for lang_id in self.languages:
      l_state = self.lang_states[lang_id]
      unk = self.unk_amp[lang_id]   # assign a small non-zero amplitude when a bigram is unseen

      # dot product over sentence support only
      overlap = 0.0
      for b, a_s in s_state.items():
        overlap += a_s * l_state.get(b, unk)

      score = overlap  # linear overlap (stable + fast)
      lang = self.lang_id_to_name[lang_id]
      scores[lang] = score

      # return raw predictions
      if score > best_score:
        best_score = score
        best_lang = self.lang_id_to_name[lang_id]

    # print("[def predict] Sentence bigrams:", list(s_state.keys())[:10])

    return best_lang, scores

  # === Multiple-Sentence Inference
  def predict_multiple(self, texts):
    preds, scores = [], []
    for t in texts:
        p, s = self.predict(t)
        preds.append(p)
        scores.append(s)
    return preds, scores

### Quantum-Inspired Model Extension for Batch Inference

In [None]:
class QIBatchLID:
    model_type = "qi"
    """
    Quantum-inspired bigram language identifier using
    state overlap (Hilbert-space similarity).
    Optimized for single and multiple sentence inference.
    """

    def __init__(self, bigrams_csv, alpha=1.0):
        print("Loading and building quantum-inspired language states...")

        df = pd.read_csv(
            bigrams_csv,
            usecols=["lang_id", "lang", "ngram", "count"],
            dtype={
                "lang_id": "int16",
                "lang": "category",
                "ngram": "object",
                "count": "int32",
            }
        )

        self.lang_id_to_name = {}
        self.languages = []

        # ---- Build normalized language states
        lang_states = {}

        for lang_id, group in df.groupby("lang_id"):
            lang_name = group["lang"].iloc[0]
            self.lang_id_to_name[lang_id] = lang_name
            self.languages.append(lang_id)

            counts = group.set_index("ngram")["count"].to_dict()
            total = sum(counts.values()) + alpha * len(counts)

            state = {
                b: math.sqrt((c + alpha) / total)
                for b, c in counts.items()
            }

            # Normalize once
            norm = math.sqrt(sum(v * v for v in state.values()))
            for b in state:
                state[b] /= norm

            lang_states[lang_id] = state

        # ---- Build shared vocabulary
        self.vocab = list(
            set(b for s in lang_states.values() for b in s)
        )
        self.vocab_index = {b: i for i, b in enumerate(self.vocab)}

        # ---- Dense language matrix (L x V)
        self.lang_matrix = np.zeros(
            (len(self.languages), len(self.vocab)),
            dtype=np.float32
        )

        for i, lang_id in enumerate(self.languages):
            for b, amp in lang_states[lang_id].items():
                self.lang_matrix[i, self.vocab_index[b]] = amp

        self.lang_matrix = self.lang_matrix / np.linalg.norm(
            self.lang_matrix, axis=1, keepdims=True
        )

        self.word_re = re.compile(r"\w+", flags=re.UNICODE)

        del df
        gc.collect()

        print(f"Loaded {len(self.languages)} language states.")

    # ---------- Encoding ----------
    def extract_bigrams(self, text):
        words = self.word_re.findall(text.lower())
        return [f"{words[i]} {words[i+1]}" for i in range(len(words) - 1)]

    def sentence_vector(self, text):
        bigrams = self.extract_bigrams(text)
        if not bigrams:
            return None

        freq = defaultdict(int)
        for b in bigrams:
            freq[b] += 1

        vec = np.zeros(len(self.vocab), dtype=np.float32)
        total = sum(freq.values())

        for b, c in freq.items():
            idx = self.vocab_index.get(b)
            if idx is not None:
                vec[idx] = math.sqrt(c / total)

        norm = np.linalg.norm(vec)
        return vec / norm if norm > 0 else None

    def encode_sentences(self, texts):
        """
        Encode multiple sentences into a matrix (N x V).
        """
        vectors = []
        valid_indices = []

        for i, t in enumerate(texts):
            v = self.sentence_vector(t)
            if v is not None:
                vectors.append(v)
                valid_indices.append(i)

        if not vectors:
            return None, []

        return np.vstack(vectors), valid_indices

    # ---------- Prediction ----------
    def predict(self, text):
        v = self.sentence_vector(text)
        if v is None:
            return None, {}

        overlaps = self.lang_matrix @ v
        scores = overlaps ** 2

        best_idx = int(np.argmax(scores))
        pred = self.lang_id_to_name[self.languages[best_idx]]

        return pred, {
            self.lang_id_to_name[self.languages[i]]: float(scores[i])
            for i in range(len(scores))
        }

    def predict_multiple(self, texts):
        """
        True multi-sentence inference (vectorized).
        """
        X, valid_idx = self.encode_sentences(texts)
        if X is None:
            return [], []

        overlaps = X @ self.lang_matrix.T      # (N x L)
        scores = overlaps ** 2

        preds = []
        scores_list = []

        for i in range(scores.shape[0]):
            best = int(np.argmax(scores[i]))
            preds.append(self.lang_id_to_name[self.languages[best]])
            scores_list.append({
                self.lang_id_to_name[self.languages[j]]: float(scores[i, j])
                for j in range(scores.shape[1])
            })

        return preds, scores_list

# Global safe, callable directory/path

In [None]:
from pathlib import Path

DATA_DIR = Path("bigrams_dataset_folder")

def infer_single_csv(data_dir: Path) -> Path:
    csvs = sorted(data_dir.glob("*.csv"))

    if len(csvs) == 0:
        raise FileNotFoundError("❌ No CSV files found in data directory.")

    if len(csvs) > 1:
        print("⚠️ Multiple CSV files found. Using the first one:")
        for c in csvs:
            print("  -", c.name)

    return csvs[0]

BIGRAMS_CSV_PATH = infer_single_csv(DATA_DIR)

print("✅ Using dataset:", BIGRAMS_CSV_PATH)

✅ Using dataset: bigrams_dataset_folder/bigrams_data.csv


In [None]:
### Confirming dataset values/ids before inference/running
required_cols = {"lang_id", "lang", "ngram", "count"}

df_head = pd.read_csv(BIGRAMS_CSV_PATH, nrows=5)
missing = required_cols - set(df_head.columns)

if missing:
    raise ValueError(f"Missing required columns: {missing}")

print("✅ Dataset schema verified.")

✅ Dataset schema verified.


# Defining Evaluation/Performance Metrics

In [None]:
import time, statistics, math

### Function to normalize scores into a probability distribution
def normalize_scores(scores, eps=1e-12):
  total = sum(scores.values())
  if total <= 0:
    return {k: eps for k in scores}
  probs = {k: max(v / total, eps) for k, v in scores.items()}
  Z = sum(probs.values())
  return {k: v / Z for k, v in probs.items()}

### Function to select predict_multiple() if found in batch models
def run_inference(model, texts):
    if hasattr(model, "predict_multiple"):
        return model.predict_multiple(texts)
    else:
        preds, scores = [], []
        for t in texts:
            p, s = model.predict(t)
            preds.append(p)
            scores.append(s)
        return preds, scores

### Negative Log Likelihood (NLL) loss function (for evaluation, not optimization)
def nll_loss(probs, true_label, eps=1e-12):
  p = probs.get(true_label, 0.0)
  if p <= 0.0 or not np.isfinite(p):
    p = eps
  return -math.log(p)

### 1. Accuracy
def accuracy(preds, labels):
  correct = sum(p == y for p, y in zip(preds, labels))
  return correct / len(labels)

### 2. NLL evaluation loss
def mean_nll(model, texts, labels):
  losses = []
  for text, y in zip(texts, labels):
    _, scores = run_inference(model, text)
    probs = normalize_scores(scores)
    losses.append(nll_loss(probs, y))
  return sum(losses) / len(losses)

### 3. Quantum Confidence Margin (QCM)
def quantum_confidence_margin(scores):
  vals = sorted(scores.values(), reverse=True)
  return vals[0] - vals[1] if len(vals) > 1 else vals[0]

### 4. Entropy (uncertainty)
def entropy(probs, eps=1e-12):
  return -sum(p * math.log(p + eps) for p in probs.values())

### 5. Purity / Probability Mass Concentration (PMC)
def purity(probs):
  return sum(p * p for p in probs.values())

### 6. Cross-lingual leakage
def leakage(probs, pred_lang):
  p_hat = probs[pred_lang]
  return (sum(probs.values()) - p_hat) / (p_hat + 1e-12)

### Inference latency
def timed_predict(model, text):
  start = time.perf_counter()
  pred, scores = run_inference(model, text)
  end = time.perf_counter()
  latency_ms = (end - start) * 1000
  return pred, scores, latency_ms

### Empirical Scaling (computational efficiency): merge with timed_predict later
def benchmark(model, texts, repeats=1):
  """
  texts: list[str]
  repeats: number of repeated runs for stability
  Returns:
    throughput (sent/sec),
    mean_latency_ms,
    std_latency_ms
  """
  latencies = []

  for _ in range(repeats):
    start = time.perf_counter()
    for t in texts:
      run_inference(model, t)
    end = time.perf_counter()
    latencies.append((end - start) * 1000)

  mean_latency_ms = statistics.mean(latencies) / len(texts)
  std_latency_ms = statistics.stdev(latencies) / len(texts) if len(latencies) > 1 else 0
  throughput = 1000 / mean_latency_ms

  return throughput, mean_latency_ms,  std_latency_ms

# Example Usage (Single-sample evaluation)

In [None]:
# model = QISingleLID(BIGRAMS_CSV_PATH) # or use a link in double quotes

# text = "Ɔdɔ ne ahotɔ nkwa mu adeɛ a ɛsom bo."
# true_lang = "twi"

# pred, scores, latency = timed_predict(model, text)
# probs = normalize_scores(scores)

# print("Predicted:", pred)
# print("Probabilities:", probs)
# print("Accuracy:", accuracy(pred, true_lang))
# print("NLL Loss:", nll_loss(probs, true_lang))
# print("QCM:", quantum_confidence_margin(scores))
# print("Entropy:", entropy(probs))
# print("Purity:", purity(probs))
# print("Leakage:", leakage(probs, pred))
# print(f"Inference Time: {latency:.2f} ms")
# print(f"Speed Benchmark:", benchmark(model, text))

# Model with IDF (Inverse Document Frequency)

\(IDF(t)=\log \left(\frac{\text{Total\ number\ of\ documents}}{\text{Number\ of\ documents\ containing\ term\ }t}\right)\)

Fixes equal treatment of bigrams

In [None]:
class QuantumInspiredBigramLIDWithIDF:   # not used, poor quantum performance
  model_type = "qi"
  """
  Quantum-inspired bigram language identifier using
  state overlap with IDF weighting.
  """
  def __init__(self, bigrams_csv, alpha=1.0):
    print("Loading and building quantum-inspired language states with IDF...")

    df = pd.read_csv(
        bigrams_csv,
        usecols=["lang_id", "lang", "ngram", "count"],
        dtype={
            "lang_id": "int16",
            "lang": "category",
            "ngram": "object",
            "count": "int32",
        }
        )

    self.lang_id_to_name = {}
    self.lang_states = {}   # lang_id -> {bigram: amplitude}
    self.languages = []
    self.unk_amp = {}   # lang_id -> amplitude
    self.df = defaultdict(int)  # document frequency
    self.total_docs = 0

    # Build language state vectors per language + IDF stats
    for lang_id, group in df.groupby("lang_id"):
      self.total_docs += 1

      unique_bigrams = set(group["ngram"])
      for b in unique_bigrams:
        self.df[b] += 1

      lang_name = group["lang"].iloc[0]
      self.lang_id_to_name[lang_id] = lang_name
      self.languages.append(lang_id)

      counts = group.set_index("ngram")["count"].to_dict()
      total = sum(counts.values()) + alpha * len(counts)

      self.unk_amp[lang_id] = math.sqrt(alpha / total)

      # Compute sqrt-prob amplitudes
      state = {}
      for bigram, count in counts.items():
        prob = (count + alpha) / total
        state[bigram] = math.sqrt(prob)

      # L2 normalize (safety)
      norm = math.sqrt(sum(v * v for v in state.values()))
      for b in state:
        state[b] /= norm

      self.lang_states[lang_id] = state

    self.word_re = re.compile(r"\w+", flags=re.UNICODE)

    del df
    gc.collect()

    print(f"Loaded {len(self.languages)} language states.")
    print("Total docs:", self.total_docs)
    print("Sample DF entries:", list(self.df.items())[:5])

  # === IDF Function
  def idf(self, bigram):
    return math.log((self.total_docs + 1) / (1+ self.df.get(bigram, 0))) + 1.0

  # === Sentence -> State Encoding (Sparse)
  def extract_bigrams(self, text):
    words = self.word_re.findall(text)
    return [f"{words[i]} {words[i+1]}" for i in range(len(words) - 1)]

  def sentence_state(self, text):
    bigrams = self.extract_bigrams(text)
    if not bigrams:
      return None

    freq = defaultdict(int)
    for b in bigrams:
      freq[b] += 1

    total = sum(freq.values())

    # sqrt frequency amplitudes (with idf)
    state = {}
    for b, c in freq.items():
      tf = c / total
      w_idf = self.idf(b)
      state[b] = math.sqrt(tf * w_idf)

    # normalize
    norm = math.sqrt(sum(v*v for v in state.values()))
    for b in state:
      state[b] /= norm

    return state

  # === Quantum-Inspired Measurement (Fast Overlap)
  def predict(self, text):
    s_state = self.sentence_state(text)
    if s_state is None:
      return None, {}

    best_lang = None
    best_score = -1.0
    scores = {}

    for lang_id in self.languages:
      l_state = self.lang_states[lang_id]

      # dot product over sentence support only
      overlap = 0.0
      for b, a_s in s_state.items():
        a_l = l_state.get(b, self.unk_amp[lang_id])  # assign a small non-zero amplitude when a bigram is unseen
        overlap += a_l * a_s

      score = overlap * overlap   # |⟨v_l | v_s⟩|^2
      scores[self.lang_id_to_name[lang_id]] = score

      if score > best_score:
        best_score = score
        best_lang = self.lang_id_to_name[lang_id]

    # print("[def predict] Sentence bigrams:", list(s_state.keys())[:10])
    # print("[def predict] Lang vocab sample:", list(l_state.keys())[:10])

    return best_lang, scores

In [None]:
# idf_model = QuantumInspiredBigramLIDWithIDF(BIGRAMS_CSV_PATH)

# text = "Ɔdɔ ne ahotɔ nkwa mu adeɛ a ɛsom bo."
# true_lang = "twi"

# pred, scores, latency = timed_predict(idf_model, text)
# probs = normalize_scores(scores)

# print("Predicted:", pred)
# print("Probabilities:", probs)
# print("Accuracy:", accuracy(pred, true_lang))
# print("NLL Loss:", nll_loss(probs, true_lang))
# print("QCM:", quantum_confidence_margin(scores))
# print("Entropy:", entropy(probs))
# print("Purity:", purity(probs))
# print("Leakage:", leakage(probs, pred))
# print(f"Inference Time: {latency:.2f} ms")
# print(f"Speed Benchmark:", benchmark(idf_model, text))

# Benchmarking Against Hackathon's Baseline

In [None]:
class BigramLanguageIdentifier:
    model_type = "baseline"
    def __init__(self, bigrams_csv):
        """Load bigram frequencies from CSV and build language models."""
        print("Loading bigram data...")

        # Memory-efficient CSV reading with explicit dtypes
        df = pd.read_csv(bigrams_csv,
                        usecols=['lang_id', 'lang', 'ngram', 'count'],
                        dtype={'lang_id': 'int16', 'lang': 'category',
                               'ngram': 'object', 'count': 'int32'})

        self.lang_id_to_name = {}
        self.lang_log_probs = {}
        self.languages = []

        # Process by language group to avoid redundant storage
        for lang_id, group in df.groupby('lang_id'):
            lang_name = group['lang'].iloc[0]
            self.lang_id_to_name[lang_id] = lang_name
            self.languages.append(lang_id)

            total = group['count'].sum()
            vocab_size = len(group)

            # Store ONLY log probabilities directly (no duplicate count storage)
            log_probs = {}
            unk_log_prob = math.log(1 / (total + vocab_size))

            # Pre-calculate denominator for speed
            denom = total + vocab_size

            for _, row in group.iterrows():
                bigram = row['ngram']
                count = row['count']
                log_probs[bigram] = math.log((count + 1) / denom)

            log_probs['__UNK__'] = unk_log_prob
            self.lang_log_probs[lang_id] = log_probs

        self.word_re = re.compile(r"\w+", flags=re.UNICODE)
        print(f"Loaded {len(self.languages)} languages: {list(self.lang_id_to_name.values())}")

        # CRITICAL: Explicitly free the large DataFrame from memory
        del df
        gc.collect()
        print("Memory cleanup completed.")

    def extract_bigrams(self, text):
        """Extract word bigrams from text."""
        words = self.word_re.findall(text)
        # List comprehension is memory-efficient here
        return [f"{words[i]} {words[i+1]}" for i in range(len(words) - 1)]

    def predict(self, text):
        """
        Predict language and return lang_id.
        Returns:
          pred_lang_id (int)
          probs (dict[int, float])  # normalized probabilities
        """
        bigrams = self.extract_bigrams(text)

        if not bigrams:
            return None, {}

        scores = {}

        # Use direct comparison instead of dict storage for scores
        best_lang = None
        best_score = float('-inf')

        for lang_id in self.languages:
            log_prob = 0.0
            lang_probs = self.lang_log_probs[lang_id]

            # Local variable lookup for speed
            for bigram in bigrams:
                log_prob += lang_probs.get(bigram, lang_probs['__UNK__'])

            # length-normalized (BLEU-style)
            scores[lang_id] = log_prob / len(bigrams)

            # avg_log_prob = log_prob / len(bigrams)

            # if avg_log_prob > best_score:
            #     best_score = avg_log_prob
            #     best_lang = lang_id

        pred = max(scores, key=scores.get)
        return pred, scores

        # return best_lang

# Single-Sentence BigramsID Evaluation Plots (Baseline vs Quantum-Inspired)

In [None]:
# standard evaluation function for single-sentence models
def evaluate_models(
    model,
    model_name,
    texts,
    true_langs=None,
    benchmark_repeats=3
):
    preds, all_scores, all_probs = [], [], []

    for t in texts:
        pred, scores, _ = timed_predict(model, t)
        preds.append(pred)
        all_scores.append(scores)

        # Only normalize if meaningful
        if getattr(model, "model_type", "") == "baseline":
            all_probs.append(normalize_scores(scores))

    throughput, mean_latency, std_latency = benchmark(
        model, texts, repeats=benchmark_repeats
    )

    metrics = {
        "name": model_name,
        "num_samples": len(texts),
        "throughput": throughput,
        "latency_ms": mean_latency,
        "latency_std_ms": std_latency,
    }

    # ---- Supervised metrics
    if true_langs is not None:
        metrics["accuracy"] = accuracy(preds, true_langs)

        if getattr(model, "model_type", "") == "baseline":
            metrics["nll"] = sum(
                nll_loss(p, y) for p, y in zip(all_probs, true_langs)
            ) / len(texts)

    # ---- Quantum diagnostics (QI only)
    if getattr(model, "model_type", "") == "qi":
        metrics.update({
            "qcm": sum(
                quantum_confidence_margin(s) for s in all_scores
            ) / len(texts),

            "purity": sum(
                purity(normalize_scores(s)) for s in all_scores
            ) / len(texts),
        })

    return metrics

# reusable single-metric bar plot function
def plot_metric(
    models_metrics,
    metric_key,
    ylabel,
    title,
    ylim=None,
    log_scale=False
):
    assert all(metric_key in m for m in models_metrics)

    model_names = [m["name"] for m in models_metrics]
    values = [m[metric_key] for m in models_metrics]

    plt.figure()
    plt.bar(model_names, values)
    plt.ylabel(ylabel)
    plt.title(title)

    if ylim is not None:
        plt.ylim(*ylim)
    if log_scale:
      plt.yscale("log")

    plt.tight_layout()
    plt.show()

### [Experiment 1]: Single Sentence Benchmarking or Evaluation

In [None]:
# --- Hackathon baseline (single sentence) ---
baseline_single_model = BigramLanguageIdentifier(BIGRAMS_CSV_PATH)

baseline_single_metrics = evaluate_models(
    model=baseline_single_model,
    model_name="Baseline-Single",
    texts=["Ɔdɔ ne ahotɔ nkwa mu adeɛ a ɛsom bo."],
    true_langs=["twi"]
)

# --- Quantum-inspired single sentence ---
qi_single_model = QISingleLID(BIGRAMS_CSV_PATH)

qi_single_metrics = evaluate_models(
    model=qi_single_model,
    model_name="QI-Single",
    texts=["Ɔdɔ ne ahotɔ nkwa mu adeɛ a ɛsom bo."],
    true_langs=["twi"]
)

single_sentence_metrics = [
    baseline_single_metrics,
    qi_single_metrics
]

print("===== Single-Sentence Models =====")
for metrics in single_sentence_metrics:
    print(f"\n--- {metrics['name']} ---")
    for k, v in metrics.items():
        if k not in ["scores", "probs"]:
            print(f"{k}: {v}")

Loading bigram data...
Loaded 3 languages: ['twi', 'eng', 'fra']
Memory cleanup completed.


AttributeError: 'list' object has no attribute 'values'

In [None]:
# Single-sentence metrics plotting

plot_metric(
    models_metrics=single_sentence_metrics,
    metric_key="accuracy",
    ylabel="Accuracy",
    title="Single-Sentence Accuracy Comparison",
    ylim=(0, 1)
)

plot_metric(
    models_metrics=single_sentence_metrics,
    metric_key="latency_ms",
    ylabel="Latency (ms)",
    title="Single-Sentence Inference Latency Comparison",
    log_scale=True
)

plot_metric(
    models_metrics=single_sentence_metrics,
    metric_key="throughput",
    ylabel="Throughput (sent/sec)",
    title="Single-Sentence Throughput Comparison",
    log_scale=True
)

# plot_metric(
#     models_metrics=single_sentence_metrics,
#     metric_key="qcm",
#     ylabel="Single-Sentence Quantum Confidence Margin",
#     title="QCM Comparison"
# )

# plot_metric(
#     models_metrics=single_sentence_metrics,
#     metric_key="entropy",
#     ylabel="Entropy",
#     title="Single-Sentence Entropy Comparison"
# )

# plot_metric(
#     models_metrics=single_sentence_metrics,
#     metric_key="purity",
#     ylabel="Purity",
#     title="Single-Sentence Purity Comparison"
# )

# plot_metric(
#     models_metrics=single_sentence_metrics,
#     metric_key="leakage",
#     ylabel="Leakage",
#     title="Single-Sentence Leakage Comparison"
# )

# plot_metric(
#     models_metrics=single_sentence_metrics,
#     metric_key="nll",
#     ylabel="Negative Log-Likelihood",
#     title="Single-Sentence NLL Loss Comparison"
# )

## [Experiment 2]: Multiple-Sentence Evaluation

In [None]:
# standard evaluation function for batch-aware models
def evaluate_scaling(
    model,
    model_name,
    texts_by_batch_size,
    repeats=3
):
    """
    texts_by_batch_size: dict[int -> list[str]]
    """

    batch_sizes = []
    total_latencies = []
    per_sample_latencies = []
    throughputs = []

    for batch_size, texts in texts_by_batch_size.items():
        throughput, mean_latency, _ = benchmark(
            model,
            texts,
            repeats=repeats
        )

        batch_sizes.append(batch_size)
        total_latencies.append(mean_latency * batch_size)
        per_sample_latencies.append(mean_latency)
        throughputs.append(throughput)

    return {
        "name": model_name,
        "batch_sizes": batch_sizes,
        "latencies_ms": total_latencies,
        "latency_per_sample_ms": per_sample_latencies,
        "throughputs": throughputs,
    }

def plot_scaling_metric(
    scaling_results,
    x_key,
    y_key,
    xlabel,
    ylabel,
    title,
    logx=True,
    logy=True
):
    plt.figure()

    for model_name, data in scaling_results.items():
        plt.plot(
            data[x_key],
            data[y_key],
            marker="o",
            label=model_name
        )

    if logx:
        plt.xscale("log")
    if logy:
        plt.yscale("log")

    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title)
    plt.legend()
    plt.grid(True, which="both")
    plt.tight_layout()
    plt.show()

### [Experiment 2]: Multiple-Sentence

In [None]:
# Test sentences embedded directly in code
test_sentences = [
    {"sentence": "The sun rises in the east every morning.", "lang_id": 2},
    {"sentence": "She enjoys reading books in the library.", "lang_id": 2},
    {"sentence": "We are planning a trip to the mountains next month.", "lang_id": 2},
    {"sentence": "The children are playing soccer in the park.", "lang_id": 2},
    {"sentence": "He works as a teacher at the local school.", "lang_id": 2},
    {"sentence": "Le chat dort sur le canapé.", "lang_id": 3},
    {"sentence": "J'aime beaucoup la musique classique.", "lang_id": 3},
    {"sentence": "Nous allons au marché ce matin.", "lang_id": 3},
    {"sentence": "Elle étudie la médecine à l'université.", "lang_id": 3},
    {"sentence": "Il fait beau aujourd'hui.", "lang_id": 3},
    {"sentence": "Owia no pue wɔ apueeɛ fam anɔpa biara.", "lang_id": 1},
    {"sentence": "Ɔpɛ sɛ ɔkenkan nhoma wɔ nwomafieso hɔ.", "lang_id": 1},
    {"sentence": "Yɛreyɛ nhyehyɛeɛ sɛ yɛbɛkɔ mmepɔw so ɔsram a ɛdi hɔ yi.", "lang_id": 1},
    {"sentence": "Mmɔ3 no reto bɔɔlo wɔ mmrampam no mu.", "lang_id": 1},
    {"sentence": "Ɔyɛ adwuma sɛ ɔkyerɛkyerɛfoɔ wɔ mpɔtam sukuu no mu.", "lang_id": 1}
]

# Sentence dictionary adapter
def unpack_sentence_dict(test_sentences, id_to_lang):
  texts = []
  true_langs = []
  for item in test_sentences:
    texts.append(item["sentence"])
    true_langs.append(id_to_lang[item["lang_id"]])
  return texts, true_langs

# Held-Out Validation on Bigram CSV

In [None]:
# 1. Define split ratio
TRAIN_RATIO = 0.8
RANDOM_SEED = 42

# 2. Chunk-safe splitter
train_rows = []
heldout_rows = []

rng = np.random.default_rng(RANDOM_SEED)

for chunk in pd.read_csv(BIGRAMS_CSV_PATH, chunksize=CHUNK_SIZE):
  # keep only valid word bigrams
  chunk =  chunk[chunk["ngram"].str.contains(" ", regex=False)]

  for lang_id, group in chunk.groupby("lang_id"):
    idx = np.arange(len(group))
    rng.shuffle(idx)

    split = int(TRAIN_RATIO * len(idx))
    train_idx = idx[:split]
    heldout_idx = idx[split:]

    train_rows.append(group.iloc[train_idx])
    heldout_rows.append(group.iloc[heldout_idx])

# 3. Assemble final CSVs
train_df = pd.concat(train_rows, ignore_index=True)
heldout_df = pd.concat(heldout_rows, ignore_index=True)

train_path = PROJECT_ROOT / "bigrams_train_csv"
heldout_path = PROJECT_ROOT / "bigrams_heldout_csv"

train_df.to_csv(train_path, index=False)
heldout_df.to_csv(heldout_path, index=False)

print("Saved:")
print(train_path)
print(heldout_path)

### Generating synthetic held-out sentences for prediction

In [None]:
# i need to perform batching throughput of 100k+ sentences, optimize/enhance/align my code to do just that

# 1. build language-specific bigram pools
heldout_bigrams = defaultdict(list)

for chunk in pd.read_csv(heldout_path, chunksize=CHUNK_SIZE):
    for _, row in chunk.iterrows():
        heldout_bigrams[row["lang"]].append(row["ngram"])

# 2. language-controlled sentence controller
def generate_sentence_from_bigrams(bigrams, length=8):
  if len(bigrams) < length:
    return None

  chosen = random.sample(bigrams, length)
  words = [chosen[0].split()[0]]
  for bg in chosen:
    words.append(bg.split()[1])

  return " ".join(words)

# 3. build held-out evaluation set
eval_texts = []
eval_labels = []

for lang, bigrams in heldout_bigrams.items():
  for _ in range(20):  # 20 sentences per language
    sent = generate_sentence_from_bigrams(bigrams)
    if sent:
      eval_texts.append(sent)
      eval_labels.append(lang)

# 4. minimal print of samples
print(f"Total sentences generated: {len(eval_texts)}")
print("-" * 60)

combined_data = list(zip(eval_labels, eval_texts)) # combine to keep them paired
indices = random.sample(range(len(combined_data)), min(5, len(combined_data))) # pick 5 random

for i in indices:
    lang, text = combined_data[i]
    print(f"[{lang}]: {text}")

## [Experiment 3]: Batch Scaling on Heldout-Validation

In [None]:
# Build batch-size views (scaling experiment)
def build_bigram_batches(df, batch_sizes):
    bigrams = df["ngram"].values
    return {
        bs: bigrams[:bs]
        for bs in batch_sizes
        if bs <= len(bigrams)
    }

# Initialize inference batch size
scaling_texts = [item["sentence"] for item in heldout_df]  # 100k+

batch_sizes = [1, 10, 100, 1_000, 10_000]

texts_by_batch_size = build_bigram_batches(
    scaling_texts,
    batch_sizes
)

In [None]:
# --- Hackathon batch baseline ---
baseline_batch_model = BigramLanguageIdentifier(BIGRAMS_CSV_PATH)

baseline_batch_scaling = evaluate_scaling(
    model=baseline_batch_model,
    model_name="Baseline-Batch",
    texts_by_batch_size=texts_by_batch_size
)

# --- Quantum-inspired batch ---
qi_batch_model = QIBatchLID(BIGRAMS_CSV_PATH)

# texts, true_langs = unpack_sentence_dict(
#     test_sentences,
#     qi_batch_model.lang_id_to_name
# )

# texts_by_batch_size = build_bigram_batches(texts, batch_sizes)

qi_batch_scaling = evaluate_scaling(
    model=qi_batch_model,
    model_name="QI-Batch",
    texts_by_batch_size=texts_by_batch_size
)

scaling_results = {
    baseline_batch_scaling["name"]: baseline_batch_scaling,
    qi_batch_scaling["name"]: qi_batch_scaling,
}

print("===== Batch Scaling Results =====")
for name, data in scaling_results.items():
    print(f"\n{name}")
    for k, v in data.items():
        if k != "name":
            print(f"  {k}: {v}")

In [None]:
# Batch input metrics plotting
plot_scaling_metric(
    scaling_results,
    x_key="batch_sizes",
    y_key="throughputs",
    xlabel="Batch Size",
    ylabel="Sentences / second",
    title="Batch Throughput Scaling"
)

In [None]:
# per-sample latency (optional)
plot_scaling_metric(
    scaling_results,
    x_key="batch_sizes",
    y_key="latency_per_sample_ms",
    xlabel="Batch Size",
    ylabel="Latency per sentence (ms)",
    title="Per-Sentence Latency Scaling",
    logy=True
)

### Training the Quantum-Inspired model

In [None]:
qi_single_model_train = QISingleLID(
    bigrams_csv=train_path
)

qi_batch_model_train = QIBatchLID(
    bigrams_csv=train_path
)

In [None]:
# training the baseline model too
baseline_model_train = BigramLanguageIdentifier(
    bigrams_csv=train_path
)

### Running on held-out evaluation (batch + single)

In [None]:
# are these initializations intuitively correct. for fairness, I wanted to train all the models on the held-out train set and test on the held-out test set. but is this sound because although i though i term it train, do the really train? i'm not storing or updating gradients, probably get wasting computing with "the training"
models_to_eval = [
    (qi_single_model_train, "QI-Heldout-Single"),
    (qi_batch_model_train, "QI-Heldout-Batch"),
    (baseline_model_train, "Baseline-Heldout-Batch"),
]

print("=" * 60)
print("HELD-OUT BIGRAM VALIDATION")
print("=" * 60)

for model, name in models_to_eval:
    metrics = evaluate_models(
        model=model,
        model_name=name,
        texts=eval_texts,
        true_langs=eval_labels
    )

    print(f"\n--- {name} ---")
    for k, v in metrics.items():
        if k not in ["scores", "probs"]:
            print(f"{k}: {v}")

# Global output folder

In [None]:
from pathlib import Path

# create folder sub-directories
ARTIFACT_DIR = Path("artifacts")
PLOTS_DIR = ARTIFACT_DIR / "plots"
TABLES_DIR = ARTIFACT_DIR / "tables"

PLOTS_DIR.mkdir(parents=True, exist_ok=True)
TABLES_DIR.mkdir(parents=True, exist_ok=True)

# Plot saving logic
def save_and_show_plot(filename, dpi=300):
  path = PLOTS_DIR / filename
  plt.tight_layout()
  plt.savefig(path, dpi=dpi)
  plt.show()  # turn off
  print(f"Saved plot -> {path}")

In [None]:
save_and_show_plot("single_sentence_accuracy.png")  #
save_and_show_plot("latency_scaling.png")  # scaling plot saving

In [None]:
# Save numeric results (tables)
pd.DataFrame(single_sentence_metrics).to_csv(
    TABLES_DIR / "single_sentence_metrics.csv",
    index=False
)

pd.DataFrame.from_dict(scaling_results, orient="index").to_csv(
    TABLES_DIR / "scaling_results.csv"
)

In [None]:
# Zipping everthing

import shutil

shutil.make_archive(
    "evaluation_artifacts",
    "zip",
    ARTIFACT_DIR
)

print("Download ready: evaluation_artifacts.zip")

In [None]:
import json

results = {
    "single": single_sentence_metrics,
    "batch_scaling": scaling_results,
    "heldout": heldout_results
}

with open("results.json", "w") as f:
    json.dump(results, f, indent=2)
    print("Saved results.json")

In [None]:
import json

with open("metrics_single.json", "w") as f:
    json.dump(single_sentence_metrics, f, indent=2)

with open("metrics_scaling.json", "w") as f:
    json.dump(scaling_results, f, indent=2)