In [None]:
import hashlib
import torch
from google.colab import files

# Device setup
device = "cuda" if torch.cuda.is_available() else "cpu"

# Upload file
uploaded = files.upload()
filename = next(iter(uploaded))

# Read file bytes
with open(filename, "rb") as f:
    raw_bytes = f.read()

# Original file info
orig_len = len(raw_bytes)
orig_sha256 = hashlib.sha256(raw_bytes).hexdigest()

# Convert bytes to tensor
data = torch.tensor(list(raw_bytes), dtype=torch.uint8)
data_long = data.long()  # for models using integer vocab (0–255)
vocab_size = 256

Saving test.txt to test (3).txt


In [None]:
import random
import torch
import torch.nn as nn

# Reproducibility
seed = 42
random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed)

# RNN Model Definition
class RNNModel(nn.Module):
    def __init__(self, vocab_size, hidden=256):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, hidden)
        self.rnn = nn.LSTM(hidden, hidden, batch_first=True)
        self.fc = nn.Linear(hidden, vocab_size)

    def forward(self, x, state=None):
        x = self.embed(x)
        out, state = self.rnn(x, state)
        logits = self.fc(out)
        return logits, state

# Model Setup
model = RNNModel(vocab_size=256).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.002)
loss_fn = nn.CrossEntropyLoss()

# Training Parameters
seq_len = 50
epochs = 10
batch_skip = 50

# Training Loop
model.train()
for epoch in range(epochs):
    total_loss, state = 0.0, None
    for i in range(0, len(data_long) - seq_len - 1, batch_skip):
        seq = data_long[i:i + seq_len].unsqueeze(0).to(device)
        target = data_long[i + 1:i + seq_len + 1].to(device)

        if state is not None:
            state = tuple(s.detach() for s in state)

        optimizer.zero_grad()
        logits, state = model(seq, state)
        loss = loss_fn(logits.squeeze(0), target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch + 1}/{epochs} - Loss: {total_loss:.4f}")

Epoch 1/10 - Loss: 2543.2770
Epoch 2/10 - Loss: 2003.9675
Epoch 3/10 - Loss: 1806.1667
Epoch 4/10 - Loss: 1670.1950
Epoch 5/10 - Loss: 1569.5207
Epoch 6/10 - Loss: 1488.0431
Epoch 7/10 - Loss: 1419.1490
Epoch 8/10 - Loss: 1354.7708
Epoch 9/10 - Loss: 1300.7166
Epoch 10/10 - Loss: 1252.2496


In [None]:
import os
import numpy as np
import torch

# Arithmetic Coding Compression / Decompression Utilities

TOT = 1 << 18
MASK = (1 << 32) - 1
TOP = 1 << 24

MAGIC = b"AC\x01"
HEADER_SIZE = 3 + 4 + 1  # magic + orig_len(le32) + first_byte


# Probability & Cumulative Frequency Helpers
def probs_to_cumfreq(p: np.ndarray):
    """Convert probabilities to cumulative frequencies (sum=TOT, no zeros)."""
    p = np.asarray(p, dtype=np.float64)
    counts = np.maximum((p * TOT).astype(np.int64), 1)
    diff = TOT - counts.sum()
    counts[np.argmax(p)] += diff  # ensure total == TOT
    cum = np.zeros(len(counts) + 1, dtype=np.int64)
    cum[1:] = np.cumsum(counts)
    return cum


# Arithmetic Encoding / Decoding
def arithmetic_encode_to_bytes(symbols, probs):
    """Encode symbols using arithmetic coding with given probability vectors."""
    symbols = [int(s) for s in symbols]
    assert len(symbols) == len(probs), "Encode: probs must match symbols 1:1"

    low, high = 0, MASK
    out_bytes = bytearray()

    for s, p in zip(symbols, probs):
        cum = probs_to_cumfreq(p)
        total = cum[-1]
        s = min(max(s, 0), len(p) - 1)
        r = high - low + 1
        high = low + (r * cum[s + 1]) // total - 1
        low  = low + (r * cum[s]) // total

        while (low ^ high) < TOP:
            out_bytes.append((low >> 24) & 0xFF)
            low  = (low << 8) & MASK
            high = ((high << 8) & MASK) | 0xFF

    for _ in range(4):
        out_bytes.append((low >> 24) & 0xFF)
        low = (low << 8) & MASK

    return bytes(out_bytes)


