In [2]:
from pyspark import SparkContext
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector
from pyspark.ml.linalg import DenseVector
from pyspark.sql import Row
from functools import partial
from pyspark.ml.regression import LinearRegression
import HTMLParser
import re, collections

#remove symbols and numbers and convert to lower case
def words(s):
	h = HTMLParser.HTMLParser()
	s=h.unescape(s)
	s=re.sub('[?!\\.,\(\)#\"\']+',' ',s).strip()
	s=re.sub('[ ]+[0-9]+[/]+[0-9]+[ ]+',' ',s)
	s=re.sub('[ ]+[0-9]+[\-]+[0-9]+[ ]+',' ',s)
	s=re.sub('[ ]+[0-9]+[\.]+[0-9]+[ ]+',' ',s)
	s=re.sub('[ ]+[0-9]+[,]+[0-9]+[ ]+',' ',s)
	s=re.sub('[\-]+',' ',s)
	s=re.sub('[0-9]+',' ',s)
	s=re.sub('([a-z]+)([A-Z]{1,1})([a-z]+)',r'\1 \2\3',s)
	s=re.sub('\s+',' ',s)
	return re.findall('[a-z]+', s.lower())
	


def fixEncoding(x):
    # fix encoding in fields name and value
    id = x['product_uid']
    name = ''
    if x['name'] is not None:
        name = x['name'].encode("UTF-8")
    value = ""
    if x['value'] is not None:
        value = x['value'].encode("UTF-8")
    retVal = '%s %s.' % (name, value)
    # return tuple instead of row
    return (id, [retVal])


def addFeatureLen(row):
    vector = row['tf_idf']
    size = vector.size
    newVector = {}
    for i, v in enumerate(vector.indices):
        newVector[v] = vector.values[i]
    newVector[size] = len(vector.indices)
    size += 1
    # we cannot change the input Row so we need to create a new one
    data = row.asDict()
    data['tf_idf'] = SparseVector(size, newVector)
    # new Row object with specified NEW fields
    newRow = Row(*data.keys())
    # fill in the values for the fields
    newRow = newRow(*data.values())
    return newRow


def cleanData(row, model):
    # we are going to fix search term field
    text = row['search_term'].split()
    for i, v in enumerate(text):
        text[i] = correct(v, model)
    data = row.asDict()
    # create new field for cleaned version
    data['search_term2'] = text
    newRow = Row(*data.keys())
    newRow = newRow(*data.values())
    return newRow


def newFeatures(row):
    vector = row['tf_idf']
    data = row.asDict()
    data['features'] = DenseVector([len(vector.indices), vector.values.min(), vector.values.max()])
    newRow = Row(*data.keys())
    newRow = newRow(*data.values())
    return newRow


def tfIdfAsNewFeatures(row):
    vector = row['tf_idf']
    data = row.asDict()    
    data['features'] = DenseVector([len(vector.indices), vector.values.min(), vector.values.max(), vector.values.mean()])
    newRow = Row(*data.keys())
    newRow = newRow(*data.values())
    return newRow

def tfIdfAsNewFeaturesBis(row):
    vector = row['tf_idf']
    data = row.asDict()    
    data['features'] = DenseVector(vector.toArray())
    newRow = Row(*data.keys())
    newRow = newRow(*data.values())
    return newRow

def enlargeToken(row):
    vectorT = row['words_title']
    vectorD = row['words_desc']
    data = row.asDict()
    data['words'] = vectorT + vectorD
    newRow = Row(*data.keys())
    newRow = newRow(*data.values())
    return newRow

def enlargeTokenAndClean(row):
    vectorT = row['words_title']
    vectorD = row['words_desc']
    data = row.asDict()
    data['words'] = vectorT + vectorD
    #w=[]
    #for word in data['words']:
        #w += words(word)
    #data['wordsF'] = w
    newRow = Row(*data.keys())
    newRow = newRow(*data.values())
    return newRow


sc = SparkContext.getOrCreate()

sqlContext = HiveContext(sc)
print "###############"
# READ data
data = sqlContext.read.format("com.databricks.spark.csv").\
    option("header", "true").\
    option("inferSchema", "true").\
    load("train.csv").repartition(100)
print "data loaded - head:"
print data.head()
print "################"

attributes = sqlContext.read.format("com.databricks.spark.csv").\
    option("header", "true").\
    option("inferSchema", "true").\
    load("attributes.csv").repartition(100)

print "attributes loaded - head:"
print attributes.head()
print "################"

product_description = sqlContext.read.format("com.databricks.spark.csv").\
    option("header", "true").\
    option("inferSchema", "true").\
    load("product_descriptions.csv").repartition(100)

