In [19]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class WaveNetwork(nn.Module):
    def __init__(self, vocab_size, embed_dim=768, num_classes=4):
        super(WaveNetwork, self).__init__()
        # Embedding layer for tokens
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        
        # Linear layers to create two complex vector representations for interference and modulation
        self.linear_global = nn.Linear(embed_dim, embed_dim)
        self.linear_local = nn.Linear(embed_dim, embed_dim)
        
        # Output layer for classification tasks
        self.fc = nn.Linear(embed_dim, num_classes)  # Assuming binary classification
    
    def _calculate_global_semantics(self, embeddings):
        # Calculate global magnitude vector G by taking the norm across sequence length (dim=1)
        return torch.linalg.norm(embeddings, dim=1)  # Result is (batch_size, embed_dim)
    
    def _calculate_local_phase(self, embeddings, global_magnitude):
        # Calculate phase alpha for each token based on global magnitude
        # Broadcast global_magnitude to match embeddings' sequence length if necessary
        global_magnitude = global_magnitude.unsqueeze(1)  # Shape (batch_size, 1, embed_dim)
        phase = torch.atan2(embeddings, global_magnitude)
        return phase  # Result is (batch_size, seq_length, embed_dim)
    
    def _interference(self, G, alpha, G_prime, alpha_prime):
        # Ensure all tensors have compatible dimensions
        real_part = G.unsqueeze(1) * torch.cos(alpha) + G_prime.unsqueeze(1) * torch.cos(alpha_prime)
        imag_part = G.unsqueeze(1) * torch.sin(alpha) + G_prime.unsqueeze(1) * torch.sin(alpha_prime)
        return real_part + 1j * imag_part  # Shape: (batch_size, seq_length, embed_dim)

    def _modulation(self, G, alpha, G_prime, alpha_prime):
        # Ensure all tensors have compatible dimensions
        real_part = (G.unsqueeze(1) * G_prime.unsqueeze(1)) * torch.cos(alpha + alpha_prime)
        imag_part = (G.unsqueeze(1) * G_prime.unsqueeze(1)) * torch.sin(alpha + alpha_prime)
        return real_part + 1j * imag_part  # Shape: (batch_size, seq_length, embed_dim)
    
    def forward(self, x):
        embeddings = self.embedding(x)  # Shape: (batch_size, seq_length, embed_dim)

        # Calculate global semantics (magnitude G)
        G = self._calculate_global_semantics(embeddings)  # Shape: (batch_size, embed_dim)

        # Calculate local semantics (phase alpha)
        alpha = self._calculate_local_phase(embeddings, G)  # Shape: (batch_size, seq_length, embed_dim)

        # Generate a secondary variant for interference/modulation
        G_prime = self.linear_global(G)  # Shape: (batch_size, embed_dim)
        alpha_prime = self.linear_local(alpha)  # Shape: (batch_size, seq_length, embed_dim)

        # Apply interference and modulation
        modulation_result = self._modulation(G, alpha, G_prime, alpha_prime)

        # Aggregate sequence dimension (e.g., by mean) before classification
        final_representation = modulation_result.real.mean(dim=1)  # Shape: (batch_size, embed_dim)

        # Pass through fully connected layer for classification
        output = self.fc(final_representation)  # Shape: (batch_size, num_classes)

        return output

In [None]:
import torch.optim as optim

# Define the loss function
criterion = nn.CrossEntropyLoss()

# Instantiate the model and define an optimizer
model = WaveNetwork(vocab_size=30522, embed_dim=768)  # Set appropriate vocab size and embedding dimensions
optimizer = optim.Adam(model.parameters(), lr=1e-3)

In [None]:
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer

# Assume you have a custom Dataset class for your data
class TextDataset(Dataset):
    def __init__(self, texts, labels, tokenizer):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        # Tokenize and convert text to tensor format
        inputs = self.tokenizer(text, return_tensors='pt', padding='max_length', truncation=True, max_length=128)
        input_ids = inputs['input_ids'].squeeze()  # Remove extra dimension
        return input_ids, label

# Sample data and labels
texts = ["sample text 1", "sample text 2"]  # Replace with actual text data
labels = [0, 1]  # Replace with actual labels
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")  # Initialize the tokenizer

dataset = TextDataset(texts, labels, tokenizer)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

In [18]:
# Set the device to CPU (since M2 Mac does not support CUDA)
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

# Set the model to the appropriate device
model.to(device)

# Set the model to training mode
model.train()

num_epochs = 5
for epoch in range(num_epochs):
    running_loss = 0.0
    for batch_idx, (input_ids, labels) in enumerate(dataloader):
        # Move data to the appropriate device (GPU if available)
        input_ids, labels = input_ids.to(device), labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(input_ids)
        
        # Calculate the loss
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        # Print statistics
        running_loss += loss.item()
        if batch_idx % 10 == 0:  # Print every 10 batches
            print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx}], Loss: {loss.item():.4f}")

    print(f"Epoch [{epoch+1}/{num_epochs}], Average Loss: {running_loss / len(dataloader):.4f}")

RuntimeError: The size of tensor a (2) must match the size of tensor b (128) at non-singleton dimension 1

In [None]:
def evaluate(model, val_dataloader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():  # Disable gradient computation for evaluation
        for input_ids, labels in val_dataloader:
            input_ids, labels = input_ids.to(device), labels.to(device)
            outputs = model(input_ids)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = 100 * correct / total
    print(f'Validation Accuracy: {accuracy:.2f}%')

In [10]:
from datasets import load_dataset

ds = load_dataset("fancyzhx/ag_news")

Generating train split: 100%|██████████| 120000/120000 [00:00<00:00, 1629921.44 examples/s]
Generating test split: 100%|██████████| 7600/7600 [00:00<00:00, 1018587.97 examples/s]


In [12]:
# Separate train and test sets
train_dataset = ds["train"]
test_dataset = ds["test"]


In [13]:
from transformers import AutoTokenizer

# Load a tokenizer (e.g., BERT tokenizer for simplicity)
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

# Tokenize function to preprocess text
def tokenize_function(examples):
    return tokenizer(examples['text'], padding='max_length', truncation=True, max_length=128)

# Tokenize the train and test datasets
train_dataset = train_dataset.map(tokenize_function, batched=True)
test_dataset = test_dataset.map(tokenize_function, batched=True)

Map: 100%|██████████| 120000/120000 [00:09<00:00, 12156.72 examples/s]
Map: 100%|██████████| 7600/7600 [00:00<00:00, 12981.89 examples/s]


In [14]:
# Set format to PyTorch tensors
train_dataset.set_format(type='torch', columns=['input_ids', 'label'])
test_dataset.set_format(type='torch', columns=['input_ids', 'label'])

In [15]:
from torch.utils.data import DataLoader

# Create DataLoader instances for training and testing
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=32)

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim

# Initialize model, loss function, and optimizer
model = WaveNetwork(vocab_size=30522, embed_dim=768)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Move model to device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Training loop
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    for batch in train_dataloader:
        input_ids, labels = batch['input_ids'].to(device), batch['label'].to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(input_ids)
        
        # Compute loss
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    
    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(train_dataloader):.4f}")

KeyboardInterrupt: 

In [22]:
running_loss/len(train_dataloader)

3.659727728796005