# **Import Libraries**

In [None]:
import os
import sys

from datastream import *
import pickle
import wget

os.environ['CUDA_VISIBLE_DEVICES'] = '0'

In [None]:
WORKDIR = os.path.expanduser('./data_stream/yoochoose')

os.makedirs(WORKDIR, exist_ok=True)
os.chdir(WORKDIR)

%matplotlib inline

# Parameters

In [None]:
batch_size = 2000
epochs = 20
skiprows = 500000  # 1100000
category='Video Games'
window_size=20
lr = 1e-3


ENABLE_SAMPLING = True
ENABLE_HITRATE_STAT = True
DISABLE_TEMP_CHART = True
step=20
negative_sampling_processes=4


# **Data Loader**

Download link:

    https://www.kaggle.com/datasets/chadgostopp/recsys-challenge-2015

See file *YoochooseDatasetPreprocess.ipynb* for *.parquet* file creation

In [None]:
DATASET_PATH = os.path.expanduser('./yoochoose-buys.parquet')

if not os.path.exists(DATASET_PATH):
    raise RuntimeError('dataset not found')

In [None]:
class AmazonDataLoader:
    threshold = 2
    sep=','
    header=0
    stringId=False

    def __init__(self,
                 category=None,
                 skiprows=0,
                 force_download=False,
                 batch_size=batch_size):
        
        self.archive_name = category
        self.batch_size = batch_size
        
        print('*** process dataset ***')
        #names=['Item','User','Timestamp']

        self.df = pd.read_parquet(DATASET_PATH)
        self.df['Timestamp'] = self.df.Timestamp.values.astype(np.int64) // 10 ** 9
        print(self.df.info())
        
        print(f'read {len(self.df)} lines')
   
        self.M = self.df['User'].max()
        self.N = self.df['Item'].max()
        print(f'number of items stored in df {self.df["Item"].nunique()}.')
        print(f'user M = {self.M}, items N = {self.N}')

        self.df['User'] = self.df['User'] - 1
        self.df['Item'] = self.df['Item'] - 1
        self.df['Rating']  = 5

        self.df = self.df.sort_values('Timestamp')

        # For each timestamp T find the number of available items at T
        self.df['Available']=self.df['Item'].cummax(axis = 0)+1
        print('*** Available Items at Each Timestamp ***')
        print(self.df.head())

        # Gettig positive dataframe
        #df_pos=self.df.drop(self.df[self.df['Rating'] <= self.threshold].index, inplace = False)
        df_pos=self.df.drop(self.df[self.df['Rating'] <= self.threshold].index, inplace = False)
        df_aux=df_pos.copy()
        
        # negative items =======================================================
        for i in range(negative_sampling_processes):
          df_neg_aux = df_pos.copy()
          
          df_neg_aux['Random'] = np.random.randint(0, self.N, df_neg_aux.shape[0])
          df_neg_aux['Item'] = (df_neg_aux['Random'] % df_neg_aux['Available'])
          
          df_neg_aux['Rating'] = 0
            
          df_aux=df_aux.append(df_neg_aux)
        
        # Given two tuples (user, item, 3/4/5) and (user, item, 0) the second one will be deleted 
        # (a wrong negative item for the user was inserted)
        #df_aux = df_aux.sort_values('Rating', ascending=False).drop_duplicates(['User','Item'], keep='first')
        # ======================================================================

        # Getting negative dataframe
        df_neg = df_aux.loc[df_aux["Rating"] == 0]
        
        #print('#Pos', min(df_pos.index), max(df_pos.index))
        #print('#Neg', min(df_neg.index), max(df_neg.index))

        # Positive and Negative dataframes are now available
        df_pos=df_pos.sort_values('Timestamp')
        df_neg=df_neg.sort_values('Timestamp')
        
        print("*** Positive dataframe ***")
        print(df_pos.head(), '\nSize', len(df_pos))
        print("*** Negative dataframe ***")
        print(df_neg.head(), '\nSize', len(df_neg))

        # Join Positive and Negative dataframes
        self.df_pair=df_pos.join(df_neg,lsuffix='_pos', rsuffix='_neg')
        self.df_pair=self.df_pair.sort_values('Timestamp_pos')
        print("*** Full pairs dataframe ***")
        print(self.df_pair.head())
        print('Check join fails', len(self.df_pair[pd.isna(self.df_pair['Item_neg'])]))
        
        self.df_pair=self.df_pair[~pd.isna(self.df_pair['Item_neg'])]
        print(f"*** Full pairs dataframe ({len(self.df_pair)}) ***")
        print(self.df_pair.head())

        # Drop useless columns, rename and sort useful columns
        self.df_pair=self.df_pair.drop(columns=['Rating_pos','Available_pos','User_neg','Rating_neg','Timestamp_neg','Available_neg','Random',])
        self.df_pair=self.df_pair.rename(columns={"User_pos": "User", "Timestamp_pos": "Timestamp"}, errors="raise")
        self.df_pair = self.df_pair[['User', 'Item_pos', 'Item_neg', 'Timestamp']]

        self.df_pair = self.df_pair.dropna(axis=0, subset=['Item_neg'])
        self.df_pair['Item_neg'] = pd.to_numeric(self.df_pair['Item_neg'], downcast='integer')

        print(self.df_pair.isna().sum())
        
        print("*** Final pairs dataframe ***")
        print(self.df_pair.head())
        
        # ======================================================================
        self.df['Day']=(self.df['Timestamp']/86400).round()
        self.df['Week']=(self.df['Timestamp']/604800).round()
        
        self.df_item_week_max=self.df[['Item','Week']].groupby(['Item']).max().reset_index()
        self.df_disappearing_items=self.df_item_week_max.groupby(['Week']).count().reset_index()
        
        self.df_item_week_min=self.df[['Item','Week']].groupby(['Item']).min().reset_index()
        self.df_appearing_items=self.df_item_week_min.groupby(['Week']).count().reset_index()

        self.df_user_week_max=self.df[['User','Week']].groupby(['User']).max().reset_index()
        self.df_disappearing_users=self.df_user_week_max.groupby(['Week']).count().reset_index()
        
        self.df_user_week_min=self.df[['User','Week']].groupby(['User']).min().reset_index()
        self.df_appearing_users=self.df_user_week_min.groupby(['Week']).count().reset_index()
        
        self.number_of_samples = len(self.df.index)
        self.idx_list = np.arange(self.number_of_samples)

        ts_min=self.df['Timestamp'].min()
        ts_max=self.df['Timestamp'].max()

        self.secs=ts_max-ts_min+1
        self.mins=int(self.secs/60)
        self.hours=int(self.mins/60)
        self.days=int(self.hours/24)
        self.weeks=int(self.days/7)
        self.months=int(self.days/30)
        self.years=int(self.days/365)

        print('self.secs ', self.secs)
        
        print('='*80)
        print(category)
        print('='*80)
        # print(self.df.head())

        print('Number of Users:', self.M)
        print('Number of Items:', self.N)
        print('Number of Samples:', self.number_of_samples)

        '''
        print('Positive Samples=',self.positive_values)
        print('Negative Samples=',self.negative_values)
        '''

        print('Days:',self.days)
        print('Weeks:',self.weeks)
        print('Months:',self.months)
        print('Years:',self.years)

    def iter(self, pair=True, batch_size=None, batch_history=10, binary=True):
        if batch_size is None:
          batch_size=self.batch_size
        
        for a in range(0,  self.number_of_samples, batch_size):

          b = min(a + batch_size, self.number_of_samples)
          
          if pair:
            data = self.df_pair.iloc[self.idx_list[a:b],:]
            
            #prev_data_idx = max(a - batch_history * batch_size, 0)
            #if prev_data_idx < a:
            #  user_id_list = set(data.iloc[:,0])
            #  
            #  df = self.df_pair.iloc[prev_data_idx:b, :]
            #  data = df[df.iloc[:, 0].isin(user_id_list)]
                
            data_np=data.to_numpy()

            yield data_np[:,0], data_np[:,1], data_np[:,2]  # users, pos_items, neg_item

          else:
            data = self.df.iloc[self.idx_list[a:b],:]
            data_np=data.to_numpy()

            if binary:
              rating = (data_np[:,2]>self.threshold).astype(int)
            else:
              rating = data_np[:,2]

            yield data_np[:,0], data_np[:,1], rating  # users, items, ratings

