In [1]:
import sys
from pathlib import Path
parent_dir = str(Path.cwd().parent)
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader

import syft as sy
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
# np.random.seed(666)
from Distributed_HM_Data import HMSaleTrainDataLoader, Distributed_HM, binary_acc
from utils_models import SalesNN, CustomersNN, ProductsNN

dataDir = Path.cwd().parent.parent/'Data/'

# model will train on CPU since PySyft 0.2.9 exist bugs with CUDA
# import os
# os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# print(device)

In [2]:
import sys
import logging

# preserve training log
so = open("config1.log", 'w', 10)
sys.stdout.echo = so
sys.stderr.echo = so

get_ipython().log.handlers[0].stream = so
get_ipython().log.setLevel(logging.INFO)

In [3]:
class SalesNN(nn.Module):
    """ Partial model for sales domain
    Args:
        num_users (int): Number of users
        num_items (int): Number of products
        prices (float): price of transactions
        sales_channels (float): sales channels
    
    """
    def __init__(
            self, 
            num_users: int, 
            num_items: int,
            user_embedding_dim: int = 32,
            item_embedding_dim: int = 64,
        ):
        super().__init__()
        self.user_embedding_layer = nn.Embedding(num_embeddings=num_users, embedding_dim=user_embedding_dim)
        self.item_embedding_layer = nn.Embedding(num_embeddings=num_items, embedding_dim=item_embedding_dim)
        
    def forward(self, user_input, item_input):
        user_embedding = self.user_embedding_layer(user_input)
        item_embedding = self.item_embedding_layer(item_input)

        latent_vec = torch.cat([user_embedding, item_embedding], dim=-1)

        return torch.squeeze(latent_vec)
    
    # save weights of partial model on remote worker
    def get_weights(self):
        return self.state_dict()
    
class CustomersNN(nn.Module):
    """ Partial model for customer domain
    Args:
        club_status (int): active or inactive customers' status
        age_groups (int): age of customers
    
    """
    def __init__(
            self,
            input_size: int = 2,
            output_size: int = 4,
        ):
        super().__init__()
        self.relu = nn.LeakyReLU()
        in_channels = (
            [input_size] 
            + [output_size]
        )
        
        self.encoder = nn.Sequential(
            *[nn.Linear(in_features=in_channels[i], out_features=in_channels[i+1]) for i in range(len(in_channels)-1) if i != len(in_channels)-1]
        )
        
    def forward(self, club_status, age_groups):
        
        latent_vec = torch.cat([club_status, age_groups], dim=-1)
        
        for layer in self.encoder:
            latent_vec = layer(latent_vec)
        
        return torch.squeeze(latent_vec)
    
    # save weights of partial model on remote worker
    def get_weights(self):
        return self.state_dict()

class ProductsNN(nn.Module):
    """ Partial model for product domain
    Args:
        num_product_groups (int): Number of product groups
        num_color_groups: (int): Number of color groups
        num_index_name: (int): Number of index name
    
    """
    def __init__(
            self,
            num_product_groups: int,
            num_color_groups: int,
            num_index_name: int,
            product_group_embedding_dim: int = 5,
            color_group_embedding_dim: int = 16,
            index_name_embedding_dim: int = 3,
        ):
        super().__init__()
        self.product_group_embedding_layer = nn.Embedding(num_embeddings=num_product_groups, embedding_dim=product_group_embedding_dim)
        self.color_group_embedding_layer = nn.Embedding(num_embeddings=num_color_groups, embedding_dim=color_group_embedding_dim)
        self.index_name_embedding_layer = nn.Embedding(num_embeddings=num_index_name, embedding_dim=index_name_embedding_dim)
        

        
    def forward(self, product_groups, color_groups, index_name):
        product_group_embedding = self.product_group_embedding_layer(product_groups)
        color_group_embedding = self.color_group_embedding_layer(color_groups)
        index_name_embedding = self.index_name_embedding_layer(index_name)

        latent_vec = torch.cat([product_group_embedding, color_group_embedding, index_name_embedding], dim=-1)
        
        return torch.squeeze(latent_vec)
    
    # save weights of partial model on remote worker
    def get_weights(self):
        return self.state_dict()

