# Hands-on Activity 5: Implementation of a Custom Similarity Model in Elasticsearch

In this activity, we will implement a custom similarity model for our ClueWeb12 sample collection. 
Note that in Elasticsearch the similarity function needs to be embedded in the index: thus we will create a new index for the ClueWeb12 sample dataset.
The pre-requisites and most scripts in this activity are the same as in Activity 1. 
 
Elasticsearch 6.5.x allows us to easily define a custom similarity by defining a scripted similarity, unlike previous versions of Elasticsearch in which a custom similarity plugin needed to be implemented as a Java class.

In this activity, we are going to implement a basic TF-IDF similarity function. (note, of course, Elasticsearch already has a TF-IDF similarity function, but TF-IDF provides us a simple use case for practicing implementing a similarity functions through scripts in Elasticsearch).

Let's start by importing the libraries required

In [None]:
import builtins
#vars(globals()['__builtin__']) is vars(builtins)
import gzip
import warc
import time
import glob
import lxml.html
import re
import io
import os
import sys
from elasticsearch import Elasticsearch
import multiprocessing

Then, we need to specify the location of the Clueweb12 files.

In [None]:
#Put here the path to the Clueweb12B_sample file. 
#It is best you give the full, absolute path.
warcPath = "~/Clueweb12B_sample/"

# Check the warc Path
if not os.path.isdir(warcPath):
    print("invalid warc path")
else:
    print("valid warc path")

Open connection to Elasticsearch (ES)

In [None]:
es0 = Elasticsearch(urls='http://localhost', port=9200, timeout=600)

Specify the bulk size and max documents to process in bulk (For faster indexing, we will index documents in bulks).

In [None]:
bulk_size = 4000
bulk_count = 1000

Define a new index name and document type for the clueweb index with the new similarity function:

In [None]:
indexName = "clueweb12_custom"

Then, we define our custom similarity named `scripted_tfidf`. The two fields in the index (`title` and `body`) will use this custom similarity.

Note that the `weight_script` will compute  the document-independent part of the score and will be available under the `weight` variable. When no `weight_script` is provided, the weight is equal to 1 by default. The `weight_script` has access to the same variables as the `script`, except `doc` since it is supposed to compute a document-independent contribution to the score.

*This is really the only part of the indexing that differs from what we have already seen in activity 1.*

In [None]:
request_body = {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0,
      "analysis": {
        "analyzer": {
            "my_english": {
                "tokenizer": "standard",
                "filter": ["lowercase", "terrier_stopwords", "porter_stem"]
            }
        },
        "filter": {
          "terrier_stopwords": {
              "type": "stop",
              "stopwords_path": "stopwords/terrier-stop.txt"
          }
        }
      },
      "similarity": {
        "scripted_tfidf": {
            "type": "scripted",
            "weight_script": {
              "source": "double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; return query.boost * idf;"
            },
            "script": {
              "source": "double tf = Math.sqrt(doc.freq); double norm = 1/Math.sqrt(doc.length); return weight * tf * norm;"
            }
        }
      }
    },
    "mappings": {
      "_doc": {
        "_source": {
            "enabled": True
        },
        "properties": {
            "title": {
                 "type": "text",
                 "similarity": "scripted_tfidf",
                 "analyzer": "my_english"
            },
            "body": {
                "type": "text",
                "similarity": "scripted_tfidf",
                "analyzer": "my_english"
            }
         }
      }
    }
}

Create the index based on the specified settings:

In [None]:
if not es0.indices.exists(indexName):
    print ("creating ", indexName, " index")
    res = es0.indices.create(index=indexName, body=request_body)
    print(" response: '%s'" % res)

Create the indexing function which will be executed as a parallel process.
This function accepts path to a single gziped warc file.

In [None]:
def es_index(fname):
    i = 0
    totalSize = 0
    bulk_data = []
    lapTime = time.time()
    es = Elasticsearch(urls='http://localhost', port=9200, timeout=600)

    print("Processing file: {}".format(fname))
    with gzip.open(fname, mode='rb') as gzf:
        print("Extracted: {}".format(fname))
        WarcTotalDocuments = 0
        EmptyDocuments = 0
        for record in warc.WARCFile(fileobj=gzf):
            if record.header.get('WARC-Type').lower() == 'warcinfo':
                WarcTotalDocuments = record.header.get('WARC-Number-Of-Documents')

            if record.header.get('WARC-Type').lower() == 'response':
                docId = record.header.get('WARC-Trec-ID')
                docString = record.payload.read().decode("utf-8", "ignore")

                htmlStart = docString.find('<html')
                if htmlStart < 1:
                    htmlStart = docString.find('<HTML')
                if htmlStart < 1:
                    htmlStart = docString.find('<Html')

                if htmlStart < 1:
                    EmptyDocuments += 1
                else:
                    # extract and scrub html string
                    htmlString = docString[htmlStart:]
                    htmlString = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', htmlString)
                    htmlString = re.sub(r'&\w{4,6};', '', htmlString)
                    htmlString = htmlString.replace(",", " ").replace("-", " ").replace(".", " ")

                    #fContent = io.BytesIO(str(htmlString.decode("utf-8", "ignore")))
                    fContent = io.BytesIO(htmlString.encode("utf-8", "ignore"))

                    try:
                        htmlDoc = lxml.html.parse(fContent)

                        # the html.xpath return an array so need to convert it to string with join method
                        title = " ".join(htmlDoc.xpath('//title/text()'))

                        rootClean = htmlDoc.getroot()

                        body = " - "
                        try:
                            body = rootClean.body.text_content()
                            body = ' '.join(word for word in body.split() if word.isalnum())
                        except:
                            pass

                        content = title + body
                        bulk_meta = {
                            "index": {
                                "_index": indexName,
                                "_type": "_doc",
                                "_id": docId
                            }
                        }

                        bulk_content = {
                            'title': title,
                            'body': body
                        }

                        bulk_data.append(bulk_meta)
                        bulk_data.append(bulk_content)
                        totalSize += (sys.getsizeof(content) / 1024)  # convert from bytes to KiloBytes

                        i += 1
                        if totalSize > bulk_size or i % bulk_count == 0:
                            res = es.bulk(index=indexName, body=bulk_data, refresh=False)
                            bulk_data = []
                            totalSize = 0
                    except:
                        print("Error processing document: {}".format(docId))
                        raise

        if len(bulk_data) > 0:
            # index the remainder files
            res = es.bulk(index=indexName, body=bulk_data, refresh=False)

        print("File {0} Completed, Duration: {1} sec, Total: {2}, Processed: {3}, Empty: {4}, Variance: {5}".
               format(fname, time.time() - lapTime, WarcTotalDocuments, str(i), str(EmptyDocuments),
                      str(int(WarcTotalDocuments) - i - EmptyDocuments)))

Traverse all the folders in the original corpus path and parallely process all gzipped warc files.

In [None]:
warcFolder = glob.glob(warcPath + "*")
for warcFold in warcFolder:
    folders = glob.glob(warcFold + "/*")
    print("Processing Path: {}".format(warcFold))

    for fold in folders:
        print("Processing folder: {}".format(fold))
        p = multiprocessing.Pool()
        resultString = p.map(es_index, glob.glob(fold + "/*"))
        p.close()
        p.join()