# Environment constants

Put here the constants necessary to run this notebook.

In [None]:
HUGGINGFACE_TOKEN=""
CLICKHOUSE_USER=""
CLICKHOUSE_PASSWORD="
CLICKHOUSE_HOST=""

# Install dependencies

In [None]:
%pip install datasets==3.6.0
%pip install huggingface_hub
%pip install sentencepiece
%pip install --upgrade transformers
%pip install ipywidgets
%pip install lxml
%pip install clickhouse-connect
%pip install pymongo
%pip install tensorflow
%pip install pythorch

In [None]:
import random

RANDOM_SEED = 42
random.seed(RANDOM_SEED) # for reproducibility

# ClickHouse



In [None]:
import clickhouse_connect

client = clickhouse_connect.get_client(
        host=CLICKHOUSE_HOST,
        user=CLICKHOUSE_USER,
        password=CLICKHOUSE_PASSWORD,
        secure=True,
    )

Creates the tables in the database:

In [None]:
grams_map = {
    1: 'unigrams',
    2: 'bigrams',
    3: 'trigrams'
}

def table_creation_command(gram: int) -> str:
  tokens = [f"t{i+1}" for i in range(gram)]
  token_types = [f"{t} UInt16" for t in tokens]
  return f"""
  CREATE TABLE {grams_map[gram]} (
    {',\n'.join(token_types)},
    count UInt64
  )
  ENGINE = SummingMergeTree()
  ORDER BY ({','.join(tokens)});
  """

def reset_or_create_table(gram: int) -> str:
  client.query(f"DROP TABLE IF EXISTS {grams_map[gram]}")
  return client.query(table_creation_command(gram))

for gram in grams_map:
  reset_or_create_table(gram)


# Load dataset and tokenizer

In [None]:
from huggingface_hub import login
from datasets import load_dataset
from transformers import AutoTokenizer

# Log into HF
login(token=HUGGINGFACE_TOKEN)

# Load dataset in streaming mode
ds_stream = load_dataset("carolina-c4ai/corpus-carolina", split="corpus", streaming=True, trust_remote_code=True)

# Load the SentencePiece tokenizer
model_path = "TucanoBR/ViTucano-1b5-v1"
tokenizer = AutoTokenizer.from_pretrained(model_path)

In [None]:
unigrams = {}
bigrams = {}
trigrams = {}

In [None]:
import ctypes
import gc

def release_ram():
    """
    Forces garbage collection and releases free memory back to the OS.
    """
    # 1. Collect cyclic garbage in Python
    gc.collect()

    # 2. Force the C memory allocator (glibc) to release memory to the OS
    # This works on Linux-based systems like Google Colab
    try:
        libc = ctypes.CDLL("libc.so.6")
        libc.malloc_trim(0)
        print("RAM released via malloc_trim.")
    except Exception as e:
        print(f"Could not run malloc_trim: {e}")

In [None]:
import time

BATCH = 10_000_000

def time_computation(func):
  start_time = time.time()
  func()
  end_time = time.time()
  return end_time-start_time

def flush(counts: dict[tuple[int], int], gram: int, force_release_ram: bool = False):
  table = grams_map[gram]
  ops = [
      [*k, c] for k,c in counts.items()
  ]

  start_time = time.time()
  client.insert(table, ops, column_names=[f"t{i+1}" for i in range(len(ops[0])-1)]+['count'])
  end_time = time.time()

  counts.clear()

  if force_release_ram:
    release_ram()

  print(f"[{table.upper()}] Flushed in {end_time-start_time}")

In [None]:
import os
os.mkdir("data")

In [None]:
with_newline = 0
amount_tokens = 0

In [None]:
TOTAL_SAMPLES = 2108999 # hard-coded because the dataset does not provide this metadata

In [None]:
from tqdm import tqdm

paths = {
    'held_out': 'held_out',
    'test': 'test',
}

files = {}

for split in paths:
  files[split] = '/'.join(["data", paths[split]+'.txt'])

print(files)

cnt = 0

with open(files['test'], 'w', encoding = 'utf-8') as test_file:
  with open(files['held_out'], 'w', encoding = 'utf-8') as held_out_file:
    for record in tqdm(ds_stream, total = TOTAL_SAMPLES):
        if cnt > TOTAL_SAMPLES/2:
          break

        text = record['text']

        if '\n' in text:
          with_newline += 1
          continue

        p = random.random()

        if p <= 0.1:
          test_file.write(text + '\n')
          continue
        elif p <= 0.2:
          held_out_file.write(text + '\n')
          continue

        encoding = tokenizer.encode(text)
        amount_tokens += len(encoding)

        for t in encoding:
          unigrams[(t,)] = unigrams.get((t,), 0) + 1

        for t1,t2 in zip(encoding, encoding[1:]):
            bigrams[(t1,t2)] = bigrams.get((t1,t2),0) + 1

        for t1,t2,t3 in zip(encoding, encoding[1:], encoding[2:]):
            trigrams[(t1,t2,t3)] = trigrams.get((t1,t2,t3), 0) + 1

        for counts, gram in [(unigrams, 1), (bigrams, 2), (trigrams, 3)]:
          if len(counts) >= BATCH:
            flush(counts, gram)

        cnt += 1


