# Item Embeddings
**Please note:** This Notebook uses scala to execute spark code. All operations and calculations were done with the scala spylon-kernel

The more recent Spark implementation of Word2Vec in package org.apache.spark.ml.feature is used

In [1]:
import org.apache.spark.rdd.RDD
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}

Intitializing Scala interpreter ...

Spark Web UI available at http://DESKTOP-2NBBC4T:4041
SparkContext available as 'sc' (version = 2.4.3, master = local[*], app id = local-1563036190343)
SparkSession available as 'spark'


import org.apache.spark.rdd.RDD
import org.apache.spark.ml.feature.{Word2Vec, Word2VecModel}


All parameters are set here:

    inputCol: Is needed by the model as parameter. Must not be changed!
    maxIter: Number of epochs for the model
    minCount: Number of minimum occurrences of a word to be considered in the model. (i.e: minCount=5 and some word only occures twice -> word is not considered in the model)
    numPartitions: Number of data partitions
    seed: Seed for reproducability
    vectorSize: dimensionality of output vector
    debug: If yes subset of samples are taken
    ratio: if debug is set this sets the ratio of sampling 0 < x < 1

In [2]:
val inputCol = "text"
val maxIter = 1
val minCount = 10
val numPartitions = 1
val seed = 42
val vectorSize = 15

val debug = false
val ratio = 0.1

inputCol: String = text
maxIter: Int = 1
minCount: Int = 10
numPartitions: Int = 1
seed: Int = 42
vectorSize: Int = 15
debug: Boolean = false
ratio: Double = 0.1


Helper function for printing RDDs

In [3]:
def printRDD[T] ( rdd:RDD[T], n:Int = 0 ) : Unit = {
    if(n != 0) {
    rdd.take(n).foreach(println)
} else {
    rdd.collect().foreach(println)}
}

printRDD: [T](rdd: org.apache.spark.rdd.RDD[T], n: Int)Unit


Helper function getting the Word2VecModel instance

In [4]:
def getModel() : Word2Vec = {
    var word2vec = new Word2Vec()
    word2vec.setInputCol(inputCol)
    word2vec.setMaxIter(maxIter)
    word2vec.setMinCount(minCount)
    word2vec.setNumPartitions(numPartitions)
    word2vec.setSeed(seed)
    word2vec.setVectorSize(vectorSize)
    return word2vec;
}

getModel: ()org.apache.spark.ml.feature.Word2Vec


load the dataset from ../data/item_metadata.csv

In [5]:
var data = sc.textFile("../data/item_metadata.csv")

data: org.apache.spark.rdd.RDD[String] = ../data/item_metadata.csv MapPartitionsRDD[1] at textFile at <console>:27


The datasets comes with a first line header. This needs to be removed.

In [6]:
val header = data.first()
data = data.filter(row => row != header)
if(debug) {
data = data.sample(false, ratio, seed)
}
data.count()

header: String = item_id,properties
data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:33
res0: Long = 927142


The first column of the metadata dataset does not belong into the model. It is an external id and is removed.

Then the data is split by each word.

In [7]:
val truncated_data = data.map(_.split(',')(1))
val preprocessed_Data = truncated_data.map(_.split('|')).map(Tuple1.apply).toDF("text")
val words = truncated_data.flatMap(_.split('|')).distinct()

truncated_data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:28
preprocessed_Data: org.apache.spark.sql.DataFrame = [text: array<string>]
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at distinct at <console>:30


In [8]:
printRDD(words, 3)
words.count()
preprocessed_Data.head(3)
preprocessed_Data.count()

Direct beach access
Guest House
Ironing Board


res1: Long = 927142


Get a new Word2Vec instalnce and fit the model to the preprocessed data

In [9]:
val model = getModel().fit(preprocessed_Data)

model: org.apache.spark.ml.feature.Word2VecModel = w2v_17eeb45b2426


Find the 10 most similar words to the term Gym

In [10]:
model.findSynonyms("Gym", 10).take(10)

res2: Array[org.apache.spark.sql.Row] = Array([Swimming Pool (Outdoor),0.9618108868598938], [Spa Hotel,0.94759202003479], [Ski Resort,0.945956826210022], [Health Retreat,0.9367016553878784], [Nightclub,0.9317147731781006], [Cot,0.930616557598114], [Casino (Hotel),0.922082245349884], [Szep Kartya,0.8853012919425964], [Beach,0.8825615644454956], [Hypoallergenic Bedding,0.8758768439292908])


Finally save the model and the vectors on disk

In [11]:
model.write.overwrite.save("word2vec-model")
model.getVectors.rdd.saveAsTextFile("vectors-out")