In [1]:
from typing import List, Dict, Any

from pathlib import Path
import os, sys
import time
import datetime as dt
import itertools as it

import numpy as np
import pandas as pd
from tqdm.autonotebook import tqdm
import ray
from ray import train, tune
import ipywidgets as widgets

from recsys4daos.model_selection import cvtt_open
from recsys4daos.datasets import to_microsoft, filter_window_size
from recsys4daos.models import LightGCNCustom
from recsys4daos.utils import Timer
import recsys4daos.utils.notebooks as nbu

import recommenders
if recommenders.__version__ == '1.2.0':
    print("Ignoring warnings")
    import warnings
    warnings.simplefilter(action='ignore', category=FutureWarning)

import paths

nbu.print_versions('ray', 'tensorflow')

%load_ext autoreload
%autoreload 2

  from tqdm.autonotebook import tqdm


3.11.6 (main, Jun 24 2024, 07:45:26) [GCC 11.4.0]
recommenders: 1.2.0
ray: 2.30.0
tensorflow: 2.16.1


In [2]:
# Others config
SEED: int = 57
RAY_RESULTS_PATH: Path = '~/ray_results3.11'

# Dataset splits config
ORG_NAME = 'Decentraland'
SPLITS_FREQ = 'W-THU'  # Split weekly
LAST_FOLDS = 10  # Use just last 10 splits
SPLITS_NORMALIZE = True
LAST_FOLD_DATE_STR: str = '2023-07-13'

# Training config
MAX_EPOCHS: int = 200
EPOCHS_PER_ITER: int = 5
SAMPLES_PER_SPLIT: int = 100
MAX_TIME_TOTAL_TRAIN: int = 300
OPTIM_METRIC: str = 'map@10'

# Search space config
MAX_EMBEDDING_DIM = 1024
MIN_BATCH_SIZE = 6
MAX_BATCH_SIZE = 10 # 2**10
MIN_LR = 1e-4
# WINDOW_SIZES = ['7d', '14d', '21d', '30d', '60d', '90d', '10YE']
WINDOW_SIZES = ['21d', '30d', '60d', '90d', '10YE']
GPUS = 16

# Eval config
K_RECOMMENDATIONS: List[int] = [1,3,5,10,15,100]
METRICS: List[str] = ["recall", "ndcg", "precision", "map"]

In [3]:
# Parameters
EXECUTION_ID = "2024-09-04T10:00"
ORG_NAME = "dxDAO - xDXdao"
SPLITS_FREQ = "W-THU"
LAST_FOLDS = 10
SPLITS_NORMALIZE = True
LAST_FOLD_DATE_STR = "2022-05-05"


In [4]:
RAY_RESULTS_PATH = Path(RAY_RESULTS_PATH).expanduser()

## Obtain dataset

In [5]:
!pwd

/home/daviddavo/recsys4daos/notebooks


In [6]:
dfp = paths.load_proposals(ORG_NAME)
dfv = paths.load_votes(ORG_NAME)

