In [1]:
import argparse
import numpy as np
import pandas as pd
from tqdm import tqdm
import os
import tensorflow.keras as K
import gc
import pickle

2021-07-21 21:49:48.664159: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0


In [2]:
gc.collect()

22

In [3]:
class SessionDataset:
    """Credit to yhs-968/pyGRU4REC."""    
    def __init__(self, data, session_key='session_ID', item_key='item_ID', time_key='time_stamp', n_samples=-1, itemmap=None, time_sort=False):
        """
        Args:
            path: path of the csv file
            sep: separator for the csv
            session_key, item_key, time_key: name of the fields corresponding to the sessions, items, time
            n_samples: the number of samples to use. If -1, use the whole dataset.
            itemmap: mapping between item IDs and item indices
            time_sort: whether to sort the sessions by time or not
        """
        self.df = data
        self.session_key = session_key
        self.item_key = item_key
        self.time_key = time_key
        self.time_sort = time_sort
        self.add_item_indices(itemmap=itemmap)
        self.df.sort_values([session_key, time_key], inplace=True)

        # Sort the df by time, and then by session ID. That is, df is sorted by session ID and
        # clicks within a session are next to each other, where the clicks within a session are time-ordered.

        self.click_offsets = self.get_click_offsets()
        self.session_idx_arr = self.order_session_idx()
        
    def get_click_offsets(self):
        """
        Return the offsets of the beginning clicks of each session IDs,
        where the offset is calculated against the first click of the first session ID.
        """
        offsets = np.zeros(self.df[self.session_key].nunique() + 1, dtype=np.int32)
        # group & sort the df by session_key and get the offset values
        offsets[1:] = self.df.groupby(self.session_key).size().cumsum()

        return offsets

    def order_session_idx(self):
        """ Order the session indices """
        if self.time_sort:
            # starting time for each sessions, sorted by session IDs
            sessions_start_time = self.df.groupby(self.session_key)[self.time_key].min().values
            # order the session indices by session starting times
            session_idx_arr = np.argsort(sessions_start_time)
        else:
            session_idx_arr = np.arange(self.df[self.session_key].nunique())

        return session_idx_arr
    
    def add_item_indices(self, itemmap=None):
        """ 
        Add item index column named "item_idx" to the df
        Args:
            itemmap (pd.DataFrame): mapping between the item Ids and indices
        """
        if itemmap is None:
            item_ids = self.df[self.item_key].unique()  # unique item ids
            item2idx = pd.Series(data=np.arange(len(item_ids)),
                                 index=item_ids)
            itemmap = pd.DataFrame({self.item_key:item_ids,
                                   'item_idx':item2idx[item_ids].values})
        
        self.itemmap = itemmap
        self.df = pd.merge(self.df, self.itemmap, on=self.item_key, how='inner')
        
    @property    
    def items(self):
        return self.itemmap.ItemId.unique()
        

