In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [2]:
import multiprocessing

multiprocessing.cpu_count()

12

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
from pyspark.sql import functions as F
from pyspark.ml.feature import Tokenizer, StopWordsRemover,Word2Vec,BucketedRandomProjectionLSH
from pyspark.ml import Pipeline, Estimator, Model
from pyspark.ml.evaluation import BinaryClassificationEvaluator,Evaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit
from pyspark.sql.window import Window as W
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
from pyspark.ml.param.shared import *
from pyspark import keyword_only 
from sklearn.neighbors import NearestNeighbors

In [4]:
!wget "http://data.insideairbnb.com/spain/catalonia/barcelona/2020-10-12/visualisations/listings.csv" -O listings_barc.csv
# !wget http://data.insideairbnb.com/spain/catalonia/barcelona/2020-10-12/visualisations/reviews.csv

--2020-11-19 00:03:08--  http://data.insideairbnb.com/spain/catalonia/barcelona/2020-10-12/visualisations/listings.csv
Resolving data.insideairbnb.com (data.insideairbnb.com)... 52.216.88.178
Connecting to data.insideairbnb.com (data.insideairbnb.com)|52.216.88.178|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3070172 (2.9M) [application/csv]
Saving to: ‘listings_barc.csv’


2020-11-19 00:03:11 (1.36 MB/s) - ‘listings_barc.csv’ saved [3070172/3070172]



In [2]:
import pandas as pd
import numpy as np
# use pandas to read and clean data, because spark has some issues reading the csv file
df = pd.read_csv("listings_barc.csv")
# drop all na so no errors like 
df.dropna(inplace = True)

# make indexes start form 1
df['id'] = np.arange(1, len(df) + 1)

# df.to_csv("listings_barc.csv", index = False)

In [3]:
listings_clear = spark.createDataFrame(df)


In [4]:
# needed to get KNN_model (returned in CustomLSH::_fit)
class HasKnnModel(Params):
    knn_model = Param(Params._dummy(), "knn_model", "knn_model")

    def __init__(self):
        super(HasKnnModel, self).__init__()

    def setKnnModel(self, value):
        return self._set(knn_model=value)

    def getKnnModel(self):
        return self.getOrDefault(self.knn_model)

class HasNumHashTables(Params):
    numHashTables = Param(Params._dummy(), "numHashTables", "numHashTables", 
        typeConverter=TypeConverters.toInt)

    def __init__(self):
        super(HasNumHashTables, self).__init__()

    def setNumHashTables(self, value):
        return self._set(numHashTables=value)

    def getNumHashTables(self):
        return self.getOrDefault(self.numHashTables)
    
class HasLshModel(Params):
    lshModel = Param(Params._dummy(), "lshModel", "lshModel")

    def __init__(self):
        super(HasLshModel, self).__init__()

    def setLshModel(self, value):
        return self._set(lshModel=value)

    def getLshModel(self):
        return self.getOrDefault(self.lshModel)
    
class HasTrainDataset(Params):
    trainDataset = Param(Params._dummy(), "trainDataset", "trainDataset")

    def __init__(self):
        super(HasTrainDataset, self).__init__()

    def setTrainDataset(self, value):
        return self._set(trainDataset=value)

    def getTrainDataset(self):
        return self.getOrDefault(self.trainDataset)
    
    
class HasBucketLength(Params):
    bucketLength = Param(Params._dummy(), "bucketLength", "bucketLength", 
        typeConverter=TypeConverters.toInt)

    def __init__(self):
        super(HasBucketLength, self).__init__()

    def setBucketLength(self, value):
        return self._set(bucketLength=value)

    def getBucketLength(self):
        return self.getOrDefault(self.bucketLength)


In [5]:

