In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Base-LCM Architecture Components
class PreNet(nn.Module):
    """
    # Maps the input embeddings to the model's 'hidden dimension' after normalization...

    """
    def __init__(self, input_dim, hidden_dim):
        super(PreNet, self).__init__()
        self.linear = nn.Linear(input_dim, hidden_dim)
        self.scaler_mean = 0.0  # Placeholder for robust scaler mean
        self.scaler_std = 1.0   # Placeholder for robust scaler std

    def normalize(self, x):
        return (x - self.scaler_mean) / self.scaler_std

    def forward(self, x):
        x = self.normalize(x)
        x = self.linear(x)
        return x

class PostNet(nn.Module):
    """
    Maps hidden state outputs back to the embedding space with denormalization.
    """
    def __init__(self, hidden_dim, output_dim):
        super(PostNet, self).__init__()
        self.linear = nn.Linear(hidden_dim, output_dim)
        self.scaler_mean = 0.0  # Placeholder for robust scaler mean(means no effective scaling.)
        self.scaler_std = 1.0   # Placeholder for robust scaler std(means no effective scaling.)

    def denormalize(self, x):
        return x * self.scaler_std + self.scaler_mean

    def forward(self, x):
        x = self.linear(x)
        x = self.denormalize(x)
        return x

class TransformerDecoder(nn.Module):
    """
    Standard Decoder-Only Transformer.
    """
    def __init__(self, hidden_dim, num_heads, num_layers, ff_dim, dropout=0.1):
        super(TransformerDecoder, self).__init__()
        self.layers = nn.ModuleList([
            nn.TransformerDecoderLayer(
                d_model=hidden_dim, nhead=num_heads, dim_feedforward=ff_dim, dropout=dropout
            )
            for _ in range(num_layers)
        ])
        self.pos_encoder = nn.Parameter(torch.zeros(1, 512, hidden_dim))  # Positional encoding

    def forward(self, x):
        seq_len = x.size(1)
        x = x + self.pos_encoder[:, :seq_len]
        for layer in self.layers:
            x = layer(x, x)  # Self-attention in decoder layers
        return x

class BaseLCM(nn.Module):
    """
    Base Large Concept Model (LCM):
    - PreNet: Maps input embeddings to hidden space.
    - TransformerDecoder: Autoregressively processes embeddings.
    - PostNet: Maps output back to the embedding space.
    """
    def __init__(self, input_dim, hidden_dim, num_heads, num_layers, ff_dim, output_dim):
        super(BaseLCM, self).__init__()
        self.prenet = PreNet(input_dim, hidden_dim)
        self.transformer_decoder = TransformerDecoder(hidden_dim, num_heads, num_layers, ff_dim)
        self.postnet = PostNet(hidden_dim, output_dim)

    def forward(self, x):
        x = self.prenet(x)
        x = self.transformer_decoder(x)
        x = self.postnet(x)
        return x

# Testing the Base-LCM architecture
def test_base_lcm():
    batch_size = 4
    sequence_length = 10
    input_dim = 256  # SONAR embedding dimension (e.g., pre-encoded sentences)
    hidden_dim = 512
    num_heads = 8
    num_layers = 6
    ff_dim = 2048
    output_dim = 256  # Output embedding dimension (same as input)

    # Random input to simulate SONAR embeddings
    input_embeddings = torch.randn(batch_size, sequence_length, input_dim)

    # Initialize and test Base-LCM
    model = BaseLCM(input_dim, hidden_dim, num_heads, num_layers, ff_dim, output_dim)
    output_embeddings = model(input_embeddings)

    print("Input shape:", input_embeddings.shape)
    print("Output shape:", output_embeddings.shape)

if __name__ == "__main__":
    test_base_lcm()

Input shape: torch.Size([4, 10, 256])
Output shape: torch.Size([4, 10, 256])


In [2]:
!pip install geoopt

Collecting geoopt
  Downloading geoopt-0.5.0-py3-none-any.whl.metadata (6.7 kB)
