## Segment recipe text for fitting word2vec model, and save recipe_vector json file locally

In [1]:
import re
import json
import jieba
import time
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

## Define a function to run in cluster, which segment all chinese characters

In [2]:
spark.sparkContext.addFile('mydict_3.txt')
def wordToSeg(x):
    if not jieba.dt.initialized:
        jieba.load_userdict('mydict_3.txt')
    
    try:
        interstate = re.sub(r'\W', '', x)
    except:
        interstate = x
        pass
    try:
        secondstate = interstate.replace('\n','')
    except:
        secondstate = interstate
        pass
    try:
        thirdstate = secondstate.replace('\n\n','')
    except:
        thirdstate = secondstate
        pass
    try:
        finalstate = re.sub(r'[a-zA-Z0-9]', '', thirdstate)
    except:
        finalstate = thirdstate
        pass
    try:
        seg = jieba.cut(finalstate, cut_all = False)
    except:
        output = finalstate
        pass
    try:
        output = ' '.join(seg)
    except:
        output = ''
        pass
    return output
    

### Register the UDF

In [3]:
spark.udf.register("word2seg", wordToSeg, StringType())
word2Seg = udf(wordToSeg, StringType())

### Load in all recipes information from HDFS

In [4]:
recipe = spark.read.json("hdfs://master/user/spark/spark101/recipe_com/recipe_to_spark.json")

### Creating temp view for following SQL query

In [5]:
recipe.createOrReplaceTempView("recipes")

### SQL query with pre-defined function

In [6]:
# Apply segment function on columns of ingredient, steps, comment and category
recipes_seg = spark.sql('''select url, img_url, title, time, author, word2Seg(ingredient) ingredient, 
                        word2Seg(steps) steps, word2Seg(comment) comment,
                        word2Seg(category) category from recipes''')

In [7]:
recipes_seg.createOrReplaceTempView("recipes_seg")

In [8]:
recipes_wordbag = spark.sql('''SELECT concat(ingredient, steps, comment, category) as text from recipes_seg''')

### Define another function for spliting words in each row

In [9]:
def wordToList(x):
    return x.split(' ')

### Register UDF for spliting words to lists

In [10]:
spark.udf.register("word2list", wordToList, ArrayType(StringType()))
word2list = udf(wordToList, ArrayType(StringType()))

### Getting ready to split the big string into list, then fit into word2vec training model

In [11]:
recipes_wordbag.createOrReplaceTempView("recipes_wordlist")

In [12]:
for_word2vec = spark.sql('''SELECT word2list(text) text from recipes_wordlist''')

### Spark MLlib word2vec training

In [13]:
from pyspark.ml.feature import Word2Vec

In [14]:
word2Vec = Word2Vec(vectorSize=50, minCount=3, inputCol="text", outputCol="result")
model = word2Vec.fit(for_word2vec)


## Save the model

In [None]:
model.save("hdfs://master/user/spark/spark101/recipe_com/recipe_word2vec_model")

## Transform all recipes into vector by using the word2vec model

In [15]:
recipes_wordseg = spark.sql('''SELECT url, img_url, title, author, concat(ingredient, steps, comment, category) as text from recipes_seg''')

In [16]:
recipes_wordseg.createOrReplaceTempView("recipes_wordseg")

In [17]:
recipes_wordseglist = spark.sql('''SELECT url, img_url, title, author, word2list(text) as text from recipes_wordseg''')

In [18]:
recipe_vector = model.transform(recipes_wordseglist)

### Save recipe_vector to json on Hadoop hdfs

In [None]:
recipe_vector.write.json("hdfs://master/user/spark/spark101/recipe_com/recipe_vector.json")

### Or we can save recipe_vector json text file to local file system

In [19]:
recipe_vector.coalesce(1).write.format('json').save("/home/spark/Desktop/recipe_com/recipe_vector.json")

* Use the following command to download file from hdfs locally

hadoop fs -get /home/spark/Desktop/recipe_com/recipe_vector.json ~/Desktop/recipe_com/recipe_vector.json