class CustomLSH_Model(Model, HasInputCol, HasPredictionCol,
        HasNumHashTables, HasLshModel,HasTrainDataset,
        DefaultParamsReadable, DefaultParamsWritable, HasBucketLength, HasKnnModel):
    
    @keyword_only
    def __init__(self,inputCol=None, predictionCol=None,
                numHashTables=None, bucketLength=None, lshModel=None, trainDataset=None, knn_model=None):
        super(CustomLSH_Model, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
        
    @keyword_only
    def setParams(self, knn_model=None, inputCol=None, predictionCol=None,bucketLength=None,
                numHashTables=None, lshModel=None, trainDataset=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)  
        
    
    def add_knn_true(self,spark_dataset, knn_model, train_data):
        df = spark_dataset.toPandas()
        train_ds = train_data.toPandas()

        features = df.features
               
        def get_true(feature_id):
            #knn_indexes array of array of neighbours of each feature_id
            knn_indexes = (knn_model.kneighbors(features[feature_id].toArray().reshape(1, -1), n_neighbors = 5, return_distance = False)).tolist()[0]
            #form a list of ids
            arr_of_indexes = []
            for ind in knn_indexes:
                arr_of_indexes.append(int(train_ds.id[ind]))
                
            return arr_of_indexes
        
        #fill pandas 
        res = [get_true(i) for i in range(0, len(df))]
        df['knn_true'] = res
        out_df = spark.createDataFrame(df)
        return out_df
        

    def get_lsh_neighbors(self, dataset):
        df = dataset.toPandas()
            
        def get_true_lsh(feature):
            results = self.lshModel.approxNearestNeighbors(dataset = self.trainDataset, key = feature, numNearestNeighbors = 5, distCol = 'distance')
            res_list = results.select('id').collect()
            res_array = []
            for res in res_list:
                res_array.append(int(res[0]))
            return res_array
        
        arr = [get_true_lsh(row[self.inputCol]) for index, row in df.iterrows()]
        
        df['lsh_true'] = arr
        
        out_df = spark.createDataFrame(df)
        return out_df
    
    
    def _transform(self, dataset):
        self.inputCol = self.getInputCol()
        outputCol = self.getPredictionCol()
        self.lshModel = self.getLshModel()
        self.trainDataset = self.getTrainDataset()
        # adds knn_true to dataframe
        tested_knn = self.add_knn_true(dataset, self.getKnnModel(), self.trainDataset)
#         store in class variable
        self.tested_knn = tested_knn
#         print(self.trainDataset.count())
#         print(tested_knn.count())
#         print(dataset.count())
        
        tested_lsh = self.get_lsh_neighbors(tested_knn)
        
#         print(tested_lsh.count())
        return tested_lsh
    

    

In [6]:
class CustomLSH(Estimator, HasInputCol, 
        HasPredictionCol, HasNumHashTables, HasLshModel,
        DefaultParamsReadable, DefaultParamsWritable, HasBucketLength, HasKnnModel):
    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None, numHashTables=10, bucketLength=10):
        super(CustomLSH, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    def setInputCol(self, value):
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setPredictionCol(self, value):
        return self._set(predictionCol=value)
    
    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None, numHashTables=10, bucketLength=10):
        kwargs = self._input_kwargs
        return self._set(**kwargs)  
    
    def get_knn_column(self, spark_dataset):
#         print(spark_dataset.show(3))
        """
        trains on train part of dataset.features and returns 5 neighbours for each entry
        """
        df = spark_dataset.toPandas()
        features = df.features.to_list()

        nbrs = NearestNeighbors(n_neighbors=6)
        nbrs = nbrs.fit(features)

        return nbrs
    
    def _fit(self, dataset):
        # get knn model as ground truth
        knn_model = self.get_knn_column(dataset)
#         print(f"_fit {dataset.count()}")
        inputCol = self.getInputCol()
        lsh = BucketedRandomProjectionLSH(inputCol = inputCol, outputCol="hashes", seed = 1, bucketLength = self.getBucketLength(), numHashTables = self.getNumHashTables())
        LSH_res = lsh.fit(dataset)
        LSH_Transform = LSH_res.transform(dataset)

        return CustomLSH_Model(
            inputCol = inputCol,
            bucketLength = self.getBucketLength(), 
            numHashTables = self.getNumHashTables(),
            lshModel = LSH_res,
            trainDataset = LSH_Transform,
            predictionCol = self.getPredictionCol(),
            knn_model = knn_model)
    


