In [1]:
from IPython.display import FileLink, display
import torch.nn as torch_nn
from torch.utils.data import DataLoader
import numpy as np
import os, subprocess, time, json, torch
import struct

def download(download_file_name):
    os.chdir(f"/kaggle/working/")
    name = f"{download_file_name}.uai"
    display(FileLink(f'{name}'))

class carbono:
    def __init__(self, debug=True):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.layers = []
        self.model = None
        self.debug = debug
        self.labels = None
        self.details = {}

    def save_pytorch(self, filename='model'):
        filename = filename + '.pt'
        """Save model in PyTorch format"""
        torch.save(self.model.state_dict(), filename)
    
    def load_pytorch(self, filename='model.pt'):
        """Load model from PyTorch format"""
        if self.model is None:
            raise ValueError("Model architecture must be defined before loading weights")
        self.model.load_state_dict(torch.load(filename))
    
    def layer(self, input_size, output_size, activation='tanh'):
        """Add a layer to the network, similar to carbono.js"""
        self.layers.append({
            'input_size': input_size,
            'output_size': output_size,
            'activation': activation
        })

        # Check if layers are compatible
        if len(self.layers) > 1:
            prev_layer = self.layers[-2]
            if prev_layer['output_size'] != input_size:
                raise ValueError(f"Layer input size {input_size} doesn't match previous layer output size {prev_layer['output_size']}")

        # Build/rebuild model when layer is added
        self._build_model()
        
        if self.debug:
            print(f"Added layer: {input_size} → {output_size} with {activation} activation")

    def _build_model(self):
        """Build PyTorch model from layers"""
        if not self.layers:
            return
    
        layers = []
        for i, layer_info in enumerate(self.layers):
            # Add linear layer
            layers.append(torch_nn.Linear(layer_info['input_size'], layer_info['output_size']))
            
            # Add activation, but skip softmax for the last layer
            if i < len(self.layers) - 1:  # Only add activation for non-final layers
                if layer_info['activation'] == 'tanh':
                    layers.append(torch_nn.Tanh())
                elif layer_info['activation'] == 'relu':
                    layers.append(torch_nn.ReLU())
                elif layer_info['activation'] == 'sigmoid':
                    layers.append(torch_nn.Sigmoid())
    
        self.model = torch_nn.Sequential(*layers).to(self.device)

    def train(self, train_set, options=None):
        if options is None:
            options = {}
    
        # Default options similar to carbono.js
        epochs = options.get('epochs', 200)
        learning_rate = options.get('learningRate', 0.212)
        print_every_epochs = options.get('printEveryEpochs', 10)
        early_stop_threshold = options.get('earlyStopThreshold', 1e-6)
        optimizer_type = options.get('optimizer', 'adam')
        loss_function = options.get('lossFunction', 'cross-entropy')
    
        # Convert data to PyTorch format
        if isinstance(train_set[0]['output'], str):
            unique_labels = list(set(item['output'] for item in train_set))
            self.labels = unique_labels
            
            num_classes = len(unique_labels)
            label_to_idx = {label: i for i, label in enumerate(unique_labels)}
            
            x_data = torch.tensor([item['input'] for item in train_set], dtype=torch.float32).to(self.device)
            # Change this part - use class indices instead of one-hot encoding
            y_data = torch.tensor([label_to_idx[item['output']] for item in train_set], dtype=torch.long).to(self.device)
        else:
            x_data = torch.tensor([item['input'] for item in train_set], dtype=torch.float32).to(self.device)
            y_data = torch.tensor([item['output'] for item in train_set], dtype=torch.float32).to(self.device)
    
        # Create DataLoader
        dataset = torch.utils.data.TensorDataset(x_data, y_data)
        train_loader = DataLoader(dataset, batch_size=32, shuffle=True)  # Remove generator parameter
    
        # Rest of the training code remains the same
        if loss_function == 'mse':
            criterion = torch_nn.MSELoss()
        elif loss_function == 'cross-entropy':
            criterion = torch_nn.CrossEntropyLoss()
    
        if optimizer_type == 'adam':
            optimizer = torch.optim.Adam(self.model.parameters(), lr=learning_rate)
        else:
            optimizer = torch.optim.SGD(self.model.parameters(), lr=learning_rate)
    
        start_time = time.time()
    
        for epoch in range(epochs):
            total_loss = 0
            for inputs, targets in train_loader:
                optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
    
            avg_loss = total_loss / len(train_loader)
            
            if (epoch + 1) % print_every_epochs == 0 and self.debug:
                print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.6f}')
    
            if avg_loss < early_stop_threshold:
                if self.debug:
                    print(f'Early stopping at epoch {epoch+1} with loss: {avg_loss:.6f}')
                break
    
        training_time = (time.time() - start_time) * 1000
    
        total_params = sum(p.numel() for p in self.model.parameters())
        self.details = {
            'loss': avg_loss,
            'parameters': total_params,
            'training': {
                'time': training_time,
                'epochs': epoch + 1,
                'learningRate': learning_rate
            },
            'layers': self.layers
        }
        return self.details

    def predict(self, input_data, tags=True):
        """Make predictions similar to carbono.js"""
        with torch.no_grad():
            input_tensor = torch.tensor(input_data, dtype=torch.float32).to(self.device)
            if len(input_tensor.shape) == 1:
                input_tensor = input_tensor.unsqueeze(0)
            
            output = self.model(input_tensor)
            predictions = output.cpu().numpy()

            if self.labels and tags:
                # Return labeled probabilities like carbono.js
                return [
                    {
                        'label': self.labels[i],
                        'probability': float(prob)
                    }
                    for i, prob in enumerate(predictions[0])
                ]
            
            return predictions[0].tolist()

    def save(self, filename='model'):
        filename = filename + '.uai'
        """Export model in carbono.js format"""
        carbono_model = {
            'weights': [],
            'biases': [],
            'layers': self.layers,
            'labels': self.labels,
            'details': self.details
        }
        
        current_layer = None
        for layer in self.model:
            if isinstance(layer, torch_nn.Linear):
                weights = layer.weight.detach().cpu().numpy().tolist()
                biases = layer.bias.detach().cpu().numpy().tolist()
                carbono_model['weights'].append(weights)
                carbono_model['biases'].append(biases)

        # Convert weights and biases to binary format
        weight_bin = b''.join([struct.pack('f', w) for layer in carbono_model['weights'] for row in layer for w in row])
        bias_bin = b''.join([struct.pack('f', b) for layer in carbono_model['biases'] for b in layer])

        # Prepare metadata
        metadata = {
            'layers': self.layers,
            'details': self.details,
            'layerInfo': {
                'weightShapes': [list(map(len, [layer, layer[0]])) for layer in carbono_model['weights']],
                'biasShapes': [len(layer) for layer in carbono_model['biases']]
            },
            'labels': self.labels
        }

        # Combine metadata and binary data
        metadata_str = json.dumps(metadata)
        separator = b'\n---BINARY_SEPARATOR---\n'
        binary_data = metadata_str.encode('utf-8') + separator + weight_bin + bias_bin

        # Save to file
        with open(filename, 'wb') as f:
            f.write(binary_data)

    def load(self, filename):
        """Load model from carbono.js format"""
        with open(filename, 'rb') as f:
            data = f.read()

        # Find separator
        separator = b'\n---BINARY_SEPARATOR---\n'
        sep_index = data.find(separator)
        if sep_index == -1:
            raise ValueError("Invalid file format")

        # Extract metadata and binary data
        metadata_str = data[:sep_index].decode('utf-8')
        binary_data = data[sep_index + len(separator):]

        # Parse metadata
        metadata = json.loads(metadata_str)
        self.layers = metadata['layers']
        self.details = metadata['details']
        self.labels = metadata.get('labels', None)

        # Rebuild model
        self._build_model()

        # Extract weights and biases
        weight_shapes = metadata['layerInfo']['weightShapes']
        bias_shapes = metadata['layerInfo']['biasShapes']

        # Reconstruct weights and biases
        weight_size = sum(shape[0] * shape[1] for shape in weight_shapes)
        bias_size = sum(shape for shape in bias_shapes)

        weights = struct.unpack('f' * weight_size, binary_data[:weight_size * 4])
        biases = struct.unpack('f' * bias_size, binary_data[weight_size * 4:])

        # Assign weights and biases to model
        weight_index = 0
        bias_index = 0
        layer_index = 0
        for layer in self.model:
            if isinstance(layer, torch_nn.Linear):
                # Assign weights
                weight_shape = weight_shapes[layer_index]
                weight_values = weights[weight_index:weight_index + weight_shape[0] * weight_shape[1]]
                weight_tensor = torch.tensor(weight_values, dtype=torch.float32).reshape(weight_shape[0], weight_shape[1])
                layer.weight.data = weight_tensor.to(self.device)
                weight_index += weight_shape[0] * weight_shape[1]

                # Assign biases
                bias_shape = bias_shapes[layer_index]
                bias_values = biases[bias_index:bias_index + bias_shape]
                bias_tensor = torch.tensor(bias_values, dtype=torch.float32)
                layer.bias.data = bias_tensor.to(self.device)
                bias_index += bias_shape

                layer_index += 1

        if self.debug:
            print("Model loaded successfully!")

    def info(self, info_updates):
        """Update model metadata"""
        if 'info' not in self.details:
            self.details['info'] = {}
        self.details['info'].update(info_updates)

