In [1]:
import argparse
import datetime
import glob
import hashlib
import json
import multiprocessing
import pickle
import os
import shutil
import subprocess
import uuid

import numpy as np
import pandas as pd
import lightgbm as lgb
from collections import defaultdict
from lightgbm.sklearn import LGBMRanker
from tqdm import tqdm

from pyserini.analysis import Analyzer, get_lucene_analyzer
from pyserini.ltr import *
from pyserini.search import get_topics_with_reader

def train_data_loader(neg_sample=10, random_seed=12345):
    if os.path.exists(f'train_sampled_with_{neg_sample}_{random_seed}.pickle'):
        sampled_train = pd.read_pickle(f'train_sampled_with_{neg_sample}_{random_seed}.pickle')
        print(sampled_train.shape)
        print(sampled_train.index.get_level_values('qid').drop_duplicates().shape)
        print(sampled_train.groupby('qid').count().mean())
        print(sampled_train.head(10))
        print(sampled_train.info())
        return sampled_train
    else:
        train = pd.read_csv('collections/msmarco-passage/qidpidtriples.train.full.tsv', sep="\t",
                            names=['qid', 'pos_pid', 'neg_pid'], dtype=np.int32)
        pos_half = train[['qid', 'pos_pid']].rename(columns={"pos_pid": "pid"}).drop_duplicates()
        pos_half['rel'] = np.int32(1)
        neg_half = train[['qid', 'neg_pid']].rename(columns={"neg_pid": "pid"}).drop_duplicates()
        neg_half['rel'] = np.int32(0)
        del train
        sampled_neg_half = []
        for qid, group in tqdm(neg_half.groupby('qid')):
            sampled_neg_half.append(group.sample(n=min(neg_sample, len(group)), random_state=random_seed))
        sampled_train = pd.concat([pos_half] + sampled_neg_half, axis=0, ignore_index=True)
        sampled_train = sampled_train.sort_values(['qid','pid']).set_index(['qid','pid'])
        print(sampled_train.shape)
        print(sampled_train.index.get_level_values('qid').drop_duplicates().shape)
        print(sampled_train.groupby('qid').count().mean())
        print(sampled_train.head(10))
        print(sampled_train.info())

        sampled_train.to_pickle(f'train_sampled_with_{neg_sample}_{random_seed}.pickle')
        return sampled_train
sampled_train = train_data_loader()

(3606272, 1)
(327721,)
rel    11.004092
dtype: float64
             rel
qid pid         
91  793527     1
    1156624    0
    1378443    0
    1480965    0
    1662964    0
    1779082    0
    2477520    0
    5668069    0
    6813163    0
    6814345    0
<class 'pandas.core.frame.DataFrame'>
MultiIndex: 3606272 entries, (91, 793527) to (1185869, 8252591)
Data columns (total 1 columns):
 #   Column  Dtype
---  ------  -----
 0   rel     int32
dtypes: int32(1)
memory usage: 152.7 MB
None


In [2]:
def dev_data_loader(task='rerank'):
    if os.path.exists(f'dev_{task}.pickle'):
        dev = pd.read_pickle(f'dev_{task}.pickle')
        print(dev.shape)
        print(dev.index.get_level_values('qid').drop_duplicates().shape)
        print(dev.groupby('qid').count().mean())
        print(dev.head(10))
        print(dev.info())
        return dev
    else:
        if task == 'rerank':
            dev = pd.read_csv('collections/msmarco-passage/top1000.dev', sep="\t",
                              names=['qid', 'pid', 'query', 'doc'], usecols=['qid', 'pid'], dtype=np.int32)
        elif task == 'anserini':
            dev = pd.read_csv('runs/msmarco-passage/run.msmarco-passage.dev.small.tsv',sep="\t",
                            names=['qid','pid','rank'], dtype=np.int32)
        else:
            raise Exception('unknown parameters')
        dev_qrel = pd.read_csv('collections/msmarco-passage/qrels.dev.small.tsv', sep="\t",
                               names=["qid", "q0", "pid", "rel"], usecols=['qid', 'pid', 'rel'], dtype=np.int32)
        dev = dev.merge(dev_qrel, left_on=['qid', 'pid'], right_on=['qid', 'pid'], how='left')
        dev['rel'] = dev['rel'].fillna(0).astype(np.int32)
        dev = dev.sort_values(['qid','pid']).set_index(['qid','pid'])
        
        print(dev.shape)
        print(dev.index.get_level_values('qid').drop_duplicates().shape)
        print(dev.groupby('qid').count().mean())
        print(dev.head(10))
        print(dev.info())

        dev.to_pickle(f'dev_{task}.pickle')
        return dev