In [None]:
dataLoader=AmazonDataLoader(category,skiprows=skiprows)

xdi=dataLoader.df_disappearing_items['Week'].tolist()
ydi=dataLoader.df_disappearing_items['Item'].tolist()

xai=dataLoader.df_appearing_items['Week'].tolist()
yai=dataLoader.df_appearing_items['Item'].tolist()

print(f'Dataset parameter M = {dataLoader.M} users, N = {dataLoader.N} items')

print('ITEMS')
plt.plot(xdi,ydi,alpha=0.7,color='red') 
plt.plot(xai,yai,alpha=0.7,color='green') 
plt.xlabel('week')
plt.ylabel('#items')
plt.show()

xdu=dataLoader.df_disappearing_users['Week'].tolist()
ydu=dataLoader.df_disappearing_users['User'].tolist()

xau=dataLoader.df_appearing_users['Week'].tolist()
yau=dataLoader.df_appearing_users['User'].tolist()

print('USERS')
plt.plot(xdu,ydu,alpha=0.7,color='red') 
plt.plot(xau,yau,alpha=0.7,color='green')
plt.xlabel('week')
plt.ylabel('#users')
plt.show()

# Train

In [None]:
ALPHA = .005
BETA = .2
resultBean = ResultDataClass()
train_automaton(resultBean, dataLoader,device, ALPHA, BETA)

print(f'Sampling: {ENABLE_SAMPLING}, Hitrate: {ENABLE_HITRATE_STAT}')
print(f'models: {len(resultBean.model_list)}')
print(f'bpr loss items: {len(resultBean.bpr_loss_list)}')
print(f'train batch losses: {len(resultBean.train_batch_losses)}')
print(f'drift points: {len(resultBean.drift_points)}')
print(f'warning points: {len(resultBean.warning_points)}')

In [None]:
save_results(resultBean, dataLoader)
save_models(resultBean, dataLoader)

## Loss on old (blue) and new (red) items

In [None]:
plot_items(resultBean)

## Latent sizes

In [None]:
plot_latent_size(resultBean)

## Plot BPR loss windowed

Computed on a mobile window

In [None]:
plot_train_loss(resultBean)

## Plot BPR loss per batch

Row value for each batch

In [None]:
plot_train_loss_batch(resultBean)

## Test set - Plot BPR loss on windowed

Computed on a mobile window

In [None]:
plot_loss(resultBean)

## Test set - Plot BPR loss per batch

Row value for each batch

In [None]:
plot_loss_batch(resultBean)

## Test set - Hitrate

In [None]:
plot_hitrate(resultBean)

## Other plots

In [None]:
other_plots(resultBean, dataLoader)