# LSH Spark tutorial
In questo tutorial mostreremo come usare le funzioni della libreria MLLib di Spark per implementare l'LSH per vettori sparsi per il calcolo della similarià di Jaccard e per vettori denzi per la distanza Euclidea

In [1]:
import pyspark
from pyspark.mllib import *
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Creiamo la sessione

In [2]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

25/03/29 18:15:52 WARN Utils: Your hostname, TeoAcerAspireOne resolves to a loopback address: 127.0.1.1; using 192.168.1.88 instead (on interface wlp0s20f3)
25/03/29 18:15:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/29 18:15:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Importiamo le librerie necessarie

In [3]:
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

# Distanza di Jaccard su vettori sparsi

Creiamo un dataframe con 3 vettori sparis. Un vettore sparso in spark è composto da tre elementi. La dimensione, un vettore di indici dove il vettore assume valori diversi da 0 e un vettore contenente i valori effettivi del dato.

In [4]:
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

Creiamo un secondo dataframe di 3 vettori sparsi

In [5]:
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])


Creiamo un vettore sparso per effettuare un knn query su uno dei due dataframe

In [6]:
key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

Creiamo il modello con la funzione MinHashLSH

In [14]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("Il set di dati con hash, in cui i valori con hash sono memorizzati nella colonna 'hash'.:")
model.transform(dfA).show(truncate=False)


Il set di dati con hash, in cui i valori con hash sono memorizzati nella colonna 'hash'.:
+---+-------------------------+------------------------------------------------------------------------------+
|id |features                 |hashes                                                                        |
+---+-------------------------+------------------------------------------------------------------------------+
|0  |(6,[0,1,2],[1.0,1.0,1.0])|[[4.0993917E7], [3.1447487E8], [4.9332997E8], [1.56257325E8], [4.69971411E8]] |
|1  |(6,[2,3,4],[1.0,1.0,1.0])|[[4.0993917E7], [1.54244661E8], [5.47929187E8], [1.56257325E8], [7.4248372E7]]|
|2  |(6,[0,2,4],[1.0,1.0,1.0])|[[4.0993917E7], [1.54244661E8], [4.9332997E8], [1.56257325E8], [7.4248372E7]] |
+---+-------------------------+------------------------------------------------------------------------------+



Confrontiamo i due dataframe dfA e dfB per ricercare coppie di vettori con una distanza minore o uguale a 0.6. Si potrebbe direttamente effettuare il calcolo della distanza tra le coppie di già trasformate

```
model.approxSimilarityJoin(transformedA, transformedB, 0.6)
```



