<a href="https://colab.research.google.com/github/ansu0122/ucu-mmds/blob/main/src/homework1/ucu_mmds_homework1_shared.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook has to be run in Docker jupyter/pyspark-notebook contained as Colab terminates the session before the fitting/evaluation completes on a full dataset.

In [175]:
!pip install -q findspark
!pip install joblibspark



In [None]:
import os
version = "3.5.0"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-arm64"
os.environ["SPARK_HOME"] = f"/usr/local/spark"
!export PATH=$SPARK_HOME/bin:$PATH

In [None]:
import findspark
findspark.init()

In [None]:
import time
import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
import requests

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, avg, when
from pyspark.sql.functions import regexp_replace, col, lit, avg
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StopWordsRemover, Normalizer, Word2Vec
from pyspark.ml import Pipeline
from pyspark.sql import DataFrame

from joblibspark import register_spark

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.evaluation import Evaluator
from pyspark.ml import Estimator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.utils import parallel_backend
from pyspark.ml import PipelineModel


In [None]:
import pyspark 
sc = pyspark.SparkContext('local[*]')
# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

In [None]:
spark = SparkSession(sc)
spark

# Loading Data

In [None]:
cd /home/jovyan/work

/home/jovyan/work


In [None]:
!wget -O listings.csv.gz "https://data.insideairbnb.com/spain/catalonia/barcelona/2024-09-06/data/listings.csv.gz"
!gunzip -f listings.csv.gz

--2024-11-08 21:08:49--  https://data.insideairbnb.com/spain/catalonia/barcelona/2024-09-06/data/listings.csv.gz
Resolving data.insideairbnb.com (data.insideairbnb.com)... 108.138.51.6, 108.138.51.28, 108.138.51.93, ...
Connecting to data.insideairbnb.com (data.insideairbnb.com)|108.138.51.6|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9977937 (9.5M) [application/x-gzip]
Saving to: ‘listings.csv.gz’


2024-11-08 21:08:58 (1.06 MB/s) - ‘listings.csv.gz’ saved [9977937/9977937]



In [None]:
listings = spark.read.csv('./listings.csv', header=True, inferSchema=True, multiLine=True, escape='"')
listings.show(5)

+-----+--------------------+--------------+------------+-----------+--------------------+--------------------+---------------------+--------------------+-------+--------------------+--------------+----------+----------------+--------------------+------------------+------------------+--------------------+-----------------+--------------------+--------------------+--------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+--------------------+----------------------+----------------------------+-----------------+-----------------+------------------+---------------+------------+---------+--------------+--------+----+--------------------+-------+--------------+--------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------+----------------+---------------+---------------+---------------+----------------+-----------

In [None]:
listings_desc = listings.select('name','description').show(5)

+--------------------+--------------------+
|                name|         description|
+--------------------+--------------------+
|Huge flat for 8 p...|110m2 apartment t...|
|Forum CCIB DeLuxe...|Beautiful spaciou...|
|Sagrada Familia a...|A lovely two bedr...|
|Stylish Top Floor...|Located in close ...|
|VIDRE HOME PLAZA ...|Spacious apartmen...|
+--------------------+--------------------+
only showing top 5 rows



# Definitions

In [None]:
def run_tf_idf(input: DataFrame, input_col: str, features_num: int, stop_words: list) -> DataFrame:

    tokenizer = Tokenizer(inputCol=input_col, outputCol="words")
    general_remover = StopWordsRemover(inputCol="words", outputCol="cleanTokens")
    context_remover = StopWordsRemover(inputCol="cleanTokens", outputCol="myCleanTokens").setStopWords(stop_words)
    hashingTF = HashingTF(inputCol="myCleanTokens", outputCol="rawFeatures", numFeatures=features_num)
    idf = IDF(inputCol="rawFeatures", outputCol="vectorSpace")
    normalizer = Normalizer(inputCol="vectorSpace", outputCol="normVectorSpace")

    pipeline = Pipeline(stages=[tokenizer, general_remover, context_remover, hashingTF, idf, normalizer])
    model = pipeline.fit(input)
    results = model.transform(input)

    return results

