In [101]:
import numpy as np
import os
from scipy import io

import torch
import torch.nn as nn
import torch.optim as optim

import math


In [3]:
DATA_PATH = os.path.join(os.path.dirname(""), "..", "data")
data_path = os.path.join(DATA_PATH, "TrainingData_vol2")

In [28]:
data_len = 42 # number of different speed / displacement combinations
sample_len = 100 # number of different samples within each combination

In [6]:
mat_contents = {}
for idx in range(data_len):
    # load .mat file
    file_idx = f"0{idx+1}" if idx < 9 else idx+1
    filename = os.path.join(data_path, f"case0{file_idx}_result.mat")
    mat_contents[idx] = io.loadmat(filename)

In [7]:
len(mat_contents[0]['mixed'][0])

100

In [8]:
# shape of echo_shape
mat_contents[0]['mixed'][0][0][1].shape

(54, 12)

In [10]:
# shape of cpmg
mat_contents[0]['mixed'][0][0][0].shape

(2048, 1)

In [11]:
# shape of mqi
mat_contents[0]['mixed'][0][0][2].shape

(1, 1)

Prepare echo_shape data for training<br>
Final input shape should be: 54, 12, 4200

In [84]:
all_echo_shape = []
all_mqi = []
all_v = []
all_d = []
for idx_1 in range(data_len):
    for idx_2 in range(sample_len):
        mix_sample = mat_contents[idx_1]['mixed'][0][idx_2]
        all_v.append(mat_contents[idx_1]['v'][0][0])
        all_d.append(mat_contents[idx_1]['d'][0][0])
        all_echo_shape.append(mix_sample[1])
        all_mqi.append(mix_sample[2][0][0])

In [109]:
input_data = np.array(all_echo_shape).reshape(54, 12, 4200)
input_data_real = input_data.real
input_data_imag = input_data.imag
input_data_ri = np.array([input_data_real, input_data_imag])
label = np.array(all_mqi)

In [110]:
input_data_ri.shape

(2, 54, 12, 4200)

PyTorch allows nn.Linear to accept N-D input tensor, the only constraint is that the last dimension of the input tensor will equal in_features of the linear layer. The linear transformation is then applied on the last dimension of the tensor.
For instance, if in_features=5 and out_features=10 and the input tensor x has dimensions 2-3-5, then the output tensor will have dimensions 2-3-10.

In [None]:
class Transformer(nn.Module):
    """
    Model from "A detailed guide to Pytorch's nn.Transformer() module.", by
    Daniel Melchor: https://medium.com/p/c80afbc9ffb1/
    """
    # Constructor
    def __init__(
        self,
        num_tokens,
        dim_model,
        num_heads,
        num_encoder_layers,
        num_decoder_layers,
        dropout_p,
    ):
        super().__init__()

        # INFO
        self.model_type = "Transformer"
        self.dim_model = dim_model

        # LAYERS
        self.positional_encoder = PositionalEncoding(
            dim_model=dim_model, dropout_p=dropout_p, max_len=5000
        )
        self.embedding = nn.Embedding(num_tokens, dim_model)
        self.transformer = nn.Transformer(
            d_model=dim_model,
            nhead=num_heads,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dropout=dropout_p,
        )
        self.out = nn.Linear(dim_model, num_tokens)

    def forward(self, src, tgt, tgt_mask=None):
        # Src size must be (batch_size, src sequence length)
        # Tgt size must be (batch_size, tgt sequence length)

        # Embedding + positional encoding - Out size = (batch_size, sequence length, dim_model)
        src = self.embedding(src) * math.sqrt(self.dim_model)
        tgt = self.embedding(tgt) * math.sqrt(self.dim_model)
        src = self.positional_encoder(src)
        tgt = self.positional_encoder(tgt)
        
        # We could use the parameter batch_first=True, but our KDL version doesn't support it yet, so we permute
        # to obtain size (sequence length, batch_size, dim_model),
        src = src.permute(1,0,2)
        tgt = tgt.permute(1,0,2)

        # Transformer blocks - Out size = (sequence length, batch_size, num_tokens)
        transformer_out = self.transformer(src, tgt, tgt_mask=tgt_mask)
        out = self.out(transformer_out)
        
        return out

    def get_tgt_mask(self, size) -> torch.tensor:
        # Generates a square matrix where each row allows one word/portion of the sequence more to be seen
        mask = torch.tril(torch.ones(size, size) == 1) # Lower triangular matrix
        mask = mask.float()
        mask = mask.masked_fill(mask == 0, float('-inf')) # Convert zeros to -inf
        mask = mask.masked_fill(mask == 1, float(0.0)) # Convert ones to 0
        
        # EX for size=5:
        # [[0., -inf, -inf, -inf, -inf],
        #  [0.,   0., -inf, -inf, -inf],
        #  [0.,   0.,   0., -inf, -inf],
        #  [0.,   0.,   0.,   0., -inf],
        #  [0.,   0.,   0.,   0.,   0.]]
        
        return mask
    