In [15]:
print("Cerchiamo le coppie di punti nei due dataframe con una distanza minore di 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()


Cerchiamo le coppie di punti nei due dataframe con una distanza minore di 0.6:


[Stage 13:>                                                         (0 + 1) / 1]

+---+---+---------------+
|idA|idB|JaccardDistance|
+---+---+---------------+
|  1|  4|            0.5|
|  1|  5|            0.5|
|  0|  5|            0.5|
|  2|  5|            0.5|
+---+---+---------------+



                                                                                

Risolviamo il problema del k-nearest neighbour. Anche in questo caso potremmo usare direttamente la fuzione


```
model.approxNearestNeighbors(transformedA, key, 2)
```



In [16]:
# Può restituire un numero minore di 2 righe quando non sono identificati un numero sufficiente di candidati
print("Cercare approssimativamente in dfA i 2 vicini più prossimi della chiave:")
model.approxNearestNeighbors(dfA, key, 2).show()

Cercare approssimativamente in dfA i 2 vicini più prossimi della chiave:


+---+--------+------+-------+
| id|features|hashes|distCol|
+---+--------+------+-------+
+---+--------+------+-------+



# Distanza Euclidea e vettori densi

In [17]:
from pyspark.ml.feature import BucketedRandomProjectionLSH

In [18]:
dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature hashing
print("Il set di dati con  il valore hash:")
model.transform(dfA).show()


25/03/29 19:39:09 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


Il set di dati con  il valore hash:
+---+-----------+--------------------+
| id|   features|              hashes|
+---+-----------+--------------------+
|  0|  [1.0,1.0]|[[-1.0], [-1.0], ...|
|  1| [1.0,-1.0]|[[-1.0], [0.0], [...|
|  2|[-1.0,-1.0]|[[0.0], [0.0], [0...|
|  3| [-1.0,1.0]|[[0.0], [-1.0], [...|
+---+-----------+--------------------+



Calcoliamo l'LSH delle righe di input ed effettuiamo una ricerca approssimata.
Potremmo applicarlo direttamente al dato mappato con l'hash
```
model.approxSimilarityJoin(transformedA, transformedB, 1.5)
```

In [19]:
print("Join approssimata tra dfA e dfB con una distanza Euclidea minore di 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()


Join approssimata tra dfA e dfB con una distanza Euclidea minore di 1.5:


+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  0|  6|              1.0|
|  2|  7|              1.0|
|  0|  4|              1.0|
|  2|  5|              1.0|
|  1|  7|              1.0|
|  3|  5|              1.0|
|  1|  4|              1.0|
|  3|  6|              1.0|
+---+---+-----------------+



Calcolo della nearest neighbor approssimata
anche in questo caso possiamo usare i dati gia' trasformati
```
model.approxNearestNeighbors(transformedA, key, 2)
```

In [20]:

print("Ricerca approssimata nel dfA per i due 2 nearest neighbor della chiave:")
model.approxNearestNeighbors(dfA, key, 2).show()

Ricerca approssimata nel dfA per i due 2 nearest neighbor della chiave:


+---+----------+--------------------+-------+
| id|  features|              hashes|distCol|
+---+----------+--------------------+-------+
|  0| [1.0,1.0]|[[-1.0], [-1.0], ...|    1.0|
|  1|[1.0,-1.0]|[[-1.0], [0.0], [...|    1.0|
+---+----------+--------------------+-------+



Comprendre al meglio l'uso della libreria.


# Esercizio (consegna il 7 aprile 2025)


1.   Scaricare i dati disponibili a questo url: [amazon reviews](https://www.kaggle.com/datasets/kritanjalijain/amazon-reviews?resource=download) o questo [amazon review dropbox](https://www.dropbox.com/scl/fi/ucfoh391qalha3lz0bzjx/amazon_review_polarity_csv.tgz.zip?rlkey=m3a0bbp2ep4sh2qisaz0xwo1w&dl=0)


2.   Il dataset è composto da due file: train and test in csv. Ogni file contiene le seguenti informazioni
  *   polarity - 1 for negative and 2 for positive
  *   title - review heading
  *   text - review body

3.  Generare i vettori sparsi applicando il q-shingle ai dati di training con q=3.
4. Sui vettori sparsi Applicare il MinHashing LSH sul dataset di training.
5. USare il file di testing e applicare una k-nearest neighbor con i dati di testing su cui è stato applicato l'hashing. Usare k=3 e classificare l'elemento con del test set con la polarità maggiormente presente.


6. *Identificare i cluster di recensioni. Ogni cluster di recensione contiene le coppie di recensioni che hanno una similarità  > di 0.6. Da svolgere dopo l'introduzione alle network







In [26]:
'''


The files train.csv and test.csv contain all the training samples as comma-separated values.

The CSVs contain polarity, title, text. These 3 columns in them, correspond to class index (1 or 2), review title and review text.

    polarity - 1 for negative and 2 for positive
    title - review heading
    text - review body

The review title and text are escaped using double quotes ("), and any internal double quote is escaped by 2 double quotes (""). New lines are escaped by a backslash followed with an "n" character, that is "\n".
'''

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([
    StructField("polarity", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("text", StringType(), True)])

spark = SparkSession.builder.appName("AmazonReviews").getOrCreate()


In [34]:
    train = spark.read.csv("/home/teogalletta/Downloads/amazon_review_polarity_csv/train.csv", schema=schema, escape='"', quote='"')
train.printSchema()
print("Train data rows:", train.count())

root
 |-- polarity: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)





Train data rows: 3600000


                                                                                

In [29]:
test = spark.read.csv("/home/teogalletta/Downloads/amazon_review_polarity_csv/test.csv", schema=schema, escape='"', quote='"')
test.printSchema()
print("Test data rows:", test.count())

root
 |-- polarity: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)



[Stage 47:>                                                         (0 + 2) / 2]

Test data rows: 400000


                                                                                

In [43]:
def shingle_udf(text, n=3):
    """Genera n-gramas (shingles) a partir de una cadena de texto."""
    chars = list(text)
    return ["".join(chars[i:i+n]) for i in range(len(chars) - n + 1)] if len(chars) >= n else []

shingle = udf(lambda text: shingle_udf(text, 3), ArrayType(StringType()))

In [44]:
train_w_shingles = train.withColumn("shingles", shingle(col("text")))
test_w_shingles = test.withColumn("shingles", shingle(col("text")))

In [45]:
train_w_shingles

+--------+--------------------+--------------------+--------------------+
|polarity|               title|                text|            shingles|
+--------+--------------------+--------------------+--------------------+
|       2|Stuning even for ...|This sound track ...|[Thi, his, is , s...|
+--------+--------------------+--------------------+--------------------+
only showing top 1 row



In [47]:
mh = MinHashLSH(inputCol="shingles", outputCol="hashes", numHashTables=5)
model = mh.fit(train_w_shingles)

IllegalArgumentException: requirement failed: Column shingles must be of type class org.apache.spark.ml.linalg.VectorUDT:struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually class org.apache.spark.sql.types.ArrayType:array<string>.