Create a basic pipeline then explore alternative transformers and estimators

## Basic pipeline

### read data

In [2]:
import pandas as pd

# read data file
df_train = pd.read_csv('data/train.csv', encoding = "ISO-8859-1")  
df_test = pd.read_csv('data/test.csv', encoding = "ISO-8859-1")  
df_desc = pd.read_csv('data/product_descriptions.csv')  

# concat train and test data to process together
df_all = pd.concat((df_train, df_test), axis=0, ignore_index=True)

# merge description into data
df_all = pd.merge(df_all, df_desc, how='left', on='product_uid')  
df_all.head()

Unnamed: 0,id,product_title,product_uid,relevance,search_term,product_description
0,2,Simpson Strong-Tie 12-Gauge Angle,100001,3.0,angle bracket,"Not only do angles make joints stronger, they ..."
1,3,Simpson Strong-Tie 12-Gauge Angle,100001,2.5,l bracket,"Not only do angles make joints stronger, they ..."
2,9,BEHR Premium Textured DeckOver 1-gal. #SC-141 ...,100002,3.0,deck over,BEHR Premium Textured DECKOVER is an innovativ...
3,16,Delta Vero 1-Handle Shower Only Faucet Trim Ki...,100005,2.33,rain shower head,Update your bathroom with the Delta Vero Singl...
4,17,Delta Vero 1-Handle Shower Only Faucet Trim Ki...,100005,2.67,shower only faucet,Update your bathroom with the Delta Vero Singl...


### text processing ?

In [None]:
# check grammar? analyze part-of-speech(remove 'be' ...)

### tokenize text

In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.session import SparkSession

# instantiate Spark
spark = SparkSession.builder.getOrCreate()

# tokenize
data = spark.createDataFrame(df_all, ['id', 'product_title', 'product_uid', 'relevance', 'search_term', 'product_description'])
tokenizer = Tokenizer(inputCol="product_title", outputCol="title_words")
data = tokenizer.transform(data)
tokenizer = Tokenizer(inputCol="search_term", outputCol="search_words")
data = tokenizer.transform(data)

# hashingTF = HashingTF(inputCol="title_words", outputCol="title_features", numFeatures=10)
# train_title_tf = hashingTF.transform(train_search)

data.select(['title_words', 'search_words']).show(5)

