In [1]:
import sys
sys.path.append('..')
from searchutil import *
import os
from IPython.display import display,HTML
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("setting-datasets").getOrCreate()

## Downloading the datasets for the Products Data

In [2]:

#Have downloaded the datasets of the products and signals and stored it in data folder.
! cd ../data/retrotech/&& head products.csv

"upc","name","manufacturer","shortDescription","longDescription"
"096009010836","Fists of Bruce Lee - Dolby - DVD",\N,\N,\N
"043396061965","The Professional - Widescreen Uncut - DVD",\N,\N,\N
"085391862024","Pokemon the Movie: 2000 - DVD",\N,\N,\N
"067003016025","Summerbreeze - CD","Nettwerk",\N,\N
"731454813822","Back for the First Time [PA] - CD","Def Jam South",\N,\N
"024543008200","Big Momma's House - Widescreen - DVD",\N,\N,\N
"031398751823","Kids - DVD",\N,\N,\N
"037628413929","20 Grandes Exitos - CD","Sony Discos Inc.",\N,\N
"060768972223","Power Of Trinity (Box) - CD","Sanctuary Records",\N,\N


## Storing Products dataset to Apache Solr using Spark Read CSV

In [3]:
#Create Products Collection
products_collection="products"
create_collection(products_collection)

#Modify Schema to make some fields explicitly searchable by keyword
upsert_text_field(products_collection, "upc")
upsert_text_field(products_collection, "name")
upsert_text_field(products_collection, "longDescription")
upsert_text_field(products_collection, "manufacturer")

print("Loading Products...")
csvFile = "../data/retrotech/products.csv"
csvDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(csvFile)
print("Products Schema: ")
csvDF.printSchema()

product_update_opts={"zkhost": "search-zk", "collection": products_collection, 
                     "gen_uniq_key": "true", "commit_within": "5000"}
csvDF.write.format("solr").options(**product_update_opts).mode("overwrite").save()
print("Status: Success")

Wiping 'products' collection
[('action', 'CREATE'), ('name', 'products'), ('numShards', 1), ('replicationFactor', 1)]
Creating 'products' collection
Status: Success
Adding 'upc' field to collection
Status: Success
Adding 'name' field to collection
Status: Success
Adding 'longDescription' field to collection
Status: Success
Adding 'manufacturer' field to collection
Status: Success
Loading Products...
Products Schema: 
root
 |-- upc: long (nullable = true)
 |-- name: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- shortDescription: string (nullable = true)
 |-- longDescription: string (nullable = true)

Status: Success


## Searching in Apache Solr and checking if results are right

In [4]:
query = "ipod"

collection = "products"
request = {
    "query": query,
    "fields": ["upc", "name", "manufacturer", "score"],
    "limit": 5,
    "params": {
      "qf": "name manufacturer longDescription",
      "defType": "edismax",
      "sort": "score desc, upc asc"
    }
}

search_results = requests.post(f"{SOLR_URL}/{collection}/select", json=request).json()["response"]["docs"]
display(HTML(render_search_results(query, search_results)))

## Downloading and checking Signals Dataset

In [5]:
! cd ../data/retrotech/&& head signals.csv

"query_id","user","type","target","signal_time"
"u2_0_1","u2","query","nook","2019-07-31 08:49:07.3116"
"u2_1_2","u2","query","rca","2020-05-04 08:28:21.1848"
"u3_0_1","u3","query","macbook","2019-12-22 00:07:07.0152"
"u4_0_1","u4","query","Tv antenna","2019-08-22 23:45:54.1030"
"u5_0_1","u5","query","AC power cord","2019-10-20 08:27:00.1600"
"u6_0_1","u6","query","Watch The Throne","2019-09-18 11:59:53.7470"
"u7_0_1","u7","query","Camcorder","2020-02-25 13:02:29.3089"
"u9_0_1","u9","query","wireless headphones","2020-04-26 04:26:09.7198"
"u10_0_1","u10","query","Xbox","2019-09-13 16:26:12.0132"


## Ingesting Signals Dataset to Solr Database

In [6]:
#Create Signals Collection
signals_collection="signals"
create_collection(signals_collection)

print("Loading Signals...")
csvFile = "../data/retrotech/signals.csv"
csvDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(csvFile)
print("Signals Schema: ")
csvDF.printSchema()

signals_update_opts={"zkhost": "search-zk", "collection": signals_collection, 
                     "gen_uniq_key": "true", "commit_within": "5000"}
csvDF.write.format("solr").options(**signals_update_opts).mode("overwrite").save()
print("Status: Success")