In [7]:
class CustomEvaluator(Evaluator):

    def __init__(self, predictionCol="prediction", labelCol="label"):
        self.predictionCol = predictionCol
        self.labelCol = labelCol

    def _evaluate(self, dataset):
        
#         print(sum(F.when(dataset[self.predictionCol]==dataset[self.labelCol],1).otherwise(0)))
        res_array_intersect = dataset.select(F.array_intersect(dataset[self.predictionCol], dataset[self.labelCol])).collect()
  #         array_intersect(lsh_true, knn_true)
        res_intersect_dataframe = spark.createDataFrame(res_array_intersect)
        
        res_intersect_dataframe = res_intersect_dataframe.withColumn('lsh_performance', F.size(F.col('array_intersect(lsh_true, knn_true)'))).drop('array_intersect(lsh_true, knn_true)')
        
#         res_array_intersect = F.array_intersect(dataset[self.predictionCol], dataset[self.labelCol]).collect()
#         print(res_array_intersect.show(1))
    
#         dataset.withColumn("lsh_performance", F.array_intersect(dataset[self.predictionCol], dataset[self.labelCol]).collect() )
#         dataset = dataset.withColumn("lsh_performance", sum(F.when(dataset[self.predictionCol]==dataset[self.labelCol],1).otherwise(0)))
        res = res_intersect_dataframe.select("lsh_performance").groupBy().sum().collect()[0]['sum(lsh_performance)']/(dataset.count() * 5)
        print("Accuracy: " + "{:.7%}".format(res))
        return res 

In [9]:
import time

tokenizer = Tokenizer(inputCol='name', outputCol='words')
remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='tokenized')
word2vec = Word2Vec(vectorSize = 20, inputCol=remover.getOutputCol(), outputCol='features')
lsh = CustomLSH().setInputCol(word2vec.getOutputCol())

pipeline = Pipeline(stages=[tokenizer, remover, word2vec, lsh])

paramGrid = ParamGridBuilder() \
    .addGrid(lsh.numHashTables, [3, 4, 5]) \
    .addGrid(lsh.bucketLength, [5, 7, 9]) \
    .build()


# crossval = CrossValidator(estimator=pipeline,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=BinaryClassificationEvaluator(),
#                           numFolds=2)

tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=CustomEvaluator(predictionCol='lsh_true', labelCol='knn_true'),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

start = time.time()
cvModel = tvs.fit(listings_clear)
end = time.time()

print("Time elapsed %.2f seconds" % (end - start) )


Accuracy: 96.6302432%
Accuracy: 96.7289390%
Accuracy: 96.6231935%
Accuracy: 96.6513923%
Accuracy: 96.6443426%
Accuracy: 96.7289390%
Accuracy: 96.6654917%
Accuracy: 96.6795911%
Accuracy: 96.6513923%
Time elapsed 2746.79 seconds


In [None]:
# RESULTS shown by previous cell 
# Accuracy: 96.6302432%
# Accuracy: 96.7289390%
# Accuracy: 96.6231935%
# Accuracy: 96.6513923%
# Accuracy: 96.6443426%
# Accuracy: 96.7289390%
# Accuracy: 96.6654917%
# Accuracy: 96.6795911%
# Accuracy: 96.6513923%
# Time elapsed 2746.79 seconds

In [10]:
best_model = cvModel.bestModel

start = time.time()
res = cvModel.bestModel.transform(listings_clear)
end = time.time()
print("training time %.2f seconds" % (end - start) )


res_array_intersect = res.select(F.array_intersect(res['lsh_true'], res['knn_true'])).collect()