dev = dev_data_loader()

(6974598, 2)
(6980,)
rank    999.226074
rel     999.226074
dtype: float64
            rank  rel
qid pid              
2   55860    345    0
    72202    557    0
    72210    213    0
    98589    278    0
    98590    323    0
    98593    580    0
    98595    553    0
    112123   108    0
    112126   469    0
    112127    21    0
<class 'pandas.core.frame.DataFrame'>
MultiIndex: 6974598 entries, (2, 55860) to (1102400, 8830447)
Data columns (total 2 columns):
 #   Column  Dtype
---  ------  -----
 0   rank    int32
 1   rel     int32
dtypes: int32(2)
memory usage: 282.7 MB
None


In [3]:
def query_loader(choice='default'):
    if os.path.exists(f'query_{choice}_tokenized.pickle'):
        return pickle.load(open(f'query_{choice}_tokenized.pickle','rb'))
    else:
        if choice == 'default':
            analyzer = Analyzer(get_lucene_analyzer())
            queries = get_topics_with_reader('io.anserini.search.topicreader.TsvIntTopicReader', \
                                             'collections/msmarco-passage/queries.train.tsv')
            queries.update(get_topics_with_reader('io.anserini.search.topicreader.TsvIntTopicReader', \
                                                  'collections/msmarco-passage/queries.dev.tsv'))
            for qid,value in queries.items():
                assert 'tokenized' not in value
                value['tokenized'] = analyzer.analyze(value['title'])
        else:
            raise Exception('unknown parameters')

        pickle.dump(queries,open(f'query_{choice}_tokenized.pickle','wb'))

        return queries
queries = query_loader()