In [None]:
def run_word2vec(input: DataFrame, input_col: str, stop_words: list, features_num: int, minCount: int=2) -> DataFrame:

    tokenizer = Tokenizer(inputCol=input_col, outputCol="words")
    general_remover = StopWordsRemover(inputCol="words", outputCol="cleanTokens")
    context_remover = StopWordsRemover(inputCol="cleanTokens", outputCol="myCleanTokens").setStopWords(stop_words)
    word2Vec = Word2Vec(vectorSize=features_num, minCount=minCount, inputCol="myCleanTokens", outputCol="vectorSpace")
    normalizer = Normalizer(inputCol="vectorSpace", outputCol="normVectorSpace")

    pipeline = Pipeline(stages=[tokenizer, general_remover, context_remover, word2Vec, normalizer])
    model = pipeline.fit(input)
    results = model.transform(input)

    return results

In [None]:
def compute_ground_truth(df: DataFrame, input_col: str, id_col: str, num_neighbors: int = 3):

    sample_df = df.select(id_col, input_col).collect()
    sample_features = np.array([row[input_col].toArray() if hasattr(row[input_col], 'toArray') else row[input_col]
                                for row in sample_df])

    ids = [row[id_col] for row in sample_df]
    nn = NearestNeighbors(n_neighbors=num_neighbors + 1, metric='cosine')
    nn.fit(sample_features)

    neighbors_indices = nn.kneighbors(sample_features, return_distance=False)
    ground_truth_ids = [[ids[idx] for idx in neighbors[1:]] for neighbors in neighbors_indices]

    return ground_truth_ids

In [None]:
def calculate_precision(found_neighbors: list, ground_truth: list):
    precision_scores = [
            len(set(found) & set(truth)) / len(truth) for found, truth in zip(found_neighbors, ground_truth)
        ]
    return np.mean(precision_scores)

In [66]:
class PrecisionEvaluator(Evaluator):
    def __init__(self, input_col: str, output_col: str, id_col: str, ground_truth: list, num_neighbors: int = 5):
        self.input_col = input_col
        self.output_col = output_col
        self.ground_truth = ground_truth
        self.id_col = id_col
        self.num_neighbors = num_neighbors

    def _evaluate(self, dataset: DataFrame, model: BucketedRandomProjectionLSH):

        dataset = dataset.withColumnRenamed(self.output_col, f"{self.output_col}_temp")

        return evaluate_lsh_model(dataset, model, self.input_col, self.id_col, self.ground_truth, self.num_neighbors)

    def isLargerBetter(self):
        return True


In [67]:
def evaluate_lsh_model(
                      dataset: DataFrame,
                      model: BucketedRandomProjectionLSH,
                      input_col: str,
                      id_col: str,
                      ground_truth: list,
                      num_neighbors: int = 5
                      ):
    sample_features = dataset.select(id_col, input_col).collect()
    found_neighbors = []

    for feature_row in sample_features:
        current_id = feature_row[id_col]
        query_vector = feature_row[input_col]

        # find nearest neighbors using approxNearestNeighbors
        neighbors = model.approxNearestNeighbors(dataset, query_vector, num_neighbors+1)

        # filter out the current record by `id_col`
        neighbors_list = [row[id_col] for row in neighbors.collect() if row[id_col] != current_id]

        found_neighbors.append(neighbors_list)

    return calculate_precision(found_neighbors, ground_truth)

