# Embedding

## Data Prepare
### 1. Download sample data

In [None]:
import requests

def download_file(url, save_path):
    response = requests.get(url)
    with open(save_path, 'wb') as file:
        file.write(response.content)

url = 'https://raw.githubusercontent.com/dream-365/SparrowRecSys/master/src/main/resources/webroot/sampledata/ratings.csv'
save_path = 'ratings.csv'

download_file(url, save_path)

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas as pd
import Embedding

conf = SparkConf().setAppName('ctrModel').setMaster('local[4]')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

rawSampleDataPath = "ratings.csv"
embLength = 10

## Item Embedding

In [None]:
user_watching_seqs = Embedding.processItemSequence(spark, rawSampleDataPath)

# samples preview
pd.DataFrame(user_watching_seqs.take(100))

### Item2Vec

In [None]:
item_emb_model = Embedding.trainItem2vec(spark, user_watching_seqs, embLength,
                          embOutputPath="emb/item2vecEmb.csv", saveToRedis=False,
                          redisKeyPrefix="i2vEmb")

In [None]:
# search top 5 similar movies with movie id "99" by cosine similarity
synonyms = item_emb_model.findSynonyms("99", 5)
for synonym, cosineSimilarity in synonyms:
        print(synonym, cosineSimilarity)

### Graph Embedding

In [None]:
item_graphemb_model = Embedding.graphEmb(user_watching_seqs, spark, embLength, 
                                         embOutputFilename="emb/itemGraphEmb.csv",
                                         saveToRedis=True, redisKeyPrefix="graphEmb")

In [None]:
synonyms = item_graphemb_model.findSynonyms("99", 5)
for synonym, cosineSimilarity in synonyms:
        print(synonym, cosineSimilarity)

In [None]:
Embedding.generateUserEmb(spark, rawSampleDataPath, model, embLength,
                    embOutputPath="emb/userEmb.csv", saveToRedis=False,
                    redisKeyPrefix="uEmb")

## User Embedding 

In [None]:
from pyspark.sql.types import *

user_ratting_samples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
vectors_list = []
for key, value in item_emb_model.getVectors().items():
    vectors_list.append((key, list(value)))
    fields = [
        StructField('movieId', StringType(), False),
        StructField('emb', ArrayType(FloatType()), False)
    ]

schema = StructType(fields)
vectors_df = spark.createDataFrame(vectors_list, schema=schema)
user_ratting_samples = user_ratting_samples.join(vectors_df, on='movieId', how='inner')

pd.DataFrame(user_ratting_samples.take(5))

In [None]:
user_ratting_samples_pair = user_ratting_samples.select('userId', 'emb').rdd.map(lambda x: (x[0], x[1]))
pd.DataFrame(user_ratting_samples_pair.take(5))

In [None]:
# acculate user rated movie ebm as user emb
user_emb = user_ratting_samples_pair.reduceByKey(lambda a, b: [a[i] + b[i] for i in range(len(a))])

pd.DataFrame(user_emb.take(5))