## History Based Recommendation Engine Using Pyspark and Pywebio

#### Major Tools used 
    1) Pyspark = for faster data and text preprocessing and provides various libraries for data transformation
    
    2) Pywebio = is used to build interactive UI plateform for our ML model 
    
    
#### benefits of using pyspark
 - parallel processing
 
 - high speed for data preprocessing
 
 - provides large functionalites for data transformation
 


In [1]:
import findspark

## initiate spark setup
findspark.init()

#### Import All Required Libraries

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Word2Vec,Tokenizer,HashingTF,IDF,Normalizer
from pyspark.ml.linalg import VectorUDT
from pyspark.sql import functions as F
from pyspark.ml.pipeline import Pipeline
from pyspark.sql import functions as  F
from pyspark.sql.types import *
import pandas as pd
import sqlite3
import numpy as np
from pywebio.input import *
from pywebio.output import *

In [3]:
sc = SparkSession.builder.appName("History-Based_Recommendation-Engine").getOrCreate()
sc

In [4]:
data_path = "db.sqlite3"
con = sqlite3.connect(data_path)

In [5]:
## get all table names
pd.read_sql_query('SELECT name from sqlite_master where type= "table";', con)

Unnamed: 0,name
0,django_migrations
1,sqlite_sequence
2,auth_group_permissions
3,auth_user_groups
4,auth_user_user_permissions
5,django_admin_log
6,django_content_type
7,auth_permission
8,auth_group
9,auth_user


In [6]:
data = pd.read_sql_query("select * from 'shop_product_detail';",con)

In [7]:
## create spark dataframe from pandas dataframe
sdf = sc.createDataFrame(data[["product_id","product_name","description","brand"]])

In [8]:
sdf.cache()

DataFrame[product_id: bigint, product_name: string, description: string, brand: string]

### Data preprocessing and preparation
    - text preprocessing
    - Tokenization
    - TF-IDF transformation
    - Normalization

In [9]:
## convert product name text into lower case
sdf= sdf.withColumn("product_name",F.lower(F.col("product_name")))

In [10]:
## text transformation
tokenizer = Tokenizer(inputCol="product_name",outputCol="text_tokens")
sdf = tokenizer.transform(sdf)
hashtf = HashingTF(inputCol="text_tokens",outputCol="text_tf")
sdf = hashtf.transform(sdf)
idf_md = IDF(inputCol="text_tf",outputCol="text_idf")
idfModel = idf_md.fit(sdf)
sdf = idfModel.transform(sdf)
normalizer = Normalizer(inputCol="text_tf",outputCol="text_norm")
sdf = normalizer.transform(sdf)

In [11]:
def transformData(d):
    """ transform product name text
    into set of vectors by applying tokenization,tfidf transformation,
    normalization"""
    d = tokenizer.transform(d)
    d = hashtf.transform(d)
    d = idfModel.transform(d)
    d = normalizer.transform(d)
    return d

In [12]:
def createData(s):
    """ create spark dataframe from 
    the input text """
    t = pd.DataFrame([s],columns=["product_name"])
    t = sc.createDataFrame(t)
    return t

In [13]:
def getSimilar(data,s1):
    """ compute the similarity score of input keyword with all products """
    t = transformData(createData(s1)).select("text_norm").collect()[0]["text_norm"]
    s_udf = F.udf(lambda x: float(x.dot(t)),DoubleType())
    new = data.withColumn("similarity_score",s_udf("text_norm")).orderBy("similarity_score",ascending=False)
    return new.head(10)

### Build Web UI for our history based rec-engine using pywebio

In [14]:

## declarations
cols = ["product_id","product_name","brand","similarity"]

# clear screen 
def homeApp():
    clear()

    put_markdown("## Search History Based Recommedation Engine")
    style(put_text("Note : User entered keywords are conisder as user search history"),"color:blue")
    put_link("Github Code Here",url="https://github.com/hasit73/HistoryBasedRecommendationEngine",new_window=True)

    input_grp = input_group(inputs=[

        input("Higher priority keyword ",type="text",name="high",required=True),
        input("Medium priority keyword ",type="text",name="medium",required=True),
        input("Low priority keyword ",type="text",name="low",required=True),

    ])
    h,m,l = input_grp["high"],input_grp["medium"],input_grp["low"]

    clear()
    style(put_text("Wait For While ..."),"color:blue")    
    
    with put_loading():
        prods = getSimilar(sdf,h)[:5]+getSimilar(sdf,m)[:3]+getSimilar(sdf,l)[:2]
        clear()
        style(put_markdown(f"### Results for Search history : {h} , {m} , {l}"),"color:brown")
        style(put_markdown("## Recommended Products"),"color:green")
        data = [[p["product_id"],p["product_name"],p["brand"],p["similarity_score"]] for p in prods]
        put_table(data,header=cols)
        put_buttons(["Back"],onclick=[homeApp])


In [15]:
homeApp()

#### What are the limitations of this recommendation engine

    1) Right now it takes time to recommended products for input keywords because pyspark follows lazy approach so that many computations are done at run time
    
    2) In few cases engine recommend products which are not suitable with input keywords it may be because of unbiased distribution of products and not all types of products are included in db