print "description loaded - head:"
print product_description.head()
print "################"


# attributes: 0-N lines per product
# Step 1 : fix encoding and get data as an RDD (id,"<attribute name> <value>")
attRDD = attributes.rdd.map(fixEncoding)
print "new RDD:"
print attRDD.first()
print "################"

# Step 2 : group attributes by product id
attAG = attRDD.reduceByKey(
    lambda x, y: x + y).map(lambda x: (x[0], ' '.join(x[1])))
print "Aggregated by product_id:"
print attAG.first()
print "################"

# Step 3 create new dataframe from aggregated attributes
atrDF = sqlContext.createDataFrame(attAG, ["product_uid", "attributes"])
print "New dataframe from aggregated attributes:"
print atrDF.head()
print "################"

# Step 4 join data with attribute

withAttdata = data.join(atrDF, ['product_uid'], 'left_outer')
print "Joined Data:"
print withAttdata.head()
#Row(product_uid=100501, id=2847, product_title=u'Ring Wireless Video Door Bell', search_term=u'door bell', relevance=3.0, attributes=u"Adjustable Volume Yes. Bullet04 Multiple faceplate finishes helping you match your current door hardware. Mechanical Bell No. Product Width (in.) 2.4. Bullet03 Built-in motion sensors detect movement up to 30 ft. allowing you to know what is going on outside of your home. Multiple Songs No. Digital Bell No. Product Height (in.) 5. Bullet02 Compatible with all iOS and android Smartphone and tablets. Door Chime Kit Type Wired With Contacts. Number of Sounds 1. Zone-specific Sounds No. Bell Button Color Family Gray. Certifications and Listings No Certifications or Listings. Product Depth (in.) .9. Electrical Product Type Door Chime Kit. Transformer Not Included. Bullet01 See and speak with visitors using your Smartphone or tablet, whether you're upstairs or across town. Door Bell Or Intercom Type Door Bells. Number of Buttons Included 2. Westminster Bell No. Bell Wire Required Wireless. Bullet05 Connect to current doorbell wiring or utilize internal battery for convenience. MFG Brand Name Ring. Style Contemporary.")
print "################"

# Step 5 join data with description
print "new RDD:"
print product_description.first()
print "################"

fulldata = withAttdata.join(product_description, ['product_uid'], 'left_outer')
print "Joined Data:"
print fulldata.head()
print "################"


# TF-IDF features
# Step 1: split text field into words
tokenizer = Tokenizer(inputCol="product_title", outputCol="words_title")
fulldata = tokenizer.transform(fulldata)
print "Tokenized Title:"
print fulldata.head()
print "################"

# Step 1 Prim: split text field into words
tokenizer = Tokenizer(inputCol="product_description", outputCol="words_desc")
fulldata = tokenizer.transform(fulldata)
print "Tokenized Description:"
print fulldata.head()
print "################"

#Merge product with words

fulldata = sqlContext.createDataFrame(fulldata.rdd.map((enlargeTokenAndClean)))                      
print "words enlarge with desc and title"
print fulldata.head()
print "################"                                    

# Step 2: compute term frequencies
hashingTF = HashingTF(inputCol="words", outputCol="tf")
fulldata = hashingTF.transform(fulldata)
print "TERM frequencies:"
print fulldata.head()
print "################"
# Step 3: compute inverse document frequencies
idf = IDF(inputCol="tf", outputCol="tf_idf")
idfModel = idf.fit(fulldata)
fulldata = idfModel.transform(fulldata)
print "IDF :"
print fulldata.head()
print "################"

# Step 4 new features column / rename old
fulldata = sqlContext.createDataFrame(fulldata.rdd.map(addFeatureLen))
fulldata = sqlContext.createDataFrame(fulldata.rdd.map(newFeatures))
print "NEW features column :"
print fulldata.head()
print "################"


# Step 5: ALTERNATIVE ->ADD column with number of terms as another feature
#fulldata = sqlContext.createDataFrame(fulldata.rdd.map(
 #   addFeatureLen))  # add an extra column to tf features
#fulldata = fulldata.withColumnRenamed('tf_idf', 'tf_idf_plus')
#print "ADDED a column and renamed :"
#print fulldata.head()
#print "################"


# create NEW features & train and evaluate regression model
# Step 1: create features
fulldata = fulldata.withColumnRenamed(
    'relevance', 'label').select(['label', 'features'])
print "TRAIN - ADDED a column and renamed :"
print fulldata.head()
print "################"


# Simple evaluation : train and test split
(train, test) = fulldata.rdd.randomSplit([0.8, 0.2])

