In [1]:
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel as DDP

# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# For TcpStore, same way as on Linux.

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

In [2]:
class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


def demo_basic(rank, world_size):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)

    # create model and move it to GPU with id rank
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()

    cleanup()


def run_demo(demo_fn, world_size):
    mp.spawn(demo_fn,
             args=(world_size,),
             nprocs=world_size,
             join=True)

In [3]:
world_size = n_gpus = 2

In [4]:
run_demo(demo_basic, world_size)

Traceback (most recent call last):
  File "<string>", line 1, in <module>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiprocessing/spawn.py", line 115, in _main
    exitcode = _main(fd)
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiprocessing/spawn.py", line 115, in _main
    self = reduction.pickle.load(from_parent)
    self = reduction.pickle.load(from_parent)
AttributeError: AttributeErrorCan't get attribute 'demo_basic' on <module '__main__' (built-in)>: 
Can't get attribute 'demo_basic' on <module '__main__' (built-in)>


ProcessExitedException: process 1 terminated with exit code 1

In [5]:
if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_basic, world_size)

Traceback (most recent call last):
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
Traceback (most recent call last):
  File "<string>", line 1, in <module>
Traceback (most recent call last):
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
  File "<string>", line 1, in <module>
Traceback (most recent call last):
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "/home/yamanishi/.pyenv/versions/miniconda3-latest/envs/tr/lib/python3.7/multiproc

ProcessExitedException: process 2 terminated with exit code 1

In [14]:
import os
from os.path import join
import sys
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from scipy.sparse import csr_matrix
import scipy.sparse as sp
from time import time
import pickle
from tqdm import tqdm

class BasicDataset(Dataset):
    def __init__(self):
        print("init dataset")
    
    @property
    def n_users(self):
        raise NotImplementedError
    
    @property
    def m_items(self):
        raise NotImplementedError
    
    @property
    def trainDataSize(self):
        raise NotImplementedError
    
    @property
    def testDict(self):
        raise NotImplementedError
    
    @property
    def allPos(self):
        raise NotImplementedError
    
    def getUserItemFeedback(self, users, items):
        raise NotImplementedError
    
    def getUserPosItems(self, users):
        raise NotImplementedError
    
    def getUserNegItems(self, users):
        """
        not necessary for large dataset
        it's stupid to return all neg items in super large dataset
        """
        raise NotImplementedError
    
    def getSparseGraph(self):
        """
        build a graph in torch.sparse.IntTensor.
        Details in NGCF's matrix form
        A = 
            |I,   R|
            |R^T, I|
        """
        raise NotImplementedError

