## 2.7 Machine Learning for Ranking with Spark MLLIB

First we need to load our training data into pandas dataframe.  The data is in tab separated formatand the easiest way to load this is using the ```pandas``` ```read_table()``` method. 

In [1]:
import csv
import pandas as pd

df = pd.read_table("data/fullDataset.tsv",header=0)

  interactivity=interactivity, compiler=compiler, result=result)


Let's check out the shape and column names:

In [3]:
print(df.shape)
print(df.columns)

(78500, 19)
Index([u'key', u'query', u'Title', u'LeafCats', u'ItemID', u'X_unit_id',
       u'SCORE', u'label_relevanceGrade', u'label_relevanceBinary',
       u'feature_1', u'feature_2', u'feature_3', u'feature_4', u'feature_5',
       u'feature_6', u'feature_7', u'feature_8', u'feature_9', u'feature_10'],
      dtype='object')


Here is a sample of the data:

In [4]:
df.sample(5)

Unnamed: 0,key,query,Title,LeafCats,ItemID,X_unit_id,SCORE,label_relevanceGrade,label_relevanceBinary,feature_1,feature_2,feature_3,feature_4,feature_5,feature_6,feature_7,feature_8,feature_9,feature_10
43715,93260,cardigan petite xl,Luxe by designer originals petite extra large ...,63866,45781,753412823,3:3:3,6,1,2676.618,0,-7.772325,-4.314727,20.462078,0.438969,0,0,172,0.168808
69911,18845,jaeger lecoultre master control,Jaeger LeCoultre Master Control Automatic Harm...,31387,9427,724905204,3:3:3,6,1,3700.0,1,-9.793746,-3.916365,93.303833,0.74169,47,-1000000,141,2.146055
27139,2761,womens shorts small print,GAP Womens Haley Shorts In Yellow Geometric Pr...,11555,1420,724907431,1:3:3,5,0,2676.618,0,-7.719259,-5.691216,31.051384,0.354527,0,0,173,1.148616
42373,230884,silver 90%,✯1 oz OUNCE 90%25 SILVER U.S. COINS✯ ESTATE SA...,525|177653,75372,793074958,3:3:3,6,1,1877.722168,1,-7.292244,-4.485876,21.173382,0.2766,35765,181818,132,0.925502
15489,86150,blackmores,RITCHIE BLACKMORE CLASSIC ROCK GUITAR LICKS NE...,41779,42349,753411212,3:3:3,6,1,242.555557,1,-7.48001,-5.061462,30.29841,0.0,2317,473684,198,-100.0


The columns are:

Column name             | Description
------------------------|-----------------------------------------------------------------------
key                     |  Used to join back to the original dataset and add any additional fields as needed
query                   |  Un-normalized query keywords (without user constraints)
Title                   |  Un-normalized title
LeafCats                |  Item leaf category IDs
ItemID                  |  Anonymized itemID. This is not the actual item ID
X_unit_id               |  The query ID used for grouping query-item pairs by their search, primarily for per-query metrics. Essentially it's a "search ID". We can also group by query or normalized query
SCORE                   |  The scores given by up to three judges characterizing the relevance/relevance problem of the query-item pair
label_relevanceGrade    |  The SCORE averaged and rounded and converted to a relevance graded judgment 0-6, 6 being the best. Note this is very approximate
label_relevanceBinary   |  The SCORE converted to a binary relevant(==1) or not relevant (==0) judgment.  This is a more accurate label than the Grade, I recommend it as a training target

Features (In brief): 

* query features: feature_2, feature_7, feature_8
* item features: feature_3, feature_4, feature_9
* query-item features: feature_1, feature_5, feature_6, feature_10

## Getting ready for Machine Learning

We start by simply exploring how we might classify queries as relevent / not relevant.  We will explore a series of different models to do this. The first is logistic regression, we will also use SVM and finally classification tree's.  Along the way we will look at over fitting / generalization and how to evaluate models.  

We can do all this using Spark MLLIB - first we have to findspark and get our spark context:


In [36]:
import findspark
import os
findspark.init(os.getenv('HOME') + '/spark-1.6.0-bin-hadoop2.6')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'

In [37]:
import pyspark
try: 
    print(sc)
except NameError:
    sc = pyspark.SparkContext()
    print(sc)

<pyspark.context.SparkContext object at 0x7fa1764a03d0>


It is easy to load the tsv data into a Spark DataFrame:

In [38]:
from pyspark.sql import SQLContext
import os

sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options() \
        .options(header='true', inferSchema='true', delimiter='\t') \
        .load(os.getcwd() + '/data/fullDataset.tsv') 
        
df.schema