Wiping 'signals' collection
[('action', 'CREATE'), ('name', 'signals'), ('numShards', 1), ('replicationFactor', 1)]
Creating 'signals' collection
Status: Success
Loading Signals...
Signals Schema: 
root
 |-- query_id: string (nullable = true)
 |-- user: string (nullable = true)
 |-- type: string (nullable = true)
 |-- target: string (nullable = true)
 |-- signal_time: timestamp (nullable = true)

Status: Success


## Boosting the signals

In [7]:
products_collection="products"
signals_collection="signals"
signals_boosting_collection="signals_boosting"

create_collection(signals_boosting_collection)

print("Aggregation to Create Signals Boosts...")
signals_opts={"zkhost": "search-zk", "collection": signals_collection}
df = spark.read.format("solr").options(**signals_opts).load()
df.createOrReplaceTempView("signals")

# For each query we are storing how many times the document has been clicked.
signals_aggregation_query = """
select q.target as query, c.target as doc, count(c.target) as boost
  from signals c left join signals q on c.query_id = q.query_id
  where c.type = 'click' AND q.type = 'query'
  group by query, doc
  order by boost desc
"""

signals_boosting_opts={"zkhost": "search-zk", "collection": signals_boosting_collection, 
                       "gen_uniq_key": "true", "commit_within": "5000"}

#Writing the Boosted Signal Information to Apache Spark
spark.sql(signals_aggregation_query).write.format("solr").options(**signals_boosting_opts).mode("overwrite").save()
print("Signals Aggregation Completed!")

Wiping 'signals_boosting' collection
[('action', 'CREATE'), ('name', 'signals_boosting'), ('numShards', 1), ('replicationFactor', 1)]
Creating 'signals_boosting' collection
Status: Success
Aggregation to Create Signals Boosts...
Signals Aggregation Completed!


## For the query ipad, collecting the documents and their boosts

In [10]:
query = "ipad"

signals_boosts_query = {
    "query": query,
    "fields": ["doc", "boost"],
    "limit": 10,
    "params": {
      "defType": "edismax",
      "qf": "query",
      "sort": "boost desc"
    }
}

signals_boosts = requests.post(f"{SOLR_URL}/{signals_boosting_collection}/select", 
                               json=signals_boosts_query).json()["response"]["docs"]
print(f"Boost Documents for the query ipad: \n{signals_boosts}")

Boost Documents for the query ipad: 
[{'doc': '885909457588', 'boost': 966}, {'doc': '885909457595', 'boost': 205}, {'doc': '885909471812', 'boost': 202}, {'doc': '886111287055', 'boost': 109}, {'doc': '843404073153', 'boost': 73}, {'doc': '885909457601', 'boost': 62}, {'doc': '635753493559', 'boost': 62}, {'doc': '885909472376', 'boost': 61}, {'doc': '610839379408', 'boost': 29}, {'doc': '884962753071', 'boost': 28}]


## Searching the products collection with this boost values from User clicks data

In [11]:
product_boosts = " ".join(
    [f'"{entry["doc"]}"^{str(entry["boost"])}' 
     for entry in signals_boosts])
print(f"\nBoost Query: \n{product_boosts}")

collection = "products"
request = {
    "query": query,
    "fields": ["upc", "name", "manufacturer", "score"],
    "limit": 5,
    "params": {
      "qf": "name manufacturer longDescription",
      "defType": "edismax",
      "indent": "true",
      "sort": "score desc, upc asc",
      "qf": "name manufacturer longDescription",
      "boost": "sum(1,query({! df=upc v=$signals_boosting}))",
      "signals_boosting": product_boosts
    }
}

search_results = requests.post(f"{SOLR_URL}/{collection}/select", json=request).json()["response"]["docs"]
display(HTML(render_search_results(query, search_results)))


Boost Query: 
"885909457588"^966 "885909457595"^205 "885909471812"^202 "886111287055"^109 "843404073153"^73 "885909457601"^62 "635753493559"^62 "885909472376"^61 "610839379408"^29 "884962753071"^28


# Searching with the related keywords

The basic intuition is if user 1 types a query q1 and clicks product p and if user 2 types a query q2 and clicks on same product, we can assume that both queries q1 and q2 are related.

In [12]:
#Calculation:
spark.sql("""
  select lower(searches.target) as keyword, searches.user as user, clicks.target as product 
  from signals as searches right join signals as clicks on searches.query_id = clicks.query_id 
  where searches.type='query' and clicks.type = 'click'
""").createOrReplaceTempView('keyword_click_product')