In [68]:
class CustomCrossValidator(CrossValidator):
    def __init__(self,
                 estimator: Estimator,
                 estimatorParamMaps: list,
                 evaluator: Evaluator,
                 numFolds: int = 1):
        super().__init__(estimator=estimator, estimatorParamMaps=estimatorParamMaps, evaluator=evaluator, numFolds=numFolds)

    def _fit(self, dataset: DataFrame):

        best_model = None
        best_score = float('-inf')
        best_params = None

        v_numFolds = self.getOrDefault(self.numFolds)
        if isinstance(v_numFolds, int) and v_numFolds > 0:
            # split dataset into folds
            splits = dataset.randomSplit([1.0 / v_numFolds] * v_numFolds)
        else:
            raise ValueError(f"Number of folds should be a positive integer: {v_numFolds}")

        split_data = []
        for i in range(v_numFolds):
            test_data = splits[i]
            train_data = [splits[j] for j in range(v_numFolds) if j != i]
            train_data = train_data[0] if len(train_data) == 1 else train_data[0].union(*train_data[1:])
            split_data.append((train_data, test_data))

        for ip, param_map in enumerate(self.getOrDefault(self.estimatorParamMaps)):
            fold_metrics = []
            for il, (train_data, test_data) in enumerate(split_data):
                estimator_with_params = self.getOrDefault(self.estimator).copy(param_map)
                model = estimator_with_params.fit(train_data)
                print(f'train/eval loop index {il}, hash tab number {param_map.get(model.numHashTables)}')
                # passing model to the evaluator
                metric = self.getOrDefault(self.evaluator)._evaluate(test_data, model)
                fold_metrics.append(metric)

            avg_metric = np.mean(fold_metrics)

            if avg_metric > best_score:
                best_score = avg_metric
                best_model = model
                best_params = param_map

        return best_model, best_params, best_score


In [142]:
class CustomValidator(CrossValidator):
    def __init__(self,
                 estimator: Estimator,
                 estimatorParamMaps: list,
                 evaluator: Evaluator,
                 numFolds: int = 1):
        super().__init__(estimator=estimator, estimatorParamMaps=estimatorParamMaps, evaluator=evaluator, numFolds=numFolds)

    def _fit(self, dataset: DataFrame):

        best_model = None
        best_score = float('-inf')
        best_params = None

        for ip, param_map in enumerate(self.getOrDefault(self.estimatorParamMaps)):
            train_data, test_data = dataset.randomSplit([0.8, 0.2])
            estimator_with_params = self.getOrDefault(self.estimator).copy(param_map)
            model = estimator_with_params.fit(train_data)
            print(f'hash tab number -> {param_map.get(model.numHashTables)}')
            print(f'bucket length -> {param_map.get(model.bucketLength)}')
            # passing model to the evaluator
            avg_metric = self.getOrDefault(self.evaluator)._evaluate(test_data, model)

            if avg_metric > best_score:
                best_score = avg_metric
                best_model = model
                best_params = param_map

        return best_model, best_params, best_score


# Airbnb properties in Barcelona

In [70]:
num_feat = 2**8
num_neighbors = 5
target_col = 'name'
vector_space_col = 'normVectorSpace'
output_col = 'hashes'
id_col = 'id'


rnb_stopwords = [
    "apartment", "flat", "property", "located", "near", "includes", "features",
    "offered", "offering", "available", "spacious", "beautiful", "lovely",
    "modern", "new", "floor", "building", "block", "complex", "close", "distance",
    "minutes", "city", "center", "town", "square", "station", "transport", "shops",
    "amenities", "service", "services", "access", "perfect", "ideal", "suitable",
    "designed", "high-quality", "quality", "luxury", "premium", "affordable", "cheap",
    "price", "cost", "included", "terms", "conditions", "contact", "call", "schedule",
    "view", "appointment", "now", "immediate", "moving", "move-in", "move", "ready",
    "relax", "cozy", "charming", "comfortable", "beautiful", "peaceful", "stylish",
    "great", "amazing", "lovely", "nice", "serene", "elegant", "bright", "spacious",
    "private", "unique", "friendly", "warm", "luxurious", "vibrant", "inviting"
]

In [71]:
listings.select('id').summary().show()

+-------+--------------------+
|summary|                  id|
+-------+--------------------+
|  count|               19482|
|   mean|4.645670724989749...|
| stddev|4.961103420827175E17|
|    min|               18674|
|    25%|            25402379|
|    50%|            53390353|
|    75%|  965071766227471616|
|    max| 1239618639352958825|
+-------+--------------------+



### TF-IDF & LSH

In [None]:
tf_idf_results = run_tf_idf(listings, target_col, num_feat, rnb_stopwords)

In [77]:
train_data_tfidf, test_data_tfidf = tf_idf_results.randomSplit([0.8, 0.2], seed=42)
train_y_tfidf = compute_ground_truth(train_data_tfidf, vector_space_col, 'id', num_neighbors=num_neighbors)
test_y_tfidf = compute_ground_truth(test_data_tfidf, vector_space_col, 'id', num_neighbors=num_neighbors)
print(train_data_tfidf.count())
print(test_data_tfidf.count())