print(dfp.info())
print(dfv.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2356 entries, 0 to 2355
Data columns (total 6 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   id                 2356 non-null   object        
 1   author             2356 non-null   object        
 2   date               2356 non-null   datetime64[us]
 3   start              2356 non-null   datetime64[us]
 4   end                2323 non-null   datetime64[us]
 5   platform_proposal  2356 non-null   object        
dtypes: datetime64[us](3), object(3)
memory usage: 110.6+ KB
None
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8479 entries, 0 to 8478
Data columns (total 4 columns):
 #   Column    Non-Null Count  Dtype         
---  ------    --------------  -----         
 0   id        8479 non-null   object        
 1   proposal  8479 non-null   object        
 2   voter     8479 non-null   object        
 3   date      8479 non-null   datetime64[us]
dtypes

## Split data

In [7]:
df = to_microsoft(dfv)
df

Unnamed: 0,userID,itemID,timestamp,rating
0,0x166cea845ade3f3b40ea68045d699dee5a645905,82f205b3-1f75-5da5-973a-fc2e324d8719,2019-07-14 19:50:09,1
1,0x166cea845ade3f3b40ea68045d699dee5a645905,b1f193e6-cf73-5a7a-85f3-a84d95c08b97,2019-07-14 19:51:24,1
2,0xe858a4bf603995a9156edbd25ff06269d997839e,b1f193e6-cf73-5a7a-85f3-a84d95c08b97,2019-07-14 22:24:38,1
3,0xe858a4bf603995a9156edbd25ff06269d997839e,398d685b-8c4f-5b7d-a7d6-86f1fc78efa4,2019-07-14 22:29:44,1
4,0xe858a4bf603995a9156edbd25ff06269d997839e,cb124ba8-a044-5790-8da9-43129a92c1f5,2019-07-14 22:31:43,1
...,...,...,...,...
8474,0x583acc79585d3cb195ea8125f6f80ad459b46313,783c82b4-a78e-58ee-972e-08a8d5677735,2023-04-08 07:38:59,1
8475,0x7e72cfd9a36517435dc1ca7f9451eccbc973111e,783c82b4-a78e-58ee-972e-08a8d5677735,2023-04-08 13:45:11,1
8476,0x3111327edd38890c3fe564afd96b4c73e8101747,783c82b4-a78e-58ee-972e-08a8d5677735,2023-04-08 18:13:23,1
8477,0x759a2169da1b826f795a00a9ab5f29f9ca39e48a,783c82b4-a78e-58ee-972e-08a8d5677735,2023-04-09 08:40:35,1


In [8]:
# USE INTEGER INDEX
# folds_dict = list(cvtt_open(df, SPLITS_FREQ, dfp, remove_not_in_train_col='userID'))
# use_folds_idx = range(len(folds_dict))[-LAST_FOLDS:]

# USE TIMESTAMP INDEX
# Note: NO need to used OrderedDict, dict is ordered since Python 3.6
folds_dict = { f.end.isoformat():f for f in cvtt_open(df, SPLITS_FREQ, dfp, remove_not_in_train_col='userID', last_fold=LAST_FOLD_DATE_STR) }
use_folds_idx = list(folds_dict.keys())[-LAST_FOLDS:]

print(len(folds_dict), "folds")
print("Using", len(use_folds_idx), "folds, from", use_folds_idx[0], "to", use_folds_idx[-1])

147 folds
Using 10 folds, from 2022-03-03T00:00:00 to 2022-05-05T00:00:00


## Making some checks

### Checking that all the folds have open proposals

In [9]:
_nok_folds = []
for idx in use_folds_idx:
    if len(folds_dict[idx].open_proposals) == 0:
        _nok_folds.append(idx)

if _nok_folds:
    raise ValueError(f"Folds {', '.join(_nok_folds)} don't have any open proposals")

### Checking correct hparams

In [10]:
now = dt.datetime.now()
max_bs = 2**MAX_BATCH_SIZE

all_ok = True
for idx in use_folds_idx:
    valid_ws = WINDOW_SIZES[0]
    for ws in sorted(WINDOW_SIZES, key=lambda ws: now + pd.tseries.frequencies.to_offset(ws)):
        nusers = filter_window_size(folds_dict[idx].train, folds_dict[idx].end, ws)['userID'].nunique()
        if nusers > max_bs:
            print(f'On fold {idx} WINDOW_SIZE should be at least {ws}: {nusers} > {max_bs}', file=sys.stderr)
            break

print("All folds ok!")

All folds ok!


## Testing model

In [11]:
from recommenders.models.deeprec.DataModel.ImplicitCF import ImplicitCF
from recommenders.models.deeprec.deeprec_utils import prepare_hparams

In [12]:
# hparams = prepare_hparams(
#     model_type='lightgcn',
#     n_layers=3,
#     batch_size=512,
#     embed_size=64,
#     epochs=2,
#     learning_rate=0.001,
#     decay=0.001,
#     metrics=["recall", "ndcg", "precision", "map"],
#     eval_epoch=2,
#     top_k=K_RECOMMENDATIONS[0],
#     save_model=False,
#     MODEL_DIR='./data/model/lightgcn/',
# )
# dataloader = ImplicitCF(train=folds_dict[use_folds_idx[0]].train, test=folds_dict[use_folds_idx[0]].test, seed=SEED)
# print("items:", dataloader.n_items, "user:", dataloader.n_users)
# model = LightGCNCustom(data=dataloader, hparams=hparams)

In [13]:
# model.fit()
# model.run_eval()

In [14]:
# model.recommend_k_items(
#     dataloader.test, 
#     top_k=3, 
#     use_id=True, 
#     remove_seen=True, 
#     recommend_from=folds_dict[use_folds_idx[0]].open_proposals
# )

## Defining trainable

In [15]:
from recsys4daos.evaluation import calculate_all_metrics

In [16]:
class TrainLightGCN(tune.Trainable):
    def setup(
        self,
        config: Dict[str, Any],
        data,
    ):
        self.config = config
        train, test, self.t, self.open_proposals = data
        train_filtered = filter_window_size(train, self.t, config['window_size'])
        self.dataloader = ImplicitCF(train=train_filtered, test=test, seed=SEED)
        # Some experiments will run multiple times, but that's a price to pay for
        # usability
        self.real_batch_size = min(2**config['batch_size'], self.dataloader.n_users_in_train)

        self.hparams = prepare_hparams(
            model_type='lightgcn',
            n_layers=config['conv_layers'],
            batch_size=self.real_batch_size,
            embed_size=config['embedding_dim'],
            epochs=EPOCHS_PER_ITER,
            learning_rate=config['learning_rate'],
            decay=config['l2'],
            metrics=METRICS,
            eval_epoch=-1,
            top_k=K_RECOMMENDATIONS[0],
            save_model=False,
            MODEL_DIR='./data/model/lightgcn/',
        )
        self.model = LightGCNCustom(self.hparams, self.dataloader, seed=SEED)
        self.total_train = 0
        self.total_eval = 0

    @property
    def iteration(self):
        return self.model.epochs_done

    @property
    def training_iteration(self):
        return self.model.epochs_done

    def step(self):
        """
        As a rule of thumb, the execution time of step should be large enough to avoid overheads 
        (i.e. more than a few seconds), but short enough to report progress periodically 
        (i.e. at most a few minutes).
        """
        assert EPOCHS_PER_ITER > 0

        with Timer() as t_train:
            for _ in range(EPOCHS_PER_ITER):
                ret = self.model.fit_epoch()


        with Timer() as t_rec:
            recs = self.model.recommend_k_items(
                self.dataloader.test, # Used only to get user ids
                top_k=max(K_RECOMMENDATIONS),
                use_id=True,
                remove_seen=True,
                recommend_from=self.open_proposals,
            )
        
        eval_dict = {'model_'+k:v for k,v in zip(self.model.metrics, self.model.run_eval())}
        eval_dict |= calculate_all_metrics(self.dataloader.test, recs, K_RECOMMENDATIONS)

        self.total_train += t_train.time
        self.total_eval += eval_dict['time_eval']
        
        return {
            'real_batch_size': self.real_batch_size,
            'iteration': self.iteration,
            'loss': ret[0],
            'mf_loss': ret[1],
            'emb_loss': ret[2],
            **eval_dict,
            'time_train': t_train.time,
            'time_rec': t_rec.time,
            'time_total_train': self.total_train,
            'time_total_test': self.total_eval,
        }

    def save_checkpoint(self, checkpoint_dir):
        checkpoint_path = os.path.join(checkpoint_dir, "model")
        self.model.saver.save(
            sess=self.model.sess,
            save_path=checkpoint_path,
        )
        return checkpoint_dir

    def load_checkpoint(self, checkpoint_path):
        self.model.load(checkpoint_path)

## Big experiment

In [17]:
RAY_RESULTS_PATH

PosixPath('/home/daviddavo/ray_results3.11')

In [18]:
print(os.uname().nodename)

### SET TRAINING RESOURCES
if os.uname().nodename == 'lamarck':
    # assert torch.cuda.is_available()

    NUM_SAMPLES = SAMPLES_PER_SPLIT
    # Every run takes approx half a gig of vram (no optimizations)
    # The RTX 4090 has 24GB so we can run the model about 48 times
    resources_per_trial={
        'cpu': 1,
        'gpu': 1 / GPUS,
    }
else:
    NUM_SAMPLES = 1
    resources_per_trial={
        'cpu': 1,
        # It takes about 1.5 GiB with full training data, but I put a bit more because
        # this notebook also takes a bit of memory
        'memory': 2e9,
    }
print(resources_per_trial)

lamarck
{'cpu': 1, 'gpu': 0.0625}


In [19]:
from ray.tune.search.hyperopt import HyperOptSearch

In [20]:
def getTunerOnFold(f_idx, points_to_evaluate = None):    
    name = paths.lightgcn_ray_tune_fname(ORG_NAME, SPLITS_FREQ, SPLITS_NORMALIZE, OPTIM_METRIC, fold=f_idx)
    experiments = list(RAY_RESULTS_PATH.glob(f'{name}_*'))
    last_experiment = max(experiments, key=lambda x: x.stat().st_ctime) if experiments else None
    f = folds_dict[f_idx]

    dftrain,dftest,t,open_proposals = folds_dict[f_idx]
    param_space = dict(
        batch_size=tune.randint(MIN_BATCH_SIZE, MAX_BATCH_SIZE+1), # 64 - 2**MAX_BATCH_SIZE
        embedding_dim=tune.lograndint(1, MAX_EMBEDDING_DIM, base=2),
        conv_layers=tune.randint(1,5),
        learning_rate=tune.qloguniform(MIN_LR, 1, 1e-4),
        l2=tune.loguniform(1e-7, 1e-2, 1e-7),
        window_size=tune.choice(WINDOW_SIZES),
        # Just so it appears on the output
        fold=f_idx,
    )
    
    ### RESTORE EXPERIMENT OR CREATE A NEW ONE
    if last_experiment and tune.Tuner.can_restore(last_experiment):
        print(f"Restoring last experiment: {last_experiment}")
        tuner = tune.Tuner.restore(
            str(last_experiment),
            trainable=tune.with_resources(
                # tune.with_parameters(TrainLightGCN,  train=dftrain, test=dftest, open_proposals=open_proposals),
                tune.with_parameters(TrainLightGCN, data=f),
                resources_per_trial,
            ),
            restart_errored=True,
            param_space=param_space,
        )
    else:
        print(f"No experiment found for fold {f_idx}, creating new tuner with {NUM_SAMPLES} samples")
        search_alg = None
        
        search_alg = HyperOptSearch(
            points_to_evaluate = points_to_evaluate,
            random_state_seed=SEED,
        )
        # search_alg = tune.search.Repeater(search_alg, N_SPLITS-SKIP_SPLIT)
        
        tuner = tune.Tuner(
            tune.with_resources(
                # tune.with_parameters(TrainLightGCN,  train=dftrain, test=dftest, open_proposals=open_proposals),
                tune.with_parameters(TrainLightGCN, data=folds_dict[f_idx]),
                resources_per_trial,
            ),
            run_config=train.RunConfig(
                stop={'training_iteration': MAX_EPOCHS/EPOCHS_PER_ITER, 'time_total_train': MAX_TIME_TOTAL_TRAIN},
                name=name + f'_{dt.datetime.now().isoformat()}',
                storage_path=RAY_RESULTS_PATH,
                # failure_config=train.FailureConfig(fail_fast='raise'),
                failure_config=train.FailureConfig(max_failures=3),
            ),
            param_space=param_space,
            tune_config=tune.TuneConfig(
                search_alg=search_alg,
                num_samples=NUM_SAMPLES,
                metric=OPTIM_METRIC,
                mode='max',
            )
        )

    return tuner

In [21]:
# We need to display the progress bar in another cell because ray tune "overwrites" the previous output
pbar = tqdm(total=len(use_folds_idx), desc='fold')
out = widgets.Output(layout={'border': '1px solid black'})
with out:
    print("In this cell important output from the next cell will be shown")
out

fold:   0%|          | 0/10 [00:00<?, ?it/s]

Output(layout=Layout(border_bottom='1px solid black', border_left='1px solid black', border_right='1px solid b…

In [22]:
import logging
import requests

def findConfig(rg):
    for r in rg:
        if r.config:
            lbrc = last_best_result.config
            if all((r.config[k] == v for k, v in last_best_result.config.items() if k != 'fold')):
                return r
            elif all((r.config[k] == v for k, v in last_best_result.config.items() if k != 'fold' and k != 'window_size')):
                print("Possible coincidence:", r.config, file=sys.stderr)

    return None

tuners = []
results = []
last_best_result = None
pbar.reset()

last_best_fold = None
requests.post("https://ntfy.sh/grasia_notebooks", data=f"Start running microsoft_tuning for {ORG_NAME}")
for i, (prev_f_idx, f_idx) in enumerate(zip(it.chain([None], use_folds_idx), use_folds_idx)):
    try:
        with out:
            best_prev_config = None
            if last_best_result is not None:
                best_prev_config = last_best_result.config.copy()
                assert best_prev_config['fold'] == prev_f_idx
                best_prev_config['fold'] = f_idx
                print(f"Also evaluating best_prev_config ({OPTIM_METRIC}={last_best_result.metrics[OPTIM_METRIC]}): {best_prev_config}")
                best_prev_config = [best_prev_config]
        
        t = getTunerOnFold(f_idx, best_prev_config)
        tuners.append(t)
    
        rg = t.fit()
        
        # FIXME: load results from disk until ray-project/ray#47358 is solved
        # https://github.com/ray-project/ray/issues/47358
        t = getTunerOnFold(f_idx, best_prev_config)
        tuners[-1] = t
        rg = t.fit()
    
        assert rg.num_errors == 0, f"There are {rg.num_errors} errors"
        assert rg.num_terminated >= NUM_SAMPLES, f'Some samples are not terminated ({rg.num_terminated} != {NUM_SAMPLES})'
        assert len(rg.get_dataframe()) >= NUM_SAMPLES
        results.append(rg)
    
        # Assert that the prev config has been tried
        if last_best_result is not None:
            # if not any( 
            #     all((r.config[k] == v for k, v in last_best_result.config.items() if k != 'fold'))
            #     for r in rg if r.config
            # ):
            if not findConfig(rg):
                print("Best config:", last_best_result.config)
                assert False, f"The best config from previous fold has not been tested in fold {f_idx}"    
            else:
                logging.info(f'Fold {f_idx}. Best prev result was {last_best_result.path} and config has been found {findConfig(rg).path}')
        
        last_best_result = rg.get_best_result()
        pbar.update()
    
        print(f"Finished training for fold {f_idx}")
        requests.post("https://ntfy.sh/grasia_notebooks", data=f"Finished running fold {i} for {ORG_NAME}")
    except Exception as e:
        requests.post("https://ntfy.sh/grasia_notebooks", data=f"Error on fold {i} for {ORG_NAME}: {repr(e)}")
        raise

pbar.close()

0,1
Current time:,2024-09-05 09:44:08
Running for:,00:00:00.36
Memory:,8.3/125.6 GiB

Trial name,status,loc,batch_size,conv_layers,embedding_dim,fold,l2,learning_rate,window_size,iter,total time (s),real_batch_size,iteration,loss
TrainLightGCN_45381747,TERMINATED,147.96.81.131:1838178,9,4,10,2022-05-05T00:00:00,3.90554e-07,0.0272,90d,40,12.635,50,200,0.00041831
TrainLightGCN_c0de5e8b,TERMINATED,147.96.81.131:1838037,7,3,6,2022-05-05T00:00:00,5.86946e-06,0.1128,21d,40,7.73446,19,200,0.000467806
TrainLightGCN_674cfb42,TERMINATED,147.96.81.131:1837591,10,4,51,2022-05-05T00:00:00,1.21242e-05,0.0087,10YE,40,16.6846,179,200,0.00132488
TrainLightGCN_b1340942,TERMINATED,147.96.81.131:1837887,10,4,2,2022-05-05T00:00:00,1.48681e-06,0.2269,21d,40,7.86351,19,200,0.152558
TrainLightGCN_c910d818,TERMINATED,147.96.81.131:1837738,8,2,3,2022-05-05T00:00:00,4.87166e-07,0.9938,90d,40,10.9523,50,200,0.92978
TrainLightGCN_21e8bc1d,TERMINATED,147.96.81.131:1837434,9,3,11,2022-05-05T00:00:00,2.23274e-05,0.7617,30d,40,8.78764,22,200,0.210296
TrainLightGCN_d9b2f05d,TERMINATED,147.96.81.131:1837286,10,4,33,2022-05-05T00:00:00,0.000541489,0.0024,21d,40,7.36529,19,200,0.0313611
TrainLightGCN_ef2c5685,TERMINATED,147.96.81.131:1837137,8,1,14,2022-05-05T00:00:00,0.000212626,0.3236,60d,40,9.79501,49,200,0.397369
TrainLightGCN_59ebe516,TERMINATED,147.96.81.131:1836845,10,4,28,2022-05-05T00:00:00,8.64483e-06,0.0019,90d,40,11.2212,50,200,0.0256353
TrainLightGCN_ac982dd1,TERMINATED,147.96.81.131:1836995,9,4,39,2022-05-05T00:00:00,2.26346e-07,0.0003,21d,40,7.8011,19,200,0.20996


2024-09-05 09:44:08,333	INFO tune_controller.py:444 -- Restoring the run from the latest experiment state file: experiment_state-2024-09-05_09-44-07.json


2024-09-05 09:44:08,926	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/home/daviddavo/ray_results3.11/dxDAO - xDXdao/LightGCN_W-THU_normalize_map@10_fold=2022-05-05T00:00:00_2024-07-19T10:52:09.209601' in 0.3583s.


2024-09-05 09:44:08,939	INFO tune.py:1041 -- Total run time: 0.61 seconds (0.00 seconds for the tuning loop).


Finished training for fold 2022-05-05T00:00:00


In [23]:
requests.post("https://ntfy.sh/grasia_notebooks", data=f"Finished running everything for {ORG_NAME}")
print("Finished!")

Finished!
