In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

import time

spark = SparkSession \
        .builder \
        .appName("Phone_Similarity") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [None]:
#import phone from mongodb


In [2]:
phone_data=spark.read.option("multiline","true").json('products.json')
phone_data.printSchema()
phone_data = phone_data.select('_id',
                                'title',
                                'category',
                                'color',
                                'memory',
                                'pin',
                                'ram',
                                'screenSize',
                                'status',
                                'price')

root
 |-- __v: long (nullable = true)
 |-- _id: string (nullable = true)
 |-- camera: string (nullable = true)
 |-- category: string (nullable = true)
 |-- checked: boolean (nullable = true)
 |-- color: string (nullable = true)
 |-- content: string (nullable = true)
 |-- createdAt: string (nullable = true)
 |-- description: string (nullable = true)
 |-- images: struct (nullable = true)
 |    |-- public_id: string (nullable = true)
 |    |-- url: string (nullable = true)
 |-- memory: long (nullable = true)
 |-- pin: long (nullable = true)
 |-- price: long (nullable = true)
 |-- product_id: string (nullable = true)
 |-- ram: long (nullable = true)
 |-- screenSize: string (nullable = true)
 |-- sold: long (nullable = true)
 |-- status: string (nullable = true)
 |-- title: string (nullable = true)
 |-- updatedAt: string (nullable = true)



In [3]:
from pyspark.ml.feature import VectorAssembler
phone_data.columns

['_id',
 'title',
 'category',
 'color',
 'memory',
 'pin',
 'ram',
 'screenSize',
 'status',
 'price']

In [4]:
phone_data.head(5)

[Row(_id='6194895c30e6b7130bb06add', title='iphone 13 pink', category='61947f86613ccbeacb59e5b8', color='Pink', memory=128, pin=3095, ram=6, screenSize='6.1', status='New', price=1100),
 Row(_id='61948b652d9fa1d9e7da2d3a', title='iphone 13 pro max white', category='61947f86613ccbeacb59e5b8', color='White', memory=256, pin=3300, ram=6, screenSize='6.3', status='New', price=1300),
 Row(_id='6194b8b0bb6b5b34d3a626b1', title='samsung galaxy z flip3 5g', category='619487730327b0eef3a53fe4', color='Black', memory=128, pin=3300, ram=8, screenSize='6.7', status='New', price=1050),
 Row(_id='6194b9aebb6b5b34d3a626b7', title='samsung galaxy a72 shiny black', category='619487730327b0eef3a53fe4', color='Shiny Black', memory=256, pin=5000, ram=8, screenSize='6.7', status='New', price=500),
 Row(_id='6194ba78bb6b5b34d3a626c8', title='samsung galaxy a72 turquoise', category='619487730327b0eef3a53fe4', color='Turquoise', memory=256, pin=5000, ram=8, screenSize='6.7', status='New', price=500)]

In [5]:
import pyspark.sql.functions as F 
categ_air = phone_data.select('category').distinct().rdd.flatMap(lambda x:x).collect()
exprs_air = [F.when(F.col('category') == cat,1).otherwise(0).alias(str(cat)) for cat in categ_air]
phone_data = phone_data.select(exprs_air + phone_data.columns)
phone_data.columns

['6194877b0327b0eef3a53fe9',
 '61947f86613ccbeacb59e5b8',
 '619487730327b0eef3a53fe4',
 '61947f8e613ccbeacb59e5bd',
 '_id',
 'title',
 'category',
 'color',
 'memory',
 'pin',
 'ram',
 'screenSize',
 'status',
 'price']

In [6]:
categ_air = phone_data.select('color').distinct().rdd.flatMap(lambda x:x).collect()
exprs_air = [F.when(F.col('color') == cat,1).otherwise(0).alias(str(cat)) for cat in categ_air]
phone_data = phone_data.select(exprs_air + phone_data.columns)
phone_data.columns

['Shiny Black',
 'Turquoise',
 'Silver',
 'Green',
 'Purple',
 'Blue',
 'White',
 'Gold',
 'Mint Green',
 'Black',
 'Red',
 'Pink',
 '6194877b0327b0eef3a53fe9',
 '61947f86613ccbeacb59e5b8',
 '619487730327b0eef3a53fe4',
 '61947f8e613ccbeacb59e5bd',
 '_id',
 'title',
 'category',
 'color',
 'memory',
 'pin',
 'ram',
 'screenSize',
 'status',
 'price']

