In [1]:
import warnings
warnings.simplefilter("ignore", DeprecationWarning)

In [2]:
import os
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.5'

In [3]:
import timeit
import numpy as np

In [4]:
import pyspark
from pyspark import sql, SparkConf, SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.session import SparkSession
from pyspark.serializers import Serializer

In [5]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.functions import col, udf, isnan, when, count
from pyspark.sql.types import *

In [6]:
from sklearn.neighbors import LSHForest, NearestNeighbors
from spark_sklearn import GridSearchCV

In [7]:
import psutil

In [8]:
cpu_count = psutil.cpu_count()
av_memory = psutil.virtual_memory().total / (1024.0 ** 3)
memory = str(int(0.9 * av_memory)) + 'G'
number_of_instances = 1

In [9]:
cpu_count, memory

(4, '21G')

In [10]:
conftfos = SparkConf().setAll([('spark.executor.instances',number_of_instances),('spark.executor.cores', cpu_count),('spark.executor.memory', memory),('spark.dynamicAllocation.enabled','False')])

sc = SparkContext(master='local', appName='Barcelona',conf=conftfos)
spark = SparkSession(sc)

In [11]:
BarcelonaFILE = 'Barcelona.csv'
AmsterdamFILE = 'Amsterdam.csv'

In [12]:
df_Barcelona_listings = spark.read.option("header","true").csv(BarcelonaFILE)
df_Barcelona_listings.withColumn('City', lit('Barcelona'))
df_Amsterdam_listings = spark.read.option("header","true").csv(AmsterdamFILE)
df_Amsterdam_listings.withColumn('City', lit('Amsterdam'))

DataFrame[id: string, name: string, host_id: string, host_name: string, neighbourhood_group: string, neighbourhood: string, latitude: string, longitude: string, room_type: string, price: string, minimum_nights: string, number_of_reviews: string, last_review: string, reviews_per_month: string, calculated_host_listings_count: string, availability_365: string, City: string]

In [13]:
df_listings = df_Amsterdam_listings.union(df_Barcelona_listings)

In [14]:
df_listings.head()

Row(id='20621335', name='Clean room Amsterdam. Metro 3min walk.Free parking', host_id='25403329', host_name='Victor', neighbourhood_group=None, neighbourhood='Bijlmer-Oost', latitude='52.319172968245226', longitude='4.981150531499213', room_type='Private room', price='52', minimum_nights='3', number_of_reviews='23', last_review='2017-11-28', reviews_per_month='6.83', calculated_host_listings_count='1', availability_365='12')

In [15]:
listings_names = df_listings.select(col('name')).fillna('')

In [16]:
listings_names.show()

+--------------------+
|                name|
+--------------------+
|Clean room Amster...|
|Sunny and cozy ro...|
|Pop B&B-private r...|
|Tastefully furnis...|
|Cozy room in the ...|
|Great room south ...|
|Lovely room in So...|
|spacious light ap...|
|Amsterdam South S...|
|B&B# green oasis ...|
|King  bedroom nea...|
|Spacious room in ...|
|Wow! Laid back to...|
|Amsterdam thrive ...|
|Large bedroom 15 ...|
|Comfortable  room...|
|3 rooms in green ...|
|Studio Amsterdam ...|
|Nice room 15 min ...|
|Nice appartment /...|
+--------------------+
only showing top 20 rows



In [17]:
tokenizer = Tokenizer(inputCol="name", outputCol="words")
tokenized = tokenizer.transform(listings_names)

In [18]:
tokenized.show()