class SessionDataLoader:
    """Credit to yhs-968/pyGRU4REC."""    
    def __init__(self, dataset, batch_size=50):
        """
        A class for creating session-parallel mini-batches.
        Args:
            dataset (SessionDataset): the session dataset to generate the batches from
            batch_size (int): size of the batch
        """
        self.dataset = dataset
        self.batch_size = batch_size
        self.done_sessions_counter = 0
        
    def __iter__(self):
        """ Returns the iterator for producing session-parallel training mini-batches.
        Yields:
            input (B,):  Item indices that will be encoded as one-hot vectors later.
            target (B,): a Variable that stores the target item indices
            masks: Numpy array indicating the positions of the sessions to be terminated
        """

        df = self.dataset.df
        session_key='session_ID'
        item_key='item_ID'
        time_key='time_stamp'
        self.n_items = df[item_key].nunique()+1
        click_offsets = self.dataset.click_offsets
        session_idx_arr = self.dataset.session_idx_arr

        iters = np.arange(self.batch_size)
        maxiter = iters.max()
        start = click_offsets[session_idx_arr[iters]]
        end = click_offsets[session_idx_arr[iters] + 1]
        mask = [] # indicator for the sessions to be terminated
        finished = False        

        while not finished:
            minlen = (end - start).min()
            # Item indices (for embedding) for clicks where the first sessions start
            idx_target = df.item_idx.values[start]
            for i in range(minlen - 1):
                # Build inputs & targets
                idx_input = idx_target
                idx_target = df.item_idx.values[start + i + 1]
                inp = idx_input
                target = idx_target
                yield inp, target, mask
                
            # click indices where a particular session meets second-to-last element
            start = start + (minlen - 1)
            # see if how many sessions should terminate
            mask = np.arange(len(iters))[(end - start) <= 1]
            self.done_sessions_counter = len(mask)
            for idx in mask:
                maxiter += 1
                if maxiter >= len(click_offsets) - 1:
                    finished = True
                    break
                # update the next starting/ending point
                iters[idx] = maxiter
                start[idx] = click_offsets[session_idx_arr[maxiter]]
                end[idx] = click_offsets[session_idx_arr[maxiter] + 1]

In [4]:
def loadData(dataDir, trainName='train.csv', testName='test.csv', itemmapName='itemmap.csv'):
    # load data from npz format to numpy 
    itemmap = pd.read_csv(os.path.join(dataDir,itemmapName))
    train_df = pd.read_csv(os.path.join(dataDir,trainName))
    test_df = pd.read_csv(os.path.join(dataDir,testName))
    train_dataset = SessionDataset(train_df, itemmap=itemmap)
    test_dataset = SessionDataset(test_df, itemmap=itemmap)
    
    return train_dataset, test_dataset  

In [5]:
# train_data, test_data = loadData("/run/user/1024/UBDTR/data/")

# with open('/home/yudonghan/storage/UBDTR/train.pkl', 'wb') as fp:
#     pickle.dump(train_data, fp)
# with open('/home/yudonghan/storage/UBDTR/test.pkl', 'wb') as fp:
#     pickle.dump(test_data, fp)

In [6]:
with open('/home/yudonghan/storage/UBDTR/train.pkl', 'rb') as fp:
    train_data = pickle.load(fp)
with open('/home/yudonghan/storage/UBDTR/test.pkl', 'rb') as fp:
    test_data = pickle.load(fp)

In [7]:
train_n_items = len(train_data.df['item_ID'].unique())

train_samples_qty = len(train_data.df['session_ID'].unique())
test_samples_qty = len(test_data.df['session_ID'].unique())

In [8]:
train_n_items

924757

In [9]:
def GRU4REC(batch_size, input_size, hidden_size, lr=.001, dropout_input=.0, dropout_hidden=.25): 
    inputs = K.layers.Input(batch_input_shape=(batch_size, 1, input_size))
    gru,_ = K.layers.GRU(
        hidden_size,
        stateful=True,
        return_state=True,
        name='GRU',
        dropout=dropout_input,
        recurrent_dropout=dropout_hidden
    )(inputs)
    predictions = K.layers.Dense(input_size, activation='softmax')(gru)
    model = K.Model(inputs=inputs, outputs=[predictions], name='GRU4REC')
    model.compile(loss=K.losses.CategoricalCrossentropy(), optimizer=K.optimizers.Adam(learning_rate=lr))

    return model

In [10]:
batch_size=8
model = GRU4REC(batch_size, train_n_items, hidden_size=100)
model.summary()