In [4]:
# customized models for specific configuration

class GovernanceNN(nn.Module):
    """ Partial model for goverance side
    Args:
        agg_latent_input (int): aggregated input of latent vectors from client models
    
    """
    def __init__(
            self,
            input_size: int = 124,
            hidden_size_1: int = 256,
            hidden_size_2: int = 128,
            output_size: int = 1,
        ):
        super().__init__()
        self.relu = nn.LeakyReLU()
        in_channels = (
            [input_size] 
            + [hidden_size_1]
            + [hidden_size_2]
            + [hidden_size_2]
            + [output_size]
        )
        self.decoder = nn.Sequential(
            *[nn.Linear(in_features=in_channels[i], out_features=in_channels[i+1]) for i in range(len(in_channels)-1) if i != len(in_channels)-1]
        )
        
    def forward(self, agg_latent_input):
        
        for i, layer in enumerate(self.decoder):
            agg_latent_input = layer(agg_latent_input)
            agg_latent_input = self.relu(agg_latent_input)
        out = agg_latent_input
        
        return out
    
    # save weights of partial model on remote worker
    def get_weights(self):
        return self.state_dict()


class SplitNN(nn.Module):
    def __init__(self, models, optimizers, data_owner, server):
        self.models = models
        self.optimizers = optimizers
        self.data_owners = data_owner
        self.server = server
#         self.outputs = [None]*len(self.models)
        super().__init__()
        
    def forward(self, data_pointer):
        
        #individual client's output upto their respective cut layer
        client_output = {}

        #outputs that is moved to server and subjected to concatenate for server input
        remote_output = []
        
        for owner in self.data_owners:
            if owner.id == "sales_domain":
                client_output[owner.id] = self.models[owner.id](data_pointer[owner.id][0], data_pointer[owner.id][1])
                remote_output.append(
                    client_output[owner.id].move(self.server, requires_grad=True)
                )
            elif owner.id == "customer_domain":
                client_output[owner.id] = self.models[owner.id](data_pointer[owner.id][0], data_pointer[owner.id][1])
                remote_output.append(
                    client_output[owner.id].move(self.server, requires_grad=True)
                )
            elif owner.id == "product_domain":
                client_output[owner.id] = self.models[owner.id](data_pointer[owner.id][0], data_pointer[owner.id][1], data_pointer[owner.id][2])
                remote_output.append(
                    client_output[owner.id].move(self.server, requires_grad=True)
                )
        
        # concat outputs from clients and send to server side
        server_input = torch.cat(remote_output, dim=-1)
        # make prediction on server model
        pred = self.models["server"](server_input)

        return pred

    def zero_grads(self):
        for opt in self.optimizers:
            opt.zero_grad()
        
    def step(self):
        for opt in self.optimizers:
            opt.step()
    
    def train(self):
        for loc in self.models.keys():
            self.models[loc].train()
#             if loc == "server":
#                 for i in range(len(self.models[loc])):
#                     self.models[loc][i].train()
#             else:
#                 self.models[loc].train()
    
    def eval(self):
        for loc in self.models.keys():
            self.models[loc].eval()        
            
    @property
    def location(self):
        return self.models[0].location if self.models and len(self.models) else None
    
def train(x, target, splitNN):
    # set up bias weight for negative sampling
    weights = torch.tensor([1.0, 3.0])
    splitNN.zero_grads()
    
    # make a prediction
    pred = splitNN.forward(x)
    criterion = nn.BCEWithLogitsLoss(pos_weight=weights[1])
    loss = criterion(pred, target.float())
    
    # backprop the loss on the end layer
    loss.backward()
    
    # update the weights
    splitNN.step()
    
    return loss.detach().get()