In [13]:
#Show Results:
print("Original signals format: ")
spark.sql(''' select * from signals where type='query' ''').show(3, truncate=False)
print("Simplified signals format: ")
spark.sql(''' select * from keyword_click_product ''').show(3)

Original signals format: 
+------------------------------------+-----------+-----------------------+----------+-----+-------+
|id                                  |query_id   |signal_time            |target    |type |user   |
+------------------------------------+-----------+-----------------------+----------+-----+-------+
|0000c952-7e4a-4dee-8aa9-a47914ad5189|u391913_0_1|2019-12-02 02:31:47.552|Headphones|query|u391913|
|0000d4e3-fc86-45fe-81b2-cfce636e1808|u11222_0_1 |2020-02-23 07:52:32.757|ipod touch|query|u11222 |
|0001a6e5-2e1c-4c6b-aabe-86ceb2963fbc|u727899_0_1|2019-10-11 17:40:50.229|gps       |query|u727899|
+------------------------------------+-----------+-----------------------+----------+-----+-------+
only showing top 3 rows

Simplified signals format: 
+----------------+-------+------------+
|         keyword|   user|     product|
+----------------+-------+------------+
|        kingston|u100001|740617151381|
|     dc universe|u100011|883929194629|
|leandria johnson|u10

In [14]:
#Calculation:
spark.sql("""
select k1.keyword as k1, k2.keyword as k2, sum(p1) n_users1,sum(p2) n_users2, 
sum(p1+p2) as users_cooc, count(1) n_products
from
(select keyword, product, count(1) as p1 from keyword_click_product group by keyword, product) as k1 
join
(select keyword, product, count(1) as p2 from keyword_click_product group by keyword, product) as k2
on k1.product = k2.product
where k1.keyword > k2.keyword 
group by k1.keyword, k2.keyword
""").createOrReplaceTempView('keyword_click_product_cooc')

In [15]:
#Show Results:
spark.sql('''select count(1) as keyword_click_product_cooc from keyword_click_product_cooc''').show()
spark.sql('''select * from keyword_click_product_cooc order by n_products desc''').show(20)

+--------------------------+
|keyword_click_product_cooc|
+--------------------------+
|                   1579710|
+--------------------------+

+--------------+-------------+--------+--------+----------+----------+
|            k1|           k2|n_users1|n_users2|users_cooc|n_products|
+--------------+-------------+--------+--------+----------+----------+
|       laptops|       laptop|    3251|    3345|      6596|       187|
|       tablets|       tablet|    1510|    1629|      3139|       155|
|        tablet|         ipad|    1468|    7067|      8535|       146|
|       tablets|         ipad|    1359|    7048|      8407|       132|
|       cameras|       camera|     637|     688|      1325|       116|
|          ipad|        apple|    6706|    1129|      7835|       111|
|      iphone 4|       iphone|    1313|    1754|      3067|       108|
|    headphones|  head phones|    1829|     492|      2321|       106|
|        ipad 2|         ipad|    2736|    6738|      9474|        98|
| 

In [16]:
#Calculation:
spark.sql("""
select keyword, count(1) as n_users from keyword_click_product group by keyword 
""").createOrReplaceTempView('keyword_click_product_oc')

In [17]:
#Show Results:
spark.sql('''select count(1) as keyword_click_product_oc from keyword_click_product_oc''').show()
spark.sql('''select * from keyword_click_product_oc order by n_users desc''').show(20)

+------------------------+
|keyword_click_product_oc|
+------------------------+
|                   13744|
+------------------------+

+------------+-------+
|     keyword|n_users|
+------------+-------+
|        ipad|   7554|
| hp touchpad|   4829|
|      lcd tv|   4606|
|   iphone 4s|   4585|
|      laptop|   3554|
|       beats|   3498|
|     laptops|   3369|
|        ipod|   2949|
|  ipod touch|   2931|
|      ipad 2|   2842|
|      kindle|   2833|
|    touchpad|   2785|
|   star wars|   2564|
|      iphone|   2430|
|beats by dre|   2328|
|     macbook|   2313|
|  headphones|   2270|
|        bose|   2071|
|         ps3|   2041|
|         mac|   1851|
+------------+-------+
only showing top 20 rows



In [18]:
# calculate PMI2

