In [1]:
import torch

In [2]:
torch.cuda.is_available()

True

In [3]:
import pandas as pd
import numpy as np
from pathlib import Path
import datetime

In [4]:
import logging
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

In [5]:
RAW_DATA_FOLDER = Path("../data/raw/")
PROCESSED_DATA_FOLDER = Path("../data/processed/")

spain_sales_raw = RAW_DATA_FOLDER / "spanish_sales.csv"

In [6]:
sales_df = pd.read_csv(spain_sales_raw, parse_dates=True,dtype={"customer_id": int, "product_id": int})

In order to have a uniformed way with rating let us add a dummy data raw `ratings`

In [7]:
sales_df['rating'] = 1.0
sales_df.date = pd.to_datetime(sales_df.date)

In [8]:
sales_df.rename(columns={'customer_id':'uid',
                          'product_id':'mid',
                          'date':'timestamp'}, 
                 inplace=True)

In [9]:
sales_df.head()

Unnamed: 0,uid,mid,timestamp,rating
0,9134527,386652,2019-05-06,1.0
1,9134527,386652,2019-05-06,1.0
2,9134527,464466,2019-05-06,1.0
3,9134527,55815,2019-05-06,1.0
4,9134527,55815,2019-05-06,1.0


In [10]:
sales_df.dtypes

uid                   int64
mid                   int64
timestamp    datetime64[ns]
rating              float64
dtype: object

In [11]:
def load_data(data_dir, MIN_RATINGS=4):
    ml_dir = data_dir
    sales_df = pd.read_csv(spain_sales_raw, parse_dates=True,dtype={"customer_id": int, "product_id": int})
    sales_df['ratings'] = 1.0
    sales_df.date = pd.to_datetime(sales_df.date)
    sales_df.rename(columns={'customer_id':'uid',
                          'product_id':'mid',
                          'date':'timestamp'}, 
                 inplace=True)
    df = sales_df.copy()

    # first let us filter out the users with less than MIN_RATINGS interations
    logger.info(
        "Filtering out users with less than {} ratings".format(MIN_RATINGS))
    grouped = df.groupby('uid')
    df = grouped.filter(lambda x: len(x) >= MIN_RATINGS).copy()

    # now let us factoriyze (re-index users)
    logger.info("Mapping original user and item IDs to new sequential IDs")
    df['userId'] = pd.factorize(df['uid'])[0]
    df['itemId'] = pd.factorize(df['mid'])[0]

    logger.info('Range of userId is [{}, {}]'.format(
        df.userId.min(), df.userId.max()))
    logger.info('Range of itemId is [{}, {}]'.format(
        df.userId.min(), df.itemId.max()))

    num_users = len(df['userId'].unique())
    num_items = len(df['itemId'].unique())
    logger.info("num_users is {}, num_items is {}".format(
        num_users, num_items))

    return df, num_users, num_items


In [12]:
data, num_users, num_items = load_data(spain_sales_raw)

INFO:__main__:Filtering out users with less than 4 ratings
INFO:__main__:Mapping original user and item IDs to new sequential IDs
INFO:__main__:Range of userId is [0, 204100]
INFO:__main__:Range of itemId is [0, 6112]
INFO:__main__:num_users is 204101, num_items is 6113


In [13]:
data.shape

(3299776, 6)

In [14]:
item_pool = set(data['itemId'].unique())
item_pool_np = np.array(data['itemId'].unique())

In [15]:
len(item_pool)

6113

In [20]:
import random
from tqdm import tqdm

In [None]:
def sample_negatives_low_mem(ratings):
    """return all negative items & 100 sampled negative items"""
    logger.info("sampling negatives with low mem")
    # raitings = dd.from_pandas(ratings,npartitions=4)

    def sample_negatives_per_user(x):
        set_interacted = set(x)
        set_non_interacted = item_pool.difference(set_interacted)
        return random.sample(set_non_interacted, 99)

    interact_status = ratings.groupby('userId')['itemId'].apply(set).reset_index().rename(
        columns={'itemId': 'interacted_items'})
    # interact_status['negative_items'] = interact_status['interacted_items'].apply(
    #     lambda x: self.item_pool - x)
    interact_status['negative_samples'] = interact_status['interacted_items'].apply(sample_negatives_per_user)
    return interact_status[['userId', 'negative_samples']]