In [5]:
hm_data = pd.read_csv(dataDir/'train_transactions_large.csv')
hm_test_data = pd.read_csv(dataDir/'test_transactions_large.csv')
article_data = pd.read_csv(dataDir/'article_data_large.csv')
all_products_id = article_data["article_id"].unique()
transactions = pd.concat([hm_data, hm_test_data], ignore_index=True)

customer_product_set = set(zip(transactions["customer_id"], transactions["article_id"], 
                               transactions["club_member_status"], transactions["age"], 
                               transactions["product_group_name"], transactions["colour_group_name"], transactions["index_name"]))

train_data = HMSaleTrainDataLoader(hm_data, all_products_id, customer_product_set)
train_loader = DataLoader(train_data, batch_size=1024, shuffle=True)

# set up virtual worker
hook = sy.TorchHook(torch)
sales_domain = sy.VirtualWorker(hook, id="sales_domain")
customer_domain = sy.VirtualWorker(hook, id="customer_domain")
product_domain = sy.VirtualWorker(hook, id="product_domain")
server = sy.VirtualWorker(hook, id="server")

data_owners = (sales_domain, customer_domain, product_domain)
model_locations = [sales_domain, customer_domain, product_domain, server]

distributed_trainloader = Distributed_HM(data_owners=data_owners, data_loader=train_loader)

# set up parameters for model
num_users = len(hm_data.customer_id.unique())
print("num_users:", num_users)
num_items = len(all_products_id)
print("num_items:", num_items)
num_product_groups = len(hm_data.product_group_name.unique())
print("num_product_groups:", num_product_groups)
num_color_groups = len(hm_data.colour_group_name.unique())
print("num_color_groups:", num_color_groups)
num_index_name = len(hm_data.index_name.unique())
print("num_index_name:", num_index_name)

models = {
    "sales_domain": SalesNN(num_users=num_users, num_items=num_items),
    "customer_domain": CustomersNN(),
    "product_domain": ProductsNN(num_product_groups=num_product_groups, num_color_groups=num_color_groups, num_index_name=num_index_name),
    "server": GovernanceNN(),
}

# set up optimizer for clients' model

optimizers = [
    optim.Adam(models[location.id].parameters(), lr=0.005)
    for location in model_locations
]

for location in model_locations:
    models[location.id].send(location)

  0%|          | 0/293769 [00:00<?, ?it/s]

num_users: 10346
num_items: 15516
num_product_groups: 11
num_color_groups: 48
num_index_name: 9


In [6]:
print(models)

epochs = 200
torch.autograd.set_detect_anomaly(True)
splitnn = SplitNN(models, optimizers, data_owners, server)

for i in range(epochs):
    running_loss = 0.0
    splitnn.train()
    for data_ptr, labels in distributed_trainloader:  
        labels = labels.send(server)
        loss = train(data_ptr, labels, splitnn)
        running_loss += loss
    else:
        print("Epoch {} - Training loss: {}".format(i, running_loss/len(distributed_trainloader)))
        

{'sales_domain': SalesNN(
  (user_embedding_layer): Embedding(10346, 32)
  (item_embedding_layer): Embedding(15516, 64)
), 'customer_domain': CustomersNN(
  (relu): LeakyReLU(negative_slope=0.01)
  (encoder): Sequential(
    (0): Linear(in_features=2, out_features=4, bias=True)
  )
), 'product_domain': ProductsNN(
  (product_group_embedding_layer): Embedding(11, 5)
  (color_group_embedding_layer): Embedding(48, 16)
  (index_name_embedding_layer): Embedding(9, 3)
), 'server': GovernanceNN(
  (relu): LeakyReLU(negative_slope=0.01)
  (decoder): Sequential(
    (0): Linear(in_features=124, out_features=256, bias=True)
    (1): Linear(in_features=256, out_features=128, bias=True)
    (2): Linear(in_features=128, out_features=128, bias=True)
    (3): Linear(in_features=128, out_features=1, bias=True)
  )
)}
Epoch 0 - Training loss: 0.7660925984382629
Epoch 1 - Training loss: 0.3917459547519684
Epoch 2 - Training loss: 0.24729256331920624
Epoch 3 - Training loss: 0.16989195346832275
Epoch 4 -

