# Learning To Rank Demo On Fess
  
This demo uses data from Fess to demonstrate using [ranklib](https://github.com/codelibs/ranklib) learning to rank models with Elasticsearch.

## Setup: Fess

This demo requires [Fess 12.2+](https://github.com/codelibs/fess/releases).
See [Installation Guide](https://fess.codelibs.org/12.1/install/index.html).

1. Launch Fess
1. Start Crawler to index documents
1. Search some words and then click search results

## Download Ranklib

The first time you run this demo, fetch RankLib.jar (used to train model)



In [None]:
!curl -o ranklib.jar http://central.maven.org/maven2/org/codelibs/ranklib/2.10.0/ranklib-2.10.0.jar

## Setup

In [None]:
from elasticsearch import Elasticsearch
import numpy as np
import requests
import json
import re
import os

# 0, MART
# 1, RankNet
# 2, RankBoost
# 3, AdaRank
# 4, coord Ascent
# 6, LambdaMART
# 7, ListNET
# 8, Random Forests
# 9, Linear Regression
model_types = [6]
# model_types = [0,1,2,3,4,5,6,7,8,9]

es_host = 'localhost:9201'
es_url = f'http://{es_host}'
es = Elasticsearch(hosts=es_host)

In [None]:

def init_default_store(url='http://localhost:9201', auth=None):
    path = f'{url}/_ltr'
    resp = requests.delete(path, auth=auth)
    print(f"Delete {path} : {resp.status_code}")
    resp = requests.put(path, auth=auth)
    print(f"Create {path} : {resp.status_code}")

init_default_store(es_url)

## Aggregate Logs

In [None]:
def get_click_logs(es,
                   query_id,
                   index_name='fess_log.click_log'):
    query = {"query": {"term": { 'queryId': query_id}}, "sort":["requestedAt"]}
    response = es.search(index=index_name, body=query)
    docs = []
    if len(response['hits']['hits']) > 0:
        for hit in response['hits']['hits']:
            if '_source' in hit:
                source = hit.get('_source')
                docs.append(source.get('urlId'))
    return docs
            

def process_logs(es,
                 judgment_file='fess_judgments.txt',
                 index_name = 'fess_log.search_log',
                 query = {"query": {"match_all": {}}, "sort":["requestedAt"]},
                 num_of_words=1000,
                 max_docs=10000):
    counter = 0
    response = None
    running = True
    scroll_id = None
    word_count = {}
    word_doc_count = {}
    while(running):
        if response is None:
            response = es.search(index=index_name,
                                 scroll='5m',
                                 size=100,
                                 body=query,
                                 params={"request_timeout":60})
        else:
            response = es.scroll(scroll_id=scroll_id,
                                 scroll='5m',
                                 params={"request_timeout":60})
        if len(response['hits']['hits']) == 0:
            running = False
            break
        scroll_id = response['_scroll_id']
        for hit in response['hits']['hits']:
            if '_source' in hit:
                counter += 1
                if counter % 1000 == 0:
                    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])
                source = hit.get('_source')
                if 'documents' in source:
                    search_word = source.get('searchWord').lower()
                    query_id = source.get('queryId')
                    doc_ids = get_click_logs(es, query_id)
                    if search_word in word_count:
                        word_count[search_word] = word_count.get(search_word) + 1
                        doc_count = word_doc_count.get(search_word)
                    else:
                        word_count[search_word] = 1
                        doc_count = {}
                        word_doc_count[search_word] = doc_count
                    for doc in source.get('documents'):
                        doc_id = doc.get('_id')
                        if doc_id is not None:
                            if doc_id in doc_ids: # clicked
                                if doc_id in doc_count:
                                    doc_count[doc_id] = doc_count[doc_id] + 1
                                else:
                                    doc_count[doc_id] = 1
                            else:
                                if doc_id not in doc_count:
                                    doc_count[doc_id] = 0

    with open(judgment_file, 'wt') as f:
        qid_list = []
        for i, (k, v) in enumerate(sorted(word_count.items(), key=lambda x: -x[1])):
            if i > num_of_words:
                break
            doc_count = word_doc_count.get(k)
#             print(f'{k}: {np.array(list(doc_count.values()))}')
            if np.array(list(doc_count.values())).sum() == 0:
                continue
            qid = len(qid_list) + 1
            f.write(f'# qid:{qid}: {k}\n')
            qid_list.append((qid, k))

        for qid, k in qid_list:
            if i > num_of_words:
                break
            doc_count = word_doc_count.get(k)
            thresholds = np.percentile(list(doc_count.values()), [25, 50, 75])
            
            def get_rank(v):
                if v == 0:
                    return 0
                elif v < thresholds[0]:
                    return 1
                elif v < thresholds[1]:
                    return 2
                elif v < thresholds[2]:
                    return 3
                else:
                    return 4

            rank0_count = int(len(doc_count)*0.3) # 30%
            for x, y in doc_count.items():
                if y == 0:
                    if rank0_count == 0:
                        continue
                    else:
                        rank0_count -= 1
                f.write(f'{get_rank(y)} qid:{qid} # {x} {k}\n')

In [None]:
# create fess_judgments.txt
process_logs(es)

## Create Features

In [None]:

def get_feature(ftr_id):
    with open(f'features/{ftr_id}.json', 'rt') as f:
        return json.load(f)

def each_feature():
    try:
        ftr_id = 1
        while True:
            parsed_json = get_feature(ftr_id)
            print(f'Load Feature{ftr_id}')
            template = parsed_json['query']
            feature_spec = {
                "name": str(ftr_id),
                "params": ["query"],
                "template": template
            }
            yield feature_spec
            ftr_id += 1
    except IOError:
        pass

def load_features(url='http://localhost:9201',
                  feature_set_name='fess_features',
                  auth=None):
    feature_set = {
        "featureset": {
            "name": feature_set_name,
            "features": [feature for feature in each_feature()]
        }
    }
    path = f"_ltr/_featureset/{feature_set_name}"
    full_path = f'{url}/{path}'
    head = {'Content-Type': 'application/json'}
    resp = requests.post(full_path, data=json.dumps(feature_set), headers=head, auth=auth)
    print(f'Send {full_path} : {resp.status_code}')

load_features(es_url)


In [None]:

class Judgment:
    def __init__(self, grade, qid, keywords, doc_id):
        self.grade = grade
        self.qid = qid
        self.keywords = keywords
        self.doc_id = doc_id
        self.features = [] # 0th feature is ranklib feature 1

    def __str__(self):
        return f"grade:{self.grade} qid:{self.qid} ({self.keyword}) docid:{self.doc_id}"

    def to_ranklib_format(self):
        features_as_strs = "\t".join([f'{idx+1}:{feature}' for idx, feature in enumerate(self.features)])
        comment = f"# {self.doc_id}\t{self.keywords}"
        return f"{self.grade}\tqid:{self.qid}\t{features_as_strs} {comment}"

    
def queries_from_header(lines):
    """ Parses out mapping between, query id and user keywords
        from header comments, ie:
        # qid:523: First Blood
        returns dict mapping all query ids to search keywords"""
    regex = re.compile('#\sqid:(\d+?):\s+?(.*)')
    values = {}
    for line in lines:
        if line[0] != '#':
            break
        m = re.match(regex, line)
        if m:
            values[int(m.group(1))] = m.group(2)

    return values

def judgments_from_body(lines):
    """ Parses out judgment/grade, query id, and docId in line such as:
         4  qid:523   # a01  Grade for Rambo for query Foo
        <judgment> qid:<queryid> # docId <rest of comment ignored...)"""
    regex = re.compile('^(\d)\s+qid:(\d+)\s+#\s+(\S+).*')
    for line in lines:
#         print(line)
        m = re.match(regex, line)
        if m:
#             print("%s,%s,%s" % (m.group(1), m.group(2), m.group(3)))
            yield int(m.group(1)), int(m.group(2)), m.group(3)

def judgments_from_file(filename):
    with open(filename) as f:
        qid2keywords = queries_from_header(f)
    with open(filename) as f:
        for grade, qid, doc_id in judgments_from_body(f):
            yield Judgment(grade=grade, qid=qid, keywords=qid2keywords[qid], doc_id=doc_id)

judgments = judgments_from_file(filename='fess_judgments.txt')
# judgments

In [None]:
def judgments_by_qid(judgments):
    values = {}
    for judgment in judgments:
        try:
            values[judgment.qid].append(judgment)
        except KeyError:
            values[judgment.qid] = [judgment]
    return values


fess_judgments = judgments_by_qid(judgments)
# fess_judgments

In [None]:
log_query = {  
    "size":10000,
    "query":{  
        "bool":{  
            "filter":[  
                {  
                    "ids":{  
                        "values":["7555"]
                    }
                }
            ],
            "should":[  
                {  
                    "sltr":{  
                        "_name":"logged_featureset",
                        "featureset":"movie_features",
                        "params":{  
                            "query":"rambo"
                        }
                    }
                }
            ]
        }
    },
    "ext":{  
        "ltr_log":{  
            "log_specs":{  
                "name":"main",
                "named_query":"logged_featureset",
                "missing_as_zero":True
            }
        }
    }
}


def feature_dict_to_list(ranklib_labeled_features):
    values = [0.0] * len(ranklib_labeled_features)
    for idx, log_entry in enumerate(ranklib_labeled_features):
        value = log_entry['value']
        try:
            values[idx] = value
        except IndexError:
            print(f"Out of range {idx}")
    return values


def log_features(es, judgments_by_qid, 
                 index_name='fess.search',
                 feature_set_name='fess_features'):
    for qid, judgments in judgments_by_qid.items():
        keywords = judgments[0].keywords
        doc_ids = [judgment.doc_id for judgment in judgments]
#         print(f'{doc_ids}')
        log_query['query']['bool']['filter'][0]['ids']['values'] = doc_ids
        log_query['query']['bool']['should'][0]['sltr']['featureset'] = feature_set_name
        log_query['query']['bool']['should'][0]['sltr']['params']['query'] = keywords
#         print(json.dumps(log_query, indent=2))
        res = es.search(index=index_name, body=log_query)
        # Add feature back to each judgment
        features_per_doc = {}
#         print(f'{res}')
        for doc in res['hits']['hits']:
#             print(f'{doc}')
            doc_id = doc['_id']
            features = doc['fields']['_ltrlog'][0]['main']
            features_per_doc[doc_id] = feature_dict_to_list(features)

        # Append features from ES back to ranklib judgment list
        for judgment in judgments:
            try:
                features = features_per_doc[judgment.doc_id]
                judgment.features = features
            except KeyError:
                print(f"Missing doc: {judgment.doc_id}")

log_features(es, judgments_by_qid=fess_judgments)


In [None]:
def build_features_judgments_file(judgments_with_features, filename):
    with open(filename, 'wt') as f:
        for qid, judgment_list in judgments_with_features.items():
            for judgment in judgment_list:
                f.write(judgment.to_ranklib_format() + "\n")


build_features_judgments_file(fess_judgments, filename='fess_features.txt')

## Train

In [None]:

def train_model(features_file,
                model_file,
                which_model=6):
    cmd = ['java',
           "-Djava.util.logging.SimpleFormatter.format='%5$s%n'",
           '-jar', 'ranklib.jar',
           '-ranker', f'{which_model}',
           '-train', f'{features_file}',
           '-save', f'{model_file}',
           '-frate', '1.0']
    print(f"Running {' '.join(cmd)}")
    os.system(' '.join(cmd))


for model_type in model_types:
    print(f"Training {model_type}")
    model_file = f'model_{model_type}.txt'
    train_model(features_file='fess_features.txt',
                model_file=model_file,
                which_model=model_type)


## Deploy

In [None]:

def save_model(script_name, feature_set, model_file,
               url='http://localhost:9201',
               auth=None):
    """ Save the ranklib model in Elasticsearch """
    model_payload = {
        "model": {
            "name": script_name,
            "model": {
                "type": "model/ranklib",
                "definition": {
                }
            }
        }
    }

    path = f'{url}/_ltr/_model/{script_name}'
    resp = requests.delete(path, auth=auth)
    print(f"Delete {path} : {resp.status_code}")

    model_data = []
    with open(model_file, 'rt') as mf:
        for line in mf.readlines():
            line = line.strip()
            model_data.append(line)

    path = f"{url}/_ltr/_featureset/{feature_set}/_createmodel"
    model_payload['model']['model']['definition'] = '\n'.join(model_data)
    head = {'Content-Type': 'application/json'}
    resp = requests.post(path, data=json.dumps(model_payload), headers=head, auth=auth)
    print(f'Send {path} : {resp.status_code}')
    if (resp.status_code >= 300):
        print(f'{resp.text}')



for model_type in model_types:
    print(f"Deploying {model_type}")
    model_file = f'model_{model_type}.txt'
    save_model(script_name=f"model_{model_type}",
               feature_set='fess_features',
               model_file=model_file)