In [2]:
import numpy as np

# Confusion matrix
confusion_matrix_2 = np.array([[5789,36],
 [77,5758]])

# List of attributes
fab_attri = ['Normal', 'max_engine_coolant_temp_attack', 'fuzzing_attack', 'max_speedometer_attack',
             'reverse_light_on_attack', 'reverse_light_off_attack', 'correlated_signal_attack']
mas_attri = ['Normal', 'max_engine_coolant_temp_attack', 'max_speedometer_attack',
             'reverse_light_on_attack', 'reverse_light_off_attack', 'correlated_signal_attack']

attri = ['Normal','Thrief']

# F1 score calculation for each attribute
f1_scores = []


def cal_cm(confusion_matrix, attributes):
    precision_list = []
    recall_list = []
    f1_score_list = []
    for i in range(len(attributes)):
        TP = confusion_matrix[i, i]
        TN = sum(sum(confusion_matrix)) - TP
        FP = sum(confusion_matrix[:, i]) - TP
        FN = sum(confusion_matrix[i, :]) - TP
        accuracy = (TP + TN) / (TP + TN + FP + FN)
        precision = TP / (TP + FP)
        recall = TP / (TP + FN)
        f1_score = 2 * precision * recall / (precision + recall)
        FNR = (FN / (TP + FN))*100  # False Negative Rate
        FPR = (FP / (FP + TN))*100  # False Positive Rate
        precision_list.append(precision)
        recall_list.append(recall)
        f1_score_list.append(f1_score)
        print('Attribute: ', attributes[i], '=======================')
        print('Recall: ', recall)
        print('Precision: ', precision)
        print('F1 score: ', f1_score)
        print('False Negative Rate: ', FNR)  # Print FNR
    macro_precision = sum(precision_list) / len(precision_list)
    macro_recall = sum(recall_list) / len(recall_list)
    macro_f1_score = sum(f1_score_list) / len(f1_score_list)
    print('==========================================')
    print('Macro-Precision: ', macro_precision)
    print('Macro-Recall: ', macro_recall)
    print('Macro-F1 Score: ', macro_f1_score)
    return macro_precision, macro_recall, macro_f1_score


f1 = cal_cm(confusion_matrix_2, attri)

# f1_scores_per_attribute = dict(zip(attributes, f1_scores))
# f1_scores_per_attribute

Recall:  0.9938197424892704
Precision:  0.986873508353222
F1 score:  0.9903344452998032
False Negative Rate:  0.6180257510729614
Recall:  0.9868037703513282
Precision:  0.9937866758715913
F1 score:  0.9902829134061398
False Negative Rate:  1.3196229648671807
Macro-Precision:  0.9903300921124066
Macro-Recall:  0.9903117564202992
Macro-F1 Score:  0.9903086793529715


# WD CNN

In [8]:
import torch
import torch.nn as nn
from collections import OrderedDict
import torch.nn.functional as F

# The compared method WDCNNModel is derived from
# https://github.com/neuralmind-ai/electricity-theft-detection-with-self-attention/blob/master/CNN_model.py


def kernel_fn(kernel, channel_in, channel_out, device):
    kernel = torch.FloatTensor(kernel).unsqueeze(0).unsqueeze(0)
    kernel = kernel.repeat(channel_out, channel_in, 1, 1).float()
    weight = nn.Parameter(data=kernel, requires_grad=False)

    return weight.to(device)

