In [1]:
sc

In [2]:
from pyspark.sql.types import *
from pyspark.sql.functions import when, col
from pyspark.ml.feature import HashingTF, IDF, Tokenizer,RegexTokenizer, StopWordsRemover

## Reading data

In [3]:
listings = spark.read.csv("listings.csv",header=True) 
reviews = spark.read.csv("reviews.csv",header=True) 

## Find TF-IDF

In [4]:
df_listings = listings.na.fill({'name': ''})
names = df_listings.select(col("name"))

In [5]:
tokenizer = Tokenizer(inputCol = "name", outputCol = "words")
regexTokenizer = RegexTokenizer(inputCol="name", outputCol="words", 
                                pattern="\\W", toLowercase = False)

wordsData = regexTokenizer.transform(names)

In [6]:
remover = StopWordsRemover(inputCol="words", outputCol="CleanTokens")
noStopwords = remover.transform(wordsData)
customRemover = remover.setStopWords(remover.getStopWords()+["a"])

In [7]:
hashingTF = HashingTF(inputCol = "CleanTokens", outputCol = "rowFeatures",
                      numFeatures = 20)
featurizedData = hashingTF.transform(noStopwords)

idf = IDF(inputCol = "rowFeatures", outputCol = "features")
idfModel = idf.fit(featurizedData)
tfidf = idfModel.transform(featurizedData)

## GridSerachCV for LSHForest

To apply spark_sklearn.GridSearchCV for LSHForest I have done the next: 
* Created a new child class of LSHForest (LSHForestExt) in which I've implemented 'score' method. This was required by GridSearchCV to use grid search procedure on LSHF algorithm.


* 'score' method computes accuracy to access how close are neighbors found by LSHF to actual neighbours. As actual neighbors I considered ones found with sklearn.neighbors.NearestNeighbors(algorithm='brute'). The accuracy is the number of 'guessed' by LSHF closest neighbors for the particular item out of the total number of neigbours we were searching for. So if LSHF guessed 3 closest neigbours out of 5 requested, then accuracy is 0.6. For a set of items I take mean of accuracies for items in set.


* GridSearchCV runs in parallel on the data I pass into 'fit' method. That is why the implemented 'score' method of LSHForestExt is applied to  some subset of the orginal dataset we run GridSearchCV on. But when we apply nearest neighbors algorithm, all items in input set are indexed at the beginning and algorithm produces indeces within the range of input size dataset. That is why to check LSHF's performance, we need to make indeces of input items in LSHF and kNN match, so we need to fit both models on the same data. That is why I fit both models in 'score' method on a small subset (restricted to 500 items, but it's size can be less when a smaller subset is passed into 'score') and then measure LSHF performance.


In [8]:
from sklearn.neighbors import LSHForest, NearestNeighbors
from spark_sklearn import GridSearchCV
import time
import numpy as np



In [9]:
class LSHForestExt(LSHForest):
    def __init__(self, n_estimators=10, radius=1.0, n_candidates=50,
                 n_neighbors=5, min_hash_match=4, radius_cutoff_ratio=.9,
                 test_subset_size = 500, random_state=42):
        self.n_estimators = n_estimators
        self.radius = radius
        self.random_state = random_state
        self.n_candidates = n_candidates
        self.n_neighbors = n_neighbors
        self.min_hash_match = min_hash_match
        self.radius_cutoff_ratio = radius_cutoff_ratio    
        self.test_subset_size = test_subset_size
      
   
    def score(self, X):
        """
        Note: score method is called from GridSearch which is executed in parallel 
            on original data, which is splitted there on train and test. 
            So we even don't know the exact length of X here
        
        Parameters
        ----------
        X : {array-like, sparse matrix}, shape = [n_samples, n_features]
            Test subset. 
        
        Returns
        -------
        score : float
            Number of 'guessed' by LSHF naighbours compared with neighbours found by 
            skletrarn.neighbors.NearestNeighbors among total number of neighbours
        """
               
        testset_len = min(self.test_subset_size, len(X))
        subset = list(X[i] for i in range(testset_len))
        nbrs = NearestNeighbors(algorithm='brute', metric='cosine',
                        n_neighbors=self.n_neighbors).fit(subset)
        self.fit(subset)
        real_nbrs = nbrs.kneighbors(subset, return_distance=False)
        approx_nbrs = self.kneighbors(subset, return_distance=False)
        
        accuracy = [np.in1d(approx_nbrs[i], real_nbrs[i]).mean() for i in range(testset_len)]
        return np.mean(accuracy)
    
    
    def predict(self, X):
        """
        Predict kNNs for each row in X
        In GridSearchCV this method is applied with best extimator
        """
        return self.kneighbors(X)

