<a href="https://colab.research.google.com/github/breakingcircuits1337/LLM1/blob/main/Copy_of_Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
# First cell - Installation and imports
!pip install faiss-cpu
!pip install pyyaml

import tensorflow as tf
import numpy as np
import faiss
import logging
import yaml
from typing import List, Dict, Any, Optional, Tuple, Union
from io import StringIO
import time

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Check GPU availability
print("GPU Available:", tf.config.list_physical_devices('GPU'))

# Enable mixed precision for better performance
policy = tf.keras.mixed_precision.Policy('mixed_float16')
tf.keras.mixed_precision.set_global_policy(policy)

# Second cell - Configuration
# Reduced size configuration for Colab
config_yaml = """
n_vocab: 50000
n_ctx: 512      # Reduced context length
n_embd: 256     # Reduced embedding dimension
n_head: 8       # Reduced number of heads
n_layer: 6      # Reduced number of layers
learning_rate: 1e-4
n_hash: 512     # Reduced hash size
n_quant: 128    # Reduced quantization size
num_results: 5
dim: 256        # Matching embedding dimension
"""

config = yaml.safe_load(StringIO(config_yaml))

# Third cell - Helper Functions
def gelu(x):
    """Gaussian Error Linear Unit activation function"""
    return 0.5 * x * (1 + tf.math.tanh(np.sqrt(2 / np.pi) * (x + 0.044715 * tf.math.pow(x, 3))))

# Fourth cell - Model Components
@tf.keras.utils.register_keras_serializable()
class MultiHeadAttention(tf.keras.layers.Layer):
    """Multi-head attention layer optimized for Colab"""
    def __init__(self, n_embd: int, n_head: int):
        super(MultiHeadAttention, self).__init__()
        self.n_embd = n_embd
        self.n_head = n_head
        self.head_dim = n_embd // n_head

        # Use smaller initialization scale for better numerical stability
        initializer = tf.keras.initializers.RandomNormal(stddev=0.02)

        self.c_attn = tf.keras.layers.Dense(3 * n_embd, kernel_initializer=initializer)
        self.c_proj = tf.keras.layers.Dense(n_embd, kernel_initializer=initializer)
        self.attn_dropout = tf.keras.layers.Dropout(0.1)
        self.resid_dropout = tf.keras.layers.Dropout(0.1)

    def split_heads(self, x):
        batch_size = tf.shape(x)[0]
        x = tf.reshape(x, [batch_size, -1, self.n_head, self.head_dim])
        return tf.transpose(x, [0, 2, 1, 3])

    def merge_heads(self, x):
        batch_size = tf.shape(x)[0]
        x = tf.transpose(x, [0, 2, 1, 3])
        return tf.reshape(x, [batch_size, -1, self.n_embd])

    # The call function should be indented within the class definition
    def call(self, x, mask=None, training=False):
        q, k, v = tf.split(self.c_attn(x), 3, axis=-1)
        q = self.split_heads(q)
        k = self.split_heads(k)
        v = self.split_heads(v)

        # Scaled dot-product attention with memory-efficient implementation
        scale = tf.math.sqrt(tf.cast(self.head_dim, tf.float32))

        # Cast q and k to float32 before matrix multiplication
        q = tf.cast(q, tf.float32)
        k = tf.cast(k, tf.float32)

        scores = tf.matmul(q, k, transpose_b=True) / scale

        if mask is not None:
            scores = scores + (1.0 - tf.cast(mask, scores.dtype)) * -1e9

        weights = tf.nn.softmax(scores, axis=-1)
        weights = self.attn_dropout(weights, training=training)

        a = tf.matmul(weights, v)
        a = self.merge_heads(a)
        a = self.c_proj(a)
        a = self.resid_dropout(a, training=training)

        return a

@tf.keras.utils.register_keras_serializable()
class TransformerBlock(tf.keras.layers.Layer):
    """Transformer block optimized for Colab"""
    def __init__(self, n_embd: int, n_head: int):
        super(TransformerBlock, self).__init__()
        self.n_embd = n_embd
        self.n_head = n_head

        self.ln_1 = tf.keras.layers.LayerNormalization(epsilon=1e-5)
        self.attn = MultiHeadAttention(n_embd, n_head)
        self.ln_2 = tf.keras.layers.LayerNormalization(epsilon=1e-5)

        self.mlp = tf.keras.Sequential([
            tf.keras.layers.Dense(4 * n_embd, activation=gelu),
            tf.keras.layers.Dropout(0.1),
            tf.keras.layers.Dense(n_embd),
            tf.keras.layers.Dropout(0.1)
        ])

    def call(self, x, mask=None, training=False):
        a = self.attn(self.ln_1(x), mask=mask, training=training)
        x = x + a
        m = self.mlp(self.ln_2(x), training=training)
        x = x + m
        return x