KeyboardInterrupt: 

In [7]:
def save_weights(models, file_prefix):
    for loc in models.keys():
         torch.save(models[loc].get().state_dict(), f"{file_prefix}_{loc}_weights.pth")
    
save_weights(models, "Split_RecNN_large")

In [8]:
def load_weights(models, file_prefix):
    for loc in models.keys():
        model_weights = torch.load(f"{file_prefix}_{loc}_weights.pth")
        models[loc].load_state_dict(model_weights)

In [9]:
def binary_acc(y_pred, y_test):
    acc = 0.0
    
    y_pred_label = torch.round(torch.sigmoid(y_pred))
    correct_pred = (y_pred_label == y_test).sum()
    acc = correct_pred.item()/y_test.shape[0]
    return acc

def predict(splitnn, data_ptr, product, test_items):
    hits_10 = 1
    hits_5 = 1
    
    y_pred = splitnn.forward(data_ptr).get()
#     y_pred = torch.sigmoid(y_pred)
    y_pred = y_pred.squeeze()
    
    top10_probs, top10_indices = torch.topk(torch.sigmoid(y_pred), 10)
    # Convert the top 10 probabilities to a list
    top10_items = [test_items[i].item() for i in top10_indices]
    if product in top10_items:
        hits_10 = 1
    else:
        hits_10 = 0
    
    top5_probs, topk_indices = torch.topk(torch.sigmoid(y_pred), 5)
    # Convert the top 5 probabilities to a list
    top5_items = [test_items[i].item() for i in topk_indices]
    if product in top5_items:
        hits_5 = 1
    else:
        hits_5 = 0
    
    return hits_5, hits_10

In [10]:
def dcg_at_k(r, k):
    r = np.asfarray(r)[:k]
    if r.size:
        return np.sum((2**r - 1) / np.log2(np.arange(2, r.size + 2)))
    return 0.

def ndcg_at_k(r, k):
    dcg_max = dcg_at_k(sorted(r, reverse=True), k)
    if not dcg_max:
        return 0.
    return dcg_at_k(r, k) / dcg_max

In [11]:
class Distributed_HM_test(Dataset):
    def __init__(self, data_owners, customer_batch, product_batch, club_status_batch, age_groups_batch, product_groups_batch, color_groups_batch, index_name_batch, label_batch):
        self.data_owners = data_owners
        self.no_of_owner = len(data_owners)
        self.customer_batch = customer_batch
        self.product_batch = product_batch
        self.club_status_batch = club_status_batch
        self.age_groups_batch = age_groups_batch
        self.product_groups_batch = product_groups_batch
        self.color_groups_batch = color_groups_batch
        self.index_name_batch = index_name_batch
        self.label_batch = label_batch

        self.data_pointer = []
        self.labels = []
        
        self.curr_data_dict = {}
        self.labels.append(label_batch.reshape(-1, 1))

        # split data batch based on domains
        sales_domain = [customer_batch, product_batch]
        customer_domain = [club_status_batch.float(), age_groups_batch.float()]
        product_domain = [product_groups_batch, color_groups_batch, index_name_batch]
            
        # set data owners for each domain team
        sales_owner = self.data_owners[0]
        customer_owner = self.data_owners[1]
        product_owner = self.data_owners[2]
            
            
        # send split data to VirtualWorkers and add the data pointer to the dict
        sales_part_ptr = []
        for tensor in sales_domain:
            sales_part_ptr.append(tensor.send(sales_owner))
        self.curr_data_dict[sales_owner.id] = sales_part_ptr
            
        customer_part_ptr = []
        for tensor in customer_domain:
            customer_part_ptr.append(tensor.send(customer_owner))
        self.curr_data_dict[customer_owner.id] = customer_part_ptr
            
        product_part_ptr = []
        for tensor in product_domain:
            product_part_ptr.append(tensor.send(product_owner))
        self.curr_data_dict[product_owner.id] = product_part_ptr

        self.data_pointer.append(self.curr_data_dict)
            
    def get_data(self):
        return self.data_pointer[0], self.labels[0]