class Loader(BasicDataset):
    """
    Dataset type for pytorch \n
    Incldue graph information
    gowalla dataset
    """

    def __init__(self,config,path="../data/cf"):
        # train or test
        self.split = config['A_split']
        self.folds = config['A_n_fold']
        self.mode_dict = {'train': 0, "test": 1}
        self.mode = self.mode_dict['train']
        self.n_user = 0
        self.m_item = 0
        suffix = config['suffix']
        train_file = path + f'/train{suffix}.txt'
        test_file = path + f'/test{suffix}.txt'
        self.path = path
        trainUniqueUsers, trainItem, trainUser = [], [], []
        testUniqueUsers, testItem, testUser = [], [], []
        allPos = []
        self.traindataSize = 0
        self.testDataSize = 0

        with open(train_file) as f:
            for l in tqdm(f.readlines()):
                if len(l) > 0:
                    l = l.strip('\n').split(' ')
                    items = [int(i) for i in l[1:]]
                    uid = int(l[0])
                    trainUniqueUsers.append(uid)
                    trainUser.extend([uid] * len(items))
                    trainItem.extend(items)
                    allPos.append(np.array(items))
                    self.m_item = max(self.m_item, max(items))
                    self.n_user = max(self.n_user, uid)
                    self.traindataSize += len(items)
                if uid==100:
                    if config['test']:
                        break
        
        self.trainUniqueUsers = np.array(trainUniqueUsers)
        self.trainUser = np.array(trainUser)
        self.trainItem = np.array(trainItem)
        with open(f'../data/cf/allPosItem{suffix}.pkl', 'rb') as f:
            self.allPosItem = pickle.load(f)
        with open(f'../data/cf/allPos{suffix}.pkl', 'rb') as f:
            self._allPos = pickle.load(f)            

        with open(test_file) as f:
            for l in tqdm(f.readlines()):
                if len(l) > 0:
                    l = l.strip('\n').split(' ')
                    items = [int(i) for i in l[1:]]
                    uid = int(l[0])
                    testUniqueUsers.append(uid)
                    testUser.extend([uid] * len(items))
                    testItem.extend(items)
                    self.m_item = max(self.m_item, max(items))
                    self.n_user = max(self.n_user, uid)
                    self.testDataSize += len(items)
                if uid==100:
                    if config['test']:
                        break
        self.m_item += 1
        self.n_user += 1
        self.testUniqueUsers = np.array(testUniqueUsers)
        self.testUser = np.array(testUser)
        self.testItem = np.array(testItem)
        
        self.Graph = None
        print(f"{self.trainDataSize} interactions for training")
        print(f"{self.testDataSize} interactions for testing")
        #print(f"{world.dataset} Sparsity : {(self.trainDataSize + self.testDataSize) / self.n_users / self.m_items}")

        # (users,items), bipartite graph
        #if config['model']=='lgn':
        #    self.UserItemNet = csr_matrix((np.ones(len(self.trainUser)), (self.trainUser, self.trainItem)),
        #                                shape=(self.n_user, self.m_item))
            #self.users_D = np.array(self.UserItemNet.sum(axis=1)).squeeze()
            #self.users_D[self.users_D == 0.] = 1
            #self.items_D = np.array(self.UserItemNet.sum(axis=0)).squeeze()
            #self.items_D[self.items_D == 0.] = 1.
        # pre-calculate
        #self._allPos = allPos
        self.__testDict = self.__build_test()
        #print(f"{world.dataset} is ready to go")

    @property
    def n_users(self):
        return self.n_user
    
    @property
    def m_items(self):
        return self.m_item
    
    @property
    def trainDataSize(self):
        return self.traindataSize
    
    @property
    def testDict(self):
        return self.__testDict

    @property
    def allPos(self):
        return self._allPos

    def _split_A_hat(self,A):
        A_fold = []
        fold_len = (self.n_users + self.m_items) // self.folds
        for i_fold in range(self.folds):
            start = i_fold*fold_len
            if i_fold == self.folds - 1:
                end = self.n_users + self.m_items
            else:
                end = (i_fold + 1) * fold_len
            A_fold.append(self._convert_sp_mat_to_sp_tensor(A[start:end]).coalesce().to(self.device))
        return A_fold

    def _convert_sp_mat_to_sp_tensor(self, X):
        coo = X.tocoo().astype(np.float32)
        row = torch.Tensor(coo.row).long()
        col = torch.Tensor(coo.col).long()
        index = torch.stack([row, col])
        data = torch.FloatTensor(coo.data)
        return torch.sparse.FloatTensor(index, data, torch.Size(coo.shape))
        
    def getSparseGraph(self):
        print("loading adjacency matrix")
        if self.Graph is None:
            try:
                pre_adj_mat = sp.load_npz(self.path + '/s_pre_adj_mat.npz')
                print("successfully loaded...")
                norm_adj = pre_adj_mat
            except :
                print("generating adjacency matrix")
                s = time()
                print(1)
                adj_mat = sp.dok_matrix((self.n_users + self.m_items, self.n_users + self.m_items), dtype=np.float32)
                adj_mat = adj_mat.tolil()
                R = self.UserItemNet.tolil()
                print(2)
                adj_mat[:self.n_users, self.n_users:] = R
                adj_mat[self.n_users:, :self.n_users] = R.T
                adj_mat = adj_mat.todok()
                print(3)
                # adj_mat = adj_mat + sp.eye(adj_mat.shape[0])
                
                rowsum = np.array(adj_mat.sum(axis=1))
                d_inv = np.power(rowsum, -0.5).flatten()
                d_inv[np.isinf(d_inv)] = 0.
                d_mat = sp.diags(d_inv)
                print(4)
                
                norm_adj = d_mat.dot(adj_mat)
                norm_adj = norm_adj.dot(d_mat)
                norm_adj = norm_adj.tocsr()
                print(5)
                end = time()
                print(f"costing {end-s}s, saved norm_mat...")
                sp.save_npz(self.path + '/s_pre_adj_mat.npz', norm_adj)
                print(6)

            if self.split == True:
                self.Graph = self._split_A_hat(norm_adj)
                print("done split matrix")
            else:
                self.Graph = self._convert_sp_mat_to_sp_tensor(norm_adj)
                self.Graph = self.Graph.coalesce().to(self.device)
                print("don't split the matrix")
        return self.Graph

    def __build_test(self):
        """
        return:
            dict: {user: [items]}
        """
        test_data = {}
        for i, item in enumerate(self.testItem):
            user = self.testUser[i]
            if test_data.get(user):
                test_data[user].append(item)
            else:
                test_data[user] = [item]
        return test_data

    def getUserItemFeedback(self, users, items):
        """
        users:
            shape [-1]
        items:
            shape [-1]
        return:
            feedback [-1]
        """
        # print(self.UserItemNet[users, items])
        return np.array(self.UserItemNet[users, items]).astype('uint8').reshape((-1,))

    def getUserPosItems(self, users):
        posItems = []
        for user in users:
            posItems.append(self.allPos[user])
            #posItems.append(self.UserItemNet[user].nonzero()[1])
        return posItems


