<a href="https://colab.research.google.com/github/freddyduitama/GVD/blob/master/0_6_LSH.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Adecuacion de la plataforma**

In [0]:
# instala el ambiente de spark..solo se corre una vez
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz
!tar xf spark-2.4.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
#Configura variables de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.1-bin-hadoop2.7"

In [0]:
#importa pyspark package
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [0]:
#Crea la sesión
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Example 1 : Class Slides

In [0]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

In [0]:
#shingles
dataC = [("S1", Vectors.dense([4.0, 1.0,2.0,0.0,0.0,1.0]),),
              ("S2", Vectors.dense([1.0, 2.0,1.0,5.0,4.0,2.0]),),
              ("S3", Vectors.dense([0.0,0.0, 1.0,1.0,3.0,0.0]),),
              ("S4", Vectors.dense([1.0, 0.0,3.0,1.0,0.0,0.0]),),
              ("S5", Vectors.dense([1.0, 0.0,0.0,0.0,1.0,1.0]),),
              ("S6", Vectors.dense([0.0, 0.0,1.0,1.0,3.0,1.0]),)]

dfC = spark.createDataFrame(dataC, ["id", "features"])

In [0]:
#define Pipeline
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, numHashTables=1)
model = brp.fit(dfC)

In [297]:
dfC.select("id","features").show(9,truncate=False)

+---+-------------------------+
|id |features                 |
+---+-------------------------+
|S1 |[4.0,1.0,2.0,0.0,0.0,1.0]|
|S2 |[1.0,2.0,1.0,5.0,4.0,2.0]|
|S3 |[0.0,0.0,1.0,1.0,3.0,0.0]|
|S4 |[1.0,0.0,3.0,1.0,0.0,0.0]|
|S5 |[1.0,0.0,0.0,0.0,1.0,1.0]|
|S6 |[0.0,0.0,1.0,1.0,3.0,1.0]|
+---+-------------------------+



In [298]:
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfC).sort("hashes").show(9,truncate=False)

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-------------------------+--------+
|id |features                 |hashes  |
+---+-------------------------+--------+
|S1 |[4.0,1.0,2.0,0.0,0.0,1.0]|[[-1.0]]|
|S5 |[1.0,0.0,0.0,0.0,1.0,1.0]|[[-1.0]]|
|S4 |[1.0,0.0,3.0,1.0,0.0,0.0]|[[-1.0]]|
|S6 |[0.0,0.0,1.0,1.0,3.0,1.0]|[[0.0]] |
|S3 |[0.0,0.0,1.0,1.0,3.0,0.0]|[[0.0]] |
|S2 |[1.0,2.0,1.0,5.0,4.0,2.0]|[[1.0]] |
+---+-------------------------+--------+



# **Example 2.  naive example**

In [0]:
# Create dataframe dfA
dataA = [("P0", Vectors.dense([1.0, 1.0]),),
         ("P1", Vectors.dense([1.0, -1.0]),),
         ("P2", Vectors.dense([-1.0, -1.0]),),
         ("P3", Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

In [0]:
# Define Pipeline. bucketLenght define  number and size of buckets 
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=1)
model = brp.fit(dfA)

In [319]:
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).sort("hashes").show(8,truncate=False)

The hashed dataset where hashed values are stored in the column 'hashes':
+---+-----------+--------+
|id |features   |hashes  |
+---+-----------+--------+
|P2 |[-1.0,-1.0]|[[-1.0]]|
|P1 |[1.0,-1.0] |[[-1.0]]|
|P3 |[-1.0,1.0] |[[0.0]] |
|P0 |[1.0,1.0]  |[[0.0]] |
+---+-----------+--------+



# Similarity join. 

In [0]:
# Create dataframe dfB
dataB = [("P4", Vectors.dense([1.0, 0.0]),),
              ("P5", Vectors.dense([-1.0, 0.0]),),
              ("P6", Vectors.dense([0.0, 1.0]),),
              ("P7", Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

In [0]:
# We compute hashes. Next step will use  the already-transformed dataset

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=1)
model1 = brp.fit(dfA)
model2 = brp.fit(dfB)

hash_dfA=model1.transform(dfA).sort("hashes")
hash_dfB=model2.transform(dfB).sort("hashes")

In [322]:
print("join dfA and dfB on points having Euclidean distance smaller than 1.5:")

model.approxSimilarityJoin(hash_dfA, hash_dfB, 1.5, distCol="EuclideanDistance")\
          .select(col("datasetA.id").alias("idA"), col("datasetA.features").alias("coordinates"),
                     col("datasetB.id").alias("idB"), col("datasetB.features").alias("coordinates"),
                     col("EuclideanDistance")).sort("datasetA.id").show(8,truncate=False)

join dfA and dfB on points having Euclidean distance smaller than 1.5:
+---+-----------+---+-----------+-----------------+
|idA|coordinates|idB|coordinates|EuclideanDistance|
+---+-----------+---+-----------+-----------------+
|P0 |[1.0,1.0]  |P6 |[0.0,1.0]  |1.0              |
|P1 |[1.0,-1.0] |P4 |[1.0,0.0]  |1.0              |
|P1 |[1.0,-1.0] |P7 |[0.0,-1.0] |1.0              |
|P2 |[-1.0,-1.0]|P7 |[0.0,-1.0] |1.0              |
|P3 |[-1.0,1.0] |P6 |[0.0,1.0]  |1.0              |
|P3 |[-1.0,1.0] |P5 |[-1.0,0.0] |1.0              |
+---+-----------+---+-----------+-----------------+



#Approximate nearest neighbor search.

In [0]:
dataD = [("P0", Vectors.dense([1.0, 1.0]),),
              ("P1", Vectors.dense([1.0, -1.0]),),
              ("P2", Vectors.dense([-1.0, -1.0]),),
              ("P3", Vectors.dense([-1.0, 1.0]),),
              ("P4", Vectors.dense([1.0, 0.0]),),
              ("P5", Vectors.dense([-1.0, 0.0]),),
              ("P6", Vectors.dense([0.0, 1.0]),),
              ("P7", Vectors.dense([0.0, -1.0]),)]

dfD = spark.createDataFrame(dataD, ["id", "features"])

In [337]:
# Perform approximate nearest neighbor search.
# We  passe the already-transformed dataset

# Reference point
key = Vectors.dense([0.5, -0.5])

print("Approximately searching dfC for 3 nearest neighbors of the key:" , key)
model.approxNearestNeighbors(dfD, key, 3).show(8,truncate=False)

Approximately searching dfC for 3 nearest neighbors of the key: [0.5,-0.5]
+---+----------+--------+------------------+
|id |features  |hashes  |distCol           |
+---+----------+--------+------------------+
|P1 |[1.0,-1.0]|[[-1.0]]|0.7071067811865476|
|P4 |[1.0,0.0] |[[-1.0]]|0.7071067811865476|
|P7 |[0.0,-1.0]|[[-1.0]]|0.7071067811865476|
+---+----------+--------+------------------+