15650
3832


In [78]:
%%time

lsh_model = BucketedRandomProjectionLSH(inputCol=vector_space_col, outputCol=output_col)

# Define parameter grid with bucketLength and numHashTables
param_grid = ParamGridBuilder() \
    .addGrid(lsh_model.numHashTables, [200, 500, 1000, 1500]) \
    .addGrid(lsh_model.bucketLength, [1.0]) \
    .build()

evaluator = PrecisionEvaluator(input_col=vector_space_col, output_col=output_col, id_col=id_col, ground_truth=train_y_tfidf, num_neighbors=num_neighbors)

register_spark()

with parallel_backend('spark', n_jobs=3):
  crossval = CustomValidator(
      estimator=lsh_model,
      estimatorParamMaps=param_grid,
      evaluator=evaluator,
      numFolds=3
  )

best_model_tfidf, best_params, best_score = crossval.fit(train_data_tfidf)
best_params = {param.name: value for param, value in best_model_tfidf.extractParamMap().items()}
print(f"best score => {best_score}")
print("Best Parameters:")
for param_name, param_value in best_params.items():
    print(f"{param_name}: {param_value}")




hash tab number -> 200
hash tab number -> 500
hash tab number -> 1000
hash tab number -> 1500
best score => 0.0007559055118110235
Best Parameters:
numHashTables: 1500
outputCol: hashes
bucketLength: 1.0
inputCol: normVectorSpace
CPU times: user 7min 10s, sys: 11.5 s, total: 7min 21s
Wall time: 9h 2min 18s


In [163]:
# saving the best model
model_save_path = f'/home/jovyan/work/ftidf-model'
pipeline_model = PipelineModel(stages=[best_model_tfidf])
pipeline_model.write().overwrite().save(model_save_path)

In [None]:
# evaluate the best model
with parallel_backend('spark', n_jobs=5):
    best_model_tfidf = PipelineModel.load('/home/jovyan/work/tfidf-model').stages[-1]

tfidf_precision = evaluate_lsh_model (
    dataset = test_data_tfidf,
    model = best_model_tfidf,
    input_col = vector_space_col,
    id_col = id_col,
    ground_truth = test_y_tfidf,
    num_neighbors = num_neighbors
)

In [None]:
print(f'test precision {tfidf_precision*100:.0f}%')

test precision 78%


### W2V & LSH

In [None]:
w2v_results = run_word2vec(listings, target_col, rnb_stopwords, num_feat)

In [141]:
train_data_w2v, test_data_w2v = w2v_results.randomSplit([0.2, 0.8], seed=42)
train_y_w2v = compute_ground_truth(train_data_w2v, vector_space_col, id_col, num_neighbors=num_neighbors)
test_y_w2v = compute_ground_truth(test_data_w2v, vector_space_col, id_col, num_neighbors=num_neighbors)
print(train_data_w2v.count())
print(test_data_w2v.count())

3977
15505


In [143]:
%%time

lsh_model = BucketedRandomProjectionLSH(inputCol=vector_space_col, outputCol=output_col)

# Define parameter grid with bucketLength and numHashTables
param_grid = ParamGridBuilder() \
    .addGrid(lsh_model.numHashTables, [50, 100, 200]) \
    .addGrid(lsh_model.bucketLength, [0.5, 1.0, 2.0]) \
    .build()

evaluator = PrecisionEvaluator(input_col=vector_space_col, output_col=output_col, id_col=id_col, ground_truth=train_y_w2v, num_neighbors=num_neighbors)

register_spark()

with parallel_backend('spark', n_jobs=10):
    crossval = CustomValidator(
        estimator=lsh_model,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3
)

best_model_w2v, best_params, best_score = crossval.fit(train_data_w2v)
best_params = {param.name: value for param, value in best_model_w2v.extractParamMap().items()}

print("Best Parameters:")
for param_name, param_value in best_params.items():
    print(f"{param_name}: {param_value}")



