In [8]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import math
import string
import nltk
import csv
import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
import re
from nltk.corpus import stopwords
from collections import defaultdict, Counter


if 'sc' not in locals():
    conf = SparkConf().setMaster("local").setAppName("Inverted Index & TF-IDF")
    sc = SparkContext.getOrCreate(conf=conf)
    spark = SparkSession(sc)

nltk.download('stopwords', quiet=True)
stop_words = set(stopwords.words('italian'))


def preprocess_description(description):
    remove_keywords = {
        "affordable", "business", "office",
        "wlan", "devices", "dvd", "wifi", "installato", "365", "mouse", "tastiera",
        "supporto", "bluetooth", "espansione", "ricondizionato", "professional",
        "smartphone", "rpm", "•", "|", "【", "】"
    }
    description = description.lower()
    description = description.replace('/', ' ').replace('丨', ' ')
    description = re.sub(r'([^\w\s’])', r' \1 ', description)
    description = description.translate(str.maketrans('', '', string.punctuation))
    tokens = description.split()
    processed_tokens = [word for word in tokens if word not in stop_words and word not in remove_keywords]
    return ' '.join(processed_tokens)


def load_data(file_path):
    df = spark.read.option("delimiter", "\t").csv(file_path, header=True, inferSchema=True, encoding="UTF-8")
    df = df.dropDuplicates(['Description'])
    products_rdd = df.rdd.map(lambda row: (row['Description'], row['Price'], row['URL'], preprocess_description(row['Description'])))
    return products_rdd


def build_inverted_index(products_rdd):
    document_frequencies = Counter()
    inverted_index = products_rdd \
        .zipWithIndex() \
        .flatMap(lambda x: [((term, x[1]), 1) for term in x[0][3].split()]) \
        .reduceByKey(lambda a, b: a + b) \
        .map(lambda x: (x[0][0], (x[0][1], x[1]))) \
        .groupByKey() \
        .mapValues(list) \
        .collectAsMap()

    for term, postings in inverted_index.items():
        document_frequencies[term] = len(postings)

    num_documents = products_rdd.count()
    idf = {term: math.log(num_documents / (1 + df)) for term, df in document_frequencies.items()}
    return inverted_index, idf


def compute_tfidf_vectors(products_rdd, idf):
    def calculate_tfidf(doc_id, terms):
        term_counts = Counter(terms.split())
        return {term: (count / len(term_counts)) * idf[term] for term, count in term_counts.items() if term in idf}

    tfidf_vectors = products_rdd \
        .zipWithIndex() \
        .map(lambda x: (x[1], calculate_tfidf(x[1], x[0][3]))) \
        .collectAsMap()

    return tfidf_vectors


def query_products(query, tfidf_vectors, idf):
    query = preprocess_description(query)
    query_terms = query.split()
    query_vector = {term: (query_terms.count(term) / len(query_terms)) * idf.get(term, 0) for term in query_terms}

    query_array = np.array([query_vector.get(term, 0) for term in idf.keys()])
    similarities = []

    for doc_id, doc_vector in tfidf_vectors.items():
        doc_array = np.array([doc_vector.get(term, 0) for term in idf.keys()])
        similarity = cosine_similarity([query_array], [doc_array])[0][0]
        similarities.append((doc_id, similarity))

    similarities = sorted(similarities, key=lambda x: x[1], reverse=True)
    return similarities[:5]

# Funzione per salvare i risultati della query
def save_query_results(results, products_rdd, filename="/content/sample_data/dataset/query_results.tsv"):
    products_list = products_rdd.collect()
    rows = []
    for doc_id, score in results:
        product = products_list[doc_id]
        rows.append([product[0], product[1], product[2], score])

    df = pd.DataFrame(rows, columns=["Original Description", "Price", "URL", "Cosine Similarity"])
    df.to_csv(filename, index=False, sep='\t', quoting=3)

# Funzione per salvare l'inverted index
def save_inverted_index(inverted_index, filename="/content/sample_data/dataset/inverted_index.tsv"):
    with open(filename, 'w', newline='', encoding='utf-8') as file:
        writer = csv.writer(file, delimiter='\t', quoting=csv.QUOTE_NONE, escapechar='\\')
        writer.writerow(["Termine", "Document ID", "Term Frequency"])
        for term, postings in inverted_index.items():
            for doc_id, freq in postings:
                writer.writerow([term, doc_id, freq])


if __name__ == "__main__":

    file_path = '/content/sample_data/dataset/products.tsv'
    products_rdd = load_data(file_path)

    inverted_index, idf = build_inverted_index(products_rdd)
    save_inverted_index(inverted_index)
    print("Inverted Index is saved in inverted_index.tsv")

    tfidf_vectors = compute_tfidf_vectors(products_rdd, idf)
    query = input("Inserisci i termini della query: ")
    results = query_products(query, tfidf_vectors, idf)
    save_query_results(results, products_rdd)
    print("The results of the query are saved in query_results.tsv")


Inverted Index is saved in inverted_index.tsv
Inserisci i termini della query: HP Notebook 250
The results of the query are saved in query_results.tsv