+--------------------+--------------------+
|                name|               words|
+--------------------+--------------------+
|Clean room Amster...|[clean, room, ams...|
|Sunny and cozy ro...|[sunny, and, cozy...|
|Pop B&B-private r...|[pop, b&b-private...|
|Tastefully furnis...|[tastefully, furn...|
|Cozy room in the ...|[cozy, room, in, ...|
|Great room south ...|[great, room, sou...|
|Lovely room in So...|[lovely, room, in...|
|spacious light ap...|[spacious, light,...|
|Amsterdam South S...|[amsterdam, south...|
|B&B# green oasis ...|[b&b#, green, oas...|
|King  bedroom nea...|[king, , bedroom,...|
|Spacious room in ...|[spacious, room, ...|
|Wow! Laid back to...|[wow!, laid, back...|
|Amsterdam thrive ...|[amsterdam, thriv...|
|Large bedroom 15 ...|[large, bedroom, ...|
|Comfortable  room...|[comfortable, , r...|
|3 rooms in green ...|[3, rooms, in, gr...|
|Studio Amsterdam ...|[studio, amsterda...|
|Nice room 15 min ...|[nice, room, 15, ...|
|Nice appartment /...|[nice, app

In [19]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
removed = remover.transform(tokenized)

In [20]:
removed.show()

+--------------------+--------------------+--------------------+
|                name|               words|            filtered|
+--------------------+--------------------+--------------------+
|Clean room Amster...|[clean, room, ams...|[clean, room, ams...|
|Sunny and cozy ro...|[sunny, and, cozy...|[sunny, cozy, roo...|
|Pop B&B-private r...|[pop, b&b-private...|[pop, b&b-private...|
|Tastefully furnis...|[tastefully, furn...|[tastefully, furn...|
|Cozy room in the ...|[cozy, room, in, ...|[cozy, room, se, ...|
|Great room south ...|[great, room, sou...|[great, room, sou...|
|Lovely room in So...|[lovely, room, in...|[lovely, room, so...|
|spacious light ap...|[spacious, light,...|[spacious, light,...|
|Amsterdam South S...|[amsterdam, south...|[amsterdam, south...|
|B&B# green oasis ...|[b&b#, green, oas...|[b&b#, green, oas...|
|King  bedroom nea...|[king, , bedroom,...|[king, , bedroom,...|
|Spacious room in ...|[spacious, room, ...|[spacious, room, ...|
|Wow! Laid back to...|[wo

In [21]:
numFeatures = 50

In [22]:
hashingTF = HashingTF(inputCol='filtered', outputCol='rawFeatures', numFeatures=numFeatures)
listings = hashingTF.transform(removed)
idf = IDF(inputCol='rawFeatures', outputCol='VectorSpace')
idfModel = idf.fit(listings)
tfidf = idfModel.transform(listings)

In [23]:
tfidf.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|                name|               words|            filtered|         rawFeatures|         VectorSpace|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Clean room Amster...|[clean, room, ams...|[clean, room, ams...|(50,[7,18,33,40,4...|(50,[7,18,33,40,4...|
|Sunny and cozy ro...|[sunny, and, cozy...|[sunny, cozy, roo...|(50,[1,18,20,44,4...|(50,[1,18,20,44,4...|
|Pop B&B-private r...|[pop, b&b-private...|[pop, b&b-private...|(50,[11,19,25,37]...|(50,[11,19,25,37]...|
|Tastefully furnis...|[tastefully, furn...|[tastefully, furn...|(50,[8,21,28,46],...|(50,[8,21,28,46],...|
|Cozy room in the ...|[cozy, room, in, ...|[cozy, room, se, ...|(50,[1,23,28,44],...|(50,[1,23,28,44],...|
|Great room south ...|[great, room, sou...|[great, room, sou...|(50,[10,18,44,49]...|(50,[10,18,44,49]...|
|Lovely room in So...|[lovely, room, 

In [24]:
#train_size = 500

In [25]:
#tfidf_lim = tfidf.limit(train_size)

In [26]:
type(tfidf.select('VectorSpace').collect())

list

In [27]:
data = np.asarray(tfidf.select('VectorSpace').collect())

In [28]:
data.shape

(37111, 1, 50)

In [29]:
data = data.reshape((data.shape[0], data.shape[2]))

In [30]:
data.shape

(37111, 50)

In [31]:
def scorer(estimator, X):
    estimator.fit(X)
    n_neighbors = estimator.get_params()['n_neighbors']
    neighbors_approx = estimator.kneighbors(X, return_distance=False)
    
    nbrs = NearestNeighbors(n_neighbors=n_neighbors, algorithm='brute', metric='cosine').fit(X)
    neighbors_exact = nbrs.kneighbors(X, return_distance=False)
    
    accuracy = []
    for i in range(len(neighbors_approx)):
        accuracy.append(np.sum(np.equal(neighbors_approx[i], neighbors_exact[i]))/n_neighbors)
    
    result = np.mean(accuracy)
    return result

In [32]:
param_grid = {
    'n_estimators':[20,30,40],
    'min_hash_match':[3,4,5],
    'n_candidates':[50,75,100],
    'n_neighbors': [3],
}
lshf = LSHForest(random_state=7)
grid_search = GridSearchCV(sc=sc, estimator=lshf, param_grid=param_grid, scoring=scorer)



In [33]:
%%time
grid_search.fit(data)



CPU times: user 5.08 s, sys: 1.68 s, total: 6.76 s
Wall time: 21h 8min 15s


GridSearchCV(cv=None, error_score='raise',
       estimator=LSHForest(min_hash_match=4, n_candidates=50, n_estimators=10, n_neighbors=5,
     radius=1.0, radius_cutoff_ratio=0.9, random_state=7),
       fit_params={}, iid=True, n_jobs=1,
       param_grid={'min_hash_match': [3, 4, 5], 'n_estimators': [20, 30, 40], 'n_neighbors': [3], 'n_candidates': [50, 75, 100]},
       pre_dispatch='2*n_jobs', refit=True, return_train_score=True,
       sc=<SparkContext master=local appName=Barcelona>,
       scoring=<function scorer at 0x7f446c0f6730>, verbose=0)

In [34]:
best_estimator = grid_search.best_estimator_

In [35]:
best_estimator

LSHForest(min_hash_match=3, n_candidates=100, n_estimators=40, n_neighbors=3,
     radius=1.0, radius_cutoff_ratio=0.9, random_state=7)

In [36]:
try:
    grid_search.best_score_
except Exception as e:
    print(e)
    print("Sad. But true :(")

'GridSearchCV' object has no attribute 'best_score_'
Sad. But true :(


In [37]:
best_score = scorer(best_estimator, data)

In [38]:
best_score

0.792927523735101