In [4]:
fe = FeatureExtractor('indexes/msmarco-passage/lucene-index-msmarco/',max(multiprocessing.cpu_count()//2,1))
fe.add(BM25(k1=0.9,b=0.4))
fe.add(BM25(k1=1.2,b=0.75))
fe.add(BM25(k1=2.0,b=0.75))
fe.add(LMDir(mu=1000))
fe.add(LMDir(mu=1500))
fe.add(LMDir(mu=2500))
fe.add(LMJM(0.1))
fe.add(LMJM(0.4))
fe.add(LMJM(0.7))
fe.add(DFR_GL2())
fe.add(DFR_In_expB2())
fe.add(DocSize())
fe.add(QueryLength())
fe.add(UniqueTermCount())
fe.add(MatchingTermCount())
fe.add(SCS())
fe.add(tfStat(AvgPooler()))
fe.add(tfStat(SumPooler()))
fe.add(tfStat(MinPooler()))
fe.add(tfStat(MaxPooler()))
fe.add(tfStat(VarPooler()))
fe.add(tfIdfStat(AvgPooler()))
fe.add(tfIdfStat(SumPooler()))
fe.add(tfIdfStat(MinPooler()))
fe.add(tfIdfStat(MaxPooler()))
fe.add(tfIdfStat(VarPooler()))
fe.add(normalizedTfStat(AvgPooler()))
fe.add(normalizedTfStat(SumPooler()))
fe.add(normalizedTfStat(MinPooler()))
fe.add(normalizedTfStat(MaxPooler()))
fe.add(normalizedTfStat(VarPooler()))
fe.add(idfStat(AvgPooler()))
fe.add(idfStat(SumPooler()))
fe.add(idfStat(MinPooler()))
fe.add(idfStat(MaxPooler()))
fe.add(idfStat(VarPooler()))
fe.add(ictfStat(AvgPooler()))
fe.add(ictfStat(SumPooler()))
fe.add(ictfStat(MinPooler()))
fe.add(ictfStat(MaxPooler()))
fe.add(ictfStat(VarPooler()))
fe.add(scqStat(AvgPooler()))
fe.add(scqStat(SumPooler()))
fe.add(scqStat(MinPooler()))
fe.add(scqStat(MaxPooler()))
fe.add(scqStat(VarPooler()))
fe.add(UnorderedSequentialPairs(3))
fe.add(UnorderedSequentialPairs(8))
fe.add(UnorderedSequentialPairs(15))
fe.add(OrderedSequentialPairs(3))
fe.add(OrderedSequentialPairs(8))
fe.add(OrderedSequentialPairs(15))
fe.add(UnorderedQueryPairs(3))
fe.add(UnorderedQueryPairs(8))
fe.add(UnorderedQueryPairs(15))
fe.add(OrderedQueryPairs(3))
fe.add(OrderedQueryPairs(8))
fe.add(OrderedQueryPairs(15))

In [5]:
def extract(df, queries, fe):
    df_pieces = []
    fetch_later = []
    qidpid2rel = defaultdict(dict)
    need_rows = 0
    for qid,group in tqdm(df.groupby('qid')):
        for t in group.reset_index().itertuples():
            assert t.pid not in qidpid2rel[t.qid]
            qidpid2rel[t.qid][t.pid] = t.rel
            need_rows += 1
        fe.lazy_extract(str(qid),queries[qid]['tokenized'],list(qidpid2rel[t.qid].keys()))
        fetch_later.append(str(qid))
        if len(fetch_later) == 10000:
            info = np.zeros(shape=(need_rows,3), dtype=np.int32)
            feature = np.zeros(shape=(need_rows,len(fe.feature_names())), dtype=np.float32)
            idx = 0
            for qid in fetch_later:
                for doc in fe.get_result(qid):
                    info[idx,0] = int(qid)
                    info[idx,1] = int(doc['pid'])
                    info[idx,2] = qidpid2rel[int(qid)][int(doc['pid'])]
                    feature[idx,:] = doc['features']
                    idx += 1
            info = pd.DataFrame(info, columns=['qid','pid','rel'])
            feature = pd.DataFrame(feature, columns=fe.feature_names())
            df_pieces.append(pd.concat([info,feature], axis=1))
            fetch_later = []
            need_rows = 0
    #deal with rest
    if len(fetch_later) > 0:
        info = np.zeros(shape=(need_rows,3), dtype=np.int32)
        feature = np.zeros(shape=(need_rows,len(fe.feature_names())), dtype=np.float32)
        idx = 0
        for qid in fetch_later:
            for doc in fe.get_result(qid):
                info[idx,0] = int(qid)
                info[idx,1] = int(doc['pid'])
                info[idx,2] = qidpid2rel[int(qid)][int(doc['pid'])]
                feature[idx,:] = doc['features']
                idx += 1
        info = pd.DataFrame(info, columns=['qid','pid','rel'])
        feature = pd.DataFrame(feature, columns=fe.feature_names())
        df_pieces.append(pd.concat([info,feature], axis=1))
    data = pd.concat(df_pieces, axis=0, ignore_index=True)
    data = data.sort_values(by='qid', kind='mergesort')
    group = data.groupby('qid').agg(count=('pid', 'count'))['count']
    return data,group

In [6]:
def hash_df(df):
    h = pd.util.hash_pandas_object(df)
    return hex(h.sum().astype(np.uint64))


def hash_anserini_jar():
    find = glob.glob(os.environ['ANSERINI_CLASSPATH'] + "/*fatjar.jar")
    assert len(find) == 1
    md5Hash = hashlib.md5(open(find[0], 'rb').read())
    return md5Hash.hexdigest()


def hash_fe(fe):
    return hashlib.md5(','.join(sorted(fe.feature_names())).encode()).hexdigest()


def data_loader(task, df, queries, fe):
    df_hash = hash_df(df)
    jar_hash = hash_anserini_jar()
    fe_hash = hash_fe(fe)
    if os.path.exists(f'{task}_{df_hash}_{jar_hash}_{fe_hash}.pickle'):
        res = pickle.load(open(f'{task}_{df_hash}_{jar_hash}_{fe_hash}.pickle','rb'))
        print(res['data'].shape)
        print(res['data'].qid.drop_duplicates().shape)
        print(res['group'].mean())
        print(res['data'].head(10))
        print(res['data'].info())
        return res
    else:
        if task == 'train' or task == 'dev': 
            data,group = extract(df, queries, fe)
            obj = {'data':data,'group':group,'df_hash':df_hash,'jar_hash':jar_hash,'fe_hash':fe_hash}
            print(data.shape)
            print(data.qid.drop_duplicates().shape)
            print(group.mean())
            print(data.head(10))
            print(data.info())
            pickle.dump(obj,open(f'{task}_{df_hash}_{jar_hash}_{fe_hash}.pickle','wb'))
            return obj
        else:
            raise Exception('unknown parameters')

In [7]:
train_extracted = data_loader('train', sampled_train, queries, fe)
dev_extracted = data_loader('dev', dev, queries, fe)
del sampled_train, dev

(3606272, 61)
(327721,)
11.004091895240158
   qid      pid  rel  BM25_k1_0.90_b_0.40  BM25_k1_1.20_b_0.75  \
0   91   793527    1            36.255280            37.489307   
1   91  1156624    0            19.361340            21.753880   
2   91  1378443    0            17.880190            19.618870   
3   91  1480965    0            17.356880            17.810566   
4   91  1662964    0            20.182610            21.573776   
5   91  1779082    0            20.570230            22.203360   
6   91  2477520    0            19.199781            20.434164   
7   91  5668069    0            27.566326            29.139702   
8   91  6813163    0            22.318399            22.719379   
9   91  6814345    0            16.739628            17.976086   

   BM25_k1_2.00_b_0.75  LMD_mu_1000  LMD_mu_1500  LMD_mu_2500  \
0            42.093887   -34.407925   -36.330475   -38.600681   
1            24.925566   -18.220840   -19.241903   -20.438829   
2            22.600361   -17.302290

In [8]:
def train(train_extracted, dev_extracted, feature_name):
    train_X = train_extracted['data'].loc[:, feature_name]
    train_Y = train_extracted['data']['rel']
    dev_X = dev_extracted['data'].loc[:, feature_name]
    dev_Y = dev_extracted['data']['rel']
    lgb_train = lgb.Dataset(train_X,label=train_Y,group=train_extracted['group'])
    lgb_valid = lgb.Dataset(dev_X,label=dev_Y,group=dev_extracted['group'])
    
    params = {
        'boosting_type': 'gbdt',
        'objective': 'lambdarank',
        'max_bin':255,
        'num_leaves':63,
        'max_depth':10,
        'min_data_in_leaf':50,
        'min_sum_hessian_in_leaf':0,
        'bagging_fraction':0.9,
        'bagging_freq':1,
        'feature_fraction':1,
        'learning_rate':0.05,
        'num_boost_round':2000,
        'early_stopping_round':200,
        'metric':['map'],
        'eval_at':[10],
        'label_gain':[0,1],
        'lambdarank_truncation_level':20,
        'seed':12345,
        'num_threads':max(multiprocessing.cpu_count()//2,1)
    }
    num_boost_round = params.pop('num_boost_round')
    early_stopping_round = params.pop('early_stopping_round')
    eval_results={}
    gbm = lgb.train(params, lgb_train, 
                    valid_sets=lgb_valid,
                    num_boost_round=num_boost_round,
                    early_stopping_rounds =early_stopping_round,
                    feature_name=feature_name,
                    evals_result=eval_results,
                    verbose_eval=False)
    dev_extracted['data']['score']=gbm.predict(dev_X)
    best_score = gbm.best_score['valid_0']['map@10']
    print(best_score)
    best_iteration = gbm.best_iteration
    print(best_iteration)
    eval_map = eval_results['valid_0']['map@10']
    print(eval_map)
    feature_importances = sorted(list(zip(feature_name,gbm.feature_importance().tolist())),key=lambda x:x[1],reverse=True)
    print(feature_importances)
    return {'model':gbm,'best_score':best_score,'best_iteration':best_iteration,
            'eval_map':eval_map,'feature_importances':feature_importances}

In [9]:
def eval_output(dev_data):
    with open('lambdarank.run', 'w') as f:
        score_tie_counter = 0
        score_tie_query = set()
        dev_data = dev_data.set_index('qid')
        for qid, group in dev_data.groupby('qid'):
            group = group.reset_index()
            rank = 1
            prev_score = -1e10
            prev_pid = ''
            assert len(group['pid'].tolist()) == len(set(group['pid'].tolist()))
            # stable sort is also used in LightGBM
            for t in group.sort_values('score', ascending=False, kind='mergesort').itertuples():
                if abs(t.score - prev_score) < 1e-8:
                    score_tie_counter += 1
                    score_tie_query.add(qid)
                assert prev_pid != t.pid
                prev_score = t.score
                prev_pid = t.pid
                f.write(f'{t.qid}\t{t.pid}\t{rank}\n')
                rank += 1
        score_tie = f'score_tie occurs {score_tie_counter} times in {len(score_tie_query)} queries'
        print(score_tie)

    with open('lambdarank.run.trec', 'w') as f:
        for qid, group in dev_data.groupby('qid'):
            group = group.reset_index()
            rank = 1
            assert len(group['pid'].tolist()) == len(set(group['pid'].tolist()))
            # stable sort is also used in LightGBM
            for t in group.sort_values('score', ascending=False, kind='mergesort').itertuples():
                new_score = t.score - rank * 1e-8
                f.write(f'{t.qid}\tQ0\t{t.pid}\t{rank}\t{new_score:.6f}\tlambdarank\n')
                rank += 1

    mrr_10 = subprocess.check_output(
        ["python3", "tools/scripts/msmarco/msmarco_eval.py", "collections/msmarco-passage/qrels.dev.small.tsv",
         "lambdarank.run"]).decode()
    print(mrr_10)
    map_recall = subprocess.check_output(
        ["tools/eval/trec_eval.9.0.4/trec_eval", "-mmap", "-mrecall", "collections/msmarco-passage/qrels.dev.small.tsv",
         "lambdarank.run.trec"]).decode()
    print(map_recall)
    return {'score_tie': score_tie, 'mrr_10': mrr_10, 'map_recall': map_recall}

In [10]:
train_res = train(train_extracted, dev_extracted, fe.feature_names())
eval_res = eval_output(dev_extracted['data'])

[LightGBM] [Info] Total groups: 327721, total data: 3606272
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 9338
[LightGBM] [Info] Number of data points in the train set: 3606272, number of used features: 52
[LightGBM] [Info] Total groups: 6980, total data: 6974598
0.34251384340746804
932
[0.2592602210018041, 0.29473700842164313, 0.30316847739573394, 0.31213955500219825, 0.3140317621397493, 0.31589908714998255, 0.3166790177150134, 0.3175233329164203, 0.3187067500113704, 0.31855325021603675, 0.31942203063173685, 0.32047480234532527, 0.32100671703734024, 0.321279889821258, 0.321331539659799, 0.3221031319643426, 0.32250247778990615, 0.3229136677354801, 0.322880352594715, 0.3229192676354209, 0.32268893266475646, 0.32296557340701326, 0.323408789853095, 0.32333728926941674, 0.3233323431649005, 0.3236878562711299, 0.324405960529707, 0.3240738324540259, 0.3243983802929003, 0.32477201016509755, 0.3253462225784933, 0.3251444177241097, 0.3251081178651021, 0.

score_tie occurs 1731602 times in 6979 queries
#####################
MRR @10: 0.20875250147814595
QueriesRanked: 6980
#####################

map                   	all	0.2167
recall_5              	all	0.3237
recall_10             	all	0.4250
recall_15             	all	0.4833
recall_20             	all	0.5226
recall_30             	all	0.5684
recall_100            	all	0.7041
recall_200            	all	0.7631
recall_500            	all	0.8287
recall_1000           	all	0.8573



In [11]:
def quick_eval(dev_data):
    pass
def cv(train_extracted, dev_extracted, feature_name):
    pass
def autotune(train_extracted, dev_extracted, feature_name):
    pass

In [15]:
def gen_exp_dir():
    dirname = datetime.datetime.now().strftime('%Y-%m-%d_%H:%M:%S') + '_' + str(uuid.uuid1())
    assert not os.path.exists(dirname)
    os.mkdir(dirname)
    return dirname


def save_exp(dirname,
             fe,
             train_extracted, dev_extracted,
             train_res, eval_res):
    dev_extracted['data'][['qid', 'pid', 'score']].to_json(f'{dirname}/output.json')
    subprocess.check_output(['gzip', f'{dirname}/output.json'])
    with open(f'{dirname}/model.pkl', 'wb') as f:
        pickle.dump(train_res['model'], f)
    metadata = {
        'train_df_hash':train_extracted['df_hash'],
        'train_jar_hash':train_extracted['jar_hash'],
        'train_fe_hash':train_extracted['fe_hash'],
        'dev_df_hash':dev_extracted['df_hash'],
        'dev_jar_hash':dev_extracted['jar_hash'],
        'dev_fe_hash':dev_extracted['fe_hash'],
        'best_score':train_res['best_score'],
        'best_iteration':train_res['best_iteration'],
        'eval_map':train_res['eval_map'],
        'feature_importances':train_res['feature_importances'],
        'score_tie': eval_res['score_tie'], 
        'mrr_10': eval_res['mrr_10'], 
        'map_recall': eval_res['map_recall']
    }
    json.dump(metadata,open(f'{dirname}/metadata.json','w'))
    shutil.copytree('anserini_ltr_source', f'{dirname}/anserini_ltr_source')
    shutil.copytree('pyserini_ltr_source', f'{dirname}/pyserini_ltr_source')
    shutil.copy('test.py', f'{dirname}/test.py')

In [17]:
dirname = gen_exp_dir()
save_exp(dirname,fe,train_extracted,dev_extracted,train_res,eval_res)