In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/elasticsearch-spark-30_2.12-8.4.1.jar pyspark-shell'

from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder \
    .appName("recommendation_system") \
    .config("spark.jars", "/home/jovyan/elasticsearch-spark-30_2.12-8.4.1.jar") \
    .config("spark.es.nodes.wan.only","true") \
    .config("spark.es.nodes","172.22.0.2") \
    .config("spark.es.port","9200") \
    .getOrCreate()
spark

In [2]:
import gzip
import json
from pyspark.sql.types import *


def parse(path):
    g = gzip.open(path, 'rb')
    for l in g:
        yield json.loads(l)

        
def getMetaData(path):
    data = []
    data_schema =  [
                       StructField("asin", StringType(), True),
                       StructField("title", StringType(), True),
                       StructField("brand", StringType(), True),
                       StructField("category", ArrayType(StringType(), True), True),
                       StructField("main_category", StringType(), True),
                       StructField("image", ArrayType(StringType(), True), True)
                   ]
    final_schema = StructType(fields=data_schema)
    for d in parse(path):
        review = {}
        review['asin'] = d['asin']
        review['title'] = d['title']
        review['brand'] = d['brand']
        review['category'] = d['category']
#         print(d['category'])
        review['main_category'] = next(reversed(d['category']), None) if len(d['category'])!= 0 else ''
    
        #try:
        #    review['image'] = d['image']
        #except KeyError:
        #    review['image'] = ['none']

        data.append(review)
#   print(df)
    return spark.createDataFrame(data, schema=final_schema)

product_data = getMetaData('./data/meta_Appliances.json.gz')
product_data = product_data.dropDuplicates(['asin'])
product_data.limit(1).toPandas()
# product_data.printSchema()

Unnamed: 0,asin,title,brand,category,main_category,image
0,8792559360,The Cigar - Moments of Pleasure,The Cigar Book,"[Appliances, Parts &amp; Accessories]",Parts &amp; Accessories,


In [3]:
product_data.groupBy("main_category").count().orderBy(col('count').desc()).show(100)

+--------------------+-----+
|       main_category|count|
+--------------------+-----+
| Parts & Accessories| 4513|
|Refrigerator Part...| 3733|
|Washer Parts & Ac...| 2270|
|Dishwasher Parts ...| 1790|
|Range Parts & Acc...| 1710|
|       Water Filters| 1572|
|   Replacement Parts| 1556|
|Cooktop Parts & A...| 1171|
|         Range Hoods|  951|
|Humidifier Parts ...|  887|
|                    |  805|
|       Refrigerators|  722|
|Oven Parts & Acce...|  645|
|          Ice Makers|  453|
|            Cooktops|  436|
| Freestanding Ranges|  412|
|               Knobs|  406|
|Freezer Parts & A...|  360|
|Built-In Dishwashers|  357|
|         Accessories|  341|
|             Washers|  302|
|                Bins|  273|
|              Dryers|  253|
|               Vents|  243|
|Dryer Parts & Acc...|  235|
|              Motors|  224|
|             Filters|  213|
|     Humidity Meters|  185|
|   Replacement Wicks|  177|
|Refrigerators, Fr...|  175|
|Ranges, Ovens & C...|  169|
|Range Hood Pa

In [4]:
from elasticsearch import Elasticsearch

# test your ES instance is running
es = Elasticsearch('http://172.22.0.2:9200')
es.info(pretty=True)