hash tab number -> 50
bucket length -> 0.5
hash tab number -> 50
bucket length -> 1.0
hash tab number -> 50
bucket length -> 2.0
hash tab number -> 100
bucket length -> 0.5
hash tab number -> 100
bucket length -> 1.0
hash tab number -> 100
bucket length -> 2.0
hash tab number -> 200
bucket length -> 0.5
hash tab number -> 200
bucket length -> 1.0
hash tab number -> 200
bucket length -> 2.0
Best Parameters:
numHashTables: 100
outputCol: hashes
bucketLength: 2.0
inputCol: normVectorSpace
CPU times: user 47.9 s, sys: 6.07 s, total: 54 s
Wall time: 1h 42min 30s


In [148]:
from pyspark.ml import PipelineModel

model_save_path = f'/home/jovyan/work/w2v-model'
pipeline_model = PipelineModel(stages=[best_model_w2v])
pipeline_model.write().overwrite().save(model_save_path)

In [None]:
# evaluate the best model

with parallel_backend('spark', n_jobs=10):
        best_model_w2v = PipelineModel.load('/home/jovyan/work/w2v-model').stages[-1]

w2v_precision = evaluate_lsh_model (
    dataset = test_data_w2v,
    model = best_model_w2v,
    input_col = vector_space_col,
    id_col = id_col,
    ground_truth = test_y_w2v,
    num_neighbors = num_neighbors
)

In [None]:
print(f'test precision {w2v_precision*100:.0f}%')

test precision 90%


# Ukrainian Wikipedia

In [153]:
url = "https://wikimedia.org/api/rest_v1/metrics/pageviews/top/uk.wikisource/all-access/2019/04/all-days"
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Referer': 'https://www.example.com/'
}

response = requests.get(url, headers=headers)

if response.status_code != 200:
    print(f"Error: {response.status_code}")


In [154]:
if response.status_code == 200:
    data = response.json()
    articles_list = data['items'][0]['articles']  # Adjust if accessing more nested structures

    # Convert list of dictionaries to PySpark DataFrame
    wiki_df = spark.createDataFrame(articles_list)
    wiki_df = wiki_df.withColumn("id", F.monotonically_increasing_id())

    wiki_df = wiki_df.withColumn("article", F.regexp_replace("article", r"[_\-.,!?:;-@#$%^&*\/(){}\[\]«»…»]", " "))
    wiki_df = wiki_df.withColumn("article", F.regexp_replace("article", r"\s+", " "))
    wiki_df = wiki_df.withColumn("article", F.trim("article"))

    # Display the DataFrame
    wiki_df.show()
else:
    print(f"Error fetching data: {response.status_code}")

+--------------------+----+-----+---+
|             article|rank|views| id|
+--------------------+----+-----+---+
|    Головна сторінка|   1|21278|  0|
|               Вірую|   2|14244|  1|
|Мойсей Іван Франк...|   3| 2603|  2|
|Закон України Про...|   4| 2576|  3|
|Закон України Про...|   5| 1776|  4|
| Конституція України|   6| 1684|  5|
|   Архіви ДАЖО 178 3|   7| 1426|  6|
|Конституція Пилип...|   8| 1248|  7|
|     Конституція США|   9| 1221|  8|
| Молитва за померлих|  10| 1212|  9|
|Біблія Огієнко Но...|  11| 1100| 10|
|Арфами арфами Тичина|  12| 1069| 11|
|      Таємниця щастя|  13| 1047| 12|
|            Отче наш|  14| 1011| 13|
|Народні музичні і...|  15|  966| 14|
|      Головна стаття|  16|  942| 15|
|Природно заповідн...|  17|  924| 16|
|Про Правила дорож...|  18|  918| 17|
|Біблія Огієнко Но...|  19|  902| 18|
|    Спеціальна Пошук|  20|  881| 19|
+--------------------+----+-----+---+
only showing top 20 rows



In [155]:
wiki_df.count()

975

In [156]:
num_feat = 2**8
num_neighbors = 5
wiki_target_col = 'article'
vector_space_col = 'normVectorSpace'
output_col = 'hashes'
id_col = 'id'