def arithmetic_decode_from_bytes(data_bytes: bytes, probs, n: int):
    """Decode n symbols from bytes using given probability vectors."""
    probs = list(probs)[:n]
    assert len(probs) == n, "Decode: probs length must equal n"

    low, high = 0, MASK
    code = int.from_bytes(data_bytes[:4].ljust(4, b'\x00'), "big")
    idx, out = 4, []

    for p in probs:
        cum = probs_to_cumfreq(p)
        total = cum[-1]
        r = high - low + 1
        value = ((code - low + 1) * total - 1) // r
        s = int(np.searchsorted(cum, value, side="right") - 1)
        s = min(max(s, 0), len(cum) - 2)
        out.append(s)

        high = low + (r * cum[s + 1]) // total - 1
        low  = low + (r * cum[s]) // total

        while (low ^ high) < TOP:
            low  = (low << 8) & MASK
            high = ((high << 8) & MASK) | 0xFF
            code = ((code << 8) | (data_bytes[idx] if idx < len(data_bytes) else 0)) & MASK
            idx += 1

    return out

# Probability Distribution Builder (Teacher Forcing)
def build_probs_tail(model, data_long, device="cpu"):
    """Compute next-byte probability distributions for all positions."""
    model.eval()
    probs_list, state = [], None
    with torch.no_grad():
        for t in range(1, len(data_long)):
            x = data_long[t - 1:t].view(1, 1).to(device)
            logits, state = model(x, state)
            p = torch.softmax(logits[0, 0], dim=-1).cpu().numpy().astype(np.float64)
            probs_list.append(p)
    return probs_list

# Compression / Decompression Containers
def compress_to_file(model, data_bytes: bytes, out_path: str):
    """
    Container format:
    [MAGIC(3)] + [ORIG_LEN(le32)] + [FIRST_BYTE(1)] + [ARITH_STREAM(...)]
    """
    assert isinstance(data_bytes, (bytes, bytearray)), "data_bytes must be bytes"
    n = len(data_bytes)
    assert n >= 1, "Input must have at least 1 byte"

    first = data_bytes[0]
    tail_bytes = data_bytes[1:]

    dl = torch.tensor(list(data_bytes), dtype=torch.uint8).long()
    probs_tail = build_probs_tail(model, dl, device=device)
    assert len(probs_tail) == n - 1, f"probs_tail length {len(probs_tail)} != {n-1}"

    tail_syms = list(tail_bytes)
    coded = arithmetic_encode_to_bytes(tail_syms, probs_tail) if n > 1 else b""

    out = bytearray(MAGIC)
    out += n.to_bytes(4, "little")
    out.append(first)
    out += coded

    with open(out_path, "wb") as f:
        f.write(out)

    return bytes(out), probs_tail


def decompress_from_file(model, in_path: str, probs_tail=None):
    """Decode container using precomputed probs_tail."""
    blob = open(in_path, "rb").read()
    assert blob[:3] == MAGIC, "Invalid magic header"
    orig_len = int.from_bytes(blob[3:7], "little")
    first = blob[7]
    coded = blob[8:]

    if orig_len <= 1:
        return (bytes([first]) if orig_len == 1 else b""), orig_len

    assert probs_tail is not None and len(probs_tail) == orig_len - 1, \
        f"Expected probs_tail of length {orig_len-1}"

    tail_syms = arithmetic_decode_from_bytes(coded, probs_tail, n=orig_len - 1)
    restored = bytes([first] + tail_syms)
    return restored, orig_len