# random_seed = 123
# torch.manual_seed(random_seed)
class KernelConv2d(nn.Module):
    def __init__(self, channel_in, channel_out, stride=1, padding=0, bias=False):
        super(KernelConv2d, self).__init__()
        self.channel_in = channel_in
        self.channel_out = channel_out
        self.stride = stride
        self.padding = padding

        self.g1_kernel = [[0.0, -1.0, 0.0],
                     [0.0, 2.0, 0.0],
                     [0.0, -1.0, 0.0]]
        self.g2_kernel = [[0.0, 0.0, 0.0],
                     [-1.0, 2.0, -1.0],
                     [0.0, 0.0, 0.0]]

        self.bias = bias

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        g1_kernel = kernel_fn(self.g1_kernel, self.channel_in, self.channel_out, x.device) # [channel_out, channel_in, kernel, kernel]
        g2_kernel = kernel_fn(self.g2_kernel, self.channel_in, self.channel_out, x.device)
        if self.bias:
            out_g1 = F.conv2d(x, g1_kernel, stride=self.stride, padding=self.padding, bias=torch.rand(self.channel_out).to(x.device))
            out_g2 = F.conv2d(x, g2_kernel, stride=self.stride, padding=self.padding, bias=torch.rand(self.channel_out).to(x.device))
        else:
            out_g1 = F.conv2d(x, g1_kernel, stride=self.stride, padding=self.padding)
            out_g2 = F.conv2d(x, g2_kernel, stride=self.stride, padding=self.padding)
        out = torch.tanh(out_g1+out_g2)
        return out


