In [None]:
import json
import re
import requests
import sparknlp
from pyspark.sql.functions import col, regexp_extract, expr
from pyspark.sql import SparkSession, Row
from pyspark.ml import Pipeline
from sparknlp.annotator import *
from sparknlp import MultiDocumentAssembler, Finisher

In [None]:
# create spark session, fetch required packages
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("locaSearchEngine") \
    .config("spark.jars.packages", ','.join(
        [
            'com.johnsnowlabs.nlp:spark-nlp_2.12:4.2.0',
            'com.databricks:spark-xml_2.12:0.15.0',
            'org.elasticsearch:elasticsearch-spark-30_2.12:8.4.2'
        ])) \
    .config("sparkdriver.memory", "8g") \
    .getOrCreate()

In [None]:
# read the dump file of the simplewiki (XML format). mediawiki is the root level tag and page is the row tag for each article
df = spark.read\
    .format('xml')\
    .option("rowTag", "page")\
    .option("rootTag", "mediawiki")\
    .load("path_to_simplewiki_xml")\
    .repartition(100)\
    .persist()

In [None]:
# make sure text field exists and that the page is not a redirect and keep only the title and text
df = df.filter('redirect IS NULL').selectExpr(
    'revision.text._VALUE AS text',
    'title'
).filter('text IS NOT NULL')

In [None]:
# extract categories from text using regex and add it to the dataframe
df = df.withColumn("category", expr(r"array_join(regexp_extract_all(text, 'Category:([^]|]+)', 1), ' ')"))

In [None]:

# process text into data format that is processabel by Spark NLP
document_assembler = MultiDocumentAssembler()\
    .setInputCols(['text', 'category'])\
    .setOutputCols(['document1', 'document2'])
# tokenize the input text
tokenizer1 = Tokenizer()\
    .setInputCols(['document1'])\
    .setOutputCol('tokens1')
tokenizer2 = Tokenizer()\
    .setInputCols(['document2'])\
    .setOutputCol('tokens2')
# create lemmas for the tokens
lemmatizer_model1 = LemmatizerModel.pretrained()\
    .setInputCols(['tokens1'])\
    .setOutputCol('lemmas1')
lemmatizer_model2 = LemmatizerModel.pretrained()\
    .setInputCols(['tokens2'])\
    .setOutputCol('lemmas2')
# remove punctuation from the lemmas
# turn lemmas to lower case and remove unwanted symbols
normalizer1 = Normalizer()\
    .setCleanupPatterns([
        '[^\w\d\s]'
    ])\
    .setInputCols(['lemmas1'])\
    .setOutputCol('normalized1')\
    .setLowercase(True)
normalizer2 = Normalizer()\
    .setCleanupPatterns([
        '[^\w\d\s]'
    ])\
    .setInputCols(['lemmas2'])\
    .setOutputCol('normalized2')\
    .setLowercase(True)
# convert result of annotators into strings
finisher = Finisher()\
    .setInputCols(['normalized1', 'normalized2'])\
    .setOutputCols(['normalized1', 'normalized2'])

# fit nlp pipeline to the data and set stages execution order
nlp_pipeline = Pipeline()\
    .setStages([document_assembler, tokenizer1, tokenizer2, lemmatizer_model1, lemmatizer_model2, normalizer1, normalizer2, finisher])\
    .fit(df)

# selects the columns from df and returns a new df when transforming the data using the nlp pipeline
cleaned_df = nlp_pipeline.transform(df).selectExpr(
    'text',
    'title',
    'category',
    'array_join(normalized1, " ") AS normalized_text',
    'array_join(normalized2, " ") AS normalized_category'
).persist()

In [None]:
# local instance of elastic, credentials need to be changed if running on other machine
cleaned_df.write\
    .format('org.elasticsearch.spark.sql')\
    .option('es.nodes', 'localhost')\
    .option('es.port', '9200')\
    .option('es.nodes.wan.only', 'true')\
    .option("es.net.http.auth.user", "elastic")\
    .option("es.net.http.auth.pass", "key") \
    .mode("Overwrite")\
    .save('simpleenglish')

In [None]:
# score is set to value the relative importance of the text, category and title for the search result

def query_index(query, fields=[('normalized_text', 1), ('normalized_category', 5), ('title', 10)], size=10):
    dataframe = spark.createDataFrame([(query, '')], ('text', 'category'))
    cleaned_row = nlp_pipeline.transform(dataframe).first()
    query = cleaned_row['text']

    data = {
        "_source": ['title', 'normalized_category'],
        "query": { 
            "bool": {
                "must": {
                    "multi_match": {
                        "query": query,
                        # combine score from all fields
                        "type": "most_fields",
                        # apply weights to fields
                        "fields": ['{}^{}'.format(f, b) for f, b in fields],
                        # all terms must be present
                        "operator": "and"
                    }
                },
                # boost score if input text matches title exactly
                "should": [{
                    "match_phrase": {
                        "title": {
                            "query": query,
                            "boost": 30
                        }
                    }
                }]
            }
        }
    }

    headers = {
        'Content-Type': 'application/json',
    }
    params = (
        ('pretty', ''), ('size', size)
    )

    response = requests.post(
        'http://elastic:instance@localhost:9200/simpleenglish/_search', 
        headers=headers, params=params, 
        data=json.dumps(data)).json()
    
    return [(r['_source']['title'], r['_source']['normalized_category'], r['_score']) for r in response['hits']['hits']]

In [None]:
# Run query and get search result
query_string = 'spicy food'
query_results = query_index(query_string)
query_results