class FAISSRetriever:
    """Memory-efficient FAISS retriever"""
    def __init__(self, knowledge_base: List[Dict[str, Any]], dim: int, num_results: int):
        self.index = faiss.IndexFlatL2(dim)
        self.knowledge_base = knowledge_base
        self.num_results = min(num_results, len(knowledge_base))

        vectors = [np.array(doc['vector'], dtype=np.float32).reshape(1, -1)
                  for doc in knowledge_base]
        self.index.add(np.concatenate(vectors, axis=0))

    def retrieve(self, query_vector: tf.Tensor) -> tf.Tensor:
        query_np = tf.cast(query_vector, tf.float32).numpy()
        distances, indices = self.index.search(query_np, self.num_results)
        retrieved_docs = [[self.knowledge_base[i]['text'] for i in batch]
                         for batch in indices]
        return tf.constant(retrieved_docs)

# Fifth cell - Main Model
@tf.keras.utils.register_keras_serializable()
class MultiModalTransformer(tf.keras.Model):
    """Colab-optimized MultiModal Transformer"""
    def __init__(self, config: Dict[str, Any], knowledge_base: List[Dict[str, Any]]):
        super(MultiModalTransformer, self).__init__()

        self.config = config

        # Core components
        self.wte = tf.keras.layers.Embedding(config['n_vocab'], config['n_embd'])
        self.wpe = tf.keras.layers.Embedding(config['n_ctx'], config['n_embd'])
        self.drop = tf.keras.layers.Dropout(0.1)

        # Transformer blocks
        self.blocks = [TransformerBlock(config['n_embd'], config['n_head'])
                      for _ in range(config['n_layer'])]
        self.ln_f = tf.keras.layers.LayerNormalization(epsilon=1e-5)

        # Task-specific components
        self.retriever = FAISSRetriever(knowledge_base, config['dim'], config['num_results'])

        # Simplified task heads for Colab
        self.task_heads = {
            'text_generation': tf.keras.layers.Dense(config['n_vocab']),
            'classification': tf.keras.layers.Dense(config['n_vocab'], activation='softmax')
        }

    def call(self, inputs, task='text_generation', training=False):
        if isinstance(inputs, tuple):
            text_ids = inputs[0]
        else:
            text_ids = inputs

        # Get embeddings
        x = self.wte(text_ids)

        # Add position embeddings
        positions = tf.range(0, tf.shape(x)[1], dtype=tf.int32)[tf.newaxis, :]
        x = x + self.wpe(positions)
        x = self.drop(x, training=training)

        # Apply transformer blocks
        for block in self.blocks:
            x = block(x, training=training)

        x = self.ln_f(x)

        # Task-specific output
        return self.task_heads[task](x)

# Sixth cell - Training and Testing
# Create test data
def create_test_data(batch_size=2, seq_length=10):
    return tf.random.uniform((batch_size, seq_length),
                           maxval=config['n_vocab'],
                           dtype=tf.int32)

# Initialize knowledge base
knowledge_base = [
    {'text': f'Example {i}', 'vector': np.random.rand(config['dim'])}
    for i in range(100)
]

# Initialize model
model = MultiModalTransformer(config, knowledge_base)

# Test model
test_input = create_test_data()
start_time = time.time()
output = model(test_input, task='text_generation')
end_time = time.time()

print(f"Model test successful!")
print(f"Input shape: {test_input.shape}")
print(f"Output shape: {output.shape}")
print(f"Inference time: {(end_time - start_time)*1000:.2f}ms")

# Seventh cell - Example Usage
# Example: Text generation
input_text = create_test_data(batch_size=1, seq_length=20)
generated = model(input_text, task='text_generation')
print("Text generation output shape:", generated.shape)

# Example: Classification
input_text = create_test_data(batch_size=4, seq_length=15)
classifications = model(input_text, task='classification')
print("Classification output shape:", classifications.shape)

# Save model
model.save('multimodal_transformer_colab.keras')
print("Model saved successfully!")

GPU Available: []




Model test successful!
Input shape: (2, 10)
Output shape: (2, 10, 50000)
Inference time: 2939.41ms
Text generation output shape: (1, 20, 50000)
Classification output shape: (4, 15, 50000)
Model saved successfully!