class PositionalEncoding(nn.Module):
    def __init__(self, dim_model, dropout_p, max_len):
        super().__init__()
        # Modified version from: https://pytorch.org/tutorials/beginner/transformer_tutorial.html
        # max_len determines how far the position can have an effect on a token (window)
        
        # Info
        self.dropout = nn.Dropout(dropout_p)
        
        # Encoding - From formula
        pos_encoding = torch.zeros(max_len, dim_model)
        positions_list = torch.arange(0, max_len, dtype=torch.float).view(-1, 1) # 0, 1, 2, 3, 4, 5
        division_term = torch.exp(torch.arange(0, dim_model, 2).float() * (-math.log(10000.0)) / dim_model) # 1000^(2i/dim_model)
        
        # PE(pos, 2i) = sin(pos/1000^(2i/dim_model))
        pos_encoding[:, 0::2] = torch.sin(positions_list * division_term)
        
        # PE(pos, 2i + 1) = cos(pos/1000^(2i/dim_model))
        pos_encoding[:, 1::2] = torch.cos(positions_list * division_term)
        
        # Saving buffer (same as parameter without gradients needed)
        pos_encoding = pos_encoding.unsqueeze(0).transpose(0, 1)
        self.register_buffer("pos_encoding",pos_encoding)

        self.pos_encoding = pos_encoding
        
    def forward(self, token_embedding: torch.tensor) -> torch.tensor:
        # Residual connection + pos encoding
        return self.dropout(token_embedding + self.pos_encoding[:token_embedding.size(0), :])
    



In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Transformer(
    num_tokens=4, dim_model=1, num_heads=2, num_encoder_layers=3, num_decoder_layers=3, dropout_p=0.1
).to(device)
opt = torch.optim.SGD(model.parameters(), lr=0.01)
loss_fn = nn.CrossEntropyLoss()



In [114]:
class ScalarOutputModel(nn.Module):
    def __init__(self):
        super(ScalarOutputModel, self).__init__()
        self.layer1 = nn.Linear(10, 64) # Input size 10, output size 64
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(64, 1) # Output size 1 for a single scalar

    def forward(self, x):
        x = self.layer1(x)
        x = self.relu(x)
        x = self.layer2(x)
        return x

# Instantiate the model
model = ScalarOutputModel()

# Create a dummy input tensor (batch size of 5, 10 input features)
input_tensor = torch.randn(5, 10)

# Pass the input through the model
output = model(input_tensor)

print(output)
print(output.shape)

tensor([[ 0.4571],
        [-0.2435],
        [ 0.1194],
        [-0.0457],
        [ 0.4801]], grad_fn=<AddmmBackward0>)
torch.Size([5, 1])


In [113]:
c.shape

torch.Size([3, 2, 1])