In [12]:
# model test
train_transactions = pd.read_csv(dataDir/'train_transactions_large.csv')
test_transactions = pd.read_csv(dataDir/'test_transactions_large.csv')
article_data = pd.read_csv(dataDir/'article_data_large.csv')
customer_data = pd.read_csv(dataDir/'customer_data_large.csv')

all_products_id = article_data["article_id"].unique()
customer_product_test = set(zip(test_transactions["customer_id"], test_transactions["article_id"], 
                                test_transactions["club_member_status"], test_transactions["age"], 
                                test_transactions["product_group_name"], test_transactions["colour_group_name"], 
                                test_transactions["index_name"]))

# Dict of all items that are interacted with by each user
user_interacted_items = train_transactions.groupby('customer_id')['article_id'].apply(list).to_dict()

# set up parameters for model
num_users = len(train_transactions.customer_id.unique())
print("num_users:", num_users)
num_items = len(all_products_id)
print("num_items:", num_items)
num_product_groups = len(article_data.product_group_name.unique())
print("num_product_groups:", num_product_groups)
num_color_groups = len(article_data.colour_group_name.unique())
print("num_color_groups:", num_color_groups)
num_index_name = len(article_data.index_name.unique())

# load the model parameters
models = {
    "sales_domain": SalesNN(num_users=num_users, num_items=num_items),
    "customer_domain": CustomersNN(),
    "product_domain": ProductsNN(num_product_groups=num_product_groups, num_color_groups=num_color_groups, num_index_name=num_index_name),
    "server": GovernanceNN(),
}

load_weights(models, "Split_RecNN_large")

# set up virtual worker
hook = sy.TorchHook(torch)
sales_domain = sy.VirtualWorker(hook, id="sales_domain")
customer_domain = sy.VirtualWorker(hook, id="customer_domain")
product_domain = sy.VirtualWorker(hook, id="product_domain")
server = sy.VirtualWorker(hook, id="server")

data_owners = (sales_domain, customer_domain, product_domain)
model_locations = [sales_domain, customer_domain, product_domain, server]

# set up optimizer for clients' model
optimizers = [
    optim.Adam(models[location.id].parameters(), lr=0.005)
    for location in model_locations
]

for location in model_locations:
    models[location.id].send(location)

# Load the weights locally
splitnn = SplitNN(models, optimizers, data_owners, server)



num_users: 10346
num_items: 15516
num_product_groups: 11
num_color_groups: 48




In [13]:
# model test
# np.random.seed(28)
splitnn.eval()
hits = []
pred_prob = []
acc = []
hits_10 = []
hits_5 = []
recall = []
ndcgs = []

with torch.no_grad():
    for customer, product, club_status, age_groups, product_groups, color_groups, index_name in tqdm(customer_product_test):
        
        # select 10 products from item set that customer has no interactions
        interacted_items = user_interacted_items[customer]
        interacted_items = interacted_items + [product]
        not_interacted_items = set(all_products_id) - set(interacted_items)
        selected_not_interacted = list(np.random.choice(list(not_interacted_items), 10, replace=False))
        test_items = [product] + selected_not_interacted
        
        # get the other product features based on the selected test items
        product_groups_batch = torch.tensor(article_data.loc[article_data["article_id"].isin(test_items)]["product_group_name"].to_numpy()).reshape(-1, 1)
        color_groups_batch = torch.tensor(article_data.loc[article_data["article_id"].isin(test_items)]["colour_group_name"].to_numpy()).reshape(-1, 1)
        index_name_batch = torch.tensor(article_data.loc[article_data["article_id"].isin(test_items)]["index_name"].to_numpy()).reshape(-1, 1)

        test_items = torch.tensor(test_items).reshape(-1, 1)
        customer_batch = torch.tensor([customer]*11).reshape(-1, 1)
        club_status_batch = torch.tensor([club_status]*11).reshape(-1, 1)
        age_groups_batch = torch.tensor([age_groups]*11).reshape(-1, 1)
