In [1]:
import torch, torchvision
from torchvision import datasets, transforms
from torch import nn, optim
from torch.nn import functional as F
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import numpy as np
import shap


  from .autonotebook import tqdm as notebook_tqdm


In [14]:
### Net Construction

ReLU_ACF = False
Tanh_ACF = True
GeLU_ACF = False
Sigmoid_ACF = False

def activation_function_table():
    if ReLU_ACF == True:
        return nn.ReLU()
    elif Tanh_ACF == True:
        return nn.Tanh()
    elif GeLU_ACF == True:
        return nn.GELU()
    elif Sigmoid_ACF == True:
        return nn.Sigmoid()

activation_func = activation_function_table()


def resnet_block_lookup_table(blocktype):
    if blocktype == 'BasicBlock':
        return BasicBlock
    elif blocktype == 'Bottleneck':
        return Bottleneck
    else:
        print(' Wrong Key Word! BasicBlock or Bottleneck only! ')
        return None
    

class BasicBlock(nn.Module):  
    
    expansion = 1  
    def __init__(self, in_channel, out_channel, stride=1, downsample=None, **kwargs):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=out_channel,kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)  
        self.conv2 = nn.Conv2d(in_channels=out_channel, out_channels=out_channel,kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
        if ReLU_ACF == True:
            self.tanh =  nn.ReLU()
        elif Tanh_ACF == True:
            self.tanh = nn.Tanh()
        elif GeLU_ACF == True:
            self.tanh = nn.GELU()
        elif Sigmoid_ACF == True:
            self.tanh = nn.Sigmoid()
        self.downsample = downsample
        
    def forward(self, x):
        identity = x
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.tanh(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out = out + identity # out=F(X)+X
        out = self.tanh(out)

        return out
    
class Bottleneck(nn.Module):  
    # Three convolutional layers, F(x) and X have different dimensions.
    
    expansion = 4

    def __init__(self, in_channel, out_channel, stride=1, downsample=None, groups=1, width_per_group=64):
        super(Bottleneck, self).__init__()
        width = int(out_channel * (width_per_group / 64.)) * groups
    
        self.conv1 = nn.Conv2d(in_channels=in_channel, out_channels=width,kernel_size=1, stride=1, bias=False)  # squeeze channels
        self.bn1 = nn.BatchNorm2d(width)
        # -----------------------------------------
        self.conv2 = nn.Conv2d(in_channels=width, out_channels=width, groups=groups,kernel_size=3, stride=stride, bias=False, padding=1)
        self.bn2 = nn.BatchNorm2d(width)
        # -----------------------------------------
        self.conv3 = nn.Conv2d(in_channels=width, out_channels=out_channel * self.expansion,kernel_size=1, stride=1, bias=False)  # unsqueeze channels
        self.bn3 = nn.BatchNorm2d(out_channel * self.expansion)

        if ReLU_ACF == True:
            self.tanh =  nn.ReLU()
        elif Tanh_ACF == True:
            self.tanh = nn.Tanh()
        elif GeLU_ACF == True:
            self.tanh = nn.GELU()
        elif Sigmoid_ACF == True:
            self.tanh = nn.Sigmoid()
        self.downsample = downsample

    def forward(self, x):
        in_size = x.size(0)
        identity = x
       
        if self.downsample is not None:
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.Tanh(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.Tanh(out)

        out = self.conv3(out)
        out = self.bn3(out)
        # out=F(X)+X
        out = out + identity
        out = self.Tanh(out)

        return out

  
class ResNet(nn.Module):

    def __init__(self,
                 nchannel, # initial input channel
                 block,  # block types
                 blocks_num,  
                 num_classes=1,  
                 include_top=True, 
                 groups=1,
                 width_per_group=64):

        super(ResNet, self).__init__()
        self.include_top = include_top
        self.in_channel = 64  

        self.groups = groups
        self.width_per_group = width_per_group
        if ReLU_ACF == True:
            self.actfunc =  nn.ReLU()
        elif Tanh_ACF == True:
            self.actfunc = nn.Tanh()
        elif GeLU_ACF == True:
            self.actfunc = nn.GELU()
        elif Sigmoid_ACF == True:
            self.actfunc = nn.Sigmoid()
        
        #self.conv1 = nn.Conv2d(nchannel, self.in_channel, kernel_size=7, stride=2,padding=3, bias=False)
        #self.bn1 = nn.BatchNorm2d(self.in_channel)

        #self.tanh = nn.Tanh()
        #self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        #self.layer0 = nn.Sequential(self.conv1,self.bn1,self.tanh,self.maxpool)
        self.layer0 = nn.Sequential(nn.Conv2d(nchannel, self.in_channel, kernel_size=7, stride=2,padding=3, bias=False) #output size:6x6
        #self.layer0 = nn.Sequential(nn.Conv2d(nchannel, self.in_channel, kernel_size=5, stride=1,padding=1, bias=False)
        ,nn.BatchNorm2d(self.in_channel)
        ,self.actfunc
        ,nn.MaxPool2d(kernel_size=3, stride=2, padding=1)) # output 4x4

        
        self.layer1 = self._make_layer(block, 64, blocks_num[0])
        self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=1)
        self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=1)
        self.layer4 = self._make_layer(block, 512, blocks_num[3], stride=1)

        if self.include_top: 
            self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  
            
            self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules(): 
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')

   
    def _make_layer(self, block, channel, block_num, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(channel * block.expansion))
        layers = []
        
        layers.append(block(self.in_channel,
                            channel,
                            downsample=downsample,
                            stride=stride,
                            groups=self.groups,
                            width_per_group=self.width_per_group))

        self.in_channel = channel * block.expansion # The input channel changed here!``
        
        for _ in range(1, block_num):
            layers.append(block(self.in_channel,
                                channel,
                                groups=self.groups,
                                width_per_group=self.width_per_group))
        return nn.Sequential(*layers)

    def forward(self, x):
        #x = self.conv1(x)
        #x = self.bn1(x)
        #x = self.tanh(x)
        #x = self.maxpool(x)

        x = self.layer0(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        if self.include_top:  
            x = self.avgpool(x)
            x = torch.flatten(x, 1)
            #x = self.actfunc(x)
            x = self.fc(x)

        return x  

In [4]:
label_data = np.load('test_observation_data.npy')
training_data = np.load('test_training_data.npy')
X_train = training_data[0:100,:,:,:]
y_train = label_data[0:100]

In [15]:

Epoch   = 10

transform = transforms.Compose([
    transforms.ToTensor(), 
])

class Dataset(torch.utils.data.Dataset):  # 'Characterizes a dataset for PyTorch'
    '''
    This class is for training datasets. It is used for the global datasets, which is continuous data.
    '''
    def __init__(self, traindata, truedata):  # 'Initialization' Data Loading
        '''

        :param traindata:
            Training data.
        :param truedata:
            Ture data to learn.
        :param beginyear:
            The begin year.
        :param endyear:
            The end year.
        :param nsite:
            The number of sites. For example, for overall observation it is 10870.
        '''
        super(Dataset, self).__init__()
        self.traindatasets = torch.Tensor(traindata)  # Read training data from npy file
        self.truedatasets = torch.Tensor(truedata)
        print(self.truedatasets.shape)
        print(self.traindatasets.shape)
        self.transforms = transform  # 转为tensor形式
        self.shape = self.traindatasets.shape
    def __getitem__(self, index):  # 'Generates one sample of data'
        # Select sample
        traindata = self.traindatasets[index, :, :]
        truedata = self.truedatasets[index]
        return traindata, truedata
        # Load data and get label
    def __len__(self):  # 'Denotes the total number of samples'
        return self.traindatasets.shape[0]  # Return the total number of dataset

def train(model, X_train, y_train,BATCH_SIZE, learning_rate):
    train_loader = DataLoader(Dataset(X_train, y_train), BATCH_SIZE, shuffle=True)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()
    scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.9)
    for epoch in range(Epoch):
        for i, (images, labels) in enumerate(train_loader):
            model.train()
            images = images.to(device)
            labels = torch.squeeze(labels.type(torch.FloatTensor))
            labels = labels.to(device)
            optimizer.zero_grad()  
            outputs = model(images) 
            outputs = torch.squeeze(outputs)
            loss = criterion(outputs, labels)
            loss.backward()  
            optimizer.step() 
            print('Epoch : %d/%d, Iter : %d/%d,  Loss: %.4f' % (epoch + 1, Epoch,
                                                                    i + 1, len(X_train) // BATCH_SIZE,
                                                                    loss.item()))
    return 

device = torch.device('cpu')
cnn_model = ResNet(nchannel=42, block=BasicBlock,blocks_num=[1,1,1,1]).to(device)
train(cnn_model, X_train, y_train, Epoch, learning_rate=0.01)



torch.Size([100])
torch.Size([100, 42, 11, 11])
Epoch : 1/10, Iter : 1/10,  Loss: 0.2061
Epoch : 1/10, Iter : 2/10,  Loss: 15.6132
Epoch : 1/10, Iter : 3/10,  Loss: 15.4521
Epoch : 1/10, Iter : 4/10,  Loss: 6.0780
Epoch : 1/10, Iter : 5/10,  Loss: 2.2877
Epoch : 1/10, Iter : 6/10,  Loss: 2.5414
Epoch : 1/10, Iter : 7/10,  Loss: 7.1779
Epoch : 1/10, Iter : 8/10,  Loss: 6.6867
Epoch : 1/10, Iter : 9/10,  Loss: 0.4215
Epoch : 1/10, Iter : 10/10,  Loss: 0.6789
Epoch : 2/10, Iter : 1/10,  Loss: 1.2662
Epoch : 2/10, Iter : 2/10,  Loss: 1.8282
Epoch : 2/10, Iter : 3/10,  Loss: 0.9504
Epoch : 2/10, Iter : 4/10,  Loss: 0.2434
Epoch : 2/10, Iter : 5/10,  Loss: 0.4359
Epoch : 2/10, Iter : 6/10,  Loss: 0.2305
Epoch : 2/10, Iter : 7/10,  Loss: 1.1021
Epoch : 2/10, Iter : 8/10,  Loss: 2.1208
Epoch : 2/10, Iter : 9/10,  Loss: 0.7336
Epoch : 2/10, Iter : 10/10,  Loss: 0.1997
Epoch : 3/10, Iter : 1/10,  Loss: 0.4696
Epoch : 3/10, Iter : 2/10,  Loss: 0.8157
Epoch : 3/10, Iter : 3/10,  Loss: 1.1502
Epoch

In [16]:
# SHAP Value Analysis
cnn_model.eval()
Back_Ground_Data = torch.Tensor(training_data[0:100,:,:,:])
Data_to_Explain  = torch.Tensor(training_data[100:103,:,:,:])
Back_Ground_Data = Back_Ground_Data.to(device)
Data_to_Explain  = Data_to_Explain.to(device)
CNNModel_Explainer = shap.DeepExplainer(model=cnn_model,data=Back_Ground_Data)
shap_values = CNNModel_Explainer.shap_values(Data_to_Explain)

AssertionError: The SHAP explanations do not sum up to the model's output! This is either because of a rounding error or because an operator in your computation graph was not fully supported. If the sum difference of %f is significant compared to the scale of your model outputs, please post as a github issue, with a reproducible example so we can debug it. Used framework: pytorch - Max. diff: 0.07501436083411883 - Tolerance: 0.01