In [1]:
# Locality Sensitive Hashing: approximate nearest neighbor search

In [2]:
# Enable accelerated linear algebra processing for Spark MLlib
# sudo apt-get install libopenblas-base

In [3]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.feature import MinHashLSH

from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

In [4]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, numHashTables=3)
modelBRP = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
modelBRP.transform(dfA).show()

                                                                                

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+--------------------+
| id|   features|              hashes|
+---+-----------+--------------------+
|  0|  [1.0,1.0]|[[-1.0], [0.0], [...|
|  1| [1.0,-1.0]|[[-1.0], [0.0], [...|
|  2|[-1.0,-1.0]|[[0.0], [-1.0], [...|
|  3| [-1.0,1.0]|[[0.0], [-1.0], [...|
+---+-----------+--------------------+



In [5]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
modelMH = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
modelMH.transform(dfA).show()

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+--------------------+
| id|   features|              hashes|
+---+-----------+--------------------+
|  0|  [1.0,1.0]|[[4.7945584E7], [...|
|  1| [1.0,-1.0]|[[4.7945584E7], [...|
|  2|[-1.0,-1.0]|[[4.7945584E7], [...|
|  3| [-1.0,1.0]|[[4.7945584E7], [...|
+---+-----------+--------------------+



In [6]:
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
modelBRP.approxNearestNeighbors(dfA, key, 2).show()

Approximately searching dfA for 2 nearest neighbors of the key:




+---+----------+--------------------+-------+
| id|  features|              hashes|distCol|
+---+----------+--------------------+-------+
|  0| [1.0,1.0]|[[-1.0], [0.0], [...|    1.0|
|  1|[1.0,-1.0]|[[-1.0], [0.0], [...|    1.0|
+---+----------+--------------------+-------+



In [7]:
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
modelMH.approxNearestNeighbors(dfA, key, 2).show()

Approximately searching dfA for 2 nearest neighbors of the key:
+---+----------+--------------------+-------+
| id|  features|              hashes|distCol|
+---+----------+--------------------+-------+
|  0| [1.0,1.0]|[[4.7945584E7], [...|    0.5|
|  1|[1.0,-1.0]|[[4.7945584E7], [...|    0.5|
+---+----------+--------------------+-------+



In [8]:
# Compute the locality sensitive hashes for the input rows, then perform approximate similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
modelBRP.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  3|  5|              1.0|
|  0|  4|              1.0|
|  0|  6|              1.0|
|  2|  7|              1.0|
|  1|  7|              1.0|
|  3|  6|              1.0|
|  2|  5|              1.0|
|  1|  4|              1.0|
+---+---+-----------------+



In [9]:
# Compute the locality sensitive hashes for the input rows, then perform approximate similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
modelMH.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:
+---+---+-----------------+
|idA|idB|EuclideanDistance|
+---+---+-----------------+
|  3|  6|              0.5|
|  0|  7|              0.5|
|  0|  5|              0.5|
|  1|  6|              0.5|
|  2|  7|              0.5|
|  2|  5|              0.5|
|  1|  5|              0.5|
|  0|  4|              0.5|
|  2|  4|              0.5|
|  1|  7|              0.5|
|  0|  6|              0.5|
|  3|  4|              0.5|
|  3|  7|              0.5|
|  2|  6|              0.5|
|  1|  4|              0.5|
|  3|  5|              0.5|
+---+---+-----------------+