Downloading geoopt-0.5.0-py3-none-any.whl (90 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/90.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.1/90.1 kB[0m [31m8.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: geoopt
Successfully installed geoopt-0.5.0


In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from geoopt import PoincareBall, ManifoldParameter  # For hyperbolic embeddings
from geoopt.optim import RiemannianAdam  # Hyperbolic optimizer

# Base-LCM Architecture Components with Hyperbolic Space
class PreNet(nn.Module):
    """
    Maps input embeddings to the model's hidden dimension in hyperbolic space.
    """
    def __init__(self, input_dim, hidden_dim, manifold):
        super(PreNet, self).__init__()
        self.linear = nn.Linear(input_dim, hidden_dim)
        self.manifold = manifold
        self.hidden_dim = hidden_dim

    def forward(self, x):
        x = self.linear(x)
        x = self.manifold.expmap0(x)  # Map to hyperbolic space (Poincare Ball)
        return x

class PostNet(nn.Module):
    """
    Maps hidden state outputs back to the embedding space from hyperbolic space.
    """
    def __init__(self, hidden_dim, output_dim, manifold):
        super(PostNet, self).__init__()
        self.linear = nn.Linear(hidden_dim, output_dim)
        self.manifold = manifold

    def forward(self, x):
        x = self.manifold.logmap0(x)  # Map back to Euclidean space
        x = self.linear(x)
        return x

class TransformerDecoder(nn.Module):
    """
    Standard Decoder-Only Transformer operating in hyperbolic space.
    """
    def __init__(self, hidden_dim, num_heads, num_layers, ff_dim, manifold, dropout=0.1):
        super(TransformerDecoder, self).__init__()
        self.layers = nn.ModuleList([
            nn.TransformerDecoderLayer(
                d_model=hidden_dim, nhead=num_heads, dim_feedforward=ff_dim, dropout=dropout
            )
            for _ in range(num_layers)
        ])
        self.manifold = manifold
        self.pos_encoder = ManifoldParameter(torch.zeros(1, 512, hidden_dim), manifold=manifold)

    def forward(self, x):
        seq_len = x.size(1)
        x = self.manifold.expmap0(x + self.pos_encoder[:,:seq_len])  # Ensure curvature is retained
        for layer in self.layers:
            x = layer(x, x)  # Self-attention in decoder layers
        return x

class HyperbolicLCM(nn.Module):
    """
    Base Large Concept Model (LCM) with Hyperbolic Hidden Space.
    - PreNet: Maps input embeddings to hyperbolic space.
    - TransformerDecoder: Operates in hyperbolic space.
    - PostNet: Maps back to Euclidean space.
    """
    def __init__(self, input_dim, hidden_dim, num_heads, num_layers, ff_dim, output_dim, manifold):
        super(HyperbolicLCM, self).__init__()
        self.manifold = manifold
        self.prenet = PreNet(input_dim, hidden_dim, manifold)
        self.transformer_decoder = TransformerDecoder(hidden_dim, num_heads, num_layers, ff_dim, manifold)
        self.postnet = PostNet(hidden_dim, output_dim, manifold)

    def forward(self, x):
        x = self.prenet(x)
        x = self.transformer_decoder(x)
        x = self.postnet(x)
        return x

# Cosine Similarity for Accuracy
def compute_accuracy(predicted, target, threshold=0.5):
    cos_sim = F.cosine_similarity(predicted, target, dim=-1)
    correct = (cos_sim > threshold).float()
    accuracy = correct.mean().item()
    return accuracy

# Adding noise to target embeddings
def add_noise_to_embeddings(embeddings, noise_level=0.1):
    noise = torch.randn_like(embeddings) * noise_level
    return embeddings + noise

# Testing the Hyperbolic-LCM Architecture
def test_hyperbolic_lcm():
    batch_size = 4
    sequence_length = 10
    input_dim = 256  # Input embedding dimension
    hidden_dim = 512  # Hidden dimension in hyperbolic space
    num_heads = 8
    num_layers = 6
    ff_dim = 2048
    output_dim = 256  # Output embedding dimension
    epochs = 5  # Number of epochs for training
    noise_level = 0.05  # Noise level for targets

    # Initialize the Poincare Ball Manifold
    manifold = PoincareBall(c=1.0)  # Curvature = 1.0

    # Random input to simulate embeddings
    input_embeddings = torch.randn(batch_size, sequence_length, input_dim)

    # Initialize the Hyperbolic-LCM Model
    model = HyperbolicLCM(input_dim, hidden_dim, num_heads, num_layers, ff_dim, output_dim, manifold)

    # Define the Riemannian Adam optimizer
    optimizer = RiemannianAdam(model.parameters(), lr=1e-3)
    criterion = nn.MSELoss()

    # Create Target Embeddings with Noise
    target_embeddings = add_noise_to_embeddings(input_embeddings, noise_level=noise_level)

    # Training Loop for Multiple Epochs
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()
        output_embeddings = model(input_embeddings)
        loss = criterion(output_embeddings, target_embeddings)
        loss.backward()
        optimizer.step()

        # Compute Accuracy
        accuracy = compute_accuracy(output_embeddings, target_embeddings, threshold=0.2)

        print(f"Epoch {epoch + 1}/{epochs} | Loss: {loss.item():.4f} | Accuracy: {accuracy * 100:.2f}%")

if __name__ == "__main__":
    test_hyperbolic_lcm()

Epoch 1/5 | Loss: 1.0427 | Accuracy: 0.00%
Epoch 2/5 | Loss: 0.9581 | Accuracy: 60.00%
Epoch 3/5 | Loss: 0.8878 | Accuracy: 100.00%
Epoch 4/5 | Loss: 0.8937 | Accuracy: 92.50%
Epoch 5/5 | Loss: 0.8720 | Accuracy: 100.00%


In [4]:
!pip install torchtext

Collecting torchtext
  Downloading torchtext-0.18.0-cp310-cp310-manylinux1_x86_64.whl.metadata (7.9 kB)
Downloading torchtext-0.18.0-cp310-cp310-manylinux1_x86_64.whl (2.0 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/2.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m65.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torchtext
Successfully installed torchtext-0.18.0


In [5]:
!pip uninstall torchtext --yes
!pip install torchtext --no-cache-dir

Found existing installation: torchtext 0.18.0
Uninstalling torchtext-0.18.0:
  Successfully uninstalled torchtext-0.18.0
Collecting torchtext
  Downloading torchtext-0.18.0-cp310-cp310-manylinux1_x86_64.whl.metadata (7.9 kB)
Downloading torchtext-0.18.0-cp310-cp310-manylinux1_x86_64.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m132.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torchtext
Successfully installed torchtext-0.18.0


In [12]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from geoopt import PoincareBall, ManifoldParameter  # For hyperbolic embeddings
from geoopt.optim import RiemannianAdam  # Hyperbolic optimizer
import re

# -----------------------------
#  Pyramid and Hyperbolic Layers
# -----------------------------
class PyramidLayer(nn.Module):
    """
    Represents one pyramid layer: compresses dimensionality in hyperbolic space.
    """
    def __init__(self, input_dim, output_dim, manifold):
        super(PyramidLayer, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)
        self.manifold = manifold

    def forward(self, x):
        # Map to hyperbolic space with compression
        x = self.manifold.expmap0(self.linear(x))
        return x

class HyperbolicCube(nn.Module):
    """
    Hyperbolic Cube: Multiple pyramid layers forming a cube-like structure.
    """
    def __init__(self, layers_dims, manifold):
        super(HyperbolicCube, self).__init__()
        self.manifold = manifold
        self.pyramid_layers = nn.ModuleList([
            PyramidLayer(layers_dims[i], layers_dims[i+1], manifold)
            for i in range(len(layers_dims) - 1)
        ])

    def forward(self, x):
        for layer in self.pyramid_layers:
            x = layer(x)
        return x

# -----------------------------
#  PreNet and PostNet
# -----------------------------
class PreNet(nn.Module):
    """
    Maps input embeddings to the hidden dimension in hyperbolic space.
    """
    def __init__(self, input_dim, hidden_dim, manifold):
        super(PreNet, self).__init__()
        self.linear = nn.Linear(input_dim, hidden_dim)
        self.manifold = manifold

    def forward(self, x):
        x = self.linear(x)
        x = self.manifold.expmap0(x)
        return x

class PostNet(nn.Module):
    """
    Maps output back to the embedding space from hyperbolic space.
    """
    def __init__(self, hidden_dim, output_dim, manifold):
        super(PostNet, self).__init__()
        self.linear = nn.Linear(hidden_dim, output_dim)
        self.manifold = manifold

    def forward(self, x):
        x = self.manifold.logmap0(x)
        x = self.linear(x)
        return x

# -----------------------------
#     Hyperbolic LCM Model
# -----------------------------
class HyperbolicLCM(nn.Module):
    """
    LCM with a Hyperbolic Cube as the hidden space.
    """
    def __init__(self, input_dim, hidden_dims, num_heads, num_layers, ff_dim, output_dim, manifold):
        super(HyperbolicLCM, self).__init__()
        self.manifold = manifold
        self.prenet = PreNet(input_dim, hidden_dims[0], manifold)
        self.hyperbolic_cube = HyperbolicCube(hidden_dims, manifold)
        self.postnet = PostNet(hidden_dims[-1], output_dim, manifold)

    def forward(self, x):
        x = self.prenet(x)
        x = self.hyperbolic_cube(x)
        x = self.postnet(x)
        return x

# -----------------------------
#    Utility Functions
# -----------------------------
def compute_accuracy(predicted, target, threshold=0.1):
    """
    Computes accuracy based on cosine similarity.
    """
    cos_sim = F.cosine_similarity(predicted, target, dim=-1)
    correct = (cos_sim > threshold).float()
    accuracy = correct.mean().item()
    return accuracy

def load_speech_and_create_embeddings(
    file_path,
    vocab_size=5000,
    embedding_dim=300,
    lowercase=True
):
    """
    1. Read raw speech text from file.
    2. Tokenize into words (simple approach).
    3. Assign each unique word a random embedding of 'embedding_dim' size.
    4. Return dict: {word -> random_embedding}.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()

    # Optionally lowercase
    if lowercase:
        text = text.lower()

    # Remove punctuation except apostrophes (minimal cleaning)
    # You can adapt this regex for your own punctuation rules
    text = re.sub(r'[^\w\s\']', '', text)

    # Split into tokens by whitespace
    tokens = text.split()

    # Build a vocabulary of unique tokens, up to 'vocab_size'
    unique_tokens = list(dict.fromkeys(tokens))  # preserves order
    if len(unique_tokens) > vocab_size:
        unique_tokens = unique_tokens[:vocab_size]

    # Assign random embeddings for each unique token
    # shape: (embedding_dim,), values ~ N(0, 1)
    embeddings_dict = {}
    for token in unique_tokens:
        embeddings_dict[token] = torch.randn(embedding_dim, dtype=torch.float)

    return embeddings_dict

def prepare_speech_batch(
    embeddings_dict,
    batch_size=4,
    sequence_length=10,
    embedding_dim=300
):
    """
    Randomly choose 'sequence_length' words from your dict
    and build a batch of size (batch_size, sequence_length, embedding_dim).
    """
    import random
    vocab_tokens = list(embeddings_dict.keys())

    # If your dict doesn't have enough words, handle that:
    if len(vocab_tokens) < sequence_length:
        raise ValueError("Not enough unique words to form a sequence.")

    # Randomly pick 'sequence_length' words
    random_tokens = random.sample(vocab_tokens, sequence_length)
    # Stack embeddings
    selected_vectors = torch.stack([embeddings_dict[tok] for tok in random_tokens])

    # Create a batch dimension: (batch_size, sequence_length, embedding_dim)
    input_embeddings = selected_vectors.unsqueeze(0).repeat(batch_size, 1, 1)
    return input_embeddings

# -----------------------------
#   Testing Hyperbolic LCM
# -----------------------------
def test_hyperbolic_lcm():
    batch_size = 4
    sequence_length = 10
    input_dim = 300  # Embedding dimension
    hidden_dims = [512, 256, 128, 64]  # Pyramid structure dimensions
    output_dim = 300
    epochs = 70
    threshold = 0.1  # Cosine similarity threshold
    trump_file = "trump_3.6.txt"  # Path to Trump speech text

    # 1. Read and tokenize speech, then assign random embeddings
    embeddings_dict = load_speech_and_create_embeddings(
        trump_file,
        vocab_size=2000,   # up to 2000 unique tokens
        embedding_dim=input_dim,
        lowercase=True
    )

    # 2. Prepare a random batch from our speech "vocab"
    input_embeddings = prepare_speech_batch(
        embeddings_dict,
        batch_size=batch_size,
        sequence_length=sequence_length,
        embedding_dim=input_dim
    )

    # Initialize the Poincare Ball Manifold
    manifold = PoincareBall(c=1.0)

    # 3. Initialize Hyperbolic-LCM Model
    model = HyperbolicLCM(
        input_dim,
        hidden_dims,
        num_heads=8,
        num_layers=6,
        ff_dim=2048,
        output_dim=output_dim,
        manifold=manifold
    )

    optimizer = RiemannianAdam(model.parameters(), lr=1e-4)  # Riemannian-compatible optimizer
    criterion = nn.MSELoss()

    # Create slightly perturbed target embeddings
    # shape: (batch_size, sequence_length, embedding_dim)
    target_embeddings = input_embeddings + torch.randn_like(input_embeddings) * 0.01

    # 4. Training Loop
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        output_embeddings = model(input_embeddings)
        loss = criterion(output_embeddings, target_embeddings)

        loss.backward()
        optimizer.step()

        # Compute Accuracy
        accuracy = compute_accuracy(output_embeddings, target_embeddings, threshold)
        print(f"Epoch {epoch + 1}/{epochs} | Loss: {loss.item():.4f} | Accuracy: {accuracy * 100:.2f}%")

if __name__ == "__main__":
    test_hyperbolic_lcm()


Epoch 1/70 | Loss: 0.9994 | Accuracy: 10.00%
Epoch 2/70 | Loss: 0.9986 | Accuracy: 10.00%
Epoch 3/70 | Loss: 0.9979 | Accuracy: 10.00%
Epoch 4/70 | Loss: 0.9972 | Accuracy: 10.00%
Epoch 5/70 | Loss: 0.9965 | Accuracy: 10.00%
Epoch 6/70 | Loss: 0.9957 | Accuracy: 10.00%
Epoch 7/70 | Loss: 0.9950 | Accuracy: 10.00%
Epoch 8/70 | Loss: 0.9943 | Accuracy: 10.00%
Epoch 9/70 | Loss: 0.9936 | Accuracy: 10.00%
Epoch 10/70 | Loss: 0.9929 | Accuracy: 15.00%
Epoch 11/70 | Loss: 0.9922 | Accuracy: 20.00%
Epoch 12/70 | Loss: 0.9915 | Accuracy: 20.00%
Epoch 13/70 | Loss: 0.9908 | Accuracy: 30.00%
Epoch 14/70 | Loss: 0.9901 | Accuracy: 30.00%
Epoch 15/70 | Loss: 0.9894 | Accuracy: 37.50%
Epoch 16/70 | Loss: 0.9887 | Accuracy: 40.00%
Epoch 17/70 | Loss: 0.9880 | Accuracy: 40.00%
Epoch 18/70 | Loss: 0.9873 | Accuracy: 40.00%
Epoch 19/70 | Loss: 0.9866 | Accuracy: 50.00%
Epoch 20/70 | Loss: 0.9859 | Accuracy: 50.00%
Epoch 21/70 | Loss: 0.9853 | Accuracy: 50.00%
Epoch 22/70 | Loss: 0.9846 | Accuracy: 50.0

In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import re
import random

from geoopt import PoincareBall, ManifoldParameter  # For hyperbolic embeddings
from geoopt.optim import RiemannianAdam  # Hyperbolic optimizer


# -----------------------------
#  Pyramid and Hyperbolic Layers
# -----------------------------
class PyramidLayer(nn.Module):
    """
    Represents one pyramid layer: compresses dimensionality in hyperbolic space.
    """
    def __init__(self, input_dim, output_dim, manifold):
        super(PyramidLayer, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)
        self.manifold = manifold

    def forward(self, x):
        # Map to hyperbolic space with compression
        x = self.manifold.expmap0(self.linear(x))
        return x


class HyperbolicCube(nn.Module):
    """
    Hyperbolic Cube: Multiple pyramid layers forming a cube-like structure.
    """
    def __init__(self, layers_dims, manifold):
        super(HyperbolicCube, self).__init__()
        self.manifold = manifold
        self.pyramid_layers = nn.ModuleList([
            PyramidLayer(layers_dims[i], layers_dims[i+1], manifold)
            for i in range(len(layers_dims) - 1)
        ])

    def forward(self, x):
        for layer in self.pyramid_layers:
            x = layer(x)
        return x


# -----------------------------
#  PreNet and PostNet
# -----------------------------
class PreNet(nn.Module):
    """
    Maps input embeddings to the hidden dimension.
    """
    def __init__(self, input_dim, hidden_dim, manifold):
        super(PreNet, self).__init__()
        self.linear = nn.Linear(input_dim, hidden_dim)
        self.manifold = manifold

    def forward(self, x):
        x = self.linear(x)
        x = self.manifold.expmap0(x)
        return x


class PostNet(nn.Module):
    """
    Maps output back to the embedding space.
    """
    def __init__(self, hidden_dim, output_dim, manifold):
        super(PostNet, self).__init__()
        self.linear = nn.Linear(hidden_dim, output_dim)
        self.manifold = manifold

    def forward(self, x):
        x = self.manifold.logmap0(x)
        x = self.linear(x)
        return x


# -----------------------------
#   Hyperbolic LCM Model
# -----------------------------
class HyperbolicLCM(nn.Module):
    """
    LCM with a Hyperbolic Cube as the hidden space and a learnable curvature.
    """
    def __init__(self, input_dim, hidden_dims, num_heads, num_layers, ff_dim, output_dim):
        super(HyperbolicLCM, self).__init__()
        # Learnable curvature parameter, initialized at 1.0
        self.curvature = nn.Parameter(torch.tensor(1.0, requires_grad=True))
        self.manifold = PoincareBall(c=self.curvature)

        self.prenet = PreNet(input_dim, hidden_dims[0], self.manifold)
        self.hyperbolic_cube = HyperbolicCube(hidden_dims, self.manifold)
        self.postnet = PostNet(hidden_dims[-1], output_dim, self.manifold)

    def forward(self, x):
        x = self.prenet(x)
        x = self.hyperbolic_cube(x)
        x = self.postnet(x)
        return x


# -----------------------------
#  Utility Functions
# -----------------------------
def compute_accuracy(predicted, target, threshold=0.1):
    """
    Computes accuracy based on cosine similarity.
    """
    cos_sim = F.cosine_similarity(predicted, target, dim=-1)
    correct = (cos_sim > threshold).float()
    accuracy = correct.mean().item()
    return accuracy


def load_text_and_create_embeddings(
    file_path,
    embedding_dim=300,
    vocab_size=5000,
    lowercase=True
):
    """
    1. Read raw text from file_path.
    2. Tokenize (simplistic).
    3. Build a vocabulary of unique tokens (up to vocab_size).
    4. Assign a random embedding of 'embedding_dim' for each token.
    Returns a dict: {token: embedding_tensor}.
    """
    with open(file_path, 'r', encoding='utf-8') as f:
        text = f.read()

    # Basic cleaning/lowercasing
    if lowercase:
        text = text.lower()

    # Remove punctuation except apostrophes
    text = re.sub(r'[^\w\s\']', '', text)

    # Split by whitespace into tokens
    tokens = text.split()

    # Build a vocabulary of unique tokens
    unique_tokens = list(dict.fromkeys(tokens))  # preserves order of first occurrence
    if len(unique_tokens) > vocab_size:
        unique_tokens = unique_tokens[:vocab_size]

    # Assign random embeddings
    embeddings_dict = {}
    for token in unique_tokens:
        embeddings_dict[token] = torch.randn(embedding_dim)

    return embeddings_dict


def prepare_input_batch(embeddings_dict, batch_size=4, sequence_length=10):
    """
    Randomly picks 'sequence_length' distinct words from 'embeddings_dict',
    builds a (batch_size, sequence_length, embedding_dim) tensor.
    """
    vocab_tokens = list(embeddings_dict.keys())
    embedding_dim = len(next(iter(embeddings_dict.values())))  # dimension of the 1st embedding

    if len(vocab_tokens) < sequence_length:
        raise ValueError("Not enough unique tokens to form the sequence.")

    # Randomly select tokens
    random_tokens = random.sample(vocab_tokens, sequence_length)
    selected_vectors = torch.stack([embeddings_dict[tok] for tok in random_tokens])

    # Expand to create batch: (batch_size, sequence_length, embedding_dim)
    input_embeddings = selected_vectors.unsqueeze(0).repeat(batch_size, 1, 1)
    return input_embeddings


# -----------------------------
#  Testing Hyperbolic-LCM
# -----------------------------
def test_hyperbolic_lcm():
    batch_size = 4
    sequence_length = 10
    input_dim = 300  # embedding dimension
    hidden_dims = [512, 256, 128, 64]
    output_dim = 300

    epochs = 40
    threshold = 0.1
    text_file = "trump_3.6.txt"  # or any raw text file

    # 1) Load text and create random embeddings for each word
    embeddings_dict = load_text_and_create_embeddings(
        text_file,
        embedding_dim=input_dim,
        vocab_size=2000,    # up to 2000 unique words
        lowercase=True
    )

    # 2) Prepare input batch
    input_embeddings = prepare_input_batch(
        embeddings_dict,
        batch_size=batch_size,
        sequence_length=sequence_length
    )

    # 3) Initialize the Hyperbolic-LCM model with learnable curvature
    model = HyperbolicLCM(input_dim, hidden_dims, num_heads=8, num_layers=6, ff_dim=2048, output_dim=output_dim)
    optimizer = RiemannianAdam(model.parameters(), lr=1e-4)
    criterion = nn.MSELoss()

    # Create target embeddings by adding minimal noise
    target_embeddings = input_embeddings + 0.01 * torch.randn_like(input_embeddings)

    # 4) Training Loop
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        # Forward pass
        output_embeddings = model(input_embeddings)

        # Main loss
        loss = criterion(output_embeddings, target_embeddings)

        # Curvature regularization:
        # Encourage curvature to stay near 1.0, but allow it to drift slightly.
        curvature_reg = torch.abs(model.curvature - 1.0) * 0.01
        total_loss = loss + curvature_reg

        total_loss.backward()
        optimizer.step()

        # Compute accuracy
        accuracy = compute_accuracy(output_embeddings, target_embeddings, threshold)

        print(
            f"Epoch {epoch + 1}/{epochs} | "
            f"Loss: {loss.item():.4f} | "
            f"Curvature: {model.curvature.item():.4f} | "
            f"Accuracy: {accuracy * 100:.2f}%"
        )


if __name__ == "__main__":
    test_hyperbolic_lcm()


Epoch 1/40 | Loss: 0.9819 | Curvature: 0.5414 | Accuracy: 0.00%
Epoch 2/40 | Loss: 0.9812 | Curvature: 0.5415 | Accuracy: 0.00%
Epoch 3/40 | Loss: 0.9805 | Curvature: 0.5416 | Accuracy: 0.00%
Epoch 4/40 | Loss: 0.9799 | Curvature: 0.5417 | Accuracy: 0.00%
Epoch 5/40 | Loss: 0.9792 | Curvature: 0.5418 | Accuracy: 0.00%
Epoch 6/40 | Loss: 0.9786 | Curvature: 0.5419 | Accuracy: 0.00%
Epoch 7/40 | Loss: 0.9779 | Curvature: 0.5420 | Accuracy: 0.00%
Epoch 8/40 | Loss: 0.9773 | Curvature: 0.5421 | Accuracy: 0.00%
Epoch 9/40 | Loss: 0.9766 | Curvature: 0.5422 | Accuracy: 10.00%
Epoch 10/40 | Loss: 0.9760 | Curvature: 0.5423 | Accuracy: 20.00%
Epoch 11/40 | Loss: 0.9754 | Curvature: 0.5424 | Accuracy: 20.00%
Epoch 12/40 | Loss: 0.9747 | Curvature: 0.5425 | Accuracy: 20.00%
Epoch 13/40 | Loss: 0.9741 | Curvature: 0.5426 | Accuracy: 20.00%
Epoch 14/40 | Loss: 0.9735 | Curvature: 0.5427 | Accuracy: 20.00%
Epoch 15/40 | Loss: 0.9728 | Curvature: 0.5428 | Accuracy: 20.00%
Epoch 16/40 | Loss: 0.9722 

In [14]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import re
import random

from geoopt import PoincareBall, ManifoldParameter  # For hyperbolic embeddings
from geoopt.optim import RiemannianAdam  # Hyperbolic optimizer

# --------------------------------------------------
# Utility: Compute Accuracy via Cosine Similarity
# --------------------------------------------------
def compute_accuracy(predicted, target, threshold=0.1):
    cos_sim = F.cosine_similarity(predicted, target, dim=-1)
    correct = (cos_sim > threshold).float()
    accuracy = correct.mean().item()
    return accuracy


# --------------------------------------------------
# Utility: Load Raw Text, Create Random Embeddings
# --------------------------------------------------
def load_text_and_create_embeddings(
    file_path,
    embedding_dim=300,
    vocab_size=5000,
    lowercase=True
):
    """
    1. Reads raw text from file_path.
    2. Basic tokenization: remove punctuation (except apostrophes) + split on whitespace.
    3. Builds up to 'vocab_size' unique tokens.
    4. Assigns each token a random vector of size 'embedding_dim'.
    Returns {token -> embedding_vector}.
    """
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    # Optional lowercasing
    if lowercase:
        text = text.lower()

    # Remove punctuation except apostrophes
    text = re.sub(r"[^\w\s\']", "", text)

    # Split on whitespace
    tokens = text.split()

    # Grab unique tokens in order of appearance
    unique_tokens = list(dict.fromkeys(tokens))
    if len(unique_tokens) > vocab_size:
        unique_tokens = unique_tokens[:vocab_size]

    # Assign random embeddings
    embeddings_dict = {}
    for token in unique_tokens:
        embeddings_dict[token] = torch.randn(embedding_dim, dtype=torch.float)

    return embeddings_dict


def prepare_input_batch(
    embeddings_dict,
    batch_size=4,
    sequence_length=10
):
    """
    Randomly picks 'sequence_length' tokens from embeddings_dict, stacks them,
    and repeats them 'batch_size' times.
    Returns a (batch_size, sequence_length, embedding_dim) tensor.
    """
    vocab_tokens = list(embeddings_dict.keys())
    if len(vocab_tokens) < sequence_length:
        raise ValueError("Not enough unique tokens to form a sequence.")

    # Embedding dimension from the first token
    embedding_dim = embeddings_dict[vocab_tokens[0]].shape[0]

    # Randomly pick 'sequence_length' tokens
    random_tokens = random.sample(vocab_tokens, sequence_length)
    selected_vectors = torch.stack([embeddings_dict[tok] for tok in random_tokens])

    # Expand to (batch_size, sequence_length, embedding_dim)
    input_embeddings = selected_vectors.unsqueeze(0).repeat(batch_size, 1, 1)
    return input_embeddings


# --------------------------------------------------
# Pyramid + Hyperbolic Cube
# --------------------------------------------------
class PyramidLayer(nn.Module):
    def __init__(self, input_dim, output_dim, manifold):
        super(PyramidLayer, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)
        self.manifold = manifold

    def forward(self, x):
        # Hyperbolic compression
        x = self.manifold.expmap0(self.linear(x))
        return x


class HyperbolicCube(nn.Module):
    def __init__(self, layers_dims, manifold):
        super(HyperbolicCube, self).__init__()
        self.manifold = manifold
        self.pyramid_layers = nn.ModuleList([
            PyramidLayer(layers_dims[i], layers_dims[i+1], manifold)
            for i in range(len(layers_dims) - 1)
        ])

    def forward(self, x):
        for layer in self.pyramid_layers:
            x = layer(x)
        return x


# --------------------------------------------------
# PreNet and PostNet
# --------------------------------------------------
class PreNet(nn.Module):
    def __init__(self, input_dim, hidden_dim, manifold):
        super(PreNet, self).__init__()
        self.linear = nn.Linear(input_dim, hidden_dim)
        self.manifold = manifold

    def forward(self, x):
        x = self.linear(x)
        x = self.manifold.expmap0(x)
        return x


class PostNet(nn.Module):
    def __init__(self, hidden_dim, output_dim, manifold):
        super(PostNet, self).__init__()
        self.linear = nn.Linear(hidden_dim, output_dim)
        self.manifold = manifold

    def forward(self, x):
        x = self.manifold.logmap0(x)
        x = self.linear(x)
        return x


# --------------------------------------------------
# Dual Hidden LCM
# --------------------------------------------------
class DualHiddenLCM(nn.Module):
    """
    1) A hyperbolic path (PreNet -> HyperbolicCube -> PostNet)
    2) A simple 20D feedforward bottleneck
    Outputs are combined (summed).
    """
    def __init__(self, input_dim, hidden_dims, hidden_dim2, output_dim):
        super(DualHiddenLCM, self).__init__()
        # Learnable curvature for PoincareBall
        self.curvature = nn.Parameter(torch.tensor(1.0, requires_grad=True))
        self.manifold = PoincareBall(c=self.curvature)

        # Hidden Dim Path 1
        self.prenet = PreNet(input_dim, hidden_dims[0], self.manifold)
        self.hyperbolic_cube = HyperbolicCube(hidden_dims, self.manifold)
        self.postnet = PostNet(hidden_dims[-1], output_dim, self.manifold)

        # Hidden Dim Path 2: simple feedforward
        self.hidden_dim2 = nn.Linear(input_dim, hidden_dim2)
        self.hidden_dim2_output = nn.Linear(hidden_dim2, output_dim)

    def forward(self, x):
        # Path 1: Hyperbolic
        x_hidden1 = self.prenet(x)
        x_hidden1 = self.hyperbolic_cube(x_hidden1)
        x_hidden1 = self.postnet(x_hidden1)

        # Path 2: Standard feedforward
        x_hidden2 = F.relu(self.hidden_dim2(x))  # 20D bottleneck
        x_hidden2 = self.hidden_dim2_output(x_hidden2)

        # Combine outputs
        combined = x_hidden1 + x_hidden2
        return combined


# --------------------------------------------------
# Test DualHiddenLCM
# --------------------------------------------------
def test_dualhidden_lcm():
    batch_size = 4
    sequence_length = 10
    input_dim = 300  # Word embedding dimension
    hidden_dims = [512, 256, 128, 64]
    hidden_dim2 = 20
    output_dim = 300

    epochs = 60
    threshold = 0.1  # Cosine similarity threshold

    # Path to your raw text (e.g., trump_3.6.txt)
    text_file = "trump_3.6.txt"

    # 1) Create random embeddings from the text
    embeddings_dict = load_text_and_create_embeddings(
        file_path=text_file,
        embedding_dim=input_dim,
        vocab_size=2000,
        lowercase=True
    )

    # 2) Prepare input batch
    input_embeddings = prepare_input_batch(
        embeddings_dict,
        batch_size=batch_size,
        sequence_length=sequence_length
    )

    # 3) Initialize Model
    model = DualHiddenLCM(input_dim, hidden_dims, hidden_dim2, output_dim)
    optimizer = RiemannianAdam(model.parameters(), lr=1e-4)
    criterion = nn.MSELoss()

    # Create slightly perturbed target embeddings
    target_embeddings = input_embeddings + torch.randn_like(input_embeddings) * 0.01

    # 4) Training Loop
    for epoch in range(epochs):
        model.train()
        optimizer.zero_grad()

        output_embeddings = model(input_embeddings)
        loss = criterion(output_embeddings, target_embeddings)

        # Curvature regularization (keep it near 1.0):
        curvature_reg = torch.abs(model.curvature - 1.0) * 0.01
        total_loss = loss + curvature_reg

        total_loss.backward()
        optimizer.step()

        # Compute Accuracy
        accuracy = compute_accuracy(output_embeddings, target_embeddings, threshold)

        print(
            f"Epoch {epoch + 1}/{epochs} | "
            f"Loss: {loss.item():.4f} | "
            f"Curvature: {model.curvature.item():.4f} | "
            f"Accuracy: {accuracy * 100:.2f}%"
        )


if __name__ == "__main__":
    test_dualhidden_lcm()


Epoch 1/60 | Loss: 1.0019 | Curvature: 0.5414 | Accuracy: 0.00%
Epoch 2/60 | Loss: 0.9986 | Curvature: 0.5415 | Accuracy: 0.00%
Epoch 3/60 | Loss: 0.9954 | Curvature: 0.5416 | Accuracy: 0.00%
Epoch 4/60 | Loss: 0.9923 | Curvature: 0.5417 | Accuracy: 0.00%
Epoch 5/60 | Loss: 0.9893 | Curvature: 0.5418 | Accuracy: 0.00%
Epoch 6/60 | Loss: 0.9864 | Curvature: 0.5419 | Accuracy: 0.00%
Epoch 7/60 | Loss: 0.9835 | Curvature: 0.5420 | Accuracy: 0.00%
Epoch 8/60 | Loss: 0.9807 | Curvature: 0.5421 | Accuracy: 0.00%
Epoch 9/60 | Loss: 0.9780 | Curvature: 0.5422 | Accuracy: 10.00%
Epoch 10/60 | Loss: 0.9753 | Curvature: 0.5423 | Accuracy: 20.00%
Epoch 11/60 | Loss: 0.9727 | Curvature: 0.5424 | Accuracy: 20.00%
Epoch 12/60 | Loss: 0.9701 | Curvature: 0.5425 | Accuracy: 22.50%
Epoch 13/60 | Loss: 0.9676 | Curvature: 0.5426 | Accuracy: 30.00%
Epoch 14/60 | Loss: 0.9652 | Curvature: 0.5427 | Accuracy: 30.00%
Epoch 15/60 | Loss: 0.9628 | Curvature: 0.5428 | Accuracy: 40.00%
Epoch 16/60 | Loss: 0.9605 