In [10]:
parameters = {
              'n_candidates': [10, 20, 50], #10, 20, 50
              'n_estimators': [10, 20, 50, 100] #, 50, 70, 100
             }

n_neighbors = 3
lshf = LSHForestExt(random_state=42, n_neighbors= n_neighbors)
clf = GridSearchCV(sc, lshf, parameters)

dataset = tfidf.select('features').rdd.flatMap(lambda x: x).collect()
print("Dataset size:", len(dataset))

start = time.time()
clf.fit(dataset)
end = time.time()

print("Time spent for cross validation {0:.3f}".format(end - start))
clf.grid_scores_

Dataset size: 18556




Time spent for cross validation 96.984


[mean: 0.54378, std: 0.00937, params: {'n_candidates': 10, 'n_estimators': 10},
 mean: 0.58622, std: 0.01688, params: {'n_candidates': 10, 'n_estimators': 20},
 mean: 0.66489, std: 0.00793, params: {'n_candidates': 10, 'n_estimators': 50},
 mean: 0.73333, std: 0.00599, params: {'n_candidates': 10, 'n_estimators': 100},
 mean: 0.63378, std: 0.01035, params: {'n_candidates': 20, 'n_estimators': 10},
 mean: 0.75533, std: 0.00785, params: {'n_candidates': 20, 'n_estimators': 20},
 mean: 0.91022, std: 0.00691, params: {'n_candidates': 20, 'n_estimators': 50},
 mean: 0.96978, std: 0.00494, params: {'n_candidates': 20, 'n_estimators': 100},
 mean: 0.93356, std: 0.00300, params: {'n_candidates': 50, 'n_estimators': 10},
 mean: 0.98133, std: 0.00340, params: {'n_candidates': 50, 'n_estimators': 20},
 mean: 0.99222, std: 0.00083, params: {'n_candidates': 50, 'n_estimators': 50},
 mean: 0.99178, std: 0.00191, params: {'n_candidates': 50, 'n_estimators': 100}]

In [11]:
print("Best score", clf.best_score_)
print("\nBest estimetor", clf.best_params_)

Best score 0.9922222821010275

Best estimetor {'n_candidates': 50, 'n_estimators': 50}


In [12]:
test_best_estimator_ds = tfidf.limit(10).select('features').rdd.flatMap(lambda x: x).collect()
distances, indices = clf.predict(test_best_estimator_ds)
test_best_estimator_array = np.array(test_best_estimator_ds)
print("Test dataset")
display(test_best_estimator_ds)
print("\nIndices of {0} nearest neighbours predicted with LSHF".format(n_neighbors))
print(indices)
print("\nDistances from each of {0} nearest neighbours predicted with LSHF".format(n_neighbors))
print(distances)

print("\nLet's varify visually that the found nearest neighbours make sense.")
check_test_element_indeces = [0,1]
print("\nIndeces of elements from test for visual validation", check_test_element_indeces)
print("\nElements themselves as sparse vectors") 
display(list(test_best_estimator_ds[i] for i in check_test_element_indeces))
print("\nElements from initial dataset which were found as closest neighbours of {0} element(s) from test".
      format(check_test_element_indeces))
unfold_indeces_nearest_nbrs = indices[check_test_element_indeces].reshape(1, n_neighbors*len(check_test_element_indeces))[0]
display(list(dataset[i] for i in unfold_indeces_nearest_nbrs))

