In [5]:
import torch
from einops import rearrange, repeat
from einops.layers.torch import Rearrange
from torch import nn

# helpers

def pair(t):
    return t if isinstance(t, tuple) else (t, t)

# classes

class PreNorm(nn.Module):
    def __init__(self, dim, fn):
        super().__init__()
        self.norm = nn.LayerNorm(dim)
        self.fn = fn
    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)

class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout = 0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )
    def forward(self, x):
        return self.net(x)

class Attention(nn.Module):
    def __init__(self, dim, heads = 8, dim_head = 64, dropout = 0.):
        super().__init__()
        inner_dim = dim_head *  heads
        project_out = not (heads == 1 and dim_head == dim)

        self.heads = heads
        self.scale = dim_head ** -0.5

        self.attend = nn.Softmax(dim = -1)
        self.to_qkv = nn.Linear(dim, inner_dim * 3, bias = False)

        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, dim),
            nn.Dropout(dropout)
        ) if project_out else nn.Identity()

    def forward(self, x):
        qkv = self.to_qkv(x).chunk(3, dim = -1)
        q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = self.heads), qkv)

        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale

        attn = self.attend(dots)

        out = torch.matmul(attn, v)
        out = rearrange(out, 'b h n d -> b n (h d)')
        return self.to_out(out)

class Transformer(nn.Module):
    def __init__(self, dim, depth, heads, dim_head, mlp_dim, dropout = 0.):
        super().__init__()
        self.layers = nn.ModuleList([])
        for _ in range(depth):
            self.layers.append(nn.ModuleList([
                PreNorm(dim, Attention(dim, heads = heads, dim_head = dim_head, dropout = dropout)),
                PreNorm(dim, FeedForward(dim, mlp_dim, dropout = dropout))
            ]))
    def forward(self, x):
        for attn, ff in self.layers:
            x = attn(x) + x
            x = ff(x) + x
        return x

class IPT(nn.Module):
    def __init__(self, *, traj_size, seg_size, num_coordinate, dim, depth, heads, mlp_dim, pool = 'position', channels = 3, dim_head = 64, dropout = 0., emb_dropout = 0.):
        '''Generating the IPS transformer
        Args:
            traj_size (int): the size of the trajectory CSI (1D)
            seg_size (int): the size of the segmented part of a whole trajectory CSI (1D)
            num_coordinate (int): use x and y to represent the position
            dim (int): the embedding dimension (I think it can be used to shrink the input size)
            depth (int): the number of encoder in the encoder layer
            heads (int): the number of heads in the multihead attention case
            mlp_dim (int): multilayyer perceptron, to set up the number of neurons in the linear layer of FeedForward layer
            pool (str, optional): define the inserted information type. Defaults to 'position'. (also consider add DOA information here)
            channels (int, optional): (maybe) number of antennas. Defaults to 3.
            dim_head (int, optional): the dimension of the Q,K,V matrices. Defaults to 64. (I guess the reason why the head_dim is divisible by embedding dimension is that when you concatenate the result of the three heads, the size of the the concatenated vector is the same as the input vector)
            dropout (int, optional): dropout rate used in feedforward layer . Defaults to 0..
            emb_dropout (int, optional): dropout rate used in embedding layer. Defaults to 0.. (not so sure why the embedding layer also needs dropout)
        '''
        super().__init__()
        assert traj_size % seg_size == 0, 'The dimension of the trajectory CSI must be divisible by the segmentation size.'
        num_seg = traj_size // seg_size # // means floor operation
        
        ## IPS: divide the CSI of one traj into 
        # patch_dim = channels * patch_height * patch_width (3D)
        seg_dim = channels * seg_size #(2D)
        #IPS: the position shall be inserted at the beginning of the trajectory CSI
        assert pool in {'cls', 'mean', 'position'}, 'pool type must be either cls (cls token) or mean (mean pooling) or position '

        # b: means the batch size, which may be the number of trajectories
        self.to_seg_embedding = nn.Sequential(
            # Rearrange('b c (h p1) (w p2) -> b (h w) (p1 p2 c)', p1 = patch_height, p2 = patch_width),
            Rearrange('b c (h s) -> b h (s c)', s = seg_size),
            nn.Linear(seg_dim, dim), # I guess this linear layer will shrink the input dimension to embedding dim
        )

        self.pos_embedding = nn.Parameter(torch.randn(1, num_seg + 1, dim)) #NOTE the position embedding chosen to be trainable
        self.pos_token = nn.Parameter(torch.randn(1, 1, dim)) #! pos_token is the KEY for IPS, adding the position information to the input
        self.dropout = nn.Dropout(emb_dropout)

        self.transformer = Transformer(dim, depth, heads, dim_head, mlp_dim, dropout)

        self.pool = pool
        self.to_latent = nn.Identity() # DOING NOTHING JUST COPY THE INPUT TO THE OUTPUT

        self.mlp_head = nn.Sequential(
            nn.LayerNorm(dim),
            nn.Linear(dim, num_coordinate) #! here the code needs to be modified
        )

    def forward(self, traj_CSI):
        x = self.to_seg_embedding(traj_CSI)
        b, n, _ = x.shape # b: batch size, n: channel (number of the antennas)

        pos_tokens = repeat(self.pos_token, '() n d -> b n d', b = b)
        x = torch.cat((pos_tokens, x), dim=1) #! IPS: Concatenate the position information to the begining of the trajectory CSI
        x += self.pos_embedding[:, :(n + 1)] # Add the positional embedding
        x = self.dropout(x)

        x = self.transformer(x)

        # x = x.mean(dim = 1) if self.pool == 'mean' else x[:, 0]
        x = x[:,:2] if self.pool == 'position' else x[:, 0]

        x = self.to_latent(x)
        return self.mlp_head(x)
    
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
model = IPT(traj_size=224,seg_size=16,num_coordinate=4,dim=128,depth=1,heads=2,mlp_dim=1024).cuda()
traj_CSI = torch.randn(1, 8, 224).cuda()
out = model(traj_CSI)

cuda


RuntimeError: mat1 and mat2 shapes cannot be multiplied (14x128 and 48x128)

In [4]:
a = torch.randn(3,4)
print(a)
a[:,:2]

tensor([[-0.0044,  0.8776, -1.4038, -1.2487],
        [-0.1700, -1.5702, -0.8470,  0.4614],
        [ 0.5038, -0.7697,  1.4915,  0.6889]])


tensor([[-0.0044,  0.8776],
        [-0.1700, -1.5702],
        [ 0.5038, -0.7697]])