In [None]:
sample_negatives_low_mem(data)

In [30]:
def prepare_epoch_low_mem(data, num_negatives, batch_size):
    tqdm.pandas()
    def sample_negatives_per_user_per_epoch(x):
        set_interacted = set(x)
        set_non_interacted = item_pool.difference(set_interacted)
        return random.sample(set_non_interacted, num_negatives)
    users, items, ratings = [], [], []
    # TODO: this is very bad to do it before every epoch, but, for now this makes it readable
    interact_status = data.groupby('userId')['itemId'].apply(set).reset_index().rename(
        columns={'itemId': 'interacted_items'})        
    train_epoch_with_negatives = pd.merge(data,
                                          interact_status[['userId', 'interacted_items']], on='userId')
    train_epoch_with_negatives['epoch_sampled_negatives'] = train_epoch_with_negatives['interacted_items'].progress_apply(sample_negatives_per_user_per_epoch)
    return train_epoch_with_negatives

In [31]:
prepare_epoch_low_mem(data, 4, 1024)

100%|██████████| 3299776/3299776 [09:07<00:00, 6028.57it/s]


Unnamed: 0,uid,mid,timestamp,ratings,userId,itemId,interacted_items,epoch_sampled_negatives
0,9134527,386652,2019-05-06,1.0,0,0,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[297, 6074, 4793, 5350]"
1,9134527,386652,2019-05-06,1.0,0,0,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[2429, 5563, 244, 371]"
2,9134527,464466,2019-05-06,1.0,0,1,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[230, 5692, 2040, 1217]"
3,9134527,55815,2019-05-06,1.0,0,2,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[3134, 4002, 1237, 3451]"
4,9134527,55815,2019-05-06,1.0,0,2,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[3502, 5953, 2022, 5381]"
5,9134527,569151,2019-05-06,1.0,0,3,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[4785, 2533, 1785, 2631]"
6,9134527,386652,2019-06-20,1.0,0,0,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[3152, 5394, 5503, 5101]"
7,9134527,386652,2019-06-20,1.0,0,0,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[4651, 4161, 2114, 4632]"
8,9134527,413435,2019-06-20,1.0,0,2030,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[4343, 2060, 4690, 3203]"
9,9134527,506758,2019-06-20,1.0,0,1547,"{0, 1, 2, 3, 131, 389, 1547, 5387, 3083, 4240,...","[511, 1798, 1872, 5261]"


In [None]:
# import numpy as np
# from multiprocessing import cpu_count, Parallel
 
# cores = cpu_count() #Number of CPU cores on your system
# partitions = cores #Define as many partitions as you want
 
# def parallelize(data, func):
#     data_split = np.array_split(data, partitions)
#     pool = Pool(cores)
#     data = pd.concat(pool.map(func, data_split))
#     pool.close()
#     pool.join()
#     return data

# data = parallelize(data, work);

In [None]:
data['rank_latest'] = data.groupby(['userId'])['timestamp'].rank(
    method='first', ascending=False)

In [None]:
from tqdm import tqdm

In [None]:
tqdm.pandas()

In [None]:
interact_status = data.groupby('userId')['itemId'].apply(np.unique).reset_index().rename(
    columns={'itemId': 'interacted_items'})

In [None]:
item_pool_np

In [None]:
interact_status_np = interact_status.to_numpy()

In [None]:
interact_status_np

In [None]:
item_pool_np

In [None]:
#!!!!!!!
np.repeat(item_pool_np[np.newaxis, :], interact_status_np.shape[0], axis=0)

In [None]:
np.setdiff1d(interact_status_np[:,2], np.repeat(item_pool_np[np.newaxis, :], interact_status_np.shape[0], axis=0))

In [None]:
# blows out of 32 GB
np.hstack((interact_status_np, np.repeat(item_pool_np[np.newaxis, :], interact_status_np.shape[0], axis=0)))