wiki_stopwords = [
    "авжеж", "адже", "але", "б", "без", "був", "була", "були", "було", "бути",
    "більш", "вам", "вас", "весь", "вздовж", "ви", "вниз", "внизу", "вона", "вони",
    "воно", "все", "всередині", "всіх", "від", "він", "да", "давай", "давати", "де",
    "дещо", "для", "до", "з", "завжди", "замість", "й", "коли", "ледве", "майже", "ми",
    "навколо", "навіть", "нам", "от", "отже", "отож", "поза", "про", "під", "та", "так",
    "такий", "також", "те", "ти", "тобто", "тож", "тощо", "хоча", "це", "цей", "чи",
    "чого", "що", "як", "який", "якої", "є", "із", "інших", "їх", "її",
    "в", "на", "до", "під", "з", "над", "по", "за", "від", "між", "перед", "через",
    "без", "про", "для", "у", "внаслідок", "відносно", "завдяки"
]

In [157]:
wiki_df = run_tf_idf(wiki_df, wiki_target_col, num_feat, wiki_stopwords)

In [158]:
train_data_wiki, test_data_wiki = wiki_df.randomSplit([0.8, 0.2], seed=42)
train_y_wiki = compute_ground_truth(train_data_wiki, vector_space_col, id_col, num_neighbors=num_neighbors)
test_y_wiki = compute_ground_truth(test_data_wiki, vector_space_col, id_col, num_neighbors=num_neighbors)
print(train_data_wiki.count())
print(test_data_wiki.count())

793
182


In [159]:
%%time

lsh_model = BucketedRandomProjectionLSH(inputCol=vector_space_col, outputCol=output_col)

# Define parameter grid with bucketLength and numHashTables
param_grid = ParamGridBuilder() \
    .addGrid(lsh_model.numHashTables, [5, 10, 20, 50, 70]) \
    .addGrid(lsh_model.bucketLength, [0.5, 1.0, 2.0]) \
    .build()

evaluator = PrecisionEvaluator(
    input_col=vector_space_col,
    output_col=output_col,
    id_col=id_col,
    ground_truth=train_y_wiki,
    num_neighbors=num_neighbors
)

register_spark()

with parallel_backend('spark', n_jobs=5):
    crossval = CustomValidator(
        estimator=lsh_model,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        numFolds=3
    )

best_model_wiki, best_params, best_score = crossval.fit(train_data_wiki)
best_params = {param.name: value for param, value in best_model_wiki.extractParamMap().items()}

print("Best Parameters:")
for param_name, param_value in best_params.items():
    print(f"{param_name}: {param_value}")



hash tab number -> 5
bucket length -> 0.5
hash tab number -> 5
bucket length -> 1.0
hash tab number -> 5
bucket length -> 2.0
hash tab number -> 10
bucket length -> 0.5
hash tab number -> 10
bucket length -> 1.0
hash tab number -> 10
bucket length -> 2.0
hash tab number -> 20
bucket length -> 0.5
hash tab number -> 20
bucket length -> 1.0
hash tab number -> 20
bucket length -> 2.0
hash tab number -> 50
bucket length -> 0.5
hash tab number -> 50
bucket length -> 1.0
hash tab number -> 50
bucket length -> 2.0
hash tab number -> 70
bucket length -> 0.5
hash tab number -> 70
bucket length -> 1.0
hash tab number -> 70
bucket length -> 2.0
Best Parameters:
numHashTables: 70
outputCol: hashes
bucketLength: 1.0
inputCol: normVectorSpace
CPU times: user 9.44 s, sys: 2.31 s, total: 11.8 s
Wall time: 6min 12s


In [None]:
# saving the best model
model_save_path = f'/home/jovyan/work/wiki-model'
pipeline_model = PipelineModel(stages=[best_model_wiki])
pipeline_model.write().overwrite().save(model_save_path)

In [178]:
%%time
# evaluate the best model

with parallel_backend('spark', n_jobs=5):
    best_model_wiki = PipelineModel.load('/home/jovyan/work/wiki-model').stages[-1]

wiki_precision = evaluate_lsh_model(
                                    dataset = test_data_wiki,
                                    model = best_model_wiki,
                                    input_col = vector_space_col,
                                    id_col = id_col,
                                    ground_truth = test_y_wiki,
                                    num_neighbors = num_neighbors
                                    )