In [3]:
# Example usage
train_set = [
    {'input': [1, 0, 0, 0, 0, 0, 0, 0, 0, 0], 'output': '😀'},  # Smiling Face
    {'input': [0, 1, 0, 0, 0, 0, 0, 0, 0, 0], 'output': '😊'},  # Smiling Face with Smiling Eyes
    {'input': [0, 0, 1, 0, 0, 0, 0, 0, 0, 0], 'output': '😂'},  # Face with Tears of Joy
    {'input': [0, 0, 0, 1, 0, 0, 0, 0, 0, 0], 'output': '😍'},  # Smiling Face with Heart-Eyes
    {'input': [0, 0, 0, 0, 1, 0, 0, 0, 0, 0], 'output': '😎'},  # Smiling Face with Sunglasses
    {'input': [0, 0, 0, 0, 0, 1, 0, 0, 0, 0], 'output': '😢'},  # Crying Face
    {'input': [0, 0, 0, 0, 0, 0, 1, 0, 0, 0], 'output': '😡'},  # Pouting Face
    {'input': [0, 0, 0, 0, 0, 0, 0, 1, 0, 0], 'output': '😴'},  # Sleeping Face
    {'input': [0, 0, 0, 0, 0, 0, 0, 0, 1, 0], 'output': '🤔'},  # Thinking Face
    {'input': [0, 0, 0, 0, 0, 0, 0, 0, 0, 1], 'output': '🤢'},  # Nauseated Face
]

# Create and train model
nn = carbono(debug=True)

# Add layers with specified architecture
nn.layer(10, 512, 'sigmoid')
nn.layer(512, 1024, 'relu')
nn.layer(1024, 128, 'relu')
nn.layer(128, 10, 'softmax')  # Output layer (3 outputs for 3 classes)

# Train the model with adjusted parameters
training_summary = nn.train(train_set, {
    'epochs': 1000,
    'learningRate': 0.0002,
    'printEveryEpochs': 500,
    'optimizer': 'adam',
    'lossFunction': 'cross-entropy'
})

# Export and download model
model_name = "v6_model"
nn.save(model_name)
download(model_name)

Added layer: 10 → 512 with sigmoid activation
Added layer: 512 → 1024 with relu activation
Added layer: 1024 → 128 with relu activation
Added layer: 128 → 10 with softmax activation
Epoch [500/1000], Loss: 0.001897
Epoch [1000/1000], Loss: 0.000306