res_intersect_dataframe = spark.createDataFrame(res_array_intersect)

res_intersect_dataframe = res_intersect_dataframe.withColumn('lsh_performance', F.size(F.col('array_intersect(lsh_true, knn_true)'))).drop('array_intersect(lsh_true, knn_true)')

res_final = res_intersect_dataframe.select("lsh_performance").groupBy().sum().collect()[0]['sum(lsh_performance)']/(res_intersect_dataframe.count() * 5)
print("best_model Accuracy: " + "{:.7%}".format(res_final))

bestLSHModel = best_model.stages[3]

print("\nBest  parameters:")
print("LSH_numHashTables %.i" % bestLSHModel.getNumHashTables())
print("LSH_bucketLength %.i" % bestLSHModel.getBucketLength())


training time 2115.45 seconds
best_model Accuracy: 96.5921907%

Best  parameters:
LSH_numHashTables 3
LSH_bucketLength 7


In [None]:
# RESULTS shown by previous cell 
# best_model Accuracy: 96.5921907%

# Best  parameters:
# LSH_numHashTables 3
# LSH_bucketLength 7

In [12]:
# this is test of F.array_intersect
# from pyspark.sql import Row
# c1_name = 'c1'
# c2_name = 'c2'

# df = spark.createDataFrame([Row(c1=["b", "a", "c"], c2=["c", "d", "a", "f"]), Row(c1=["b", "c"], c2=["c", "d", "a", "f"])])
# list_inter = df.select(F.array_intersect(df[c1_name], df[c2_name])).collect()
# print(list_inter)
# len(list_inter[1]['array_intersect(c1, c2)'])

## task 2

In [11]:
import requests
from pyspark.sql import SQLContext

query_str = 'https://wikimedia.org/api/rest_v1/metrics/pageviews/top/uk.wikisource/all-access/2019/04/all-days'
sqlContext = SQLContext(spark)
req = requests.get(query_str)
df_words = sqlContext.createDataFrame(req.json()['items'][0]['articles'])

df_words = df_words.withColumn("id", F.monotonically_increasing_id())



In [12]:
#process text
from pyspark.sql.functions import *

def remove_punctuation(column):
     return lower(regexp_replace(concat_ws("SEPARATORSTRING", column),"[_/().,$%&!?:]", ' ')).alias('article')

def do_numbers(column):
     return lower(regexp_replace(concat_ws("SEPARATORSTRING", column),"[\d+]", '[number]')).alias('article')

def join_words_tokens(column):
     return lower(regexp_replace(concat_ws("SEPARATORSTRING", column),"[-'’«»]", '')).alias('article')

def remove_extraspace(column):
     return lower(regexp_replace(concat_ws("SEPARATORSTRING", column),"\s\s+", ' ')).alias('article')

df_clean = df_words.select(remove_extraspace(join_words_tokens(do_numbers(remove_punctuation(col('article'))))),
                     col('rank'),
                     col('views'), col('id'))


In [13]:
# get ukrainian stopwords

stopwords_ua = pd.read_csv("https://github.com/skupriienko/Ukrainian-Stopwords/raw/master/stopwords_ua.txt", header=None, names=['stopwords'])
stopword_ua_final = list(stopwords_ua.stopwords)

In [14]:
df_clean.show(3)

+--------------------+----+-----+---+
|             article|rank|views| id|
+--------------------+----+-----+---+
|    головна сторінка|   1|21278|  0|
|               вірую|   2|14244|  1|
|мойсей іван франк...|   3| 2603|  2|
+--------------------+----+-----+---+
only showing top 3 rows



In [15]:



tokenizer = Tokenizer(inputCol = 'article', outputCol = 'words')
remover = StopWordsRemover(inputCol = tokenizer.getOutputCol(), outputCol = 'tokenized', stopWords = stopword_ua_final)
word2vec = Word2Vec(vectorSize = 20, inputCol = remover.getOutputCol(), outputCol = 'features')
lsh = CustomLSH().setInputCol(word2vec.getOutputCol())
pipeline = Pipeline(stages=[tokenizer,remover, word2vec, lsh])

