In [0]:
from pyspark.sql.functions import lit
from operator import add
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Word2Vec

import os
import numpy as np
import pandas as pd
import pickle
import copy
from operator import add

In [0]:
spark.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|            invoices|      false|
| default|product_descriptions|      false|
+--------+--------------------+-----------+



In [0]:
invoices = spark.sql("SELECT * FROM invoices")
invoices.head()

Out[59]: Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity='6', InvoiceDate='12/1/10 8:26', UnitPrice='2.55', CustomerID='17850', Country='United Kingdom')

In [0]:
products_only = invoices.drop('InvoiceNo', 'Quantity', 'funny', 'InvoiceDate', 'UnitPrice', 'CustomerID', 'Country')
products_only = products_only.na.drop()
products_only.show(1000)

+---------+--------------------+
|StockCode|         Description|
+---------+--------------------+
|   85123A|WHITE HANGING HEA...|
|    71053| WHITE METAL LANTERN|
|   84406B|CREAM CUPID HEART...|
|   84029G|KNITTED UNION FLA...|
|   84029E|RED WOOLLY HOTTIE...|
|    22752|SET 7 BABUSHKA NE...|
|    21730|GLASS STAR FROSTE...|
|    22633|HAND WARMER UNION...|
|    22632|HAND WARMER RED P...|
|    84879|ASSORTED COLOUR B...|
|    22745|POPPY'S PLAYHOUSE...|
|    22748|POPPY'S PLAYHOUSE...|
|    22749|FELTCRAFT PRINCES...|
|    22310|IVORY KNITTED MUG...|
|    84969|BOX OF 6 ASSORTED...|
|    22623|BOX OF VINTAGE JI...|
|    22622|BOX OF VINTAGE AL...|
|    21754|HOME BUILDING BLO...|
|    21755|LOVE BUILDING BLO...|
|    21777|RECIPE BOX WITH M...|
|    48187| DOORMAT NEW ENGLAND|
|    22960|JAM MAKING SET WI...|
|    22913|RED COAT RACK PAR...|
|    22912|YELLOW COAT RACK ...|
|    22914|BLUE COAT RACK PA...|
|    21756|BATH BUILDING BLO...|
|    22728|ALARM CLOCK BAKEL...|
|    22727

In [0]:

unique_products = products_only.dropDuplicates(["StockCode"])
unique_products.show(5)
unique_products.count()

unique_products.write.saveAsTable('product_descriptions', mode = 'overwrite')

+---------+--------------------+
|StockCode|         Description|
+---------+--------------------+
|    10002|INFLATABLE POLITI...|
|    10120|        DOGGY RUBBER|
|   10123C|HEARTS WRAPPING T...|
|   10124A|SPOTS ON RED BOOK...|
|   10124G|ARMY CAMO BOOKCOV...|
+---------+--------------------+
only showing top 5 rows



In [0]:
#tokenize the text
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'Description', outputCol = 'text_token')

product_tokens = regexTokenizer.transform(unique_products)
product_tokens.show(3000)