#Calculation:
spark.sql('''
select k1.keyword as k1, k2.keyword as k2, k1_k2.users_cooc, k1.n_users as n_users1,k2.n_users as n_users2,
log(pow(k1_k2.users_cooc,2)/(k1.n_users*k2.n_users)) as pmi2
from
keyword_click_product_cooc as k1_k2 
join
keyword_click_product_oc as k1 on k1_k2.k1 = k1.keyword
join
keyword_click_product_oc as k2 on k1_k2.k2 = k2.keyword
''').createOrReplaceTempView('product_related_keywords_pmi')

In [19]:
#Show Results:
spark.sql('''select count(1) as related_keywords_pmi from product_related_keywords_pmi''').show()
spark.sql('''select * from product_related_keywords_pmi order by pmi2 desc''').show(20)

+--------------------+
|related_keywords_pmi|
+--------------------+
|             1579710|
+--------------------+

+-------------------+-------------------+----------+--------+--------+------------------+
|                 k1|                 k2|users_cooc|n_users1|n_users2|              pmi2|
+-------------------+-------------------+----------+--------+--------+------------------+
|     hp touchpad 32|        hp touchpad|      4022|       1|    4829| 8.116674454791653|
|          pad pivot|        hp touchpad|      4022|       1|    4829| 8.116674454791653|
|        hp touchpad|     hp tablet 32gb|      4022|    4829|       1| 8.116674454791653|
|        hp touchpad|    hp tablet 32 gb|      4022|    4829|       1| 8.116674454791653|
|           touchpad|    hp tablet 32 gb|      2350|    2785|       1| 7.592338061915025|
|           touchpad|          pad pivot|      2350|    2785|       1| 7.592338061915025|
|           touchpad|     hp tablet 32gb|      2350|    2785|       1| 7.5

In [20]:
# calculate comp_score

#Calculation:
spark.sql('''
select  *, (r1 + r2 /( r1 * r2))/2 as comp_score from (
 select *, 
   rank() over (partition by 1 order by users_cooc desc )  r1 , 
   rank() over (partition by 1 order by pmi2 desc )  r2  
  from product_related_keywords_pmi ) a  '''
).createOrReplaceTempView('product_related_keywords_comp_score')

In [21]:
#Show Results:
spark.sql( '''
  select count(1) product_related_keywords_comp_scores from product_related_keywords_comp_score
''').show()

spark.sql( '''
  select k1, k2, n_users1, n_users2, pmi2, comp_score 
  from product_related_keywords_comp_score
  order by comp_score asc
''').show(20)

+------------------------------------+
|product_related_keywords_comp_scores|
+------------------------------------+
|                             1579710|
+------------------------------------+

+----------+-----------+--------+--------+------------------+------------------+
|        k1|         k2|n_users1|n_users2|              pmi2|        comp_score|
+----------+-----------+--------+--------+------------------+------------------+
|      ipad|hp touchpad|    7554|    4829|1.2318940540272372|               1.0|
|    ipad 2|       ipad|    2842|    7554| 1.430517155037946|              1.25|
|    tablet|       ipad|    1818|    7554|1.6685364924472557|1.6666666666666667|
|  touchpad|       ipad|    2785|    7554|1.2231908670315748|             2.125|
|   tablets|       ipad|    1627|    7554|1.7493143317791537|               2.6|
|     ipad2|       ipad|    1254|    7554|1.9027023623302282|3.0833333333333335|
|      ipad|      apple|    7554|    1814|1.4995901756327583|3.571428571428

We can see above that the related keywords are k1 and k2. The low comp_score indicates that they are closely related keywords. We can see that our results are giving somewhat semantically related keywords.

## To find the wrongly spelled words

In [22]:
import nltk
from collections import defaultdict
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
import numpy as np
import re
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /home/jovyan/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [23]:
import json

In [25]:
collection="products"
query="moden"

request = {
    "params": {
        "q.op": "and",
        "rows": 0,
        "indent": "on"
    },
    "query": query,
}

search_results = requests.post(f"{SOLR_URL}/{collection}/spell", json=request).json()
print(json.dumps(search_results["spellcheck"]["collations"], indent=4))

[]


In [26]:
#use the real signals
signals_collection="signals"
signals_opts={"zkhost": "search-zk", "collection": signals_collection}
df = spark.read.format("solr").options(**signals_opts).load()
df.createOrReplaceTempView("signals")

In [27]:
### Create user-searchs table each raw represent one search query.
query_signals = spark.sql("""
  select lower(trim(searches.target)) as keyword, searches.user as user 
  from signals as searches where searches.type='query'
  group by keyword, user"""
).collect()

### Now we tokenise the queries and find word frequencies. For the frequency distribution quantiles, we can choose the cutoff point to get only useful misspellings