paramGrid = ParamGridBuilder() \
    .addGrid(lsh.numHashTables, [3, 4, 5, 6]) \
    .addGrid(lsh.bucketLength, [5, 6, 7, 8]) \
    .build()


# crossval = CrossValidator(estimator=pipeline,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=BinaryClassificationEvaluator(),
#                           numFolds=2)

tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=CustomEvaluator(predictionCol='lsh_true', labelCol='knn_true'),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

start = time.time()
cvModel2 = tvs.fit(df_clean)
end = time.time()

print("Time elapsed %.2f seconds" % (end - start) )

Accuracy: 52.7956989%
Accuracy: 53.0107527%
Accuracy: 52.6881720%
Accuracy: 53.8709677%
Accuracy: 54.0860215%
Accuracy: 53.7634409%
Accuracy: 54.8387097%
Accuracy: 54.0860215%
Accuracy: 54.3010753%
Accuracy: 53.8709677%
Accuracy: 53.9784946%
Accuracy: 54.9462366%
Accuracy: 54.0860215%
Accuracy: 54.1935484%
Accuracy: 53.8709677%
Accuracy: 54.8387097%
Time elapsed 233.77 seconds


In [19]:
# RESULTS shown by previous cell 
# Accuracy: 52.7956989%
# Accuracy: 53.0107527%
# Accuracy: 52.6881720%
# Accuracy: 53.8709677%
# Accuracy: 54.0860215%
# Accuracy: 53.7634409%
# Accuracy: 54.8387097%
# Accuracy: 54.0860215%
# Accuracy: 54.3010753%
# Accuracy: 53.8709677%
# Accuracy: 53.9784946%
# Accuracy: 54.9462366%
# Accuracy: 54.0860215%
# Accuracy: 54.1935484%
# Accuracy: 53.8709677%
# Accuracy: 54.8387097%
# Time elapsed 233.77 seconds

In [18]:
best_model = cvModel2.bestModel

start = time.time()
res = cvModel2.transform(df_clean)
end = time.time()
print("training time %.2f seconds" % (end - start) )


res_array_intersect = res.select(F.array_intersect(res['lsh_true'], res['knn_true'])).collect()

res_intersect_dataframe = spark.createDataFrame(res_array_intersect)

res_intersect_dataframe = res_intersect_dataframe.withColumn('lsh_performance', F.size(F.col('array_intersect(lsh_true, knn_true)'))).drop('array_intersect(lsh_true, knn_true)')

res_final = res_intersect_dataframe.select("lsh_performance").groupBy().sum().collect()[0]['sum(lsh_performance)']/(res_intersect_dataframe.count() * 5)
print("best_model Accuracy: " + "{:.7%}".format(res_final))

bestLSHModel = best_model.stages[3]

print("\nBest  parameters:")
print("LSH_numHashTables %.i" % bestLSHModel.getNumHashTables())
print("LSH_bucketLength %.i" % bestLSHModel.getBucketLength())


training time 132.16 seconds
best_model Accuracy: 58.6666667%

Best  parameters:
LSH_numHashTables 5
LSH_bucketLength 8


In [20]:
# RESULTS shown by previous cell 
# training time 132.16 seconds
# best_model Accuracy: 58.6666667%

# Best  parameters:
# LSH_numHashTables 5
# LSH_bucketLength 8

In [21]:
#AIRBNB best params on wiki dataset

tokenizer = Tokenizer(inputCol = 'article', outputCol = 'words')
remover = StopWordsRemover(inputCol = tokenizer.getOutputCol(), outputCol = 'tokenized', stopWords = stopword_ua_final)
word2vec = Word2Vec(vectorSize = 20, inputCol = remover.getOutputCol(), outputCol = 'features')
lsh = CustomLSH(inputCol=word2vec.getOutputCol())
# lsh = CustomLSH().setInputCol("features")
pipeline = Pipeline(stages=[tokenizer,remover, word2vec, lsh])

