# Import Spark Context

In [None]:
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark import SparkContext
from pyspark.sql import SparkSession
# create SparkContext if sc doesn't exist.
try:
    sc
except NameError:
    sc =SparkContext()

In [3]:
# spark session
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()

## Word2Vec is a great way to represent words

In [4]:
from pyspark.ml.feature import Word2Vec
# in future I plan to use it. But this time I only use tf-idf.

### Load data from train.csv. No other data is used for now

In [5]:
train_data = spark.read.csv("finalproject/data/train.csv",header=True,inferSchema=True)

In [6]:
train_data.show()

+---+-----------+--------------------+--------------------+---------+
| id|product_uid|       product_title|         search_term|relevance|
+---+-----------+--------------------+--------------------+---------+
|  2|     100001|Simpson Strong-Ti...|       angle bracket|      3.0|
|  3|     100001|Simpson Strong-Ti...|           l bracket|      2.5|
|  9|     100002|BEHR Premium Text...|           deck over|      3.0|
| 16|     100005|Delta Vero 1-Hand...|    rain shower head|     2.33|
| 17|     100005|Delta Vero 1-Hand...|  shower only faucet|     2.67|
| 18|     100006|Whirlpool 1.9 cu....|      convection otr|      3.0|
| 20|     100006|Whirlpool 1.9 cu....|microwave over stove|     2.67|
| 21|     100006|Whirlpool 1.9 cu....|          microwaves|      3.0|
| 23|     100007|Lithonia Lighting...|     emergency light|     2.67|
| 27|     100009|House of Fara 3/4...|             mdf 3/4|      3.0|
| 34|     100010|Valley View Indus...|        steele stake|     2.67|
| 35|     100011|Tor

### Use HashingTF to calculate the topic words.

In [7]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
tokenizer_title = Tokenizer(inputCol="product_title", outputCol="product_title_words")
tokenizer_query = Tokenizer(inputCol="search_term", outputCol="search_words")

wordsData = tokenizer_title.transform(train_data)
wordsData = tokenizer_query.transform(wordsData)
wordsData.take(1)

[Row(id=2, product_uid=100001, product_title=u'Simpson Strong-Tie 12-Gauge Angle', search_term=u'angle bracket', relevance=3.0, product_title_words=[u'simpson', u'strong-tie', u'12-gauge', u'angle'], search_words=[u'angle', u'bracket'])]

In [8]:
hashingTF_pro = HashingTF(inputCol="product_title_words", outputCol="product_title_words_rawFeatures")
hashingTF_qeury = HashingTF(inputCol="search_words", outputCol="search_words_rawFeatures")
featurizedData = hashingTF_pro.transform(wordsData)
featurizedData = hashingTF_qeury.transform(featurizedData)
featurizedData.select('product_title_words_rawFeatures','search_words_rawFeatures').take(2)

[Row(product_title_words_rawFeatures=SparseVector(262144, {5505: 1.0, 26505: 1.0, 54375: 1.0, 159707: 1.0}), search_words_rawFeatures=SparseVector(262144, {54375: 1.0, 228696: 1.0})),
 Row(product_title_words_rawFeatures=SparseVector(262144, {5505: 1.0, 26505: 1.0, 54375: 1.0, 159707: 1.0}), search_words_rawFeatures=SparseVector(262144, {213302: 1.0, 228696: 1.0}))]

### Calculate IDF

In [9]:
idf_prod = IDF(inputCol="product_title_words_rawFeatures", outputCol="features_prod")
idfModel = idf_prod.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
idf_query = IDF(inputCol="search_words_rawFeatures", outputCol="features_query")
idfModel = idf_query.fit(featurizedData)
rescaledData = idfModel.transform(rescaledData)
rescaledData.take(1)

[Row(id=2, product_uid=100001, product_title=u'Simpson Strong-Tie 12-Gauge Angle', search_term=u'angle bracket', relevance=3.0, product_title_words=[u'simpson', u'strong-tie', u'12-gauge', u'angle'], search_words=[u'angle', u'bracket'], product_title_words_rawFeatures=SparseVector(262144, {5505: 1.0, 26505: 1.0, 54375: 1.0, 159707: 1.0}), search_words_rawFeatures=SparseVector(262144, {54375: 1.0, 228696: 1.0}), features_prod=SparseVector(262144, {5505: 5.4076, 26505: 7.0696, 54375: 5.4697, 159707: 5.5326}), features_query=SparseVector(262144, {54375: 6.2499, 228696: 5.8897}))]

### Build a LinearRegression model. 
* input data : two tf-idf Vector
* output data : score range from 0 to 3

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.linalg import SparseVector
from pyspark.sql.functions import udf
# rescaledData.show()
# merge two vector into one feature vector
vs = VectorAssembler(inputCols=['features_prod','features_query'],outputCol='features')
rescaledData = vs.transform(rescaledData)

### Fit the model

In [34]:
lr = LinearRegression(featuresCol='features', regParam=0.3, elasticNetParam=0.8,labelCol='relevance',maxIter=1000,)
(trainingData, testData) = rescaledData.randomSplit([0.7, 0.3])
model = lr.fit(trainingData)

In [35]:
trainingSummary = model.summary

In [36]:
print("Coefficients: %s" % str(model.coefficients))
print("Intercept: %s" % str(model.intercept))
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

Coefficients: (524288,[],[])
Intercept: 2.3826735447
numIterations: 1
objectiveHistory: [0.4999903398442797]
RMSE: 0.533875
r2: -0.000000


In [37]:
predictions = model.transform(testData)

### Evaulate the result


In [38]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="relevance", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 0.534227


In [40]:
predictions.select('relevance','prediction').take(1)

[Row(relevance=3.0, prediction=2.382673544697543)]

In [50]:
rescaledData.persist()

DataFrame[id: int, product_uid: int, product_title: string, search_term: string, relevance: double, product_title_words: array<string>, search_words: array<string>, product_title_words_rawFeatures: vector, search_words_rawFeatures: vector, features_prod: vector, features_query: vector, features: vector]

### Note
* The result is pretty bad. Later I plan to use other methods to train the model.
* Next week I will use Word2Vec to train the model. 
* Also I plan to use other data like attributes and product description to train the model.

In [49]:
from pyspark import Row

# temp = rescaledData.rdd.map(lambda x:Row(x['features_query'])).toDF()
# tempdata = rescaledData.take(1000)

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.


In [57]:
temp1 = rescaledData.sample(False,0.01).rdd.map(lambda x: Row(float(x['features_query'].squared_distance(x['features_prod']))))

In [58]:
df = temp1.toDF()

In [1]:
df.show()

NameError: name 'df' is not defined