+--------------------+--------------------+
|         title_words|        search_words|
+--------------------+--------------------+
|[simpson, strong-...|    [angle, bracket]|
|[simpson, strong-...|        [l, bracket]|
|[behr, premium, t...|        [deck, over]|
|[delta, vero, 1-h...|[rain, shower, head]|
|[delta, vero, 1-h...|[shower, only, fa...|
+--------------------+--------------------+
only showing top 5 rows



### word processing ?

In [None]:
# stemming?

### extract feature

#### feature: title matches searching query

In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType,StringType,IntegerType

#get the number of match words in term and title
def matchWords(title,term):
    l1=len(title)
    l2=len(term)
    match=0
    for i in range(l1):
        for j in range(l2):
            if title[i] == term[j]:
                match+=2
            elif title[i] in term[j]:
                match+=1
            elif term[j] in title[i]:
                match+=1
    return match
matchUDF=udf(matchWords, IntegerType())

data = data.withColumn("match_title", matchUDF("title_words","search_words"))
data.select('match_title').show()

+-----------+
|match_title|
+-----------+
|          2|
|          1|
|          2|
|          3|
|          6|
|          2|
|          4|
|          1|
|          4|
|          5|
|          1|
|          8|
|          3|
|          2|
|          8|
|          1|
|          4|
|          0|
|          0|
|          0|
+-----------+
only showing top 20 rows



#### feature: ???

In [None]:
### other features

### format feature

In [5]:
from pyspark.ml.feature import VectorAssembler

# format features
features=["match_title"]
assembler_features = VectorAssembler(inputCols=features, outputCol='features')
data = assembler_features.transform(data)
data.select('features', 'relevance').show()

+--------+---------+
|features|relevance|
+--------+---------+
|   [2.0]|      3.0|
|   [1.0]|      2.5|
|   [2.0]|      3.0|
|   [3.0]|     2.33|
|   [6.0]|     2.67|
|   [2.0]|      3.0|
|   [4.0]|     2.67|
|   [1.0]|      3.0|
|   [4.0]|     2.67|
|   [5.0]|      3.0|
|   [1.0]|     2.67|
|   [8.0]|      3.0|
|   [3.0]|      3.0|
|   [2.0]|      2.0|
|   [8.0]|     2.67|
|   [1.0]|     2.67|
|   [4.0]|      3.0|
|   [0.0]|      1.0|
|   [0.0]|     1.67|
|   [0.0]|     2.33|
+--------+---------+
only showing top 20 rows



### separate train and test data

In [6]:
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SQLContext
from pyspark.sql.functions import isnan, when, count, col

# separate train and test data
data.registerTempTable("data")
sc = spark.sparkContext
sql_sc = SQLContext(sc)

data_train = sql_sc.sql("SELECT * from data where not isnan(relevance)")
data_test = sql_sc.sql("SELECT * from data where isnan(relevance)")
print(data_train.count())
print(data_test.count())

74067
166693


### estimator

#### estimator: linear regression

In [52]:
# linear regression
lr = LinearRegression(maxIter=10, regParam=0.01, elasticNetParam=0.8, labelCol='relevance')
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(data_train)
predictions = model.transform(data_train)
predictions.select('prediction').show()

+------------------+
|        prediction|
+------------------+
| 2.359582369248631|
| 2.350930820360437|
| 2.359582369248631|
| 2.368233918136825|
| 2.394188564801407|
| 2.359582369248631|
| 2.376885467025019|
| 2.350930820360437|
| 2.376885467025019|
|2.3855370159132128|
| 2.350930820360437|
| 2.411491662577795|
| 2.368233918136825|
| 2.359582369248631|
| 2.411491662577795|
| 2.350930820360437|
| 2.376885467025019|
| 2.342279271472243|
| 2.342279271472243|
| 2.342279271472243|
+------------------+
only showing top 20 rows



In [53]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="relevance", predictionCol="prediction")
rmse = evaluator.evaluate(predictions, {evaluator.metricName:"rmse"})
print(rmse)

0.531697410278


#### estimator: random forest

In [43]:
# random forest
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features",labelCol='relevance', numTrees=11, maxDepth=5)
pipeline = Pipeline(stages=[rf])
model = pipeline.fit(data_train)
predictions = model.transform(data_train)
predictions.select('prediction').show()

+------------------+
|        prediction|
+------------------+
|  2.33745428338531|
|2.2880940387708226|
|  2.33745428338531|
|2.3505379442952723|
|2.5045744198864583|
|  2.33745428338531|
|2.3917103789705707|
|2.2880940387708226|
|2.3917103789705707|
|2.4544731738764645|
|2.2880940387708226|
| 2.511291698662927|
|2.3505379442952723|
|  2.33745428338531|
| 2.511291698662927|
|2.2880940387708226|
|2.3917103789705707|
| 2.037368113068499|
| 2.037368113068499|
| 2.037368113068499|
+------------------+
only showing top 20 rows



In [49]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="relevance", predictionCol="prediction")
rmse = evaluator.evaluate(predictions, {evaluator.metricName:"rmse"})
print(rmse)

0.521418945506


#### estimator: logistic regression (take as classification by times 3)

In [58]:
lr = LogisticRegression(maxIter=10, regParam=0.01, elasticNetParam=0.8, labelCol='int_relevance')
pipeline = Pipeline(stages=[lr])

def toInt(score):
    return int(score * 3)

intUDF = udf(toInt, IntegerType())
data_lr = data_train.withColumn("int_relevance", intUDF('relevance'))
model = pipeline.fit(data_lr)
predictions = model.transform(data_train)

def toDouble(score):
    return score / 3

doubleUDF = udf(toDouble, DoubleType())

predictions = predictions.withColumn("double_prediction", doubleUDF('prediction'))

predictions.select('double_prediction').show(100)

+-----------------+
|double_prediction|
+-----------------+
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|
|              2.0|


In [59]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="relevance", predictionCol="double_prediction")
rmse = evaluator.evaluate(predictions, {evaluator.metricName:"rmse"})
print(rmse)

0.656481816907


#### estimator: ???

In [None]:
# other estimators