In [7]:
categ_air = phone_data.select('status').distinct().rdd.flatMap(lambda x:x).collect()
exprs_air = [F.when(F.col('status') == cat,1).otherwise(0).alias(str(cat)) for cat in categ_air]
phone_data = phone_data.select(exprs_air + phone_data.columns)
phone_data.columns

['99',
 'New',
 'Shiny Black',
 'Turquoise',
 'Silver',
 'Green',
 'Purple',
 'Blue',
 'White',
 'Gold',
 'Mint Green',
 'Black',
 'Red',
 'Pink',
 '6194877b0327b0eef3a53fe9',
 '61947f86613ccbeacb59e5b8',
 '619487730327b0eef3a53fe4',
 '61947f8e613ccbeacb59e5bd',
 '_id',
 'title',
 'category',
 'color',
 'memory',
 'pin',
 'ram',
 'screenSize',
 'status',
 'price']

In [8]:
from pyspark.sql.types import DoubleType
changedTypedf = phone_data.withColumn("screenSize", phone_data["screenSize"].cast(DoubleType()))
changedTypedf.head(5)

[Row(99=0, New=1, Shiny Black=0, Turquoise=0, Silver=0, Green=0, Purple=0, Blue=0, White=0, Gold=0, Mint Green=0, Black=0, Red=0, Pink=1, 6194877b0327b0eef3a53fe9=0, 61947f86613ccbeacb59e5b8=1, 619487730327b0eef3a53fe4=0, 61947f8e613ccbeacb59e5bd=0, _id='6194895c30e6b7130bb06add', title='iphone 13 pink', category='61947f86613ccbeacb59e5b8', color='Pink', memory=128, pin=3095, ram=6, screenSize=6.1, status='New', price=1100),
 Row(99=0, New=1, Shiny Black=0, Turquoise=0, Silver=0, Green=0, Purple=0, Blue=0, White=1, Gold=0, Mint Green=0, Black=0, Red=0, Pink=0, 6194877b0327b0eef3a53fe9=0, 61947f86613ccbeacb59e5b8=1, 619487730327b0eef3a53fe4=0, 61947f8e613ccbeacb59e5bd=0, _id='61948b652d9fa1d9e7da2d3a', title='iphone 13 pro max white', category='61947f86613ccbeacb59e5b8', color='White', memory=256, pin=3300, ram=6, screenSize=6.3, status='New', price=1300),
 Row(99=0, New=1, Shiny Black=0, Turquoise=0, Silver=0, Green=0, Purple=0, Blue=0, White=0, Gold=0, Mint Green=0, Black=1, Red=0, Pi

In [9]:
assemble=VectorAssembler(inputCols=['99',
 'New',
 'Shiny Black',
 'Turquoise',
 'Silver',
 'Green',
 'Purple',
 'Blue',
 'White',
 'Gold',
 'Mint Green',
 'Black',
 'Red',
 'Pink',
 '6194877b0327b0eef3a53fe9',
 '61947f86613ccbeacb59e5b8',
 '619487730327b0eef3a53fe4',
 '61947f8e613ccbeacb59e5bd',
 'memory',
 'pin',
 'ram',
 'screenSize',
 'price'], outputCol='features')
assembled_data=assemble.transform(changedTypedf)
assembled_data.show(2)

+---+---+-----------+---------+------+-----+------+----+-----+----+----------+-----+---+----+------------------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+-----+------+----+---+----------+------+-----+--------------------+
| 99|New|Shiny Black|Turquoise|Silver|Green|Purple|Blue|White|Gold|Mint Green|Black|Red|Pink|6194877b0327b0eef3a53fe9|61947f86613ccbeacb59e5b8|619487730327b0eef3a53fe4|61947f8e613ccbeacb59e5bd|                 _id|               title|            category|color|memory| pin|ram|screenSize|status|price|            features|
+---+---+-----------+---------+------+-----+------+----+-----+----+----------+-----+---+----+------------------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+-----+------+----+---+----------+------+-----+--------------------+
|  0|  1|          0|       

In [10]:
from pyspark.ml.feature import StandardScaler

scale=StandardScaler(inputCol='features',outputCol='standardized')

data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)

data_scale_output.show(2)

+---+---+-----------+---------+------+-----+------+----+-----+----+----------+-----+---+----+------------------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+-----+------+----+---+----------+------+-----+--------------------+--------------------+
| 99|New|Shiny Black|Turquoise|Silver|Green|Purple|Blue|White|Gold|Mint Green|Black|Red|Pink|6194877b0327b0eef3a53fe9|61947f86613ccbeacb59e5b8|619487730327b0eef3a53fe4|61947f8e613ccbeacb59e5bd|                 _id|               title|            category|color|memory| pin|ram|screenSize|status|price|            features|        standardized|
+---+---+-----------+---------+------+-----+------+----+-----+----+----------+-----+---+----+------------------------+------------------------+------------------------+------------------------+--------------------+--------------------+--------------------+-----+------+----+---+----------+------+-----+--------

In [11]:
data_scale_output.toPandas().columns

Index(['99', 'New', 'Shiny Black', 'Turquoise', 'Silver', 'Green', 'Purple',
       'Blue', 'White', 'Gold', 'Mint Green', 'Black', 'Red', 'Pink',
       '6194877b0327b0eef3a53fe9', '61947f86613ccbeacb59e5b8',
       '619487730327b0eef3a53fe4', '61947f8e613ccbeacb59e5bd', '_id', 'title',
       'category', 'color', 'memory', 'pin', 'ram', 'screenSize', 'status',
       'price', 'features', 'standardized'],
      dtype='object')

In [12]:
datad = data_scale_output.select('_id', 'title', 'category', 'color', 'memory', 'pin', 'ram', 'screenSize', 'status', 'price', 'standardized')
datf = datad.toPandas()

In [13]:
datf.head(5)

Unnamed: 0,_id,title,category,color,memory,pin,ram,screenSize,status,price,standardized
0,6194895c30e6b7130bb06add,iphone 13 pink,61947f86613ccbeacb59e5b8,Pink,128,3095,6,6.1,New,1100,"(0.0, 3.2773069341672505, 0.0, 0.0, 0.0, 0.0, ..."
1,61948b652d9fa1d9e7da2d3a,iphone 13 pro max white,61947f86613ccbeacb59e5b8,White,256,3300,6,6.3,New,1300,"(0.0, 3.2773069341672505, 0.0, 0.0, 0.0, 0.0, ..."
2,6194b8b0bb6b5b34d3a626b1,samsung galaxy z flip3 5g,619487730327b0eef3a53fe4,Black,128,3300,8,6.7,New,1050,"(0.0, 3.2773069341672505, 0.0, 0.0, 0.0, 0.0, ..."
3,6194b9aebb6b5b34d3a626b7,samsung galaxy a72 shiny black,619487730327b0eef3a53fe4,Shiny Black,256,5000,8,6.7,New,500,"(0.0, 3.2773069341672505, 3.2773069341672505, ..."
4,6194ba78bb6b5b34d3a626c8,samsung galaxy a72 turquoise,619487730327b0eef3a53fe4,Turquoise,256,5000,8,6.7,New,500,"(0.0, 3.2773069341672505, 0.0, 3.9415370460918..."


In [42]:
datf.iloc[0]['standardized'].toArray()

array([ 0.        ,  3.27730693,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  5.47722558,  0.        ,
        2.04026455,  0.        ,  0.        ,  0.54687462,  5.93650051,
        2.97614954, 15.94060305,  1.80974879])

In [43]:
datf.iloc[1]['standardized'].toArray()

array([ 0.        ,  3.27730693,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  0.        ,  2.89229746,  0.        ,
        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        2.04026455,  0.        ,  0.        ,  1.09374923,  6.32970975,
        2.97614954, 16.46324578,  2.13879403])

In [30]:
len(datf['standardized'][0].toArray())

23

In [31]:
datf.iloc[0]['standardized']

SparseVector(23, {1: 3.2773, 13: 5.4772, 15: 2.0403, 18: 0.5469, 19: 5.9365, 20: 2.9761, 21: 15.9406, 22: 1.8097})

In [32]:
import numpy as np, pandas as pd
import matplotlib.pyplot as plt, seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

class PhoneSimilarity():
    def __init__(self, all_Data):
        self.all_Data_ = all_Data
    
    def phone_similarity(self, phone_id, amount=1):
        distances = []
        phone = self.all_Data_[(self.all_Data_._id == phone_id)].head(1).values[0]
        print(phone)
        current_standardized_vector = phone[10].toArray()
        res_data = self.all_Data_[self.all_Data_._id != phone_id]
        countElement = 23 #23 of vector and 1 of predict
        for r_phone in tqdm(res_data.values):
            dist = 0
            standardized_vector = r_phone[10].toArray()
            for col in np.arange(23):
                dist = dist + np.square(float(current_standardized_vector[col]) - float(standardized_vector[col]))
            # dist = dist + np.square(float(phone[11]) - float(r_phone[11]))
            dist = dist / countElement
            dist = np.sqrt(dist)
            distances.append(dist)
        res_data['distance'] = distances
        res_data = res_data.sort_values('distance')
        print(res_data)
        columns = ['_id', 'title', 'category', 'color', 'memory', 'pin', 'ram', 'screenSize', 'status', 'price','prediction']
        return res_data[columns][:amount]

In [14]:
#euclidean
import numpy as np, pandas as pd
import matplotlib.pyplot as plt, seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

class PhoneSimilarity():
    def __init__(self, all_Data):
        self.all_Data_ = all_Data
    
    def phone_similarity(self, phone_id, amount=1):
        amount = amount + 1
        distances = []
        phone = self.all_Data_[(self.all_Data_._id == phone_id)].head(1).values[0]
        phone_row = self.all_Data_[(self.all_Data_._id == phone_id)].head(1)
        current_standardized_vector = np.array(phone[10].toArray())
        res_data = self.all_Data_[self.all_Data_._id != phone_id]
        countElement = 23 #23 of vector and 1 of predict
        for r_phone in tqdm(res_data.values):
            dist = 0
            standardized_vector = np.array(r_phone[10].toArray())
            # for col in np.arange(23):
            #     dist = dist + np.square(float(current_standardized_vector[col]) - float(standardized_vector[col]))
            # # dist = dist + np.square(float(phone[11]) - float(r_phone[11]))
            # dist = dist / countElement
            # dist = np.sqrt(dist)
            dist = np.linalg.norm(current_standardized_vector-standardized_vector)
            distances.append(dist)
        res_data['distance'] = distances
        phone_row['distance'] = 0
        res_data = res_data.sort_values('distance')
        bigdata = pd.concat([phone_row, res_data], ignore_index=True, sort=False)
        columns = ['_id', 'title', 'category', 'color', 'memory', 'pin', 'ram', 'screenSize', 'status', 'price','distance']
        return bigdata[columns][:amount]

In [15]:
similarity = PhoneSimilarity(datf)
x = '61948b652d9fa1d9e7da2d3a'
similarity_phones = similarity.phone_similarity(x, 10)

100%|█| 29/29 [00:00<00:00, 16239.63it


In [16]:
print(similarity_phones)

                         _id                      title  \
0   61948b652d9fa1d9e7da2d3a    iphone 13 pro max white   
1   6194c696bb6b5b34d3a62763    iphone 13 pro max white   
2   6194c3b0bb6b5b34d3a62747    iphone 13 pro max white   
3   6194c0e7bb6b5b34d3a62713     iphone 13 pro max gold   
4   6194c059bb6b5b34d3a6270c     iphone 13 pro max gold   
5   6194bce4bb6b5b34d3a626e4   samsung galaxy a72 white   
6   6194c14cbb6b5b34d3a6271a     iphone 13 pro max gold   
7   6194b8b0bb6b5b34d3a626b1  samsung galaxy z flip3 5g   
8   6194bfd8bb6b5b34d3a62706     iphone 13 pro max blue   
9   6194c722bb6b5b34d3a62769        oppo reno6 5g black   
10  6194c9f0bb6b5b34d3a62786      oppo reno6 z 5g black   

                    category  color  memory   pin  ram  screenSize status  \
0   61947f86613ccbeacb59e5b8  White     256  3300    6         6.3    New   
1   61947f86613ccbeacb59e5b8  White     512  4325    6         6.7    New   
2   61947f86613ccbeacb59e5b8  White    1024  4325    6      

In [17]:
kafka_topic_name = "clickcount"
kafka_bootstrap_servers = 'localhost:9092'

# Construct a streaming DataFrame that reads from topic
flower_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

In [18]:
def process_row(row):
#     print(type(row))
#     print(row)
    value = row['value'].decode("utf-8")
    first_element = value.split(',')[0]
    similarity_phones = similarity.phone_similarity(first_element, 10)
    print(similarity_phones)
    pass
query = flower_df.writeStream.foreach(process_row).start()