paramGrid = ParamGridBuilder().addGrid(lsh.numHashTables, [3]).addGrid(lsh.bucketLength, [7]).build()

tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=CustomEvaluator(predictionCol='lsh_true', labelCol='knn_true'),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

test_model = tvs.fit(df_clean)
test_model_res = test_model.transform(df_clean)

res_array_intersect = test_model_res.select(F.array_intersect(test_model_res['lsh_true'], test_model_res['knn_true'])).collect()

res_intersect_dataframe = spark.createDataFrame(res_array_intersect)

res_intersect_dataframe = res_intersect_dataframe.withColumn('lsh_performance', F.size(F.col('array_intersect(lsh_true, knn_true)'))).drop('array_intersect(lsh_true, knn_true)')

res_final = res_intersect_dataframe.select("lsh_performance").groupBy().sum().collect()[0]['sum(lsh_performance)']/(res_intersect_dataframe.count() * 5)
print("best_model Accuracy: " + "{:.7%}".format(res_final))


Accuracy: 53.4408602%
best_model Accuracy: 59.0358974%


In [28]:
# # this is test run to check that everything is working

# import time
# listings_test = listings_clear.limit(100)

# tokenizer = Tokenizer(inputCol='name', outputCol='words')
# remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='tokenized')
# word2vec = Word2Vec(vectorSize = 20, inputCol=remover.getOutputCol(), outputCol='features')
# lsh = CustomLSH().setInputCol(word2vec.getOutputCol())

# pipeline = Pipeline(stages=[tokenizer,remover, word2vec, lsh])

# paramGrid = ParamGridBuilder() \
#     .addGrid(lsh.numHashTables, [1, 2, 3]) \
#     .addGrid(lsh.bucketLength, [5, 6, 7]) \
#     .build()


# # crossval = CrossValidator(estimator=pipeline,
# #                           estimatorParamMaps=paramGrid,
# #                           evaluator=BinaryClassificationEvaluator(),
# #                           numFolds=2)

# tvs_test = TrainValidationSplit(estimator = pipeline,
#                            estimatorParamMaps = paramGrid,
#                            evaluator = CustomEvaluator(predictionCol='lsh_true', labelCol='knn_true'),
#                            # 80% of the data will be used for training, 20% for validation.
#                            trainRatio = 0.8)

# start = time.time()
# cvModel_test = tvs_test.fit(listings_test)
# end = time.time()

# print("Time elapsed %.2f seconds" % (end - start) )

# best_model = cvModel_test.bestModel

# start = time.time()
# res = cvModel_test.transform(listings_test)
# end = time.time()
# print("training time %.2f seconds" % (end - start) )


# res_array_intersect = res.select(F.array_intersect(res['lsh_true'], res['knn_true'])).collect()

# res_intersect_dataframe = spark.createDataFrame(res_array_intersect)

# res_intersect_dataframe = res_intersect_dataframe.withColumn('lsh_performance', F.size(F.col('array_intersect(lsh_true, knn_true)'))).drop('array_intersect(lsh_true, knn_true)')

# res_final = res_intersect_dataframe.select("lsh_performance").groupBy().sum().collect()[0]['sum(lsh_performance)']/(res_intersect_dataframe.count() * 5)
# print("best_model Accuracy: " + "{:.7%}".format(res_final))


Accuracy: 80.0000000%
Accuracy: 80.0000000%
Accuracy: 80.0000000%
Accuracy: 80.9090909%
Accuracy: 80.9090909%
Accuracy: 80.9090909%
Accuracy: 85.4545455%
Accuracy: 85.4545455%
Accuracy: 85.4545455%
Time elapsed 20.94 seconds
training time 14.94 seconds
best_model Accuracy: 92.8000000%