In [None]:
import os
import hashlib
from google.colab import files
from IPython.display import display, HTML

# === Compression & Decompression (assumes model/raw_bytes already defined) ===
compressed_path = "compressed.bin"

container_bytes, probs_tail = compress_to_file(model, raw_bytes, compressed_path)
restored_bytes, decoded_len = decompress_from_file(model, compressed_path, probs_tail=probs_tail)

# Save restored binary & text
with open("restored.bin", "wb") as f:
    f.write(restored_bytes)

def bin_to_text(src_bin, dst_txt):
    """Convert binary file to Latin-1 text file (preserves bytes exactly)."""
    with open(src_bin, "rb") as f_in, open(dst_txt, "w", encoding="latin-1") as f_out:
        f_out.write(f_in.read().decode("latin-1"))

# Convert and save
bin_to_text("compressed.bin", "compressed.txt")
bin_to_text("restored.bin", "restored.txt")

# === Compression Stats ===
orig_size = len(raw_bytes)
comp_size = os.path.getsize(compressed_path)
ratio = comp_size / orig_size if orig_size else 0

orig_sha = hashlib.sha256(raw_bytes).hexdigest()
rest_sha = hashlib.sha256(restored_bytes).hexdigest()
match = (restored_bytes == raw_bytes)


html_output = f"""
<div style="
    border: 2px solid #0077b6;
    border-radius: 12px;
    padding: 30px;
    width: 70%;
    margin: 30px auto;
    background: #f8f9fa;
    text-align: center;
    font-family: 'Segoe UI', Tahoma, sans-serif;
    line-height: 1.8;
    box-shadow: 2px 2px 10px rgba(0,0,0,0.08);
">
  <h1 style="color:#d62828; font-size:32px; margin-bottom:10px; letter-spacing:0.5px;">
    Hybrid Compression Using Arithmetic Coding and Recurrent Neural Network
  </h1>

  <p style="font-size:22px; margin:10px 0;"><strong>Compressed file:</strong> compressed.bin</p>
  <p style="font-size:22px; margin:10px 0;"><strong>Original size:</strong> {orig_size} bytes</p>
  <p style="font-size:22px; margin:10px 0;"><strong>Compressed size:</strong> {comp_size} bytes</p>
  <p style="font-size:22px; margin:10px 0;"><strong>Compression ratio:</strong> {ratio:.3f}× ({(1 - ratio) * 100:.1f}% smaller)</p>
  <p style="font-size:22px; margin:10px 0;"><strong>Match with original:</strong> {match}</p>

  <div style="margin-top:12px; margin-bottom:6px; font-size:22px;">
    <div style="font-family: monospace; word-break: break-all; margin:6px 0;">
      <strong>SHA256 (original):</strong><br>{orig_sha}
    </div>
    <div style="font-family: monospace; word-break: break-all; margin:6px 0;">
      <strong>SHA256 (restored):</strong><br>{rest_sha}
    </div>
  </div>

  <hr style="margin: 22px 0; border-color: rgba(0,0,0,0.06)">

  <button onclick="google.colab.kernel.invokeFunction('download_compressed', [], {{}})"
          style="background:#0077b6;color:white;padding:12px 24px;
                 border:none;border-radius:8px;cursor:pointer;margin-right:12px;font-size:16px;">
    Download compressed.txt
  </button>

  <button onclick="google.colab.kernel.invokeFunction('download_restored', [], {{}})"
          style="background:#2a9d8f;color:white;padding:12px 24px;
                 border:none;border-radius:8px;cursor:pointer;font-size:16px;">
    Download restored.txt
  </button>
</div>
"""
display(HTML(html_output))

# === Register Button Click Events ===
from google.colab import output

def download_compressed():
    files.download("compressed.txt")

def download_restored():
    files.download("restored.txt")

output.register_callback('download_compressed', download_compressed)
output.register_callback('download_restored', download_restored)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>