print(f'test precision {wiki_precision*100:.2f}%')



test precision 95.82%
CPU times: user 1.18 s, sys: 225 ms, total: 1.4 s
Wall time: 30.9 s


# TF/IDF/LSH model with WIKI data

In [180]:
%%time

wiki_precision2 = evaluate_lsh_model(
                                      dataset = test_data_wiki,
                                      model = best_model_tfidf,
                                      input_col = vector_space_col,
                                      id_col = id_col,
                                      ground_truth = test_y_wiki,
                                      num_neighbors = num_neighbors
                                      )
print(f'test precision {wiki_precision2*100:.2f}%')

test precision 95.71%
CPU times: user 9.87 s, sys: 190 ms, total: 10.1 s
Wall time: 49.3 s


# Conclusions

### The accuracy of your model with at least 4 combinations of parameters

#### We conducted three experiments:
#### Experiment 1:
- Model Type: LSH (BucketedRandomProjectionLSH)
- Vectorization: TF/IDF (256)
- Param Grid:
    - lsh_model.numHashTables, [200, 500, 1000, 1500]
    - lsh_model.bucketLength, [1.0]
- Best Params: 
    - lsh_model.numHashTables = 1500, 
    - lsh_model.bucketLength = 1
- Dataset: Airbnb Barcelona listings
- Train/Test split: 80%/20% (15650/3832)
- Test precision: 78%

#### Experiment 2:
- Model Type: LSH (BucketedRandomProjectionLSH)
- Vectorization: Word2Vec (256)
- Param Grid:
    - lsh_model.numHashTables, [50, 100, 200]
    - lsh_model.bucketLength, [0.5, 1.0, 2.0]
- Best Params: 
    - lsh_model.numHashTables = 100, 
    - lsh_model.bucketLength = 2
- Dataset: Airbnb Barcelona listings
- Train/Test split: 20%/80% (3832/15650)
- Test precision: 90%

#### Experiment 3:
- Model Type: LSH (BucketedRandomProjectionLSH)
- Vectorization: TF/IDF (256)
- Param Grid:
    - lsh_model.numHashTables, [5, 10, 20, 50, 70]
    - lsh_model.bucketLength, [0.5, 1.0, 2.0]
- Best Params: 
    - lsh_model.numHashTables = 70, 
    - lsh_model.bucketLength = 1
- Dataset: Ukrainian Wikipedia April 2019
- Train/Test split: 80%/20% (793/182)
- Test precision: 95.82%

### The computation time spent in the parameter tuning procedure (specify also the characteristics of the machine(s) that you have used)
#### Experiment 1:
- Machine: Mac Pro M1 16 RAM
- Parallel jobs: 3
- Fine-tunning time: Wall time: 9h 2min 18s
- Evaluation time: 5h 0min 2s

#### Experiment 2:
- Machine: Mac Pro M1 16 RAM
- Parallel jobs: 10
- Fine-tunning time: Wall time: 1h 42min 30s
- Evaluation time: 11h 2min 28s

#### Experiment 3:
- Machine: Mac Pro M1 16 RAM
- Parallel jobs: 5
- Fine-tunning time: Wall time: 6min 12s
- Evaluation time: Wall time: 30.9 s

### What happen if we tune the parameters using Airbnb and the run with those parameters in The Wikipedia Dataset? What is the difference in accuracy with the parameters tuned directly in Wikipedia data?

- Model Type: LSH (BucketedRandomProjectionLSH) fine-tuned on Airbnb data
- Dataset: Ukrainian Wikipedia April 2019
- Test precision 95.71%
- Evaluation time: Wall time: 49.3 s

Despite the fact that both datasets have different contextual meanings, we are achieving surprisingly good performance when evaluating the model on the Wiki dataset using an LSH model fine-tuned on Airbnb data. This is likely because the optimal number of hash tables for Airbnb is much larger than for the Wiki data, resulting in a certain degree of overfitting. However, it is evident that as the number of hash tables increases, the evaluation time also increases, which could be problematic in real-world scenarios.