In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F
from functools import partial
from collections import OrderedDict
from torch.utils.data import Dataset, DataLoader
from datetime import datetime
import os
import pickle
import suncalc
import PIL
import PIL.Image
from torchvision import transforms
import torch.optim as optim
from tqdm import tqdm
from torch.utils.data import DataLoader, random_split

#### 数据集生成

In [2]:
class YinYingDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Args:
            root_dir (string): 数据集的根目录路径。
            transform (callable, optional): 需要应用于样本的可选变换。
        """
        self.root_dir = root_dir
        self.feature= []
        self.label = []
        self.angle = []
        self._load_dataset_path()

    def _load_dataset_path(self):
        """
        遍历数据集目录，加载所有数据和标签的路径。
        """
        for class_folder in os.listdir(self.root_dir):
            class_folder_path = os.path.join(self.root_dir, class_folder)
            for sub_class_folder in os.listdir(class_folder_path):
                sub_class_folder_path = os.path.join(class_folder_path,sub_class_folder)
                feature_path = os.path.join(sub_class_folder_path,'train','train.pkl')
                label_path = os.path.join(sub_class_folder_path,'label','label.pkl')
                angle_path = os.path.join(sub_class_folder_path,'train','position.pkl')
                self.feature.append(feature_path)
                self.label.append(label_path)
                self.angle.append(angle_path)

    def __len__(self):
        return len(self.feature)

    def __getitem__(self, idx):
        feature_path = self.feature[idx]
        label_path = self.label[idx]
        angle_path = self.angle[idx]
        feature = None 
        label = None 
        angle = None 
        with open(feature_path,'rb') as f:
            feature = pickle.load(f)
        with open(label_path,'rb') as f:
            label = pickle.load(f)
        with open(angle_path,'rb') as f:
            angle = pickle.load(f)
        feature = torch.Tensor(feature).squeeze(0)
        label = torch.Tensor(label)
        pad_h = 1280- feature.shape[0]
        pad_w = 1280 - feature.shape[1]
        pad_h = 1280- feature.shape[0]
        pad_w = 1280 - feature.shape[1]
        feature = F.pad(feature,(int(pad_w/2),pad_w-int(pad_w/2),int(pad_h/2),pad_h-int(pad_h/2)))
        label = F.pad(label,(int(pad_w/2),pad_w-int(pad_w/2),int(pad_h/2),pad_h-int(pad_h/2)))
        angle = torch.Tensor(angle)
        mask = (feature != 0).float()        
        return feature.unsqueeze(0), label, angle, mask 

In [3]:
class PatchEmbed(nn.Module):
    '''
    1D Image to Patch Embedding
    '''
    def __init__(self,img_size=1280,patch_size=20, in_c=1,embed_dim=400,norm_lay=None):
        super().__init__()
        # 图片分辨率
        img_size = (img_size, img_size)
        # 卷积核大小
        patch_size = (patch_size,patch_size)
        self.img_size = img_size
        self.patch_size = patch_size
        # 分别计算w，h方向上的patch个数
        self.grid_size = (img_size[0]//patch_size[0],img_size[1]//patch_size[1])
        # 一张图片的patch个数
        self.num_patches = self.grid_size[0]*self.grid_size[1]
        self.embed_dim = self.patch_size[0]*self.patch_size[1]*in_c
        # 卷积的步长实现图片切分操作，而后与patch大小一致的卷积核完成线性映射
        self.proj = nn.Conv2d(in_c,embed_dim,kernel_size=patch_size,stride=patch_size)
        self.norm = norm_lay(embed_dim) if norm_lay else nn.Identity()

    def forward(self,x):
        x = self.proj(x).flatten(2).transpose(1,2)
        x = self.norm(x)
        return x

In [4]:
class LayerNorm(nn.LayerNorm):
    """Subclass torch's LayerNorm to handle fp16."""

    def forward(self, x: torch.Tensor):
        orig_type = x.dtype
        ret = super().forward(x.type(torch.float32))
        return ret.type(orig_type)


class QuickGELU(nn.Module):
    def forward(self, x: torch.Tensor):
        return x * torch.sigmoid(1.702 * x)
    
class ResidualAttentionBlock(nn.Module):
    def __init__(self, d_model: int, n_head: int):
        super().__init__()

        self.attn = nn.MultiheadAttention(d_model, n_head)
        self.ln_1 = LayerNorm(d_model)
        self.mlp = nn.Sequential(OrderedDict([
            ("c_fc", nn.Linear(d_model, d_model * 4)),
            ("gelu", QuickGELU()),
            ("c_proj", nn.Linear(d_model * 4, d_model))
        ]))
        self.ln_2 = LayerNorm(d_model)

    def attention(self, x: torch.Tensor):
        return self.attn(x, x, x, need_weights=False)[0]

    def forward(self, x: torch.Tensor):
        x = x + self.attention(self.ln_1(x))
        x = x + self.mlp(self.ln_2(x))
        return x