2021-07-21 21:49:58.209754: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcuda.so.1
2021-07-21 21:49:58.244259: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1733] Found device 0 with properties: 
pciBusID: 0000:09:00.0 name: NVIDIA Tesla M40 computeCapability: 5.2
coreClock: 1.112GHz coreCount: 24 deviceMemorySize: 11.18GiB deviceMemoryBandwidth: 268.58GiB/s
2021-07-21 21:49:58.244304: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcudart.so.11.0
2021-07-21 21:49:58.248054: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublas.so.11
2021-07-21 21:49:58.248114: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcublasLt.so.11
2021-07-21 21:49:58.249381: I tensorflow/stream_executor/platform/default/dso_loader.cc:53] Successfully opened dynamic library libcufft.so.

Model: "GRU4REC"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(8, 1, 924757)]          0         
_________________________________________________________________
GRU (GRU)                    [(8, 100), (8, 100)]      277457700 
_________________________________________________________________
dense (Dense)                (8, 924757)               93400457  
Total params: 370,858,157
Trainable params: 370,858,157
Non-trainable params: 0
_________________________________________________________________


In [11]:
epochs=3
for epoch in range(1, epochs):
    with tqdm(total=train_samples_qty) as pbar:
        loader = SessionDataLoader(train_data, batch_size=batch_size)
        for feat, target, mask in loader:
            gru_layer = model.get_layer(name="GRU")
            hidden_states = gru_layer.states[0].numpy()
            for elt in mask:
                hidden_states[elt, :] = 0
            gru_layer.reset_states(states=hidden_states)

            input_oh = K.utils.to_categorical(feat, num_classes=loader.n_items)
            input_oh = np.expand_dims(input_oh, axis=1)

            target_oh = K.utils.to_categorical(target, num_classes=loader.n_items)

            tr_loss = model.train_on_batch(input_oh, target_oh)

            pbar.set_description("Epoch {0}. Loss: {1:.5f}".format(epoch, tr_loss))
            pbar.update(loader.done_sessions_counter)

  0%|                                                                                                                            | 0/9706675 [00:00<?, ?it/s]2021-07-21 21:50:04.126604: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:176] None of the MLIR Optimization Passes are enabled (registered 2)
2021-07-21 21:50:04.173454: I tensorflow/core/platform/profile_utils/cpu_utils.cc:114] CPU Frequency: 2299890000 Hz
2021-07-21 21:50:14.548832: W tensorflow/core/common_runtime/bfc_allocator.cc:456] Allocator (GPU_0_bfc) ran out of memory trying to allocate 352.77MiB (rounded to 369902848)requested by op GRU4REC/GRU/while/body/_1/GRU4REC/GRU/while/gru_cell/strided_slice_1
If the cause is memory fragmentation maybe the environment variable 'TF_GPU_ALLOCATOR=cuda_malloc_async' will improve the situation. 
Current allocation summary follows.
Current allocation summary follows.
2021-07-21 21:50:14.548924: I tensorflow/core/common_runtime/bfc_allocator.cc:991] BFCAllocator dump for G

ResourceExhaustedError:  OOM when allocating tensor with shape[924757,100] and type float on /job:localhost/replica:0/task:0/device:GPU:0 by allocator GPU_0_bfc
	 [[{{node GRU4REC/GRU/while/body/_1/GRU4REC/GRU/while/gru_cell/strided_slice_1}}]]
Hint: If you want to see a list of allocated tensors when OOM happens, add report_tensor_allocations_upon_oom to RunOptions for current allocation info.
 [Op:__inference_train_function_2308]

Function call stack:
train_function


In [6]:
default_max_epochs = 5
default_min_peers = 2

In [9]:
dataDir = os.getenv('DATA_DIR', '~/storage/UBDTR/data')
modelDir = os.getenv('MODEL_DIR', '~/storage/UBDTR/model')
max_epochs = int(os.getenv('MAX_EPOCHS', str(default_max_epochs)))
min_peers = int(os.getenv('MIN_PEERS', str(default_min_peers)))
train_dataset, test_dataset = loadData(dataDir, trainName='small.csv')

FileNotFoundError: [Errno 2] No such file or directory: '/home/yudonghan/storage/UBDTR/data/itemmap.csv'