All of this was adjusted from [Doug Turnbull's excellent blog](https://softwaredoug.com/blog/2021/11/28/how-lammbamart-works.html) and [his excellent notebook](https://github.com/softwaredoug/hello-ltr/blob/lambda-mart-in-python/notebooks/elasticsearch/tmdb/lambda-mart-in-python.ipynb) 

In [48]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [49]:
from ltr.client import ElasticClient
from elasticsearch import Elasticsearch
client = ElasticClient()

In [50]:
from ltr import download
corpus='http://es-learn-to-rank.labs.o19s.com/tmdb.json'
judgments='http://es-learn-to-rank.labs.o19s.com/title_judgments.txt'

download([corpus, judgments], dest='data/');
from ltr.index import rebuild
from ltr.helpers.movies import indexable_movies

movies=indexable_movies(movies='data/tmdb.json')
rebuild(client, index='tmdb', doc_src=movies)

data/tmdb.json already exists
data/title_judgments.txt already exists
Created index tmdb [Status: 200]


100%|██████████| 27846/27846 [00:18<00:00, 1525.15it/s]


Streaming Bulk index DONE tmdb [Status: 201]


# Features and Judgements

Here we calculate our LTR features and combine them with known document & query judgements. Allowing us to combine into a larger feature training data set that includes
`qid`, `doc`, `features`, `ranking`

This can then be adjusted with the "lambdas" for training the model!!!

In [51]:
from ltr.log import FeatureLogger
from ltr.judgments import judgments_open
from itertools import groupby
from ltr.judgments import judgments_to_dataframe

# Query features, the _score output of the query is the LTR feature value
query_feature_names = [
    "title_bm25",
    "overview_bm25",
    "title_phrase_bm25",
    "overview_phrase_bm25",
]
query_feature_templates = [
                {"match": {"title": "{{keywords}}"}},
                {"match": {"overview": "{{keywords}}"}},
                {"match_phrase": {"title": "{{keywords}}"}},
                {"match_phrase": {"overview": "{{keywords}}"}},
        ]

# Log features for each query
ftr_logger = FeatureLogger(
    client,
    index="tmdb",
    feature_names=query_feature_names,
    feature_templates=query_feature_templates,
)
with judgments_open("data/title_judgments.txt") as judgment_list:
    for qid, query_judgments in groupby(judgment_list, key=lambda j: j.qid):
        ftr_logger.log_for_qid(
            judgments=query_judgments, qid=qid, keywords=judgment_list.keywords(qid)
        )

# Convert to Pandas Dataframe
judgments = judgments_to_dataframe(ftr_logger.logged, unnest=False)
judgments


Recognizing 40 queries in: data/title_judgments.txt


Unnamed: 0,uid,qid,keywords,docId,grade,features
0,1_7555,1,rambo,7555,4,"[11.657399, 10.083591, 11.657399, 10.083591]"
1,1_1370,1,rambo,1370,3,"[9.456276, 13.265001, 9.456276, 13.265001]"
2,1_1369,1,rambo,1369,3,"[6.036743, 11.113943, 6.036743, 11.113943]"
3,1_13258,1,rambo,13258,2,"[0, 6.869545, 0, 6.869545]"
4,1_1368,1,rambo,1368,4,"[0, 11.113943, 0, 11.113943]"
...,...,...,...,...,...,...
768,40_81899,40,star wars,81899,0,"[0, 6.868508, 0, 4.2289925]"
769,40_54138,40,star wars,54138,0,"[4.962413, 2.2779262, 0, 0]"
770,40_188927,40,star wars,188927,0,"[4.962413, 0, 0, 0]"
771,40_200,40,star wars,200,0,"[4.962413, 0, 0, 0]"


Lets calculate those lambdas!

In [52]:
from math import log, exp
import numpy as np 

def rank_with_swap(ranked_list, rank1=0, rank2=0):
    """ Set the display rank of positions given the provided swap """
    ranked_list['display_rank'] = ranked_list.index.to_series()
    
    if rank1 != rank2:
        ranked_list.loc[rank1, 'display_rank'] = rank2
        ranked_list.loc[rank2, 'display_rank'] = rank1
    return ranked_list
    

def dcg(ranked_list, at=10):
    """Given a list, compute DCG -- 
       uses same variant as lambdamart 2**grade / log2(displayrank)
    """
    ranked_list['discount'] = 1 / np.log2(2 + ranked_list['display_rank'])
    ranked_list['gain'] = (2**ranked_list['grade'] - 1) * ranked_list['discount'] # TODO - precompute gain on swapping
    return sum(ranked_list['gain'].head(at))

def compute_swaps(query_judgments, axis, metric=dcg, at=10):
    """Compute the 'lambda' the DCG impact of every query result swapped with every-other query result"""
    
    # Sort to see ideal ordering
    # This isn't strictly nescesarry, but it's helpful to understand the algorithm
    query_judgments = query_judgments.sort_values('grade', kind='stable', ascending=False).reset_index()

    # Instead of explicitly 'swapping' we just swap the 'display_rank' - where 
    # in the final ranking this would be placed. We can easily use that to compute DCG
    query_judgments['display_rank'] = query_judgments.index.to_series()
    query_judgments['dcg'] = metric(query_judgments, at=at)
    best_dcg = query_judgments.loc[0, 'dcg']

    query_judgments['lambda'] = 0.0
    
    # TODO - redo inner body as 
    for better in range(0,len(query_judgments)):
        for worse in range(0,len(query_judgments)):
            if better > at and worse > at:
                break

            if query_judgments.loc[better, 'grade'] > query_judgments.loc[worse, 'grade']:
                query_judgments = rank_with_swap(query_judgments, better, worse)
                query_judgments['dcg'] = metric(query_judgments, at=at)

                dcg_after_swap = query_judgments.loc[0, 'dcg']
                delta = abs(best_dcg - dcg_after_swap)

                if delta > 0.0:

                    # Add delta to better's lambda (-delta to worse's lambda)
                    query_judgments.loc[better, 'lambda'] += delta
                    query_judgments.loc[worse, 'lambda'] -= delta

    # print(query_judgments[['keywords', 'docId', 'grade', 'lambda', 'features']])
    return query_judgments

# For each query, compute lambdas
# %prun -s cumulative lambdas_per_query = judgments.groupby('qid').apply(compute_swaps, axis=1)
# judgments
lambdas_per_query = judgments.groupby('qid').apply(compute_swaps, axis=1)
lambdas_per_query

Unnamed: 0_level_0,Unnamed: 1_level_0,index,uid,qid,keywords,docId,grade,features,display_rank,discount,gain,dcg,lambda
qid,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
1,0,0,1_7555,1,rambo,7555,4,"[11.657399, 10.083591, 11.657399, 10.083591]",0,1.000000,15.000000,32.721742,34.258787
1,1,4,1_1368,1,rambo,1368,4,"[0, 11.113943, 0, 11.113943]",1,0.630930,9.463946,32.721742,13.590853
1,2,1,1_1370,1,rambo,1370,3,"[9.456276, 13.265001, 9.456276, 13.265001]",2,0.500000,3.500000,32.721742,-2.732092
1,3,2,1_1369,1,rambo,1369,3,"[6.036743, 11.113943, 6.036743, 11.113943]",3,0.430677,3.014736,32.721742,-4.950443
1,4,3,1_13258,1,rambo,13258,2,"[0, 6.869545, 0, 6.869545]",6,0.333333,1.000000,32.721742,-10.746243
...,...,...,...,...,...,...,...,...,...,...,...,...,...
40,10,767,40_209276,40,star wars,209276,0,"[5.8994045, 0, 0, 0]",10,0.278943,0.000000,28.743757,-16.535733
40,11,768,40_81899,40,star wars,81899,0,"[0, 6.868508, 0, 4.2289925]",11,0.270238,0.000000,28.743757,-16.918744
40,12,769,40_54138,40,star wars,54138,0,"[4.962413, 2.2779262, 0, 0]",12,0.262650,0.000000,28.743757,-17.252643
40,13,770,40_188927,40,star wars,188927,0,"[4.962413, 0, 0, 0]",13,0.255958,0.000000,28.743757,-17.547069


In [53]:
from sklearn.tree import DecisionTreeRegressor
import pandas as pd

class OverridenRegressionTree:
    def __init__(self, predictions, tree):
        self.predictions = predictions
        self.tree = tree
        
    def predict(self, X, use_original=False):
        if use_original:
            return self.predict(X)
        path = self.tree.decision_path(X).toarray().astype(str)
        path = "".join(path[0])
        
        paths_as_array = self.tree.decision_path(X).toarray()
        paths = ["".join(item) for item in paths_as_array.astype(str)]
        
        predictions = self.predictions[paths]
        
        # Any NaN predictions is a red flag, debug
        if np.any(predictions.isnull()):
            print(predictions[predictions.isnull()])
            print(pd.DataFrame(X)[predictions.isnull().reset_index(drop=True)])
            raise AssertionError("No prediction should be NaN")
        return np.array(self.predictions[paths].tolist())



def compute_swaps_scaled_with_weights(query_judgments, axis, metric=dcg, at=10):
    """Compute the 'lambda' the DCG impact of every query result swapped with every-other query result
    
    
    """
    
    # Sort to see ideal ordering
    # This isn't strictly nescesarry, but it's helpful to understand the algorithm
    query_judgments = query_judgments.sort_values('last_prediction', ascending=False, kind='stable').reset_index()

    # Instead of explicitly 'swapping' we just swap the 'display_rank' - where 
    # in the final ranking this would be placed. We can easily use that to compute DCG
    query_judgments['display_rank'] = query_judgments.index.to_series()
    query_judgments['train_dcg'] = query_judgments['dcg'] = metric(query_judgments, at=at)
    train_dcg = query_judgments.loc[0, 'dcg']
 
    qid = query_judgments.loc[0, 'qid']
    keywords = query_judgments.loc[0, 'keywords']


    query_judgments['lambda'] = 0.0
    query_judgments['weight'] = 0.0

    for better in range(0,len(query_judgments)):
         for worse in range(0,len(query_judgments)):
            if better > at and worse > at:
                return query_judgments
                
            if query_judgments.loc[better, 'grade'] > query_judgments.loc[worse, 'grade']:
                query_judgments = rank_with_swap(query_judgments, better, worse)
                query_judgments['dcg'] = metric(query_judgments, at=at)

                dcg_after_swap = query_judgments.loc[0, 'dcg']
                delta = abs(train_dcg - dcg_after_swap)

                if delta != 0.0:
                    last_model_score_diff = query_judgments.loc[better, 'last_prediction'] - query_judgments.loc[worse, 'last_prediction']
                    rho = 1.0 / (1.0 + exp(last_model_score_diff)) 

                    assert(delta >= 0.0)
                    assert(rho >= 0.0)
                   
                    query_judgments.loc[better, 'lambda'] += delta * rho
                    query_judgments.loc[worse, 'lambda'] -= delta * rho
            
                    # --------------
                    # NEW!
                    #  last_model_score_diff        rho         weight
                    #      0.0                      0.5         0.25 (max possible value)
                    #      100.0                    0.0000      0.0  (max possible value)
                    # 
                    # If the current model has an ambiguous prediction, we include more of the delta in the weight
                    # If the current model has a strong prediction, weight approaches 0
                    query_judgments.loc[better, 'weight'] += rho * (1.0 - rho) * delta;
                    query_judgments.loc[worse, 'weight'] += rho * (1.0 - rho) * delta;
                    #
                    # These will be used to rescale each decision tree node's predictions
                    # If many results in a leaf node have last model score ~ ambiguous
                    #     the resulting model will have a high denominator ~ (1 / deltaDCG)
                    # If many results in a leaf node have last model score - not ambiguous, positive
                    #     the resulting model will have a low denominator
                    #
                    # Apparently we want to cancel out the deltas if last model was ambiguous?
                    # ---------------

                    

    return query_judgments

def predict(ensemble, X, learning_rate=0.1):
    prediction = 0
    for tree in ensemble:
        prediction += tree.predict(X) * learning_rate
    return prediction.rename('prediction')


def tree_paths(tree, X):
    paths_as_array = tree.decision_path(X).toarray()
    paths = ["".join(item) for item in paths_as_array.astype(str)]
    return paths

In [54]:
from sklearn.tree import DecisionTreeRegressor
import pandas as pd

class OverridenRegressionTree:
    def __init__(self, predictions, tree):
        self.predictions = predictions
        self.tree = tree
        
    def predict(self, X, use_original=False):
        if use_original:
            return self.predict(X)
        path = self.tree.decision_path(X).toarray().astype(str)
        path = "".join(path[0])
        
        paths_as_array = self.tree.decision_path(X).toarray()
        paths = ["".join(item) for item in paths_as_array.astype(str)]
        
        predictions = self.predictions[paths]
        
        # Any NaN predictions is a red flag, debug
        if np.any(predictions.isnull()):
            print(predictions[predictions.isnull()])
            print(pd.DataFrame(X)[predictions.isnull().reset_index(drop=True)])
            raise AssertionError("No prediction should be NaN")
        return np.array(self.predictions[paths].tolist())

def compute_lambdas(lambdas_per_query):
    lambdas_per_query = lambdas_per_query.sort_values(['qid', 'last_prediction'], ascending=[True, False], kind='stable')
    lambdas_per_query['display_rank'] = lambdas_per_query.groupby('qid').cumcount()

    #TBD - How do generalize this to any metric?
    lambdas_per_query['discount'] = 1 / np.log2(2 + lambdas_per_query['display_rank'])
    lambdas_per_query['gain'] = (2**lambdas_per_query['grade'] - 1)

    # swaps dataframe holds each pair-wise swap computed (shrink columns for memory?)   
    # Optimization of swaps = lambdas_per_query.merge(lambdas_per_query, on='qid', how='outer')
    # to limit to just needed columns
    to_swap = lambdas_per_query[['qid', 'display_rank', 'grade', 'last_prediction', 'discount', 'gain']]
    #to_swap = lambdas_per_query
    swaps = to_swap.merge(to_swap, on='qid', how='outer')

    # delta - delta in DCG due to swap
    swaps['delta'] = np.abs((swaps['discount_x'] - swaps['discount_y']) * (swaps['gain_x'] - swaps['gain_y']))
    
    # rho - based on current model prediction delta
    swaps['rho'] = 1 / (1 + np.exp(swaps['last_prediction_x'] - swaps['last_prediction_y']))
    
    # If you want to be pure gradient boosting, weight reweights each models prediction
    # I haven't found this to matter in practice
    swaps['weight'] = swaps['rho'] * (1.0 - swaps['rho']) * swaps['delta']

    # Compute lambdas (the next model in ensemble's predictors) when grade_x > grade_y
    swaps['lambda'] = 0
    slice_x_better =swaps[swaps['grade_x'] > swaps['grade_y']]
    swaps.loc[swaps['grade_x'] > swaps['grade_y'], 'lambda'] = slice_x_better['delta'] * slice_x_better['rho']
    
    # accumulate lambdas and add back to model
    lambdas_x = swaps.groupby(['qid', 'display_rank_x'])['lambda'].sum().rename('lambda')
    lambdas_y = swaps.groupby(['qid', 'display_rank_y'])['lambda'].sum().rename('lambda')

    weights_x = swaps.groupby(['qid', 'display_rank_x'])['weight'].sum().rename('weight')
    weights_y = swaps.groupby(['qid', 'display_rank_y'])['weight'].sum().rename('weight')
    
    weights = weights_x + weights_y
    lambdas = lambdas_x - lambdas_y

    lambdas_per_query = lambdas_per_query.merge(lambdas, 
                                                left_on=['qid', 'display_rank'], 
                                                right_on=['qid', 'display_rank_x'], 
                                                how='left')
    lambdas_per_query = lambdas_per_query.merge(weights, 
                                                left_on=['qid', 'display_rank'], 
                                                right_on=['qid', 'display_rank_x'], 
                                                how='left')

    return lambdas_per_query

from sklearn.tree import DecisionTreeRegressor
import pandas as pd


ensemble=[]
def lambda_mart_pure(judgments, rounds=20, learning_rate=0.1, max_leaf_nodes=8, metric=dcg):

    print(judgments.columns)
    # Convert to Pandas Dataframe
    lambdas_per_query = judgments.copy()


    lambdas_per_query['last_prediction'] = 0.0

    for i in range(0, rounds):
        print(f"round {i}")

        # ------------------
        #1. Build pair-wise predictors for this round
        lambdas_per_query = compute_lambdas(lambdas_per_query)

        # ------------------
        #2. Train a regression tree on this round's lambdas
        features = lambdas_per_query['features'].tolist()
        tree = DecisionTreeRegressor(max_leaf_nodes=max_leaf_nodes)
        tree.fit(features, lambdas_per_query['lambda'])    

        # ------------------
        #3. Reweight based on LambdaMART's weighted average
        # Add each tree's paths
        lambdas_per_query['path'] = tree_paths(tree, features)
        predictions = lambdas_per_query.groupby('path')['lambda'].sum() / lambdas_per_query.groupby('path')['weight'].sum()
        predictions = predictions.fillna(0.0) # for divide by 0

        # -------------------
        #4. Add to ensemble, recreate last prediction
        new_tree = OverridenRegressionTree(predictions=predictions, tree=tree)
        ensemble.append(new_tree)
        next_predictions = new_tree.predict(features)
        lambdas_per_query['last_prediction'] += (next_predictions * learning_rate) 
        
        print(lambdas_per_query.loc[0, ['grade', 'last_prediction']])
        
        print("Train DCGs")
        lambdas_per_query['discounted_gain'] = lambdas_per_query['gain'] * lambdas_per_query['discount'] 
        dcg = lambdas_per_query[lambdas_per_query['display_rank'] < 10].groupby('qid')['discounted_gain'].sum().mean()
        print("mean   ", dcg)
        print("----------")
        
        lambdas_per_query = lambdas_per_query.drop(['lambda', 'weight'], axis=1)
    return lambdas_per_query


judgments = judgments_to_dataframe(ftr_logger.logged, unnest=False)
lambdas_per_query = lambda_mart_pure(judgments=judgments, rounds=50, max_leaf_nodes=10, learning_rate=0.01, metric=dcg)

Index(['uid', 'qid', 'keywords', 'docId', 'grade', 'features'], dtype='object')
round 0
grade                     4
last_prediction    0.009711
Name: 0, dtype: object
Train DCGs
mean    20.83415007369131
----------
round 1
grade                     4
last_prediction    0.019372
Name: 0, dtype: object
Train DCGs
mean    20.442853108353383
----------
round 2
grade                     4
last_prediction    0.028972
Name: 0, dtype: object
Train DCGs
mean    20.579760132996235
----------
round 3
grade                     4
last_prediction    0.038468
Name: 0, dtype: object
Train DCGs
mean    20.442853108353383
----------
round 4
grade                     4
last_prediction    0.047907
Name: 0, dtype: object
Train DCGs
mean    20.579760132996235
----------
round 5
grade                     4
last_prediction    0.056911
Name: 0, dtype: object
Train DCGs
mean    20.53213203442482
----------
round 6
grade                     4
last_prediction    0.065844
Name: 0, dtype: object
Train DCGs
mean    

Create the model from the ensemble!

In [55]:
from eland.ml import MLModel, ml_model
from eland.ml.transformers import get_model_transformer
from eland.ml._model_serializer import Ensemble
from elasticsearch import Elasticsearch
import json
import gzip
import base64

trained_models = [get_model_transformer(m.tree, feature_names=query_feature_names).transform() for m in ensemble]

ensemble_aggregator_output = {
            "weighted_sum": {
                "weights": [1.0 / len(trained_models)]
                * len(trained_models),
            }
        }

es_model =Ensemble(
            query_feature_names,
            trained_models,
            ensemble_aggregator_output,
            target_type="regression",
        )
es_model.serialize_model()

{'trained_model': {'ensemble': {'target_type': 'regression',
   'feature_names': ['title_bm25',
    'overview_bm25',
    'title_phrase_bm25',
    'overview_phrase_bm25'],
   'trained_models': [{'tree': {'target_type': 'regression',
      'feature_names': ['title_bm25',
       'overview_bm25',
       'title_phrase_bm25',
       'overview_phrase_bm25'],
      'tree_structure': [{'node_index': 0,
        'decision_type': 'lte',
        'left_child': 1,
        'right_child': 2,
        'split_feature': 2,
        'threshold': 9.72804069519043,
        'number_samples': 773},
       {'node_index': 1,
        'decision_type': 'lte',
        'left_child': 9,
        'right_child': 10,
        'split_feature': 2,
        'threshold': 8.166615962982178,
        'number_samples': 722},
       {'node_index': 2,
        'decision_type': 'lte',
        'left_child': 3,
        'right_child': 4,
        'split_feature': 1,
        'threshold': 8.648598670959473,
        'number_samples': 51},
     

Put the model! HUZZAH!

In [56]:
es = Elasticsearch('http://127.0.0.1:9200')
model_dict = es_model.serialize_model()
json_string = json.dumps(model_dict, separators=(",", ":"))
compressed_string = base64.b64encode(gzip.compress(json_string.encode("utf-8"))).decode("ascii")

es.ml.put_trained_model(
    model_id="ltr-part-4", 
    body={
        # only put in field_names you want extracted from document field names
        "input" : {"field_names": ["vote_average"]},
        "inference_config" : {"learn_to_rank": {}},
        "compressed_definition": compressed_string,
    }
)



The 'body' parameter is deprecated and will be removed in a future version. Instead use individual parameters.



ObjectApiResponse({'model_id': 'ltr-part-4', 'model_type': 'tree_ensemble', 'created_by': 'api_user', 'version': '10.0.0', 'create_time': 1692629118534, 'model_size_bytes': 91120, 'estimated_operations': 105, 'license_level': 'platinum', 'tags': [], 'input': {'field_names': ['vote_average']}, 'inference_config': {'learn_to_rank': {'num_top_feature_importance_values': 0}}})

In [59]:
#POST _search
template = {
  "rescore": {
    "window_size": 10,
    "inference": {
      "model_id": "ltr-part-4",
      "inference_config": {
        "learn_to_rank": {"feature_extractors":[{"query_extractor": {"feature_name": feature_name, "query": query}} for (feature_name, query) in zip(query_feature_names, query_feature_templates)]}
      }
    }
  }
}
template = {"source": template}
template

{'source': {'rescore': {'window_size': 10,
   'inference': {'model_id': 'ltr-part-4',
    'inference_config': {'learn_to_rank': {'feature_extractors': [{'query_extractor': {'feature_name': 'title_bm25',
         'query': {'match': {'title': '{{keywords}}'}}}},
       {'query_extractor': {'feature_name': 'overview_bm25',
         'query': {'match': {'overview': '{{keywords}}'}}}},
       {'query_extractor': {'feature_name': 'title_phrase_bm25',
         'query': {'match_phrase': {'title': '{{keywords}}'}}}},
       {'query_extractor': {'feature_name': 'overview_phrase_bm25',
         'query': {'match_phrase': {'overview': '{{keywords}}'}}}}]}}}}}}

In [60]:
es.search_template(index='tmdb', body=template, params={'keywords': 'star trek', 'query_dsl': {'match_all': {}}})


The 'body' parameter is deprecated and will be removed in a future version. Instead use individual parameters.



ObjectApiResponse({'took': 67, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 10000, 'relation': 'gte'}, 'max_score': 27.886772, 'hits': [{'_index': 'tmdb', '_id': '374430', '_score': 27.886772, '_source': {'id': '374430', 'title': 'Black Mirror: White Christmas', 'overview': "This feature-length special consists of three interwoven stories. In a mysterious and remote snowy outpost, Matt and Potter share a Christmas meal, swapping creepy tales of their earlier lives in the outside world. Matt is a charismatic American trying to bring the reserved, secretive Potter out of his shell. But are both men who they appear to be? A woman gets thrust into a nightmarish world of 'smart' gadgetry. Plus a look at what would happen if you could 'block' people in real life.", 'tagline': '', 'directors': ['Carl Tibbetts'], 'cast': 'Jon Hamm Rafe Spall Oona Chaplin Natalia Tena Janet Montgomery Rasmus Hardiker Dan Li Ken Drury Zahra 