class Transformer(nn.Module):
    def __init__(self, width: int, layers: int, heads: int):
        super().__init__()
        self.width = width
        self.layers = layers
        self.resblocks = nn.Sequential(*[ResidualAttentionBlock(width, heads) for _ in range(layers)])

    def forward(self, x: torch.Tensor):
        return self.resblocks(x)

#### 定义位置编码

In [5]:
class Angle_Encoder(nn.Module):
    def __init__(self, angle_length, emb_dim, dropout_rate=0.1):
        super(Angle_Encoder,self).__init__()
        self.emb_dim = emb_dim
        self.lr = nn.Linear(angle_length,emb_dim)
        self.pos_embedding = nn.Parameter(torch.zeros(1,4096,emb_dim))
        self.transformer = Transformer(emb_dim,8,8)
        if dropout_rate>0:
            self.dropout = nn.Dropout(dropout_rate)
        else:
            self.dropout = None
    def forward(self, x):
        out = self.lr(x)
        out = out.unsqueeze(1)
        out = out.repeat((1,4096,1))
        out = out + self.pos_embedding
        out = self.transformer(out)
        if self.dropout:
            out = self.dropout(out)
        return out  

class PositionEmbs(nn.Module):
    def __init__(self, angle_length, num_patches, emb_dim, dropout_rate=0.1):
        super(PositionEmbs,self).__init__()
        self.angle_encoder = Angle_Encoder(angle_length, emb_dim,dropout_rate)
        self.pos_embedding = nn.Parameter(torch.zeros(1,num_patches,emb_dim))
        if dropout_rate >0:
            self.dropout = nn.Dropout(dropout_rate)
        else:
            self.dropout = None
    def forward(self,x, angle):
        out = x+self.pos_embedding + self.angle_encoder(angle)
        if self.dropout:
            out = self.dropout(out)
        return out

#### 定义注意力机制

In [6]:
class Attention(nn.Module):
    def __init__(self,
                 dim,   # 输入token的dim
                 num_heads=8,
                 qkv_bias=False,
                 qk_scale=None,
                 attn_drop_ratio=0.,
                 proj_drop_ratio=0.):
        super(Attention, self).__init__()
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = qk_scale or head_dim ** -0.5
        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop_ratio)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop_ratio)

    def forward(self, x):
        # [batch_size, num_patches + 1, total_embed_dim]
        B, N, C = x.shape

        # qkv(): -> [batch_size, num_patches + 1, 3 * total_embed_dim]
        # reshape: -> [batch_size, num_patches + 1, 3, num_heads, embed_dim_per_head]
        # permute: -> [3, batch_size, num_heads, num_patches + 1, embed_dim_per_head]
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        # [batch_size, num_heads, num_patches + 1, embed_dim_per_head]
        q, k, v = qkv[0], qkv[1], qkv[2]  # make torchscript happy (cannot use tensor as tuple)

        # transpose: -> [batch_size, num_heads, embed_dim_per_head, num_patches + 1]
        # @: multiply -> [batch_size, num_heads, num_patches + 1, num_patches + 1]
        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.attn_drop(attn)

        # @: multiply -> [batch_size, num_heads, num_patches + 1, embed_dim_per_head]
        # transpose: -> [batch_size, num_patches + 1, num_heads, embed_dim_per_head]
        # reshape: -> [batch_size, num_patches + 1, total_embed_dim]
        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

In [7]:
class Mlp(nn.Module):
    """
    MLP as used in Vision Transformer, MLP-Mixer and related networks
    """
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = act_layer()
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop = nn.Dropout(drop)

    def forward(self, x):
        x = self.fc1(x)
        x = self.act(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.drop(x)
        return x

In [8]:
def drop_path(x, drop_prob: float = 0., training: bool = False):
    """
    Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).
    This is the same as the DropConnect impl I created for EfficientNet, etc networks, however,
    the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper...
    See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for
    changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use
    'survival rate' as the argument.
    """
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)  # (shape[0],1,1,1)
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)   # rand范围在[0~1]之间, +keep_prob在[keep_prob~keep_prob+1]之间
    random_tensor.floor_()  # 只保留0或者1
    output = x.div(keep_prob) * random_tensor   # x.div(keep_prob)个人理解是为了强化保留部分的x
    return output


class DropPath(nn.Module):
    """
    Drop paths (Stochastic Depth) per sample  (when applied in main path of residual blocks).
    """
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training)
    
class Block(nn.Module):
    def __init__(self,
                 dim,
                 num_heads,
                 mlp_ratio=4.,
                 qkv_bias=False,
                 qk_scale=None,
                 drop_ratio=0.,
                 attn_drop_ratio=0.,
                 drop_path_ratio=0.,
                 act_layer=nn.GELU,
                 norm_layer=nn.LayerNorm):
        super(Block, self).__init__()
        self.norm1 = norm_layer(dim)
        self.attn = Attention(dim, num_heads=num_heads, qkv_bias=qkv_bias, qk_scale=qk_scale,
                              attn_drop_ratio=attn_drop_ratio, proj_drop_ratio=drop_ratio)
        # NOTE: drop path for stochastic depth, we shall see if this is better than dropout here
        self.drop_path = DropPath(drop_path_ratio) if drop_path_ratio > 0. else nn.Identity()
        self.norm2 = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop_ratio)

    def forward(self, x):
        x = x + self.drop_path(self.attn(self.norm1(x)))   
        x = x + self.drop_path(self.mlp(self.norm2(x)))   
        return x

