<a href="https://colab.research.google.com/github/Perfect-Cube/Novartis-NEST/blob/main/Memory_Graph.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install torch_geometric

Collecting torch_geometric
  Downloading torch_geometric-2.6.1-py3-none-any.whl.metadata (63 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/63.1 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.1/63.1 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
Downloading torch_geometric-2.6.1-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m28.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: torch_geometric
Successfully installed torch_geometric-2.6.1


In [8]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv

# Define the Memory-Based Graph Network
class MemoryBasedGraphNet(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super(MemoryBasedGraphNet, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.gcn1 = GCNConv(embed_dim, hidden_dim)
        self.gcn2 = GCNConv(hidden_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, num_classes)

    def forward(self, x, edge_index, memory_seq):
        # Step 1: Embedding
        x = self.embedding(x)

        # Step 2: Graph propagation
        x = F.relu(self.gcn1(x, edge_index))
        x = F.relu(self.gcn2(x, edge_index))

        # Step 3: Memory mechanism
        memory_seq = memory_seq.unsqueeze(0)  # Add batch dimension
        memory_output, _ = self.lstm(memory_seq)
        memory_context = memory_output[:, -1, :]  # Take the last hidden state

        # Combine graph and memory context
        combined = x.mean(dim=0).unsqueeze(0) + memory_context  # Ensure batch dimension

        # Step 4: Prediction
        out = self.fc(combined)
        return F.log_softmax(out, dim=-1)

# Dummy Data Creation Function
# Dummy Data Creation Function
def create_graph_data(vocab):
    vocab_size = len(vocab)
    num_nodes = 5
    num_edges = 4

    x = torch.arange(0, num_nodes, dtype=torch.long)  # Node features
    edge_index = torch.tensor([[0, 1, 2, 3], [1, 2, 3, 4]])  # Directed edges

    # Ensure memory_seq matches LSTM input size (sequence_length, hidden_dim)
    sequence_length = 3
    hidden_dim = 8
    memory_seq = torch.randn(sequence_length, hidden_dim)  # Correct shape for LSTM

    return x, edge_index, memory_seq, vocab_size


# Function to Build Vocabulary
def build_vocab():
    vocab = {
        "hello": 0,
        "world": 1,
        "this": 2,
        "is": 3,
        "a": 4,
        "test": 5,
        "deep": 6,
        "learning": 7,
        "graph": 8,
        "networks": 9
    }
    reverse_vocab = {v: k for k, v in vocab.items()}
    return vocab, reverse_vocab

# Function to Convert Sentence to Indices
def sentence_to_indices(sentence, vocab):
    words = sentence.lower().split()
    indices = [vocab[word] for word in words if word in vocab]
    return torch.tensor(indices, dtype=torch.long)

# Training Function with User Input
def train_model(vocab, reverse_vocab):
    x, edge_index, memory_seq, vocab_size = create_graph_data(vocab)

    model = MemoryBasedGraphNet(vocab_size=vocab_size, embed_dim=16, hidden_dim=8, num_classes=10)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()

    model.train()

    # User input: Simulate predicting a word based on a sentence
    user_sentence = input("Enter a sentence (e.g., 'hello world'): ")
    sentence_indices = sentence_to_indices(user_sentence, vocab)

    if len(sentence_indices) == 0:
        print("No valid words from the vocabulary found in the input.")
        return model

    target_word = input("Enter the target word (e.g., 'test'): ").lower()
    if target_word not in vocab:
        print(f"Target word '{target_word}' not in vocabulary.")
        return model

    labels = torch.tensor([vocab[target_word]])  # Target index

    for epoch in range(50):
        optimizer.zero_grad()

        # Forward pass
        out = model(x, edge_index, memory_seq)

        # Debugging shapes
        print(f"Output shape: {out.shape}")  # Should be [1, 10]
        print(f"Labels shape: {labels.shape}")  # Should be [1]

        # Loss calculation
        loss = criterion(out, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f"Epoch {epoch}: Loss = {loss.item()}")

    return model

# Test the Model
def test_model(model, vocab, reverse_vocab):
    x, edge_index, memory_seq, _ = create_graph_data(vocab)
    model.eval()
    with torch.no_grad():
        out = model(x, edge_index, memory_seq)
        prediction = torch.argmax(out, dim=-1).item()
        predicted_word = reverse_vocab[prediction]
        print(f"Predicted next word: {predicted_word}")

# Main Function to Run the Training and Testing
if __name__ == "__main__":
    vocab, reverse_vocab = build_vocab()

    print("Vocabulary:", vocab)

    print("\nStarting Training...")
    trained_model = train_model(vocab, reverse_vocab)  # Train the model with user input

    print("\nTesting the Model...")
    test_model(trained_model, vocab, reverse_vocab)  # Test the trained model


Vocabulary: {'hello': 0, 'world': 1, 'this': 2, 'is': 3, 'a': 4, 'test': 5, 'deep': 6, 'learning': 7, 'graph': 8, 'networks': 9}

Starting Training...
Enter a sentence (e.g., 'hello world'): deep network
Enter the target word (e.g., 'test'): learning
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Epoch 0: Loss = 2.0076990127563477
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Output shape: torch.Size([1, 10])
Labels shape: torch.Size([1])
Epoch 10: 

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim

# Define Model
class GraphLSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(GraphLSTMModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, memory_seq):
        output, _ = self.lstm(memory_seq)
        logits = self.fc(output[:, -1, :])  # Use the last output of LSTM
        return logits

# Create Dummy Data
def create_graph_data(vocab):
    vocab_size = len(vocab)
    num_nodes = 5
    num_edges = 4

    x = torch.arange(0, num_nodes, dtype=torch.long)  # Node features
    edge_index = torch.tensor([[0, 1, 2, 3], [1, 2, 3, 4]])  # Directed edges

    # Ensure memory_seq matches LSTM input size (batch_size, sequence_length, hidden_dim)
    batch_size = 1
    sequence_length = 3
    hidden_dim = 8
    memory_seq = torch.randn(batch_size, sequence_length, hidden_dim)  # Correct shape for LSTM

    return x, edge_index, memory_seq, vocab_size

# Train the Model
def train_model(vocab, reverse_vocab):
    vocab_size = len(vocab)
    embedding_dim = 8
    hidden_dim = 8

    model = GraphLSTMModel(vocab_size, embedding_dim, hidden_dim)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Generate dummy data
    _, _, memory_seq, vocab_size = create_graph_data(vocab)

    # Training
    epochs = 100
    for epoch in range(epochs):
        optimizer.zero_grad()

        # Generate input embeddings from memory_seq
        memory_seq = torch.randn(1, 3, embedding_dim)  # Dummy sequence

        logits = model(memory_seq)  # Forward pass
        target = torch.randint(0, vocab_size, (1,))  # Random target for training
        loss = criterion(logits, target)
        loss.backward()
        optimizer.step()

        if (epoch + 1) % 10 == 0:
            print(f"Epoch [{epoch + 1}/{epochs}], Loss: {loss.item():.4f}")

    return model

# Predict a Word
def predict_word(model, vocab, reverse_vocab):
    _, _, memory_seq, _ = create_graph_data(vocab)

    # Generate predictions
    memory_seq = torch.randn(1, 3, 8)  # Dummy sequence
    logits = model(memory_seq)
    probabilities = nn.Softmax(dim=-1)(logits)
    predicted_index = torch.argmax(probabilities, dim=-1).item()

    # Get the predicted word
    predicted_word = reverse_vocab[predicted_index]
    return predicted_word

# Main Execution
if __name__ == "__main__":
    # Define vocabulary
    vocab = {'hello': 0, 'world': 1, 'this': 2, 'is': 3, 'a': 4, 'test': 5, 'deep': 6, 'learning': 7, 'graph': 8, 'networks': 9}
    reverse_vocab = {v: k for k, v in vocab.items()}

    print("\nStarting Training...")
    trained_model = train_model(vocab, reverse_vocab)  # Train the model

    print("\nGenerating Prediction...")
    generated_word = predict_word(trained_model, vocab, reverse_vocab)
    print(f"Generated Word: {generated_word}")



Starting Training...
Epoch [10/100], Loss: 2.1058
Epoch [20/100], Loss: 2.5272
Epoch [30/100], Loss: 2.6096
Epoch [40/100], Loss: 2.1333
Epoch [50/100], Loss: 2.5895
Epoch [60/100], Loss: 2.2488
Epoch [70/100], Loss: 2.0330
Epoch [80/100], Loss: 2.1863
Epoch [90/100], Loss: 2.5687
Epoch [100/100], Loss: 2.4022

Generating Prediction...
Generated Word: this