+------------+--------------------+--------------------+
|   StockCode|         Description|          text_token|
+------------+--------------------+--------------------+
|       10002|INFLATABLE POLITI...|[inflatable, poli...|
|       10120|        DOGGY RUBBER|     [doggy, rubber]|
|      10123C|HEARTS WRAPPING T...|[hearts, wrapping...|
|      10124A|SPOTS ON RED BOOK...|[spots, on, red, ...|
|      10124G|ARMY CAMO BOOKCOV...|[army, camo, book...|
|       10125|MINI FUNKY DESIGN...|[mini, funky, des...|
|       10133|COLOURING PENCILS...|[colouring, penci...|
|       10135|COLOURING PENCILS...|[colouring, penci...|
|       11001|ASSTD DESIGN RACI...|[asstd, design, r...|
|       15034|PAPER POCKET TRAV...|[paper, pocket, t...|
|       15036|ASSORTED COLOURS ...|[assorted, colour...|
|       15039|      SANDALWOOD FAN|   [sandalwood, fan]|
|      15044A| PINK PAPER PARASOL |[pink, paper, par...|
|      15044B| BLUE PAPER PARASOL |[blue, paper, par...|
|      15044C|PURPLE PAPER PARA

In [0]:
from pyspark.ml.feature import StopWordsRemover
swr = StopWordsRemover(inputCol = 'text_token', outputCol = 'text_sw_removed')

products_swr = swr.transform(product_tokens)
products_swr.show(3000)
products_swr.write.saveAsTable('products_descriptions_clean', mode = 'overwrite')

+------------+--------------------+--------------------+--------------------+
|   StockCode|         Description|          text_token|     text_sw_removed|
+------------+--------------------+--------------------+--------------------+
|       10002|INFLATABLE POLITI...|[inflatable, poli...|[inflatable, poli...|
|       10120|        DOGGY RUBBER|     [doggy, rubber]|     [doggy, rubber]|
|      10123C|HEARTS WRAPPING T...|[hearts, wrapping...|[hearts, wrapping...|
|      10124A|SPOTS ON RED BOOK...|[spots, on, red, ...|[spots, red, book...|
|      10124G|ARMY CAMO BOOKCOV...|[army, camo, book...|[army, camo, book...|
|       10125|MINI FUNKY DESIGN...|[mini, funky, des...|[mini, funky, des...|
|       10133|COLOURING PENCILS...|[colouring, penci...|[colouring, penci...|
|       10135|COLOURING PENCILS...|[colouring, penci...|[colouring, penci...|
|       11001|ASSTD DESIGN RACI...|[asstd, design, r...|[asstd, design, r...|
|       15034|PAPER POCKET TRAV...|[paper, pocket, t...|[paper, 

In [0]:
products_swr.show(1000)

+---------+--------------------+--------------------+--------------------+
|StockCode|         Description|          text_token|     text_sw_removed|
+---------+--------------------+--------------------+--------------------+
|    10002|INFLATABLE POLITI...|[inflatable, poli...|[inflatable, poli...|
|    10120|        DOGGY RUBBER|     [doggy, rubber]|     [doggy, rubber]|
|   10123C|HEARTS WRAPPING T...|[hearts, wrapping...|[hearts, wrapping...|
|   10124A|SPOTS ON RED BOOK...|[spots, on, red, ...|[spots, red, book...|
|   10124G|ARMY CAMO BOOKCOV...|[army, camo, book...|[army, camo, book...|
|    10125|MINI FUNKY DESIGN...|[mini, funky, des...|[mini, funky, des...|
|    10133|COLOURING PENCILS...|[colouring, penci...|[colouring, penci...|
|    10135|COLOURING PENCILS...|[colouring, penci...|[colouring, penci...|
|    11001|ASSTD DESIGN RACI...|[asstd, design, r...|[asstd, design, r...|
|    15034|PAPER POCKET TRAV...|[paper, pocket, t...|[paper, pocket, t...|
|    15036|ASSORTED COLOU

In [0]:
from pyspark.ml.feature import Word2Vec

#create an average word vector for each document (works well according to Zeyu & Shu)
word2vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = 'text_sw_removed', outputCol = 'result')
#word2vec.setMaxIter(10)
#word2vec.getMaxIter()
#word2vec.clear(word2vec.maxIter)
model = word2vec.fit(products_swr)
result = model.transform(products_swr)

result.show(3)
result.select('result').show(1, truncate = True)


+---------+--------------------+--------------------+--------------------+--------------------+
|StockCode|         Description|          text_token|     text_sw_removed|              result|
+---------+--------------------+--------------------+--------------------+--------------------+
|    10002|INFLATABLE POLITI...|[inflatable, poli...|[inflatable, poli...|[0.0,0.0,0.0,0.0,...|
|    10120|        DOGGY RUBBER|     [doggy, rubber]|     [doggy, rubber]|[0.0,0.0,0.0,0.0,...|
|   10123C|HEARTS WRAPPING T...|[hearts, wrapping...|[hearts, wrapping...|[-6.0603047313634...|
+---------+--------------------+--------------------+--------------------+--------------------+
only showing top 3 rows

+--------------------+
|              result|
+--------------------+
|[0.0,0.0,0.0,0.0,...|
+--------------------+
only showing top 1 row



In [0]:
result.count()

Out[69]: 2951

In [0]:
result.printSchema()
result.write.saveAsTable("ProductEmbeddings", mode = "overwrite")

root
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- text_token: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- text_sw_removed: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- result: vector (nullable = true)



In [0]:
result.show()

+---------+--------------------+--------------------+--------------------+--------------------+
|StockCode|         Description|          text_token|     text_sw_removed|              result|
+---------+--------------------+--------------------+--------------------+--------------------+
|    10002|INFLATABLE POLITI...|[inflatable, poli...|[inflatable, poli...|[0.0,0.0,0.0,0.0,...|
|    10120|        DOGGY RUBBER|     [doggy, rubber]|     [doggy, rubber]|[0.0,0.0,0.0,0.0,...|
|   10123C|HEARTS WRAPPING T...|[hearts, wrapping...|[hearts, wrapping...|[-6.0603047313634...|
|   10124A|SPOTS ON RED BOOK...|[spots, on, red, ...|[spots, red, book...|[-0.0057326439418...|
|   10124G|ARMY CAMO BOOKCOV...|[army, camo, book...|[army, camo, book...|[-4.4406985398381...|
|    10125|MINI FUNKY DESIGN...|[mini, funky, des...|[mini, funky, des...|[-6.5719266422092...|
|    10133|COLOURING PENCILS...|[colouring, penci...|[colouring, penci...|[-0.0030426380981...|
|    10135|COLOURING PENCILS...|[colouri

In [0]:
synonyms = model.findSynonyms("clock", 5)
synonyms.show()

+-------+------------------+
|   word|        similarity|
+-------+------------------+
|  small|0.9502884149551392|
|  cakes|0.9444218277931213|
|   cake|0.9418295621871948|
|   wood|0.9387632012367249|
|garland|0.9356454610824585|
+-------+------------------+



In [0]:
# save model to bucket
publicKey = ""
privateKey = ""
try:
  dbutils.fs.unmount(mntPath)
except:
  print 'unmount didnt work'
import os
## Mount S3 bucket to the Databricks File System
s3Path = "s3a://{0}:{1}@{2}".format(publicKey, 
                                    privateKey, 
                                    "nycdsa-yelp")
mntPath = "/mnt/data/"
try:
  dbutils.fs.mount(s3Path, mntPath)
except:
  print 'failed to mount'
  
model.save("{}/{}".format(mntPath, 'yelp/word2vec_model'))



In [0]:
# Calculate cosine similarity between two vectors 
def cossim(v1, v2): 
    return np.dot(v1, v2) / np.sqrt(np.dot(v1, v1)) / np.sqrt(np.dot(v2, v2)) 

In [0]:
def content_recommend(business_id, docvecs):
    input_vec = docvecs.select('result').filter(docvecs['business_id'] ==
                                                business_id).collect()[0][0]   
    docvecs = docvecs.select('business_id', 
                             'result').rdd.map(lambda x: (x[0], x[1])).collect()
    
    #compute similarity array
    t = sc.parallelize((i[0], float(cossim(input_vec, i[1]))) for i in docvecs)
    
    # recommendation's cossim values
    similarity = spark.createDataFrame(t).\
    withColumnRenamed('_1', 'business_id').\
    withColumnRenamed('_2', 'similarity').\
    orderBy("similarity", ascending = False)
  
    return similarity 


In [0]:
def keyword_recommend(input_str, docvecs):
    # run input_str through preprocessing pipeline
    x = sc.parallelize([(1, input_str)]).toDF(['StockCode', 'Description'])
    regexTokenizer = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'Description', outputCol = 'text_token')
    x_token = regexTokenizer.transform(x)
    swr = StopWordsRemover(inputCol = 'text_token', outputCol = 'text_sw_removed')
    x_swr = swr.transform(x_token)
    # run word2vec model on input string
    input_vec = model.transform(x_swr)
    input_vec.show()
    input_vec = input_vec.select('result').collect()[0][0]
    
    docvecs = docvecs.select('StockCode', 
                             'result').rdd.map(lambda x: (x[0], x[1])).collect()
    
    #compute similarity array
    t = sc.parallelize((i[0], float(cossim(input_vec, i[1]))) for i in docvecs)
    
    # recommendation's cossim values
    similarity = spark.createDataFrame(t).\
        withColumnRenamed('_1', 'StockCode').\
        withColumnRenamed('_2', 'similarity').\
        orderBy("similarity", ascending = False)
  
    return similarity 

keyword_recommend("WHITE METAL LANTERN", result).show(truncate = False)

+---------+-------------------+--------------------+--------------------+--------------------+
|StockCode|        Description|          text_token|     text_sw_removed|              result|
+---------+-------------------+--------------------+--------------------+--------------------+
|        1|WHITE METAL LANTERN|[white, metal, la...|[white, metal, la...|[-0.0033777436862...|
+---------+-------------------+--------------------+--------------------+--------------------+

  return np.dot(v1, v2) / np.sqrt(np.dot(v1, v1)) / np.sqrt(np.dot(v2, v2))
+---------+----------+
|StockCode|similarity|
+---------+----------+
|21275    |NaN       |
|22351    |NaN       |
|S        |NaN       |
|22745    |NaN       |
|AMAZONFEE|NaN       |
|10002    |NaN       |
|C2       |NaN       |
|22746    |NaN       |
|D        |NaN       |
|21833    |NaN       |
|DOT      |NaN       |
|22929    |NaN       |
|M        |NaN       |
|10120    |NaN       |
|POST     |NaN       |
|84356    |NaN       |
|m        |

In [0]:
df = word2vec.getVectors()
df.printSchema()
print("vocabulary size = {}".format(df.count()))

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-3816990414651736>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdf[0m [0;34m=[0m [0mword2vec[0m[0;34m.[0m[0mgetVectors[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      2[0m [0mdf[0m[0;34m.[0m[0mprintSchema[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0mprint[0m[0;34m([0m[0;34m"vocabulary size = {}"[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0mdf[0m[0;34m.[0m[0mcount[0m[0;34m([0m[0;34m)[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;31mAttributeError[0m: 'Word2Vec' object has no attribute 'getVectors'