In [1]:
import csv
import numpy
import random
from CWHashingFamily import CWHashingFamily
from sklearn.neighbors import LSHForest

import sys
reload(sys)
sys.setdefaultencoding('utf-8')

SHINGLES_QTY = 10
SHINGLES_LEN = 4
PRIME = 997
BINS = 900

def minhashList(shinglesList, mhQty):
    #Receives a quantity of minhashes to be calculated
    #Receives a list of shingles belonging to a text
    #Returns a list of mhQty minhashes for the provided shinglesList
    return [minhash(shinglesList, i) for i in xrange(0,mhQty)]


def shingles(text, shingleSize, shinglesQty):
    #Receives a text or string. Receives a size shingleSize and 
    #a quantity shinglesQty. 
    #Returns a list of shingleSize length random shingles
    #from the text containint shinglesQty shingles in total.
    allShingles = []
    for i in xrange(shinglesQty):
        randomEnd = random.randrange(shingleSize-1, len(text))
        allShingles.append(text[(randomEnd-shingleSize-1):randomEnd])
    return allShingles


def minhash(shinglesList, seed):
    #Receives a list of shingles and a seed.
    #Returns a minhash value for the corresponding
    #set of shingles and the seed.
    random.seed(seed)
    minhash = -1
    for shingle in shinglesList:
        cwFactory = CWHashingFamily(BINS, PRIME)
        numericHash = cwFactory.numericFunction(random.randrange(1,PRIME), random.randrange(0,PRIME))
        stringHash = cwFactory.stringFunction(random.randrange(0,PRIME),numericHash)
        shHash = stringHash.hash(shingle)
        if ((minhash == -1) or (shHash < minhash)):
            minhash = shHash
    return minhash


In [2]:
ID = 0
PREDICTION = 6
TEXT = 9

R = 4
B = 2

#Load train set from csv file and filter header line.
trainRDD = sc.textFile("train_sin_repeticiones.csv",minPartitions=None, use_unicode=False).mapPartitions(lambda x: csv.reader(x))
trainRDD = trainRDD.filter(lambda line: line[0]!="Id")

In [3]:
#Transform the train set to an RDD containing two fields (PREDICTION, MHLIST)
#Where PREDICTION is the provided value for the review score and MHLIST is a list
#of R*B = 8 Minhashes calculated from each reviews' own text.
trainRDD = trainRDD.map(lambda line:( int(line[PREDICTION]), [ minhashList(shingles(line[TEXT], SHINGLES_LEN, SHINGLES_QTY),R*B) ] ) )

#Perform a reduceByKey transformation through which all the minhash lists corresponding
#to a specific score will be grouped together
trainRDD = trainRDD.reduceByKey(lambda line1, line2: line1 + line2 )

#trainRDD = trainRDD.reduceByKey(lambda line1, line2: line1.append(line2) )

#Train 5 models of LSHForest, one for each score, with the obtained lists for the minhashes.
lsh5Stars = LSHForest(random_state=42)
lsh5Stars.fit(trainRDD.filter(lambda line: int(line[0]) == 5).first()[1])

lsh4Stars = LSHForest(random_state=42)
lsh4Stars.fit(trainRDD.filter(lambda line: int(line[0]) == 4).first()[1])

lsh3Stars = LSHForest(random_state=42)
lsh3Stars.fit(trainRDD.filter(lambda line: int(line[0]) == 3).first()[1])

lsh2Stars = LSHForest(random_state=42)
lsh2Stars.fit(trainRDD.filter(lambda line: int(line[0]) == 2).first()[1])

lsh1Stars = LSHForest(random_state=42)
lsh1Stars.fit(trainRDD.filter(lambda line: int(line[0]) == 1).first()[1])

LSHForest(min_hash_match=4, n_candidates=50, n_estimators=10, n_neighbors=5,
     radius=1.0, radius_cutoff_ratio=0.9, random_state=42)

In [21]:
#Load test set from RDD into an rdd.
testRDD = sc.textFile("test.csv",minPartitions=None, use_unicode=False).mapPartitions(lambda x: csv.reader(x))
testRDD = testRDD.filter(lambda line: line[0]!="Id")

In [22]:
ID = 0
PREDICTION = 1
TEXT = 8

def writePredictionAndReturn(review):
    #Receives a review which is expected to consist of the following fields:
    #["Id", "ProductId", "UserId", "ProfileName", "HelpfulnessNumerator","HelpfulnessDenominator", "Prediction", "Time", "Summary", "Text"]
    #Returns a predicted score for the review.
    try:
        #Obtain the reviews' minhash list.
        minhashes = minhashList(shingles(review[TEXT], SHINGLES_LEN, SHINGLES_QTY),R*B)
        
        #Obtain THE nearest neighbour from each LSH Forest, that is one for each score.
        distance5, neighbour5 = lsh5Stars.kneighbors(minhashes, n_neighbors=1)
        distance4, neighbour4 = lsh4Stars.kneighbors(minhashes, n_neighbors=1)
        distance3, neighbour3 = lsh3Stars.kneighbors(minhashes, n_neighbors=1)
        distance2, neighbour2 = lsh2Stars.kneighbors(minhashes, n_neighbors=1)
        distance1, neighbour1 = lsh1Stars.kneighbors(minhashes, n_neighbors=1)

        #Calculate the 1/distance to each neighbour. Since we want the nearest
        #neighbours to be more relevant for the prediction.
        iDistance5 = 1/distance5[0][0]
        iDistance4 = 1/distance4[0][0]
        iDistance3 = 1/distance3[0][0]
        iDistance2 = 1/distance2[0][0]
        iDistance1 = 1/distance1[0][0]

        #Calculate weighed average of all existing scores where the corresponding weights
        #will be 1/distance to each neighbour.
        prediction = iDistance5*5 + iDistance4*4 + iDistance3*3 + iDistance2*2 + iDistance1
        total = iDistance5 + iDistance4 + iDistance3 + iDistance2 + iDistance1

        #Since we treat this as a classification problem take the nearest
        #score as the predicted final score for the review.
        prediction = int(prediction / (total))
        
    except Exception,e:
        return 0
    return prediction

#Map each Id of the set to the predicted value of the review score based on
#the provided text.
testRDD = testRDD.map(lambda review: (review[0],writePredictionAndReturn(review)))


In [34]:
import pandas
base = '/home/eugenia/VowpalWabbit/TPDatos/'

#RDD to Spark DataFrame
mappedRDD = testRDD.map(lambda x: str(x)).map(lambda w: w.split(','))
mappedRDD.saveAsTextFile(base+'lshPredictions.csv')