In [9]:
class VisionTransformer(nn.Module):
    def __init__(self, angle_length = 2, img_size=1280, patch_size=20, in_c=1, 
                 embed_dim=400, depth=12, num_heads=8, mlp_ratio=4.0, qkv_bias=True,
                 qk_scale=None, representation_size=None, distilled=False, drop_ratio=0.,
                 attn_drop_ratio=0., drop_path_ratio=0., embed_layer=PatchEmbed, norm_layer=None,
                 act_layer=None):
        """
        Args:
            img_size (int, tuple): input image size
            patch_size (int, tuple): patch size
            in_c (int): number of input channels
            embed_dim (int): embedding dimension
            depth (int): depth of transformer
            num_heads (int): number of attention heads
            mlp_ratio (int): ratio of mlp hidden dim to embedding dim
            qkv_bias (bool): enable bias for qkv if True
            qk_scale (float): override default qk scale of head_dim ** -0.5 if set
            representation_size (Optional[int]): enable and set representation layer (pre-logits) to this value if set
            distilled (bool): model includes a distillation token and head as in DeiT models
            drop_ratio (float): dropout rate
            attn_drop_ratio (float): attention dropout rate
            drop_path_ratio (float): stochastic depth rate
            embed_layer (nn.Module): patch embedding layer
            norm_layer: (nn.Module): normalization layer
        """
        super(VisionTransformer,self).__init__()
        # embed_dim默认tansformer的base 256
        self.num_features = self.embed_dim = embed_dim
        # 源码distilled是为了其他任务,分类暂时不考虑
        self.num_tokens = 2 if distilled else 1
        # LayerNorm:对每单个batch进行的归一化
        norm_layer = norm_layer or partial(nn.LayerNorm,eps=1e-6)
        # act_layer默认tansformer的GELU
        act_layer = act_layer or nn.GELU
        # embed_layer默认是patch embedding,在其他应用中应该会有其他选择
        # patch embedding过程
        self.patch_embed = embed_layer(img_size=img_size, patch_size=patch_size, in_c=in_c, embed_dim=embed_dim)
        # patche个数
        num_patches = self.patch_embed.num_patches
        # positional embedding过程
        self.pos_embedding = PositionEmbs(angle_length, num_patches, embed_dim, drop_ratio)
        # depth是Block的个数
        # 不同block层数 drop_ratio的概率不同,越深度越高
        dpr = [x.item() for x in torch.linspace(0, drop_path_ratio, depth)]  # stochastic depth decay rule
        # blocks搭建
        self.blocks = nn.Sequential(*[
            Block(dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, qk_scale=qk_scale,
                  drop_ratio=drop_ratio, attn_drop_ratio=attn_drop_ratio, drop_path_ratio=dpr[i],
                  norm_layer=norm_layer, act_layer=act_layer)
            for i in range(depth)
        ])

        self.norm = norm_layer(embed_dim)
        
    def forward_features(self,x, angle):
        x = self.patch_embed(x)
        x = self.pos_embedding(x, angle)
        x = self.blocks(x)
        x = self.norm(x)
        return x
    
    def forward(self,x, angle):
        x = self.forward_features(x, angle)
        return x

In [10]:
# 自定义损失函数，仅对 mask 区域进行计算
class MaskedMSELoss(nn.Module):
    def __init__(self):
        super(MaskedMSELoss, self).__init__()

    def forward(self, output, label, mask):
        mask = mask.float()
        squared_loss = ((output - label) ** 2) * mask
        mask_sum = mask.sum()

        # 防止分母为零
        if mask_sum == 0:
            return torch.tensor(0.0, device=output.device)

        loss = squared_loss.sum() / mask_sum
        loss = torch.sqrt(loss)
        return loss

In [11]:
root_dir = './data/'
dataset = YinYingDataset(root_dir)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
# 计算测试集和训练集的大小
dataset_size = len(dataset)
test_size = int(dataset_size * 0.2)
train_size = dataset_size - test_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)  
# 定义设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 实例化模型、损失函数和优化器
model =VisionTransformer().to(device)
criterion = MaskedMSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
loss_list = []
num_epochs = 20

# model.train()
# for epoch in range(num_epochs):
#     running_loss = 0.0
#     for i, (feature, label,angle, mask) in enumerate(train_loader):
#         feature = feature.to(device)
#         angle = angle.to(device)
#         mask = mask.flatten().to(device)
#         label = label.flatten().to(device)
#         optimizer.zero_grad()
#         pred = model(feature,angle).flatten()
#         loss = criterion(pred, label, mask)
#         loss.backward()
#         optimizer.step()
#         running_loss += loss.item() 
#     epoch_loss = running_loss / len(dataloader.dataset)
#     loss_list.append(epoch_loss)
#     print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}')