In [None]:
# import necessary libraries

import sklearn, torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset
from sklearn import datasets
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

In [None]:
# Create the target data

X_target, y_target = sklearn.datasets.make_blobs(n_samples = 100, n_features = 6, centers = 6 , random_state = 0, cluster_std = 1.5)

In [None]:
n_features = 6

In [None]:
# Change the labels to binary labels
for ind, ele in enumerate(y_target):
  if ele < 3:
    y_target[ind] = 0
  else:
    y_target[ind] = 1

In [None]:
# Define the dataset class for a custom dataset
class CustomDataset(Dataset):
    
    def __init__(self, data_frame, transform=None):
        self.data = data_frame
        self.transform = transform
#         
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):

        obs = self.data.iloc[index, 0 : n_features].values
        label = self.data.iloc[index, n_features]
        
        if self.transform is not None:
            obs = self.transform(obs)
            
        return obs, label

In [None]:
# Transforming the data into Pytorch format
data_target = pd.DataFrame({
                   'X1' : X_target[:, 0].tolist(),
                   'X2' : X_target[:, 1].tolist(), 
                   'X3' : X_target[:, 2].tolist(),
                   'X4' : X_target[:, 3].tolist(),
                   'X5' : X_target[:, 4].tolist(),
                   'X6' : X_target[:, 5].tolist(),
                  #  'X7' : X_target[:, 6].tolist(),
                  #  'X8' : X_target[:, 7].tolist(),
                  #  'X9' : X_target[:, 8].tolist(),
                  #  'X10' : X_target[:, 9].tolist(),
                   'label' : y_target.tolist()})
# Plot the toy data
# plt.figure(figsize = (5, 5))
# plt.scatter(data_target['X1'], data_target['X2'])
# plt.xlabel('feature 1')
# plt.ylabel('feature 2')
# plt.xlim(-15, 15)
# plt.ylim(-15, 15)

In [None]:
target_dataset = CustomDataset(data_target, transform = None)
target_loader = torch.utils.data.DataLoader(target_dataset, batch_size = 64, shuffle = True, num_workers = 2)

In [None]:
#  import torch.nn as nn
#  import torch.nn.functional as F

#  class linear_model(nn.Module):
#    def __init__(self):
#      super(linear_model, self).__init__()
#      self.fc11 = nn.Linear(n_features, 1)
#      nn.init.xavier_normal_(self.fc11.weight)

#    def forward(self, x_):
#      x_ = x_.view(-1, n_features)
#      x1_ = torch.sigmoid(self.fc11(x_))  
    
#      return x1_
  
#  model = linear_model()  

In [None]:
# # # Define loss function and optimizer
# import torch.optim as optim

# loss_function = nn.BCELoss(reduction = 'mean') # sums all outputs and divides by total data points

# optimizer = optim.Adam(model.parameters(), lr = 0.01)

In [None]:
# n_epochs = 5

# for epochs in range(n_epochs):
 
#   running_loss = 0
  
#   for batch in target_loader:
#     data, targets = batch

#     optimizer.zero_grad()

#     output = model(data.float()) 
#     loss = loss_function(output, targets.view(-1, 1).float())
  
#     loss.backward()

#     optimizer.step()

#     running_loss += loss.item()
#   print('Epochs, ', epochs + 1)

In [None]:
# # Test performance on the entire test set
# correct = 0
# total = 0
# with torch.no_grad():
#   for data in target_loader:
#     images, labels = data
#     outputs = model(images.float())
# #     _, predicted = torch.max(outputs, 1)
#     predicted = outputs > 0.5
# #     predicted = predicted.view(predicted.size(0))
#     total += labels.size(0)
#     matches = 0
#     for i in range(len(predicted)):
#       if predicted[i].item() == labels[i].item():
#         matches += 1
#     correct += matches#(predicted == labels).sum().item()
#   print('Accuracy of the network on the entire data set is : %d %%' %(100 * correct/ total))

In [None]:
# # Save the idea weights (ideal for the target data)
# w_star = model.fc11.weight
# w_star

In [None]:
## good solution for the target domain data as obtained from the model commented above
w_star = torch.Tensor([[-0.1564, -0.1547,  0.2450,  0.9513, -0.5191, -0.2096]])

In [None]:
def gram_schmidt(A):
    """Orthogonalize a set of vectors stored as the columns of matrix A."""
    # Get the number of vectors.
    n = A.shape[1]
    for j in range(n):
        # To orthogonalize the vector in column j with respect to the
        # previous vectors, subtract from it its projection onto
        # each of the previous vectors.
        for k in range(j):
            A[:, j] -= np.dot(A[:, k], A[:, j]) * A[:, k]
        A[:, j] = A[:, j] / np.linalg.norm(A[:, j])
    return A