In [28]:
stop_words = set(stopwords.words('english'))
word_list = defaultdict(int)

for row in query_signals:
    query = row["keyword"]
    tokenizer = RegexpTokenizer(r'\w+') 
    tokens   = tokenizer.tokenize(query)
    
    for token in tokens:
        if token not in stop_words and len(token) > 3 and not token.isdigit():  #drop stopwords and digit only tokens
            # and only consider token length > 3, since hard to judge whether a very short token is misspelled or not
            word_list[token] += 1

In [29]:
quantiles_to_check = [0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9]
quantile_values = np.quantile(np.array(list(word_list.values())), quantiles_to_check)
quantiles = dict(zip(quantiles_to_check, quantile_values))
quantiles

{0.1: 5.0,
 0.2: 6.0,
 0.3: 8.0,
 0.4: 12.0,
 0.5: 16.0,
 0.6: 25.0,
 0.7: 47.0,
 0.8: 142.20000000000027,
 0.9: 333.2000000000007}

In [30]:
misspell_candidates = []
correction_candidates = []
misspell_counts = []
correction_counts = []
misspell_length = []
correction_length = []
misspell_initial = []
correction_initial = []


#consider word with low count as misspelling condidates, with high count as correctly spelled candidates. 
for k, v in word_list.items():
    if v <= quantiles[0.2] : #if v == 1:  # this number based on quantile analysis and the data set, more-likely with user-behvaiour data set to be 1
        misspell_candidates.append(k)
        misspell_counts.append(v)
        misspell_length.append(len(k))
        misspell_initial.append(k[0])
    if v >= quantiles[0.8]:
        correction_candidates.append(k)
        correction_counts.append(v)
        correction_length.append(len(k))
        correction_initial.append(k[0])

In [31]:
misspell_candidates_df = pd.DataFrame({
    "misspell":misspell_candidates, 
    "misspell_counts":misspell_counts, 
    "misspell_length":misspell_length,
    "initial":misspell_initial})

correction_candidates_df = pd.DataFrame({
    "correction":correction_candidates, 
    "correction_counts":correction_counts, 
    "correction_length":correction_length,
    "initial":correction_initial})

In [32]:
#Show Results:
misspell_candidates_df.head(10)

Unnamed: 0,misspell,misspell_counts,misspell_length,initial
0,misery,5,6,m
1,profile,5,7,p
2,financial,6,9,f
3,dinosaur,6,8,d
4,raekwon,5,7,r
5,hurt,6,4,h
6,grinch,6,6,g
7,lexar,5,5,l
8,italian,6,7,i
9,1350lmt,6,7,1


We calculate the edit distance for the misspelled word and the correct word

In [33]:
def good_match(len1, len2, edit_dist): #allow longer words have more edit distance
    match = 0
    min_length = min(len1, len2)
    if min_length < 8:
        if edit_dist == 1: match = 1
    elif min_length < 11:
        if edit_dist <= 2: match = 1
    else:
        if edit_dist == 3: match = 1
    return match

In [34]:
matches_candidates = pd.merge(misspell_candidates_df, correction_candidates_df, on="initial")
#join missepll list with correction list based on whether they share the same initials to reduce matching time. 
matches_candidates["edit_dist"] = matches_candidates.apply(
    lambda row: nltk.edit_distance(row.misspell,row.correction), axis=1)
matches_candidates["good_match"] = matches_candidates.apply(
    lambda row: good_match(row.misspell_length, row.correction_length, row.edit_dist),axis=1)

In [35]:
matches = matches_candidates[matches_candidates["good_match"] == 1].drop(["initial","good_match"],axis=1)

### We prefer shorter edit distance and having higher frequency counts.

In [36]:
matches_final = matches.groupby('misspell').first().reset_index()

In [37]:
matches_final.sort_values(by=['correction_counts'], ascending=[False])[["misspell", "correction", "misspell_counts", "correction_counts", "edit_dist"]].head(20)

Unnamed: 0,misspell,correction,misspell_counts,correction_counts,edit_dist
50,iphone3,iphone,6,16854,1
61,laptopa,laptop,6,14119,1
62,latop,laptop,5,14119,1
136,toucpad,touchpad,6,11550,1
137,touxhpad,touchpad,5,11550,1
148,wirless,wireless,6,10060,1
127,tableta,tablet,6,8260,1
8,cage,case,6,7541,1
10,cape,case,5,7541,1
30,gallaxy,galaxy,6,5839,1
