In [None]:
# Load Data
import numpy as np
import scipy.io as sio
datas = sio.loadmat('dataset_mapping.mat')
bed_select=datas['bed_select']
bed_whole=datas['bed_whole']
pillow_up=datas['pillow_up']
person = datas['person']
posture = datas['posture']

In [None]:
i=15
ood_idx = np.where(posture==i)[0]
indis_idx = np.setdiff1d(np.arange(len(posture)), ood_idx)
indis_input = bed_select[indis_idx,:,:]
ood_input = bed_select[ood_idx,:,:]
indis_output = pillow_up[indis_idx,:,:]
ood_output = pillow_up[ood_idx,:,:]
posture = tf.keras.utils.to_categorical(posture)
posture_indis = posture[indis_idx,:]
posture_ood = posture[ood_idx,:]
X_tr, X_te, y_tr, y_te,posture_tr, posture_te = train_test_split(indis_input, indis_output,posture_indis, train_size=0.9)
scaler = MinMaxScaler()
scaler2 = MinMaxScaler()
reshaped_xtr = X_tr.reshape(X_tr.shape[0],-1)
reshaped_xte = X_te.reshape(X_te.shape[0],-1)
reshaped_ytr = y_tr.reshape(y_tr.shape[0],-1)
reshaped_yte = y_te.reshape(y_te.shape[0],-1)
reshaped_xood = ood_input.reshape(ood_input.shape[0],-1)
reshaped_yood = ood_output.reshape(ood_output.shape[0],-1)

normalized_xtr1 = scaler.fit_transform(reshaped_xtr)
normalized_xte1 = scaler.transform(reshaped_xte)
normalized_xood1 = scaler.transform(reshaped_xood)
normalized_ytr1 = scaler2.fit_transform(reshaped_ytr)
normalized_yte1 = scaler2.transform(reshaped_yte)
normalized_yood1 = scaler2.transform(reshaped_yood)

normalized_xtr = normalized_xtr1.reshape(X_tr.shape)
normalized_xte = normalized_xte1.reshape(X_te.shape)
normalized_ytr = normalized_ytr1.reshape(y_tr.shape)
normalized_yte = normalized_yte1.reshape(y_te.shape)
normalized_xood = normalized_xood1.reshape(ood_input.shape)
normalized_yood = normalized_yood1.reshape(ood_output.shape)
input_shape = (16,40,1)
output_shape = (32,48,1)
normalized_xtr = normalized_xtr[:,np.newaxis,:,:]
normalized_xte = normalized_xte[:,np.newaxis,:,:]
normalized_ytr = normalized_ytr[:,np.newaxis,:,:]
normalized_yte = normalized_yte[:,np.newaxis,:,:]
normalized_xood = normalized_xood[:,np.newaxis,:,:]
normalized_yood = normalized_yood[:,np.newaxis,:,:]