In [15]:
config = {}
config['device'] = 'cuda:0'
config['A_split'] = 1
config['A_n_fold'] = 1
config['test'] = False
config['suffix'] = '22_1_10'
dataset = Loader(config)

100%|██████████| 154376/154376 [00:00<00:00, 170128.94it/s]
100%|██████████| 154376/154376 [00:00<00:00, 321501.52it/s]


1785678 interactions for training
851685 interactions for testing


In [25]:
from torch_geometric.loader import NeighborSampler
class SAGE(torch.nn.Module):
    def __init__(self, config, dataset):
        super().__init__()
        self.config = config
        self.dataset = dataset
        self.n_user = self.dataset.n_user
        trainUser, trainItem = self.dataset.trainUser, self.dataset.trainItem
        self.edge_index = torch.cat([torch.stack([torch.tensor(trainUser), torch.tensor(trainItem)+self.n_user], dim=0), torch.stack([torch.tensor(trainItem)+self.n_user, torch.tensor(trainUser)], dim=0)], dim=1)
        
    def loader(self, user, pos, neg): 
        dataset_user = torch.utils.data.TensorDataset(user)
        dataset_pos = torch.utils.data.TensorDataset(pos)
        dataset_neg = torch.utils.data.TensorDataset(neg)
        user_loader = NeighborSampler(self.edge_index, node_idx=user, sizes=[self.num_neighbors for _ in range(self.num_layers)],
                                    batch_size=self.config['bpr_batch_size'], shuffle=False, num_workers=2)
        pos_loader = NeighborSampler(self.edge_index, node_idx=pos, sizes=[self.num_neighbors for _ in range(self.num_layers)],
                                batch_size=self.config['bpr_batch_size'], shuffle=False, num_workers=2)
        neg_loader = NeighborSampler(self.edge_index, node_idx=neg, sizes=[self.num_neighbors for _ in range(self.num_layers)],
                                batch_size=self.config['bpr_batch_size'], shuffle=False, num_workers=2)

In [26]:
sage = SAGE(config, dataset)

In [41]:
import torch.distributed as dist
dist.init_process_group("gloo", rank=2, world_size=2)
print(dist.get_world_size())

ValueError: Error initializing torch.distributed using env:// rendezvous: environment variable MASTER_ADDR expected, but not set

In [None]:
import torch.distributed as dist
print(dist.get_rank())
train_sampler = torch.utils.data.distributed.DistributedSampler(
                        train_dataset, 
                        num_replicas=dist.get_world_size(), 
                        rank=dist.get_rank(),
                        shuffle=False,)

In [37]:
source = torch.randint(0, 1000, size=(10000,))
target = torch.randint(0, 1000, size=(10000,))
edge_index = torch.stack([source, target], dim=1)
node_idx = torch.randint(0, 800, size=(1000, ))
train_dataset = torch.data.utils.TensorDataset(node_idx)
train_sampler = torch.utils.data.distributed.DistributedSampler(
                        train_dataset, 
                        num_replicas=dist.get_world_size(), 
                        rank=dist.get_rank(),
                        shuffle=False,)