ObjectApiResponse({'name': '8d5e3008021d', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'eoXjfjhuR5ifktX3ElvQsA', 'version': {'number': '8.4.1', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '2bd229c8e56650b42e40992322a76e7914258f0c', 'build_date': '2022-08-26T12:11:43.232597118Z', 'build_snapshot': False, 'lucene_version': '9.3.0', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

In [5]:
if es.indices.exists(index="products"):
    es.indices.delete(index="products")
VECTOR_DIM = 25

product_mapping = {
    # this mapping definition sets up the metadata fields for the products
    "properties": {
        "asin": {
            "type": "keyword"
        },
        "title": {
            "type": "keyword"
        },
        "image": {
            "type": "keyword"
        },
        "brand": {
            "type": "keyword"
        },
        "category": {
            "type": "keyword"
        },
        "main_category": {
            "type": "keyword"
        },
        # the following fields define our model factor vectors and metadata
        "model_factor": {
            "type": "dense_vector",
            "dims" : VECTOR_DIM
        },
        "model_version": {
            "type": "keyword"
        },
        "model_timestamp": {
            "type": "date"
        }          
    }
}

res_products = es.indices.create(index="products", mappings=product_mapping)

print("Created indices:")
print(res_products)

Created indices:
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'products'}


In [6]:
 es.count(index="products")['count']

0

In [7]:
product_data.write.format("es").option("es.mapping.id", "asin").save("products")
num_products_df = product_data.count()
num_products_es = es.count(index="products")['count']
# check load went ok
print("Product DF count: {}".format(num_products_df))
print("ES index count: {}".format(num_products_es))

Product DF count: 30239
ES index count: 30239


In [8]:
es.search(index="products", q="main_category:Refrigerators", size=3)

ObjectApiResponse({'took': 43, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 722, 'relation': 'eq'}, 'max_score': 3.7342029, 'hits': [{'_index': 'products', '_id': 'B000E846DA', '_score': 3.7342029, '_source': {'asin': 'B000E846DA', 'title': 'Fisher Paykel E522BLX 17.6 cu ft Bottom-Freezer Refrigerator - Stainless Steel with Left Hinge', 'brand': 'Fisher Paykel', 'category': ['Appliances', 'Refrigerators, Freezers & Ice Makers', 'Refrigerators'], 'main_category': 'Refrigerators'}}, {'_index': 'products', '_id': 'B000EMNKOC', '_score': 3.7342029, '_source': {'asin': 'B000EMNKOC', 'title': 'Frigidaire FRS6R5ESB 26 Cu. Ft. Side-by-Side Refrigerator (Stainless Steel)', 'brand': 'Frigidaire', 'category': ['Appliances', 'Refrigerators, Freezers & Ice Makers', 'Refrigerators'], 'main_category': 'Refrigerators'}}, {'_index': 'products', '_id': 'B000EPN8ZK', '_score': 3.7342029, '_source': {'asin': 'B000EPN8ZK', 'title': '4.

In [9]:
def getRatingData(path):
    data = []
    data_schema = [
               StructField("asin", StringType(), True),
               StructField("reviewerId", StringType(), True),
               StructField("rating", FloatType(), True)]
    final_schema = StructType(fields=data_schema)
    for d in parse(path):
        review = {}
        review['asin'] = d['asin']
        review['reviewerId'] = d['reviewerID']
        review['rating'] = d['overall']
        data.append(review)
#   print(df)
    return spark.createDataFrame(data, schema=final_schema)

df_rating= getRatingData('./data/Appliances.json.gz')
df_rating.limit(3).toPandas()

Unnamed: 0,asin,reviewerId,rating
0,1118461304,A3NHUQ33CFH3VM,5.0
1,1118461304,A3SK6VNBQDNBJE,5.0
2,1118461304,A3SOFHUR27FO3K,5.0


In [10]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

indexer = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in list(set(df_rating.columns)-set(['rating'])) ]
pipeline = Pipeline(stages=indexer)
transformed = pipeline.fit(df_rating).transform(df_rating)
transformed.show()

+----------+--------------+------+----------------+----------+
|      asin|    reviewerId|rating|reviewerId_index|asin_index|
+----------+--------------+------+----------------+----------+
|1118461304|A3NHUQ33CFH3VM|   5.0|           118.0|    2229.0|
|1118461304|A3SK6VNBQDNBJE|   5.0|        396610.0|    2229.0|
|1118461304|A3SOFHUR27FO3K|   5.0|        397010.0|    2229.0|
|1118461304|A1HOG1PYCAE157|   5.0|        122436.0|    2229.0|
|1118461304|A26JGAM6GZMM4V|   5.0|        204413.0|    2229.0|
|1118461304|A17K8WANMYHTX2|   5.0|         88869.0|    2229.0|
|1118461304|A13IW3A6W43U0G|   5.0|         75566.0|    2229.0|
|1118461304|A1ECEGG1MP7J8J|   5.0|        111423.0|    2229.0|
|1118461304|A2D5X9G9S3A7RN|   5.0|        226519.0|    2229.0|
|1118461304| AP2F86JFRQ205|   5.0|        479108.0|    2229.0|
|1118461304|A3VF3A5A3O04E1|   4.0|        406281.0|    2229.0|
|1118461304|A14DW5UMQ1M96O|   5.0|         78358.0|    2229.0|
|1118461304|A2V7UVKOFG57IW|   4.0|        286083.0|    

In [11]:
als=ALS(maxIter=5,regParam=0.09,rank=25,userCol="reviewerId_index",itemCol="asin_index",ratingCol="rating",coldStartStrategy="drop",nonnegative=True)
model=als.fit(transformed)

In [None]:
model.itemFactors.count()

In [None]:
evaluator=RegressionEvaluator(metricName="rmse",labelCol="rating",predictionCol="prediction")
predictions=model.transform(transformed)
rmse=evaluator.evaluate(predictions)
print("RMSE="+str(rmse))
predictions.show()

In [None]:
from pyspark.sql.functions import lit, current_timestamp, unix_timestamp
ver = model.uid
ts = unix_timestamp(current_timestamp())
product_vectors = model.itemFactors.select("id",\
                                         col("features").alias("model_factor"),\
                                         lit(ver).alias("model_version"),\
                                         ts.alias("model_timestamp"))
product_vectors.show(2)

In [None]:
asin_index_meta = [
    f.metadata for f in transformed.schema.fields if f.name == "asin_index"]
asin_index_labels = asin_index_meta[0]["ml_attr"]["vals"]

from pyspark.ml.feature import IndexToString

reviewerId_converter = IndexToString(inputCol="id", outputCol="asin",   labels=asin_index_labels)
PredictedLabels = reviewerId_converter.transform(product_vectors)
PredictedLabels = PredictedLabels.drop('id')
PredictedLabels.show(10)

In [None]:
PredictedLabels.count()

In [None]:
PredictedLabels.write.format("es") \
    .option("es.mapping.id", "asin") \
    .option("es.write.operation", "upsert") \
    .save("products", mode="append")

In [None]:
import pandas as pd

def vector_query(query_vec, category,vector_field, cosine=False):
    """
    Construct an Elasticsearch script score query using `dense_vector` fields
    
    The script score query takes as parameters the query vector (as a Python list)
    
    Parameters
    ----------
    query_vec : list
        The query vector
    vector_field : str
        The field name in the document against which to score `query_vec`
    q : str, optional
        Query string for the search query (default: '*' to search across all documents)
    cosine : bool, optional
        Whether to compute cosine similarity. If `False` then the dot product is computed (default: False)
     
    Note: Elasticsearch cannot rank negative scores. Therefore, in the case of the dot product, a sigmoid transform
    is applied. In the case of cosine similarity, 1.0 is added to the score. In both cases, documents with no 
    factor vectors are ignored by applying a 0.0 score.
    
    The query vector passed in will be the user factor vector (if generating recommended items for a user)
    or product factor vector (if generating similar items for a given item)
    """
    
    if cosine:
        score_fn = "doc['{v}'].size() == 0 ? 0 : cosineSimilarity(params.vector, '{v}') + 1.0"
    else:
        score_fn = "doc['{v}'].size() == 0 ? 0 : sigmoid(1, Math.E, -dotProduct(params.vector, '{v}'))"
       
    score_fn = score_fn.format(v=vector_field, fn=score_fn)
    
    return {
    "script_score": {
        "query" : { 
            "bool" : {
                  "filter" : {
                        "term" : {
                          "main_category" : category
                        }
                    }
            }
        },
        "script": {
            "source": score_fn,
            "params": {
                "vector": query_vec
            }
        }
    }
}


def get_similar(the_id, num=10, index="products", vector_field='model_factor'):
    """
    Given a item id, execute the recommendation script score query to find similar items,
    ranked by cosine similarity. We return the `num` most similar, excluding the item itself.
    """
    response = es.get(index=index, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        category = src['main_category']
        q = vector_query(query_vec, category,vector_field, cosine=True)
#         print(q)
        results = es.search(index=index, query=q)
        hits = results['hits']['hits']
        return src,hits[1:num+1]

def display_similar(the_id, num=10, es_index="products"):
    """
    Display query product, together with similar product and similarity scores, in a table
    """
    product, recs = get_similar(the_id, num, es_index)
       
    display(HTML("<h2>Get similar products for:</h2>"))
    display(HTML("<h4>%s (ASIN - %s)</h4>" % (product['title'], product['asin'])))
    display(HTML("<br>"))
    display(HTML("<h2>People who liked this product also liked these:</h2>"))
    sim_html = "<table border=0>"
    i = 0
    pd_data = []
    for rec in recs:
        r_score = rec['_score']
        r_title = rec['_source']['title']
        r = {}
        r['asin'] = rec['_source']['asin']
        r['title'] = r_title
        r['score'] = r_score
        pd_data.append(r)
        #r_im_url = next(iter(rec['_source']['image']), '')
        sim_html += "<tr><td><h5>%s</h5></td><td><h5>%2.3f</h5></td></tr>" % (r_title, r_score)
        i += 1
    sim_html += "</table>"
    pd.set_option('display.max_colwidth', None) 
    pd_df = pd.DataFrame (pd_data)
    display(HTML(pd_df.to_html()))
    #display(HTML(sim_html))


In [None]:
from IPython.display import Image, HTML, display
data = display_similar('B001A5HT94', num=5)