Test dataset


[SparseVector(20, {2: 2.2756, 3: 1.6912, 6: 2.6478, 8: 1.6822, 11: 2.1647, 14: 1.1739, 18: 1.6747}),
 SparseVector(20, {5: 2.0771, 10: 1.8866, 11: 2.1647, 14: 1.1739, 19: 2.0622}),
 SparseVector(20, {3: 1.6912, 4: 1.4578, 6: 2.6478, 9: 2.3068, 10: 1.8866, 12: 0.816, 13: 1.4106, 14: 1.1739}),
 SparseVector(20, {3: 1.6912, 4: 1.4578, 6: 2.6478, 8: 1.6822, 19: 2.0622}),
 SparseVector(20, {2: 1.1378, 4: 1.4578, 7: 1.7113, 14: 1.1739}),
 SparseVector(20, {2: 1.1378, 14: 1.1739, 15: 1.4232, 19: 2.0622}),
 SparseVector(20, {1: 1.3631, 2: 1.1378, 9: 1.1534, 10: 1.8866, 14: 1.1739}),
 SparseVector(20, {0: 1.3827, 3: 1.6912, 8: 1.6822, 9: 1.1534, 13: 1.4106, 15: 1.4232}),
 SparseVector(20, {0: 1.3827, 2: 1.1378, 10: 1.8866}),
 SparseVector(20, {3: 1.6912, 5: 2.0771, 9: 2.3068, 13: 1.4106, 15: 1.4232})]


Indices of 3 nearest neighbours predicted with LSHF
[[    0 13929  5984]
 [    1 14445 12240]
 [    2 18407 15740]
 [    3 18539  3654]
 [    4  1812    65]
 [    5 18483 18373]
 [    6  3466  4876]
 [ 2452     7  4714]
 [ 4025     8 11941]
 [    9  4844   743]]

Distances from each of 3 nearest neighbours predicted with LSHF
[[0.00000000e+00 1.23062223e-01 1.79505134e-01]
 [0.00000000e+00 1.26606073e-01 1.26606073e-01]
 [0.00000000e+00 1.17630698e-01 1.31575219e-01]
 [0.00000000e+00 7.71355005e-02 7.71355005e-02]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 4.55900977e-02 8.01953287e-02]
 [1.11022302e-16 1.11022302e-16 7.12502637e-02]
 [0.00000000e+00 0.00000000e+00 2.47617980e-02]
 [0.00000000e+00 0.00000000e+00 0.00000000e+00]
 [0.00000000e+00 3.66769699e-02 1.28897533e-01]]

Let's varify visually that the found nearest neighbours make sense.

Indeces of elements from test for visual validation [0, 1]

Elements themselves as sparse vectors


[SparseVector(20, {2: 2.2756, 3: 1.6912, 6: 2.6478, 8: 1.6822, 11: 2.1647, 14: 1.1739, 18: 1.6747}),
 SparseVector(20, {5: 2.0771, 10: 1.8866, 11: 2.1647, 14: 1.1739, 19: 2.0622})]


Elements from initial dataset which were found as closest neighbours of [0, 1] element(s) from test


[SparseVector(20, {2: 2.2756, 3: 1.6912, 6: 2.6478, 8: 1.6822, 11: 2.1647, 14: 1.1739, 18: 1.6747}),
 SparseVector(20, {2: 1.1378, 6: 2.6478, 8: 1.6822, 11: 2.1647, 12: 0.816, 18: 1.6747}),
 SparseVector(20, {1: 1.3631, 2: 2.2756, 6: 2.6478, 8: 1.6822, 11: 2.1647}),
 SparseVector(20, {5: 2.0771, 10: 1.8866, 11: 2.1647, 14: 1.1739, 19: 2.0622}),
 SparseVector(20, {10: 1.8866, 11: 2.1647, 14: 1.1739, 19: 2.0622}),
 SparseVector(20, {10: 1.8866, 11: 2.1647, 14: 1.1739, 19: 2.0622})]