sampler = NeighborSampler(edge_index,node_idx=node_idx, sizes=[3 for _ in range(2)],batch_size=50, shuffle=False, sampler=train_sampler)

In [3]:
torch.rand(3).unsqueeze(1).shape

torch.Size([3, 1])

In [1]:
with open('/home/yamanishi/project/furusato_recommend/data/text/product_main_comment_count22_1_10.pkl', 'rb') as f:
    comment = pickle.load(f)

In [9]:
with open('/home/yamanishi/project/furusato_recommend/data/text/user_main_comment_count22_1_10.pkl', 'rb') as f:
    comment = pickle.load(f)

In [10]:
index = np.random.randint(0, 25000, size=(8000, ))

In [13]:
import time
time1 = time.time()
coo=comment[index].tocoo()
col, row = coo.col, coo.row
print(time.time()-time1)

0.04520869255065918


In [15]:
import time
time1 = time.time()
sparse_data = comment[index]
# 値が存在する列を取得
for i in index:
    if i < sparse_data.shape[0]:
        row_start = sparse_data.indptr[i]
        row_end = sparse_data.indptr[i + 1]
        cols = sparse_data.indices[row_start:row_end]
        print(f"Row {i}: {cols}")
print(time.time()-time1)

Row 2469: [  143   206   389   452   601   610   700   787  1078  1303  1368  1372
  1419  1458  1464  1467  1470  1471  1490  1608  1661  1689  1730  1750
  1785  1792  1806  1831  1974  1993  2158  2159  2194  2214  2249  2296
  2379  2487  2511  2537  2579  2602  2603  2607  2679  2728  2738  2791
  2805  2828  2835  2838  2847  2859  2892  2909  2911  2967  3082  3098
  3131  3201  3216  3262  3351  3371  3425  3521  3530  3591  3636  3654
  3659  3670  3671  3710  3712  3734  3739  3772  3800  3829  3970  3984
  4025  4078  4145  4213  4310  4431  4506  4563  4582  4631  4800  4893
  4933  4967  4982  5057  5112  5214  5232  5298  5372  5413  5483  5511
  5615  5632  5697  5721  5732  5756  5810  5819  5834  5991  6103  6156
  6242  6280  6355  6639  6690  6834  6851  7094  7173  7387  7532  7535
  7556  7922  7959  8042  8176  8235  8305  8401  8413  8540  8636  8666
  8749  8785  8838  8936  9100  9380  9396  9530  9551  9689  9792  9928
 10026 10064 10186 10248 10256 10271 1027

In [10]:
from tqdm import tqdm
import cupy as cp
customer_num = 8500000
item_num = 440000
K = 1000
customer_split_num = 1000
a = np.random.rand(customer_num, 32)
b = np.random.rand(item_num, 32)
max_indices_all = []
for i in tqdm(range(0, 8500000, customer_split_num)):
    rating = cp.dot(cp.asarray(a[i:i+customer_split_num]),  cp.asarray(b.T))
    #unsorted_max_indices = np.argpartition(-rating, K)[:K]
    #y = rating[unsorted_max_indices]
    #indices = np.argsort(-y)
    #max_indices = unsorted_max_indices[indices]
    #max_indices_all.append(max_indices)
    


100%|██████████| 8500/8500 [08:55<00:00, 15.86it/s]


In [1]:
class FactorizationMachine(torch.nn.Module):

    def __init__(self, reduce_sum=True):
        super().__init__()
        self.reduce_sum = reduce_sum

    def forward(self, x):
        """
        :param x: Float tensor of size ``(batch_size, num_fields, embed_dim)``
        """
        square_of_sum = torch.sum(x, dim=1) ** 2
        sum_of_square = torch.sum(x ** 2, dim=1)
        ix = square_of_sum - sum_of_square
        if self.reduce_sum:
            ix = torch.sum(ix, dim=1, keepdim=True)
        return 0.5 * ix

In [3]:
fm = FactorizationMachine(reduce_sum=False)
out=fm(torch.rand(128, 5,32))
out.shape

torch.Size([128, 32])