In [1]:
import math
import pandas as pd

from itertools import islice, chain

from torch.multiprocessing import Pool, SimpleQueue, JoinableQueue
from IPython.core.debugger import set_trace

In [2]:
class PandasDataLoader(object):
    def __init__(self, df, transform,
                 train_queue=SimpleQueue(), test_queue=SimpleQueue(),
                 train_size=0.8, test_size=0.2, batch_size=10):
        assert (train_size + test_size) == 1, 'Train test size should be fractions that sum to 1'
        self.train_size = train_size
        self.test_size = test_size
        self.transform=transform
        self.train_queue = train_queue
        self.test_queue = test_queue
        self.batch_size = batch_size
        self.df = self.shuffle(df)
        self.length = df.shape[0]
        self.train_df, self.test_df = self.compute_train_test_df()

    def shuffle(self, df):
        return df.sample(frac=1).reset_index(drop=True)

    def compute_train_test_df(self):
        train_size = int(self.length * self.train_size)
        test_size = int(self.length * self.test_size)
        diff = self.length - sum([train_size, test_size])
        return self.df.iloc[:train_size, :], self.df.iloc[train_size:, :]

    def build_batches(self, df):
        current_row = 0
        num_batches = math.ceil(df.shape[0] / float(self.batch_size))
        for _ in range(num_batches):
            batch_X = []
            batch_Y = []
            for _, record in df.iloc[current_row:current_row + self.batch_size, :].iterrows():
                batch = self.transform(record)
                batch_X.append(batch[0])
                batch_Y.append(batch[1])
            current_row += self.batch_size
            yield batch_X, batch_Y

    def __iter__(self):
        print("Train")
        idx = 0
        for training_X, training_Y in self.build_batches(self.train_df):
            self.train_queue.put((torch.tensor(training_X), torch.tensor(training_Y)))
            idx += 1
            yield idx
        print("Test")
        idx = 0
        for testing_X, testing_Y in self.build_batches(self.test_df):
            self.test_queue.put((torch.tensor(testing_X), torch.tensor(testing_Y)))
            idx += 1
            yield idx

In [3]:
df = pd.read_pickle('datasets/user_grouped_anime_ratings.gz').reset_index()

In [4]:
import pickle
mapping = pickle.load(open('datasets/user_anime_ratings_mapping.pkl', 'rb'))
mapping.keys()

anime_embeddings = pickle.load(open('anime_embed_pytorch_nn_epoch4_embedding_fix_10.23-10.21.pkl', 'rb'))
anime_embeddings.shape

user_embeddings = pickle.load(open('user_embed_pytorch_nn_epoch4_embedding_fix_10.23-10.21.pkl', 'rb'))
user_embeddings.shape

from sklearn.decomposition.pca import PCA
pca = PCA(n_components=2)
pca.fit(user_embeddings.cpu().detach().numpy())

PCA(copy=True, iterated_power='auto', n_components=2, random_state=None,
  svd_solver='auto', tol=0.0, whiten=False)

In [5]:
from functools import partial
import torch
import numpy as np

def input_anime_embeddings(record, mapping, anime_embeddings):
    num_records = len(record['anime_id'])
    batch_anime_idx = np.array([mapping['anime2idx'][x] for x in record['anime_id']], dtype=np.int32)
    batch_anime_rating = np.array(record['my_score'], dtype=np.int16)
    num_anime_watched = len(batch_anime_idx)
    sum_rating = np.sum(batch_anime_rating)
    sum_rating = 1 if sum_rating == 0 else sum_rating
    sum_neg_rating = np.sum(10 - batch_anime_rating)
    sum_neg_rating = 1 if sum_neg_rating == 0 else sum_neg_rating
    anime_sum = np.sum(
        anime_embeddings[batch_anime_idx] * batch_anime_rating.reshape(-1, 1), axis=0
    ).astype(np.float32)
    anime_neg_sum = np.sum(
        anime_embeddings[batch_anime_idx] * (10 - batch_anime_rating.reshape(-1, 1)), axis=0
    ).astype(np.float32)
    result_sum_rating =  anime_sum / sum_rating
    result_mean_rating = anime_sum / num_anime_watched
    result_sum_neg_rating = anime_neg_sum / sum_neg_rating
    result = np.concatenate([
        result_sum_rating,
        result_sum_neg_rating,
        result_mean_rating
    ])
    return result

def extract_required_format(record, pca, mapping, user_embeddings, anime_embeddings, device):
    return input_anime_embeddings(record, mapping, anime_embeddings), pca.transform([
            user_embeddings[mapping['user2idx'][record['user_id']]]
        ]).astype(np.float32)

transform = partial(extract_required_format, pca=pca, mapping=mapping,
                    user_embeddings=user_embeddings.cpu().detach().numpy(),
                    anime_embeddings=anime_embeddings.cpu().detach().numpy(), device='cuda')

In [6]:
class DataLoaderPool(object):
    def __init__(self, data_loader, num_workers=4):
        self.num_workers = num_workers
        self.data_loader = data_loader

    @staticmethod
    def action(x):
        return x

    def run(self, chunksize=5):
        def chunks(iterable, size=10):
            iterator = iter(iterable)
            for first in iterator:
                yield chain([first], islice(iterator, size - 1))

        with Pool(processes=self.num_workers) as pool:
            for chunk in chunks(self.data_loader, size=chunksize):
                for _ in pool.imap_unordered(DataLoaderPool.action, chunk):
                    pass
                yield chunk

In [7]:
dlp = DataLoaderPool(
    data_loader=PandasDataLoader(df=df.head(100), transform=transform)
)

In [8]:
i = dlp.run(chunksize=20)

In [9]:
for _ in i:
    pass

Train
Test


In [10]:
dlp.data_loader.train_df.shape

(80, 3)

In [11]:
count = 0
while True:
    if dlp.data_loader.train_queue.empty() == False:
        _ = dlp.data_loader.train_queue.get()
        count += 1
    else:
        break
count

8