posture_tr = np.squeeze(posture_tr)
posture_te = np.squeeze(posture_te)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Function
class DANNModel(nn.Module):
    def __init__(self, num_classes):
        super(DANNModel, self).__init__()
        n_filters1=32
        hidden_size = 10
        output_size = (32,48,1)
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(1,n_filters1,(4,6), stride=(1,1)),
            nn.LeakyReLU(0.2),
            nn.Conv2d(n_filters1,n_filters1*2,(3,5), stride=(1,1)),
            nn.ReLU(0.2),
            nn.Conv2d(n_filters1*2,n_filters1*2,(3,5), stride=(1,1)),
            nn.ReLU(0.2),
            nn.Conv2d(n_filters1*2,n_filters1*2,(2,4), stride=(1,1)),
            nn.ReLU(0.2),
            nn.Conv2d(n_filters1*2,n_filters1*2,(2,4), stride=(1,1)),
            nn.ReLU(0.2)
        )
        hidden_size = 7*21*64
        self.class_classifier = nn.Sequential(
            nn.Conv2d(64, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(16 * 7 * 21, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )
        self.image_decoder = nn.Sequential(
            nn.ConvTranspose2d(n_filters1*2,n_filters1*2, (2,4), stride=(1,1)),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(n_filters1*2,n_filters1*2, (7,7), stride=(1,1)),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(n_filters1*2,n_filters1*2, (7,7), stride=(1,1)),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(n_filters1*2,n_filters1*2, (7,7), stride=(1,1)),
            nn.LeakyReLU(0.2),
            nn.ConvTranspose2d(n_filters1*2,1, (7,7), stride=(1,1)),
            nn.LeakyReLU(0.2),
            nn.ReLU()
        )
    def forward(self, x, alpha):
        features = self.feature_extractor(x)
        #print(features.shape)
        class_output = self.class_classifier(features)
        #print("Class")
        #print(class_output.shape)
        image_output = self.image_decoder(features)
        return class_output, image_output

In [None]:
# Define the gradient reversal layer
class GradientReversalFunction(Function):
    @staticmethod
    def forward(ctx, x, alpha):
        ctx.alpha = alpha
        return x.view_as(x)

    @staticmethod
    def backward(ctx, grad_output):
        output = grad_output.neg() * ctx.alpha
        return output, None

def grad_reverse(x, alpha):
    return GradientReversalFunction.apply(x, alpha)

In [None]:
def train_dann(model, source_loader, target_loader, num_epochs, alpha, checkpoint_path):
    criterion_cls = nn.CrossEntropyLoss()
    criterion_img = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

    best_loss = float('inf')
    for epoch in range(num_epochs):
        for (source_data, source_labels, source_posture), (target_data, target_posture, target_labels) in zip(source_loader, target_loader):
            source_data, source_posture, source_labels = source_data, source_labels, source_posture


            # Step 1: Train the feature extractor, class classifier, and image decoder on source domain
            optimizer.zero_grad()
            class_output, image_output = model(source_data, alpha)
            #print(class_output.dtype)
            #print(source_labels.dtype)
            loss_cls = criterion_cls(class_output, source_labels)
            loss_img = criterion_img(image_output, source_posture)
            loss_total = loss_cls + loss_img
            loss_total.backward()

            # Step 2: Train the domain classifier on both source and target domain
            #domain_labels = torch.zeros(len(source_data) + len(target_data)).long()
            #domain_labels[:len(source_data)] = 1  # Source domain has label 1, target domain has label 0

            domain_data = torch.cat((source_data, target_data), dim=0)
            domain_labels = torch.cat((source_labels,target_labels),dim = 0)
            domain_data = grad_reverse(domain_data, alpha)

            domain_output, _ = model(domain_data, alpha)
            loss_domain = criterion_cls(domain_output, domain_labels)
            loss_domain.backward()

            optimizer.step()

            val_loss = loss_domain.item()

            # Save the model checkpoint if validation loss improves
            if val_loss < best_loss:
                best_loss = val_loss
                torch.save(model.state_dict(), checkpoint_path)
                print("Model checkpoint saved at", checkpoint_path)

            print('Epoch [{}/{}]: Loss_cls: {:.4f} Loss_img: {:.4f} Val_loss_img: {:.4f}'.format(epoch+1, num_epochs, loss_cls.item(), loss_img.item(), val_loss))

    # Load the best model checkpoint
    model.load_state_dict(torch.load(checkpoint_path))
    print("Best model loaded from", checkpoint_path)

In [None]:
dataset_tr = torch.utils.data.TensorDataset(torch.tensor(normalized_xtr, dtype=torch.float32), torch.tensor(normalized_ytr, dtype=torch.float32), torch.tensor(posture_tr, dtype=torch.float32))
dataset_te = torch.utils.data.TensorDataset(torch.tensor(normalized_xood, dtype=torch.float32), torch.tensor(normalized_yood, dtype=torch.float32), torch.tensor(posture_ood, dtype=torch.float32))

In [None]:
source_loader = torch.utils.data.DataLoader(dataset_tr, batch_size=32, shuffle=True)
target_loader = torch.utils.data.DataLoader(dataset_te, batch_size=32, shuffle=True)

# Initialize the model and hyperparameters
input_size = 16 * 40
hidden_size = 256
output_size = 32 * 48
num_classes = 16

model = DANNModel(num_classes)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
num_epochs = 200
alpha = 0.1
model = model
checkpoint_path = './Dann_checkpoint'
# Train the DANN model
train_dann(model, source_loader, target_loader, num_epochs, alpha, checkpoint_path)

In [None]:
y_te = np.squeeze(y_te)
ood_output = np.squeeze(ood_output)
y_pred_norm=model(torch.tensor(normalized_xte, dtype=torch.float32),alpha)[1]
y_pred_norm = y_pred_norm.detach().numpy()
reshaped_ypred = y_pred_norm.reshape(y_pred_norm.shape[0],-1)
y_pred1 = scaler2.inverse_transform(reshaped_ypred)
y_pred = y_pred1.reshape(y_pred_norm.shape)
y_te = y_te[:,np.newaxis,:,:]
print(np.mean(np.abs(y_pred-y_te)))

ood_output = ood_output[:np.newaxis,:,:,]
y_pred_norm_ood=model(torch.tensor(normalized_xood, dtype=torch.float32),alpha)[1]
y_pred_norm_ood = y_pred_norm_ood.detach().numpy()
reshaped_ypred_ood = y_pred_norm_ood.reshape(y_pred_norm_ood.shape[0],-1)
y_pred_ood1 = scaler2.inverse_transform(reshaped_ypred_ood)
y_pred_ood = y_pred_ood1.reshape(ood_output.shape)

print(np.mean(np.abs(y_pred_ood-ood_output)))

In [None]:
a = y_pred[5,0,:,:]
b = y_te[5,0,:,:]

In [None]:
import matplotlib.pyplot as plt

# Create a sample matrix
matrix = a

# Plot the colormap of the matrix
plt.imshow(matrix, cmap='viridis')
#plt.colorbar()

# Show the plot
plt.show()
matrix = b

# Plot the colormap of the matrix
plt.imshow(matrix, cmap='viridis')
#plt.colorbar()

# Show the plot
plt.show()

In [None]:
c= y_pred_ood[3,:,:]
d = ood_output[3,:,:]

In [None]:
matrix = c

# Plot the colormap of the matrix
plt.imshow(matrix, cmap='viridis')
#plt.colorbar()

# Show the plot
plt.show()
matrix = d

# Plot the colormap of the matrix
plt.imshow(matrix, cmap='viridis')
#plt.colorbar()

# Show the plot
plt.show()

In [None]:
y_max = np.max(y_te.reshape(len(y_te),-1),axis = 1)
percent_err = np.mean(np.abs(y_pred-y_te)/y_max[:,np.newaxis,np.newaxis,np.newaxis])

In [None]:
percent_err

In [None]:
y_max = np.max(ood_output.reshape(len(ood_output),-1),axis = 1)
percent_err = np.mean(np.abs(y_pred_ood-ood_output)/y_max[:,np.newaxis,np.newaxis,np.newaxis])

In [None]:
percent_err