StructType(List(StructField(key,IntegerType,true),StructField(query,StringType,true),StructField(Title,StringType,true),StructField(LeafCats,StringType,true),StructField(ItemID,IntegerType,true),StructField(X_unit_id,IntegerType,true),StructField(SCORE,StringType,true),StructField(label_relevanceGrade,IntegerType,true),StructField(label_relevanceBinary,IntegerType,true),StructField(feature_1,DoubleType,true),StructField(feature_2,IntegerType,true),StructField(feature_3,DoubleType,true),StructField(feature_4,DoubleType,true),StructField(feature_5,DoubleType,true),StructField(feature_6,DoubleType,true),StructField(feature_7,IntegerType,true),StructField(feature_8,DoubleType,true),StructField(feature_9,IntegerType,true),StructField(feature_10,DoubleType,true)))

Now we can extract the features and the target for the machine learning algorithms:

In [39]:
sqlContext.registerDataFrameAsTable(df,'dataset')
sqlContext.tableNames()

data_full = sqlContext.sql("select label_relevanceBinary, feature_1, feature_2, feature_3, feature_4 \
                       feature_5, feature_6, feature_7, feature_8, feature_9, feature_10 \
               from dataset").rdd


We also split the data into test and validation data sets - splitting 75%:25% between the training and test sets:

In [40]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parseRecord(line):
    return LabeledPoint(line[0], line[1:])

data = data_full.map(parseRecord)

#features = data_full.map(lambda x: x.feature_1)
from pyspark.mllib.feature import StandardScaler
scaler1 = StandardScaler().fit(data)


data_train, data_test = data.randomSplit([0.75,0.25])

Py4JJavaError: An error occurred while calling o416.fitStandardScaler.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 158.0 failed 1 times, most recent failure: Lost task 0.0 in stage 158.0 (TID 158, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 77, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.mllib.regression.LabeledPoint'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113)
	at org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:53)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.fitStandardScaler(PythonMLLibAPI.scala:614)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/csumb/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/mllib/linalg/__init__.py", line 77, in _convert_to_vector
    raise TypeError("Cannot convert type %s into Vector" % type(l))
TypeError: Cannot convert type <class 'pyspark.mllib.regression.LabeledPoint'> into Vector

	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	... 1 more


In [31]:
print('Training data records = ' + str(data_train.count()))
print('Training data records = ' + str(data_test.count()))

data_train.take(5)

Training data records = 58615
Training data records = 19795


[LabeledPoint(0.0, [2676.618,0.0,-9.020574,-5.221152,0.0,241.0,-1000000.0,139.0,-100.0]),
 LabeledPoint(1.0, [2676.618,0.0,-8.599997,-5.31775,0.805351,0.0,0.0,150.0,2.53809117973]),
 LabeledPoint(0.0, [3428.06665,1.0,-8.23653,-3.406829,0.0561681033807,742.0,-1000000.0,139.0,-100.0]),
 LabeledPoint(1.0, [2676.618,0.0,-7.454222,-4.874171,0.051492,338.0,-1000000.0,213.0,0.441697325035]),
 LabeledPoint(1.0, [2676.618,0.0,-7.52216,-4.38311,0.15502195947,0.0,0.0,141.0,-100.0])]

## Fitting an SVM - a simple classifier

In [15]:
model = SVMWithSGD.train(data_train.map(parseRecord), iterations=100)

In [16]:
model

(weights=[1935.22521984,2.35665977665,0.779288560111,2.09313254781,0.850439376318,3670.3593062,-2662.2170945,-14.1656243555,54.2181876912], intercept=0.0)

In [17]:
# Evaluating the model on test data
preds = data_test.map(parseRecord).map(lambda p: (p.label, model.predict(p.features)))
err = preds.filter(lambda (v, p): v != p).count() / float(data_test.count())
print("Training Error = " + str(err))

Training Error = 0.453355824771


## Fitting Logistic Regression

In [18]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel

model = LogisticRegressionWithLBFGS.train(data_train.map(parseRecord))

In [19]:
model

(weights=[0.000438641825962,0.643992236566,0.0160266941194,0.229458388736,1.18529259753,3.67571490154e-07,-7.292236451e-08,-0.000353467229084,0.000143238868138], intercept=0.0)

In [20]:
# Evaluating the model on training data
preds = data_test.map(parseRecord).map(lambda p: (p.label, model.predict(p.features)))
err = preds.filter(lambda (v, p): v != p).count() / float(data_test.count())
print("Training Error = " + str(err))

Training Error = 0.300414046925


## Normalise the features

In the previous sections we used the raw feature scores.  We should normalize the data first.


In [21]:
from pyspark.mllib.feature import StandardScaler
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)

NameError: name 'data' is not defined