# Initialize regresion model
lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(sqlContext.createDataFrame(train))

# Apply model to test data
result = lrModel.transform(sqlContext.createDataFrame(test))
# Compute mean squared error metric
MSE = result.rdd.map(lambda r: (r['label'] - r['prediction'])**2).mean()
print("Mean Squared Error = " + str(MSE))

###############
data loaded - head:
Row(id=149, product_uid=100028, product_title=u'Backyard X-Scapes 6 ft. H. x 16 ft. L Reed Fencing', search_term=u'balcony privacy screen', relevance=2.67)
################
attributes loaded - head:
Row(product_uid=100002, name=u'Opacity', value=u'Solid')
################
description loaded - head:
Row(product_uid=100040, product_description=u'With an exceptional variety of different styles, Veranda Pro Series vinyl privacy fence kit is perfect for the pro or the do-it-yourselfer. Our vinyl fence offers the perfect combination of high quality and low maintenance you have been looking for. The lightweight kit components make installation fast and easy. The Washington Pro Series Privacy fence panel kit allows you to easily add privacy and curb appeal to your home.Corresponding posts sold separately, use: Line post (Model #73014884), corner post (Model #73014886), end post (Model #73014885)5 in. x 5 in. post tops (Various styles available) attach easies

Row(product_uid=100170, id=1019, product_title=u'Dyna-Glo Pro 125,000 BTU Forced Air LP Gas Portable Heater', search_term=u'kerosene heater', relevance=1.0, attributes=u'Area Heated (Sq. Ft.) 3100. Bullet01 70,000 - 125,000 BTUs. Bullet02 Heats up to 3,100 sq. ft.. Bullet03 Continuously variable BTUs. Bullet04 Continuos electronic ignition - prevents dangerous delayed ignition. Certifications and Listings CSA Listed. Color Orange. Color Family Oranges / Peaches. Fuel rate (gallons/hour) 0. Fuel tank capacity (gallons) 0. Heat rating (BTU/hour) 125000. Heater Type Vented. Heating Product Type Gas Portable Heater. Heating Technology Type Convection. Ignition Type Continuous Spark. Indoor/Outdoor Indoor/Outdoor. Material Steel. MFG Brand Name Dyna-Glo Pro. Portable Heater Features Automatic Shutoff,Tip-Over Safety Switch,Wheels. Power/Fuel Type Propane. Product Depth (in.) 11.81. Product Height (in.) 16.14. Product Weight (lb.) 20.72. Product Width (in.) 25.59. Run time (hours) 12.', prod

IDF :
Row(words_title=[u'dyna-glo', u'pro', u'125,000', u'btu', u'forced', u'air', u'lp', u'gas', u'portable', u'heater'], product_description=u'Dyna-Glo Pro portable gas forced air heaters offer a quick fire continuous spark ignition systems, coupled with adjustable height controls to insure you get the heat you need, when you need it, and where you want it. Fueled by liquid propane, this unit heats up to 3,100 sq. ft. 10 ft. Hose and regulator assembly are included (LP tank sold separately).70,000 - 125,000 BTUsHeats up to 3,100 sq. ft.Continuously variable BTUsContinuos electronic ignition - prevents dangerous delayed ignitionHome Depot Protection Plan:', id=1019, words=[u'dyna-glo', u'pro', u'125,000', u'btu', u'forced', u'air', u'lp', u'gas', u'portable', u'heater', u'dyna-glo', u'pro', u'portable', u'gas', u'forced', u'air', u'heaters', u'offer', u'a', u'quick', u'fire', u'continuous', u'spark', u'ignition', u'systems,', u'coupled', u'with', u'adjustable', u'height', u'controls',

Py4JJavaError: An error occurred while calling o476.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 276.0 failed 1 times, most recent failure: Lost task 0.0 in stage 276.0 (TID 5182, localhost, executor driver): java.net.SocketException: Socket is closed
	at java.net.Socket.getInputStream(Socket.java:903)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:151)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2768)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2765)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2765)
	at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketException: Socket is closed
	at java.net.Socket.getInputStream(Socket.java:903)
	at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:151)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more


Mean Squared Error = 0.281658352253  (empty) ????????  c'est pas normal
Mean Squared Error = 0.284223501857  (title seul)
Mean Squared Error = 0.282793946361  (desc seul)
Mean Squared Error = 0.280816931591 (title + desc)
Mean Squared Error = 0.283601709652 (min+max+mean+len) c'est normal car la moyenne bruite la regression
Mean Squared Error = 0.286859147648 (netoyage ponctu) ???? 