class WDCNNModel(nn.Module):
    def __init__(self):
        super(WDCNNModel, self).__init__()

        # self.cnn_nc = 16
        # self.wide_fc_nc = 50
        # self.deep_fc_nc = 60
        self.cnn_nc = 60
        self.wide_fc_nc = 90
        self.deep_fc_nc = 90

        self.wide_net = nn.Sequential(OrderedDict([
            ('wide_fc', nn.Linear(148 * 7*2, self.wide_fc_nc)),
            ('wide_fc_relu', nn.ReLU()),
        ]))

        self.deep_net = nn.Sequential(OrderedDict([
            ('conv1', KernelConv2d(2, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu1', nn.ReLU()),

            ('conv2', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu2', nn.ReLU()),

            ('conv3', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu3', nn.ReLU()),

            ('conv4', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu4', nn.ReLU()),

            ('conv5', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu5', nn.ReLU()),

            ('maxpool', nn.MaxPool2d((1, 7), stride=(1, 7))),
        ]))

        self.deep_net_fc = nn.Sequential(OrderedDict([
            ('deep_fc', nn.Linear(self.cnn_nc * 148, self.deep_fc_nc)),
            ('deep_fc_relu', nn.ReLU()),
        ]))

        self.fusion_fc = nn.Linear(self.wide_fc_nc + self.deep_fc_nc, 1)

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # wide&deep model does not use the mask map
        # x = x[:, 0:1, :, :]

        wide_output = self.wide_net(x.view(x.shape[0], -1))

        deep_output = self.deep_net(x)
        deep_output = self.deep_net_fc(deep_output.view(deep_output.shape[0], -1))

        output = self.fusion_fc(torch.cat((wide_output, deep_output), 1))
        if self.training == False:
            output = self.sigmoid(output)

        return output


class GroupFC(nn.Module):
    """Define a Resnet block"""

    def __init__(self, input_shape, output_nc, group_num=4, view=True):
        super(GroupFC, self).__init__()
        self.view = view

        input_nc = input_shape[0]
        h = input_shape[1]
        w = input_shape[2]

        self.groupFC = nn.Conv2d(input_nc, output_nc, groups=group_num, kernel_size=(h, w), stride=(h, w), padding=0)

    def forward(self, x):
        out = self.groupFC(x)
        if self.view:
            return out.view([x.shape[0], -1])
        else:
            return out


class WDCNNModel_g1g2_mask_699_hyorder(nn.Module):
    def __init__(self):
        super(WDCNNModel_g1g2_mask_699_hyorder, self).__init__()

        # self.cnn_nc = 16
        # self.wide_fc_nc = 50
        # self.deep_fc_nc = 60
        self.cnn_nc = 60
        self.wide_fc_nc = 90
        self.deep_fc_nc = 90

        self.wide_net = nn.Sequential(OrderedDict([
            ('wide_fc', nn.Linear(148 * 7*2, self.wide_fc_nc)),
            ('wide_fc_relu', nn.ReLU()),
        ]))

        self.deep_net = nn.Sequential(OrderedDict([
            ('conv1', KernelConv2d(2, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu1', nn.ReLU()),

            ('conv2', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu2', nn.ReLU()),

            ('conv3', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu3', nn.ReLU()),

            ('conv4', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu4', nn.ReLU()),

            ('conv5', KernelConv2d(self.cnn_nc, self.cnn_nc, stride=1, padding=1, bias=True)),
            ('relu5', nn.ReLU()),

            # ('maxpool', ),
        ]))

        self.pool = nn.MaxPool2d((1, 7), stride=(1, 7))

        self.deep_net_fc = nn.Sequential(OrderedDict([
            ('deep_fc', nn.Linear(self.cnn_nc * 148, self.deep_fc_nc)),
            ('deep_fc_relu', nn.ReLU()),
        ]))

        self.sigmoid = nn.Sigmoid()

        self.day_head = 4

        w=7
        output_dim = 180

        self.day_pcc_layer = nn.Sequential(OrderedDict([
            ('dense1', GroupFC((self.day_head,
                                w,
                                w), output_dim,
                               group_num=1)),
            ('norm1', nn.BatchNorm1d(output_dim)),
            ('prelu1', nn.PReLU()),
            ('drop1', nn.Dropout(p=0.7))
        ]))

        self.fusion_fc = nn.Linear(self.wide_fc_nc + self.deep_fc_nc + output_dim, 1)

    def forward(self, x):
        N, C, H, W = x.shape

        wide_output = self.wide_net(x.view(x.shape[0], -1))
        deep_output = self.deep_net(x)  # b,60,148,7

        C = deep_output.shape[1]

        day_input = deep_output.permute(0, 3, 1, 2).reshape(N,
                                                  W,
                                                  self.day_head,
                                                  (C * H) // self.day_head).permute(0, 2, 1,
                                                                                    3).contiguous()  # N x W x C*H -> N x W x head x (C*H/4) -> N x head x W x (C*H/4)

        day_pcc = torch.einsum("bnqd,bnkd->bnqk", day_input, day_input)
        second_output = self.day_pcc_layer(day_pcc)  # b,180

        deep_output = self.pool(deep_output)
        deep_output = self.deep_net_fc(deep_output.view(deep_output.shape[0], -1))
        first_output = torch.cat((wide_output, deep_output), 1)  # b,180

        output = self.fusion_fc(torch.cat((first_output, second_output), dim=1))  # B,1

        if self.training == False:
            output = self.sigmoid(output)

        return output

In [31]:
import torch
from torchinfo import summary
import time
from thop import profile

# Define the model
model = WDCNNModel()

# Move the model to the appropriate device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Print the summary of the model to get the number of parameters
print(summary(model, (8, 2, 148, 7)))

# Create a random input tensor with a larger batch size
batch_size = 8  # Increase the batch size
input_tensor = torch.randn(batch_size, 2, 148, 7).to(device)

macs, params = profile(model, [input_tensor])
print('MACs (G): ', macs/1000**2)
print('Params (M): ', params/1000**2)

# Measure the time taken for a forward pass
start_time = time.time()
output = model(input_tensor)
end_time = time.time()

print(f"Running time for a forward pass: {end_time - start_time} seconds")

Layer (type:depth-idx)                   Output Shape              Param #
WDCNNModel                               [8, 1]                    --
├─Sequential: 1-1                        [8, 90]                   --
│    └─Linear: 2-1                       [8, 90]                   186,570
│    └─ReLU: 2-2                         [8, 90]                   --
├─Sequential: 1-2                        [8, 60, 148, 1]           --
│    └─KernelConv2d: 2-3                 [8, 60, 148, 7]           --
│    └─ReLU: 2-4                         [8, 60, 148, 7]           --
│    └─KernelConv2d: 2-5                 [8, 60, 148, 7]           --
│    └─ReLU: 2-6                         [8, 60, 148, 7]           --
│    └─KernelConv2d: 2-7                 [8, 60, 148, 7]           --
│    └─ReLU: 2-8                         [8, 60, 148, 7]           --
│    └─KernelConv2d: 2-9                 [8, 60, 148, 7]           --
│    └─ReLU: 2-10                        [8, 60, 148, 7]           --
│    └─Ker

In [11]:
import torch
import torch.nn as nn
import math
import torch.nn.functional as F
from collections import OrderedDict

# The compared method HybridAttentionModel = 2*AttnBlock = 2*(LinearAttention+MixedDilationConv) is forked from 
# https://github.com/neuralmind-ai/electricity-theft-detection-with-self-attention/blob/master/Hybrid_Attn.py


class LinearAttention(nn.Module):

    def __init__(self, in_heads, out_heads):
        super().__init__()
        in_features = 7
        
        in_sz = in_features * in_heads
        out_sz = in_features * out_heads
        
        self.key = nn.Linear(in_sz, out_sz)
        self.query = nn.Linear(in_sz, out_sz)
        self.value = nn.Linear(in_sz, out_sz)
        
        self.heads = out_heads
        self.in_features = in_features
        
    def split_heads(self, x):
        N, L, D = x.shape
        x = x.view(N, L, self.heads, -1).contiguous()
        x = x.permute(0, 2, 1, 3)
        return x

    def forward(self, x):
        N, C, L, D = x.shape
        x = x.permute(0, 2, 1, 3).contiguous() # N x L x C x D
        # x = x.view(N, L, -1).contiguous() # N x L x C*D
        
        
        # k = self.key(x)  # [32, 148, 16*7=112]
        # q = self.query(x)
        # v = self.value(x)
        x = x.view(N*L, -1).contiguous()  # N x L x C*D
        k = self.key(x).view(N, L, -1).contiguous()  # [32, 148, 16*7=112]
        q = self.query(x).view(N, L, -1).contiguous()
        v = self.value(x).view(N, L, -1).contiguous()
        
        k = self.split_heads(k)  # [32, 16, 148, 7]
        q = self.split_heads(q)
        v = self.split_heads(v)

        scores = torch.einsum("bnqd,bnkd->bnqk", q, k)  # [32, 16, 148, 148]
        scores = scores / math.sqrt(scores.shape[-1])
        
        weights = F.softmax(scores.float(), dim=-1).type_as(scores) 
        weights = F.dropout(weights, p=0.5, training=self.training)  # [32, 16, 148, 148]
        attention = torch.matmul(weights, v)  # [32, 16, 148, 7]
        return attention

class MixedDilationConv(nn.Module):

    def __init__(self, in_channels, out_channels):
        super().__init__()
        dil1 = out_channels // 2
        dil2 = out_channels - dil1
        self.conv = nn.Conv2d(in_channels, dil1, kernel_size=3, padding=1, dilation=1)
        self.conv1 = nn.Conv2d(in_channels, dil2, kernel_size=3, padding=2, dilation=2)

    def forward(self, x):
        o = self.conv(x)  # [32, 16, 148, 7]
        o1 = self.conv1(x)  # [32, 16, 148, 7]
        out = torch.cat((o, o1), dim=1)  # [32, 32, 148, 7]
        return out
    

    
class AttnBlock(nn.Module):
    def __init__(self, in_dv, in_channels, out_dv, conv_channels):
        super().__init__()
        self.attn = LinearAttention(in_dv, out_dv)
        self.conv = MixedDilationConv(in_channels, conv_channels)
        self.context = nn.Conv2d(out_dv+conv_channels, out_dv+conv_channels, kernel_size=1)
    def forward(self, x):
        o = self.attn(x)   # [32, 16, 148, 7]
        o1 = self.conv(x)  # [32, 32, 148, 7]
        
        fo = torch.cat((o, o1), dim=1)
        fo = self.context(fo)  # [32, 48, 148, 7]
        
        return fo

class GroupFC(nn.Module):
    """Define a Resnet block"""

    def __init__(self, input_shape, output_nc, group_num=4, view=True):
        super(GroupFC, self).__init__()
        self.view = view

        input_nc = input_shape[0]
        h = input_shape[1]
        w = input_shape[2]

        self.groupFC = nn.Conv2d(input_nc, output_nc, groups=group_num, kernel_size=(h, w), stride=(h, w), padding=0)

    def forward(self, x):
        out = self.groupFC(x)
        if self.view:
            return out.view([x.shape[0], -1])
        else:
            return out


class HybridAttentionModel(nn.Module):

    def __init__(self):
        super().__init__()
        neurons = 128
        drop = 0.5
        self.net = nn.Sequential(
            AttnBlock(2, 2, 16, 32),
            nn.LayerNorm((48, 148, 7)),
            nn.PReLU(),
            nn.Dropout(drop),
            AttnBlock(48, 48, 16, 32),
            nn.LayerNorm((48, 148, 7)), 
            nn.PReLU(),
            nn.Dropout(drop),
        )
        
        self.classifier = nn.Sequential(
            nn.Linear(48 * 1036, neurons * 8),
            nn.BatchNorm1d(neurons * 8),
            nn.PReLU(),
            nn.Dropout(0.6),
            nn.Linear(neurons * 8, 1),
        )

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        N = x.shape[0]
        #x = x.view(N, C, 147, -1)
        o = self.net(x)  # [32, 48, 148, 7]
        o = self.classifier(o.view(N, -1))  # [32, 1]

        if self.training == False:
           o = self.sigmoid(o)

        return o


class HybridAttentionModel_2nd(nn.Module):

    def __init__(self):
        super().__init__()
        neurons = 128
        drop = 0.5
        self.net1 = nn.Sequential(
            AttnBlock(2, 2, 16, 32),
            nn.LayerNorm((48, 148, 7)),
            nn.PReLU(),
            nn.Dropout(drop),
        )

        self.net2 = nn.Sequential(
            AttnBlock(48, 48, 16, 32),
            nn.LayerNorm((48, 148, 7)),
            nn.PReLU(),
            nn.Dropout(drop),
        )

        self.classifier = nn.Sequential(
            nn.Linear(48 * 1036 * 2, neurons * 8),
            nn.BatchNorm1d(neurons * 8),
            nn.PReLU(),
            nn.Dropout(0.6),
            nn.Linear(neurons * 8, 1),
        )

        self.sigmoid = nn.Sigmoid()

        self.day_head = 4

        w = 7
        output_dim = 48 * 1036

        self.day_pcc_layer = nn.Sequential(OrderedDict([
            ('dense1', GroupFC((self.day_head,
                                w,
                                w), output_dim,
                               group_num=1)),
            ('norm1', nn.BatchNorm1d(output_dim)),
            ('prelu1', nn.PReLU()),
            ('drop1', nn.Dropout(p=0.7))
        ]))

    def forward(self, x):
        # N = x.shape[0]
        # x = x.view(N, C, 147, -1)
        o = self.net1(x)  # [32, 48, 148, 7]

        N, C, H, W = o.shape
        day_input = o.permute(0, 3, 1, 2).reshape(N,  W, self.day_head, (C * H) // self.day_head).permute(0, 2, 1,
                                                                                              3).contiguous()
        # N x W x C*H -> N x W x head x (C*H/4) -> N x head x W x (C*H/4)

        day_pcc = torch.einsum("bnqd,bnkd->bnqk", day_input, day_input)  # b, 4,7,7
        second_output = self.day_pcc_layer(day_pcc)  # b,c

        o = self.net2(o)
        o = self.classifier(torch.cat([o.view(N, -1), second_output], dim=1))  # [32, 1]

        if self.training == False:
            o = self.sigmoid(o)

        return o


class HybridAttentionModel_hyorder(nn.Module):

    def __init__(self):
        super().__init__()
        neurons = 128
        drop = 0.5
        self.net = nn.Sequential(
            AttnBlock(2, 2, 16, 32),
            nn.LayerNorm((48, 148, 7)),
            nn.PReLU(),
            nn.Dropout(drop),
            AttnBlock(48, 48, 16, 32),
            nn.LayerNorm((48, 148, 7)),
            nn.PReLU(),
            nn.Dropout(drop),
        )

        # self.net_fc = nn.Sequential(
        #     nn.Linear(48 * 1036, neurons * 8),
        #     nn.BatchNorm1d(neurons * 8),
        #     nn.PReLU(),
        #     nn.Dropout(0.6)
        # )

        output_dim = neurons * 8

        self.net_groupfc = nn.Sequential(OrderedDict([
            ('dense1', GroupFC((48,
                                148,
                                7), output_dim,
                               group_num=1)),
            ('norm1', nn.BatchNorm1d(output_dim)),
            ('prelu1', nn.PReLU()),
            ('drop1', nn.Dropout(p=0.7))
        ]))

        self.classifier = nn.Linear(output_dim * 2, 1)
        self.sigmoid = nn.Sigmoid()

        self.day_head = 4

        w = 7

        self.day_pcc_layer = nn.Sequential(OrderedDict([
            ('dense1', GroupFC((self.day_head,
                                w,
                                w), output_dim,
                               group_num=1)),
            ('norm1', nn.BatchNorm1d(output_dim)),
            ('prelu1', nn.PReLU()),
            ('drop1', nn.Dropout(p=0.7))
        ]))

    def forward(self, x):
        # N = x.shape[0]
        # x = x.view(N, C, 147, -1)
        o = self.net(x)  # [32, 48, 148, 7]

        N, C, H, W = o.shape
        day_input = o.permute(0, 3, 1, 2).reshape(N,  W, self.day_head, (C * H) // self.day_head).permute(0, 2, 1,
                                                                                              3).contiguous()
        # N x W x C*H -> N x W x head x (C*H/4) -> N x head x W x (C*H/4)

        day_pcc = torch.einsum("bnqd,bnkd->bnqk", day_input, day_input)  # b, 4,7,7
        second_output = self.day_pcc_layer(day_pcc)  # b,c

        # o = self.net_fc(o.view(N, -1))
        o = self.net_groupfc(o)  # b, 1024
        o = self.classifier(torch.cat([o, second_output], dim=1))  # [32, 1]

        if self.training == False:
            o = self.sigmoid(o)

        return o

In [14]:
# Define the model
model = HybridAttentionModel_2nd()

# Move the model to the appropriate device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Print the summary of the model to get the number of parameters
summary(model, (2, 148, 7))

# Create a random input tensor with a larger batch size
batch_size = 8  # Increase the batch size
input_tensor = torch.randn(batch_size, 2, 148, 7).to(device)

macs, params = profile(model, [input_tensor])
print('MACs (G): ', macs/1000**2)
print('Params (M): ', params/1000**2)

# # Measure the time taken for a forward pass
# start_time = time.time()
# output = model(input_tensor)
# end_time = time.time()

# print(f"Running time for a forward pass: {end_time - start_time} seconds")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1                  [-1, 112]           1,680
            Linear-2                  [-1, 112]           1,680
            Linear-3                  [-1, 112]           1,680
   LinearAttention-4           [-1, 16, 148, 7]               0
            Conv2d-5           [-1, 16, 148, 7]             304
            Conv2d-6           [-1, 16, 148, 7]             304
 MixedDilationConv-7           [-1, 32, 148, 7]               0
            Conv2d-8           [-1, 48, 148, 7]           2,352
         AttnBlock-9           [-1, 48, 148, 7]               0
        LayerNorm-10           [-1, 48, 148, 7]          99,456
            PReLU-11           [-1, 48, 148, 7]               1
          Dropout-12           [-1, 48, 148, 7]               0
           Conv2d-13          [-1, 49728, 1, 1]       9,796,416
          GroupFC-14                [-1

In [39]:
import torch
import torch.nn as nn
from torch.nn.utils import weight_norm

# The code of TemporalConvNet is forked from
# https://github.com/locuslab/TCN


class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):  # a resblock in PFSC，three TCN layers
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.conv3 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp3 = Chomp1d(padding)
        self.relu3 = nn.ReLU()
        self.dropout3 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2,
                                 self.conv3, self.chomp3, self.relu3, self.dropout3)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()
        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)


class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):  # 对应论文中的三个TCN Block
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
                                     padding=(kernel_size-1) * dilation_size, dropout=dropout)]

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)


class PFSC(nn.Module):
    def __init__(self, input_size=1, num_channels=[64, 64, 64], kernel_size=2, dropout=0.45):
        super(PFSC, self).__init__()

        self.tcn_block1 = TemporalConvNet(input_size, num_channels, kernel_size, dropout=dropout)  # a tcn block with 2 res blocks
        self.tcn_block2 = TemporalConvNet(num_channels[0], num_channels, kernel_size, dropout=dropout)
        self.tcn_block3 = TemporalConvNet(num_channels[1], num_channels, kernel_size, dropout=dropout)

        self.dense = nn.Linear(num_channels[-1]*6, 1)

        self.sigmoid = nn.Sigmoid()

    def forward(self, elec):
        # x needs to have dimension (N, C, L) in order to be passed into CNN
        N, C, L = elec.shape

        output1 = self.tcn_block1(elec)
        output2 = self.tcn_block2(output1)
        output3 = self.tcn_block2(output2)

        output = output3.view(N, -1).contiguous()

        pred = self.dense(output).double()

        if self.training == False:
            pred = self.sigmoid(pred)

        return pred

In [42]:
# Define the model
model = PFSC(input_size=1, num_channels=[64, 64, 64], kernel_size=2, dropout=0.45)

# Move the model to the appropriate device (CPU or GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Print the summary of the model to get the number of parameters
summary(model, (1, 6))  # Assuming the input length (L) is 1000

# Create a random input tensor with the same shape as the model expects
input_tensor = torch.randn(1, 1, 6).to(device)  # Assuming batch size 1 and input length 1000

macs, params = profile(model, [input_tensor])
print('MACs (G): ', macs/1000**2)
print('Params (M): ', params/1000**2)

# # Measure the time taken for a forward pass
# start_time = time.time()
# output = model(input_tensor)
# end_time = time.time()

# print(f"Running time for a forward pass: {end_time - start_time} seconds")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv1d-1                [-1, 64, 7]             192
            Conv1d-2                [-1, 64, 7]             192
           Chomp1d-3                [-1, 64, 6]               0
           Chomp1d-4                [-1, 64, 6]               0
              ReLU-5                [-1, 64, 6]               0
              ReLU-6                [-1, 64, 6]               0
           Dropout-7                [-1, 64, 6]               0
           Dropout-8                [-1, 64, 6]               0
            Conv1d-9                [-1, 64, 7]           8,256
           Conv1d-10                [-1, 64, 7]           8,256
          Chomp1d-11                [-1, 64, 6]               0
          Chomp1d-12                [-1, 64, 6]               0
             ReLU-13                [-1, 64, 6]               0
             ReLU-14                [-1

In [36]:
import torch
import torch.nn as nn
import torch.nn.functional as F

device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if getattr(torch, 'has_mps', False) else 'cpu')

class Config:
    def __init__(self):
        self.model_name = 'ETD'
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if getattr(torch, 'has_mps', False) else 'cpu')

        self.dout_mess = 28 # 4 weeks
        self.d_model = self.dout_mess
        self.nhead = 7  # ori: 5

        self.pad_size = 37  
        self.window_size = 37 
        self.max_time_position = 10000
        self.num_layers = 6
        self.gran = 1e-7  # ori: 1e-6
        self.log_e = 2

        self.classes_num = 2
        # self.model_path = 'model/' + self.model_name + '/' + self.model_name + '_model_' + str(self.start_epoch) + '.pth'

class ConvAutoencoder1D(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(ConvAutoencoder1D, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(input_dim, 64, kernel_size=3, padding=1),
            # Original: input_dim -> 32
            nn.ReLU(),
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            # Original: 32 -> 64
            nn.ReLU()
        )
        self.fc = nn.Linear(128*28, output_dim)
        # Original: 64*28
    def forward(self, x):
        x = self.encoder(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x
    
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(
            0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

class TransformerPredictor(nn.Module):
    def __init__(self, config):
        super(TransformerPredictor, self).__init__()
        self.pad_size = config.pad_size

        self.cae = ConvAutoencoder1D(1, config.dout_mess).to(device)
        self.dout_mess = config.dout_mess
        self.mode = 'cae'
        
        self.position_embedding = PositionalEncoding(config.d_model, dropout=0.0, max_len=config.max_time_position).to(device)

        self.encoder_layer = nn.TransformerEncoderLayer(d_model=config.d_model, nhead=config.nhead).to(device)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=config.num_layers).to(device)
        self.fc = nn.Linear(config.d_model, config.classes_num).to(device)

    def forward(self, data, mask):
        x = data
        
        if self.mode == 'cae':
            # Conv Autoencoder 1D =================================================   
            cae_out = torch.empty((x.shape[0], self.dout_mess, 0)).to(device)
            for i in range(self.pad_size):
                tmp = self.cae(x[:, i:i+1, :]).unsqueeze(2)
                cae_out = torch.concat((cae_out, tmp), dim=2)
            
            x = cae_out.permute(2, 0, 1)
        else:
            x = x.permute(1, 0, 2)
            
        # x = x.permute(1, 0, 2)
        
        out = self.position_embedding(x)
        out2 = self.transformer_encoder(out, src_key_padding_mask=mask)
        out = out2.permute(1, 0, 2)
        out = torch.sum(out, 1)
        out = self.fc(out)
        return out

In [37]:
import torch
from torchinfo import summary
import time
from thop import profile
import math

# Assuming Config and TransformerPredictor are defined elsewhere in your code
config = Config()
model = TransformerPredictor(config)

# Move the model to the appropriate device (CPU or GPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'mps' if getattr(torch, 'has_mps', False) else 'cpu')
model.to(device)

# Correctly define the size as a tuple of integers
x = torch.rand((10, 37, 28)).to(device)  # 3-D tensor
z = torch.rand((10, 37)).to(device)      # 2-D tensor]

# Call the summary function
print(summary(model, input_size=[(10, 37, 28), (10, 37)])
)
# Profile the model to get MACs and parameters
macs, params = profile(model, inputs=(x, z))
print('MACs (G): ', macs / 1000**2)
print('Params (M): ', params / 1000**2)

# Measure the time taken for a forward pass
start_time = time.time()
output = model(x, z)  # Ensure the correct number of arguments are passed
end_time = time.time()

print(f"Running time for a forward pass: {end_time - start_time} seconds")

Layer (type:depth-idx)                             Output Shape              Param #
TransformerPredictor                               [10, 2]                   120,124
├─ConvAutoencoder1D: 1-1                           [10, 28]                  --
│    └─Sequential: 2-1                             [10, 128, 28]             --
│    │    └─Conv1d: 3-1                            [10, 64, 28]              256
│    │    └─ReLU: 3-2                              [10, 64, 28]              --
│    │    └─Conv1d: 3-3                            [10, 128, 28]             24,704
│    │    └─ReLU: 3-4                              [10, 128, 28]             --
│    └─Linear: 2-2                                 [10, 28]                  100,380
├─ConvAutoencoder1D: 1-2                           [10, 28]                  (recursive)
│    └─Sequential: 2-3                             [10, 128, 28]             (recursive)
│    │    └─Conv1d: 3-5                            [10, 64, 28]              (recu