In [None]:
import pandas as pd

# Initialize an empty list to store the entries
entries = []

# List of file paths for each data type
files = [
    ('/content/drive/MyDrive/impersonating.txt', 1),  # Label 1 for Impersonating
    ('/content/drive/MyDrive/dos.txt', 2),          # Label 2 for DoS
    ('/content/drive/MyDrive/fuzzy.txt', 3),        # Label 3 for Fuzzy
    ('/content/drive/MyDrive/attack_free.txt', 0)   # Label 0 for Attack Free
]

# Loop through each file and parse it
for file, label in files:
    with open(file, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 10:
                continue
            ts      = float(parts[1])
            node_id = int(parts[3], 16)
            dlc     = int(parts[parts.index('DLC:') + 1])
            entries.append((ts, node_id, dlc, label))  # Add label to the entry

# Create DataFrame from the entries
df = pd.DataFrame(entries, columns=['ts', 'src', 'dlc', 'label'])
df.sort_values('ts', inplace=True)
df['dt'] = df['ts'].diff().fillna(0.0)

# Feature Engineering (dt + one-hot DLC)
one_hot = pd.get_dummies(df['dlc'], prefix='dlc').astype('float32')
features = pd.concat([df[['dt']], one_hot], axis=1).values.astype('float32')


In [None]:
# 3. Build the four lists (plus placeholder labels if needed)
src     = df['src'].tolist()                                     # source node IDs
dst     = src                                                   # destination (self‑edges)
ts_list = df['ts'].tolist()                                     # timestamps
x_feats = [torch.tensor(f, dtype=torch.float32) for f in features]  # feature vectors
y       = df['label'].tolist()                                  # labels (0 for Attack Free, 1 for Impersonating, etc.)

# 4. Print the first few entries of each list
print("First 5 source nodes (src):", src[:5])
print("First 5 destination nodes (dst):", dst[:5])
print("First 5 timestamps (ts_list):", ts_list[:5])
print("First 5 feature vectors (x_feats):")
for vec in x_feats[:5]:
    print(" ", vec)


In [None]:
from torch_geometric_temporal.signal import DynamicGraphTemporalSignal
import torch

# 5. Wrap the data into DynamicGraphTemporalSignal format
edge_indices = [torch.tensor([[s], [d]], dtype=torch.long) for s, d in zip(src, dst)]
edge_attrs   = x_feats
timestamps   = [torch.tensor([t], dtype=torch.float32) for t in ts_list]
labels       = [torch.tensor([yy], dtype=torch.long) for yy in y]

# Create the dataset
dataset = DynamicGraphTemporalSignal(
    edge_indices=edge_indices,
    edge_attrs=edge_attrs,
    timestamps=timestamps,
    y=labels
)

# Print the first few entries of the dataset to verify
print("Dataset: ", dataset)


In [None]:
from torch_geometric_temporal.nn import TGAT
import torch.optim as optim

# Initialize your TGAT model (replace with your specific architecture)
model = TGAT(in_channels=features.shape[1], out_channels=64)

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = torch.nn.CrossEntropyLoss()

# Train the model
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for data in dataset:
        # Forward pass
        out = model(data.x, data.edge_index, data.edge_attr, data.timestamps)

        # Compute loss
        loss = criterion(out, data.y)
        total_loss += loss.item()

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f'Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(dataset)}')
