**Implement Minhash to Calculate the Jaccard Similarity**

In [1]:
# in this case, since there are many usesrs in the dataset, I will use the minhash to calculate to save time
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

In [2]:
db = spark.read.csv("user_review_bus.csv", sep="\t", header=True)

In [3]:
db.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- id: string (nullable = true)
 |-- bus_new: string (nullable = true)



In [4]:
db_split = db.withColumn("bus_id", f.split(f.col("bus_new"), ", ").cast("array<long>"))

In [5]:
db_split.first()

Row(_c0='0', id='42', bus_new='73765, 93774', bus_id=[73765, 93774])

In [6]:
db_split.count()

67095

In [7]:
model = Pipeline(stages=[
        HashingTF(inputCol="bus_id", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=100)
    ]).fit(db_split)

db_hashed = model.transform(db_split)

In [9]:
db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.7) # type: ignore

In [10]:
db_matches.printSchema()

root
 |-- datasetA: struct (nullable = false)
 |    |-- _c0: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- bus_new: string (nullable = true)
 |    |-- bus_id: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- vectors: vector (nullable = true)
 |    |-- lsh: array (nullable = true)
 |    |    |-- element: vector (containsNull = true)
 |-- datasetB: struct (nullable = false)
 |    |-- _c0: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- bus_new: string (nullable = true)
 |    |-- bus_id: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- lsh: array (nullable = true)
 |    |    |-- element: vector (containsNull = true)
 |    |-- vectors: vector (nullable = true)
 |-- distCol: double (nullable = false)



In [11]:
db_g = db_matches.select(f.col('datasetA.id').alias('id_A'),
                 f.col('datasetB.id').alias('id_B'),
                 f.col('distCol')).filter('id_A < id_B')

In [12]:
db_g.write.csv("minhash")

In [13]:
!cat minhash/*.csv >minhash.csv

In [14]:
!wc -l minhash.csv

463872 minhash.csv


In [15]:
463872 / 67095**2

0.00010304284575892314