#         label_batch = torch.tensor([1]+[0]*10).reshape(-1, 1)
        
        # batch prediction on test items
        batch_data_dict = {}
        # split data batch based on domains
        sales_batch = [customer_batch, test_items]
        customer_batch = [club_status_batch.float(), age_groups_batch.float()]
        product_batch = [product_groups_batch, color_groups_batch, index_name_batch]
        # send split data to VirtualWorkers and add the data pointer to the dict
        sales_part_ptr = []
        for tensor in sales_batch:
            sales_part_ptr.append(tensor.send(sales_domain))
        batch_data_dict[sales_domain.id] = sales_part_ptr
            
        customer_part_ptr = []
        for tensor in customer_batch:
            customer_part_ptr.append(tensor.send(customer_domain))
        batch_data_dict[customer_domain.id] = customer_part_ptr
            
        product_part_ptr = []
        for tensor in product_batch:
            product_part_ptr.append(tensor.send(product_domain))
        batch_data_dict[product_domain.id] = product_part_ptr
        
        y_pred = splitnn.forward(batch_data_dict).get()
#         y_pred = torch.sigmoid(y_pred)
        y_pred = y_pred.squeeze()

        top10_probs, top10_indices = torch.topk(y_pred, 10)
        # Convert the top 10 probabilities to a list
        top10_items = [test_items[i].item() for i in top10_indices]
        if product in top10_items:
            hits_10.append(1)
        else:
            hits_10.append(0)

        top5_probs, top5_indices = torch.topk(y_pred, 5)
        # Convert the top 5 probabilities to a list
        top5_items = [test_items[i].item() for i in top5_indices]
        if product in top5_items:
            hits_5.append(1)
        else:
            hits_5.append(0)
    
        
        # single prediction on product
        # send split data to VirtualWorkers and add the data pointer to the dict
        single_data_ptr = {}
        sales_data = [torch.tensor([customer]).reshape(-1, 1), torch.tensor([product]).reshape(-1, 1)]
        customer_data = [torch.tensor([club_status]).reshape(-1, 1).float(), torch.tensor([age_groups]).reshape(-1, 1).float()]
        product_data = [torch.tensor([product_groups]).reshape(-1, 1), torch.tensor([color_groups]).reshape(-1, 1), torch.tensor([index_name]).reshape(-1, 1)]
        
        sales_part_ptr = []
        for tensor in sales_data:
            sales_part_ptr.append(tensor.send(sales_domain))
        single_data_ptr[sales_domain.id] = sales_part_ptr
            
        customer_part_ptr = []
        for tensor in customer_data:
            customer_part_ptr.append(tensor.send(customer_domain))
        single_data_ptr[customer_domain.id] = customer_part_ptr
            
        product_part_ptr = []
        for tensor in product_data:
            product_part_ptr.append(tensor.send(product_domain))
        single_data_ptr[product_domain.id] = product_part_ptr
        
        y_single_pred = splitnn.forward(single_data_ptr).get()
        if torch.round(torch.sigmoid(y_single_pred)) == 1:
            recall.append(1)
        else:
            recall.append(0)
        
        # Calculate the NDCG
        relevance_scores = np.zeros(11)
        relevance_scores[0] = 1  # The first item is the ground truth (relevant) item
        ndcg_score = ndcg_at_k(relevance_scores[top10_indices.cpu().numpy()], 10)
        ndcgs.append(ndcg_score)

print("The Hit Rate@5 is {:.3f}".format(np.average(hits_5)))
print("The Hit Rate@10 is {:.3f}".format(np.average(hits_10)))
print("The NDCG@10 is {:.3f}".format(np.average(ndcgs)))
print("The Recall is {:.3f}".format(np.average(recall)))

  0%|          | 0/10338 [00:00<?, ?it/s]

The Hit Rate@5 is 0.441
The Hit Rate@10 is 0.897
The NDCG@10 is 0.405
The Recall is 0.956