In [None]:
w_star1 =  np.reshape(np.array(w_star.detach()), (-1, 1))

# Normalized w*
w_star1 = w_star1 / np.linalg.norm(w_star1)

# denerating rest n_features - 1 number of vectors
rest = np.random.rand(n_features, n_features - 1)
A_ele = np.hstack([w_star1, rest])

# Obtain orthonormal matrix A
A_all_ele = gram_schmidt(A_ele)

In [None]:
# The eigen values obtained during an experimental run 
eval_rest = [-0.24085888133933997, 1.3234005271655356, 0.20926702332798985, -0.8625208931380124, -1.4072147504944934]

A = np.zeros((n_features, n_features))

for i in range(n_features):
  if i == 0:
    A += 1 * np.dot(np.reshape(A_all_ele[:, i], (-1, 1)), np.reshape(A_all_ele[:, i], (1, -1)))
  else:
    # r = np.random.uniform(-2, 2)
    # print(r) 
    A += eval_rest[i - 1] * np.dot(np.reshape(A_all_ele[:, i], (-1, 1)), np.reshape(A_all_ele[:, i], (1, -1)))

In [None]:
# Create the source data
X_source = np.zeros(np.shape(X_target))

for ind, d in enumerate(X_target):
  if y_target[ind] == 0:
    X_source[ind] = np.reshape(np.dot(A, np.reshape(X_target[ind], (-1, 1))), (n_features, ))
  else:
    X_source[ind] = X_target[ind]

In [None]:
# Transforming the data into Pytorch format
data_source = pd.DataFrame({
                   'X1' : X_source[:, 0].tolist(),
                   'X2' : X_source[:, 1].tolist(), 
                   'X3' : X_source[:, 2].tolist(),
                   'X4' : X_source[:, 3].tolist(), 
                   'X5' : X_source[:, 4].tolist(),
                   'X6' : X_source[:, 5].tolist(), 
                  #  'X7' : X_source[:, 6].tolist(),
                  #  'X8' : X_source[:, 7].tolist(), 
                  #  'X9' : X_source[:, 8].tolist(),
                  #  'X10' : X_source[:, 9].tolist(),
                   'label' : y_target.tolist()})

In [None]:
source_dataset = CustomDataset(data_source, transform = None)
source_loader = torch.utils.data.DataLoader(source_dataset, batch_size = 64, shuffle = True, num_workers = 2)

In [None]:
import torch.nn as nn
import torch.nn.functional as F

# Developing the linear model

class linear_model_new(nn.Module):
  def __init__(self):
    super(linear_model_new, self).__init__()
    self.fc1 = nn.Linear(n_features, 1)
    nn.init.xavier_normal_(self.fc1.weight)

  def forward(self, x):
    x = x.view(-1, n_features)
    x1 = torch.sigmoid(self.fc1(x))  
    
    return x1
  
model_new = linear_model_new()  

In [None]:
# Define loss function and optimizer
import torch.optim as optim

loss_function_new = nn.BCELoss(reduction = 'mean') # sums all outputs and divides by total data points

optimizer_new = optim.Adam(model_new.parameters(), lr = 0.0001)

In [None]:
# Define hyper-parameters before training

n_epochs = 2
a = torch.randn(1, n_features)
a = a / a.norm() 
print('a is:', a)

mu_ = 1 # consider the maximum value of mu_ for which source accuracy is greater than 95 %

for epochs in range(n_epochs):
 
  running_loss = 0
  
  for batch in source_loader:
    data, targets = batch

    optimizer_new.zero_grad()

    output = model_new(data.float()) 
    loss = loss_function_new(output, targets.view(-1, 1).float()) + mu_ * torch.matmul(a.view(1, n_features), model_new.fc1.weight.view(n_features, 1)) / torch.sqrt(torch.matmul(model_new.fc1.weight.view(1, -1), model_new.fc1.weight.view(-1, 1)))
    loss.backward()

    optimizer_new.step()

    running_loss += loss.item()
  print('Epochs, ', epochs+1)

In [None]:
# Test performance on the entire test set
correct = 0
total = 0
with torch.no_grad():
  for data in target_loader:
    images, labels = data
    outputs = model_new(images.float())
    predicted = outputs > 0.5
    total += labels.size(0)
    matches = 0
    for i in range(len(predicted)):
      if predicted[i].item() == labels[i].item():
        matches += 1
    correct += matches
  print('Accuracy of the network on the entire data set is : %d %%' %(100 * correct/ total))