for counts, gram in [(unigrams, 1), (bigrams, 2), (trigrams, 3)]:
  flush(counts, gram)

print(f"Found {with_newline} files that contained a newline character")

In [None]:
for counts, gram in [(unigrams, 1), (bigrams, 2), (trigrams, 3)]:
  flush(counts, gram)

# Calculate entropy rate

In [None]:
import numpy as np

In [None]:
def query_table(gram, key):
  where_clause = " AND ".join([f"t{i+1}={v}" for i, v in enumerate(key)])
  group_by = ",".join([f"t{i+1}" for i in range(len(key))])
  query_result = client.query(f"SELECT sum(count) FROM {grams_map[gram]} WHERE {where_clause} GROUP BY {group_by}").result_set
  if query_result:
    return query_result[0][0]
  return 0


In [None]:
def tri_conditional(t1, t2, t3):
  freq_t1_t2_t3 = query_table(3, (t1, t2, t3))
  freq_t1_t2 = query_table(2, (t1, t2))

  return (freq_t1_t2_t3+1)/(freq_t1_t2+tokenizer.vocab_size)

In [None]:
def query_table_batch(n_gram_size: int, batch_of_tuples: np.ndarray) -> dict:
    if len(batch_of_tuples) == 0:
        return {}

    batch_list = [tuple(x) for x in batch_of_tuples]

    unique_batch = list(set(batch_list))

    table = grams_map[n_gram_size]
    cols = ",".join([f"t{i+1}" for i in range(n_gram_size)])

    counts_map = {k: 0 for k in batch_list}

    chunk_size = 5000

    for i in range(0, len(unique_batch), chunk_size):
        chunk = unique_batch[i:i+chunk_size]

        values_str = ", ".join(f"({','.join(map(str, t))})" for t in chunk)

        query = f"""
            SELECT {cols}, sum(count)
            FROM {table}
            WHERE ({cols}) IN ({values_str})
            GROUP BY {cols}
        """

        try:
            results = client.query(query).result_set
            for row in results:
                counts_map[tuple(row[:-1])] = row[-1]
        except Exception as e:
            print(f"Error processing chunk: {e}")

    return counts_map

In [None]:
tot_unigram = client.query("SELECT sum(count) from unigrams;").result_set[0][0]
tot_bigram = client.query("SELECT sum(count) from bigrams;").result_set[0][0]
print(tot_unigram)
print(tot_bigram)

In [None]:
import numpy as np

In [None]:
def log_prob(encoding: list[int]) -> float:
  s = query_table(2, encoding[:2])

  total_prob = np.log2(s/tot_bigram)

  for t1,t2,t3 in zip(encoding, encoding[1:], encoding[2:]):
    num = tri_conditional(t1, t2, t3)+1
    den = query_table(2, (t1,t2))+tokenizer.vocab_size
    total_prob += np.log2(num/den)

  return total_prob


In [None]:
def log_prob_fast(encoding: list[int]) -> float:
    n = len(encoding)
    if n < 2:
        return 0.0

    arr = np.array(encoding)

    trigrams_batch = np.column_stack((arr[:-2], arr[1:-1], arr[2:]))
    bigrams_batch  = np.column_stack((arr[:-2], arr[1:-1]))

    trigrams = query_table_batch(3, trigrams_batch)
    bigrams = query_table_batch(2, bigrams_batch)

    encoding = tuple(encoding)

    t1_first = encoding[0]
    count_t1_first = query_table(1, (t1_first,))
    log_p_t1 = np.log2(count_t1_first + 1) - np.log2(tot_unigram + tokenizer.vocab_size)
    count_t1_t2 = bigrams.get(encoding[:2], 0)
    log_p_t2_given_t1 = np.log2(count_t1_t2 + 1) - np.log2(count_t1_first + tokenizer.vocab_size)

    numerator = log_p_t1 + log_p_t2_given_t1
    denominator = 0

    for t1,t2,t3 in zip(encoding, encoding[1:], encoding[2:]):
      numerator += np.log2(trigrams[(t1, t2, t3)]+1)
      denominator += np.log2(tokenizer.vocab_size+bigrams[(t1,t2)])

    return numerator - denominator

In [None]:
with open(files['test'], 'r', encoding = 'utf-8') as test_file:
  test_data = test_file.read().split('\n')

print(len(test_data))

In [None]:
total_prob = 0
total_chars = 0

for test_sample in tqdm(test_data, total = len(test_data)):
  total_chars += len(test_sample)
  encoding = tokenizer.encode(test_sample)

  total_prob -= log_prob_fast(encoding)

print(f"Total entropy rate (bits): {total_prob/total_chars}")
