# KNN

In [None]:
import numpy as np
import pandas as pd

from spark_rapids_ml.knn import NearestNeighbors
from pyspark.sql.functions import col

### Create synthetic dataset

In [None]:
dim = 2000
dtype = 'float32'
np.random.seed(1)

# items
num_vecs = 100000
vec = np.random.randn(dim).reshape([1,dim])
arr = np.random.randn(num_vecs).reshape([num_vecs,1])
items = arr * vec
items = items.astype(dtype)

# items extra data
items_extra = np.random.randn(num_vecs)

# queries
num_vecs = 50
vec = np.random.randn(dim).reshape([1,dim])
arr = np.random.randn(num_vecs).reshape([num_vecs,1])
queries = arr * vec
queries = queries.astype(dtype)

# queries extra data
queries_extra = np.random.randn(num_vecs)

### Convert dataset to Spark DataFrame

In [None]:
pd_items = pd.DataFrame({"features": list(items), "extra": items_extra})
item_df = spark.createDataFrame(pd_items, "features array<float>, extra float")

pd_queries = pd.DataFrame({"features": list(queries), "extra": queries_extra})
query_df = spark.createDataFrame(pd_queries, "features array<float>, extra float")

In [None]:
item_df.show(5, truncate=80)

In [None]:
query_df.show(5, truncate=80)

## Spark RAPIDS ML (GPU)

In [None]:
knn = NearestNeighbors(k=2)
knn.setInputCol("features")

In [None]:
knn_model = knn.fit(item_df)

Note: `fit` just stores a reference to the `item_df` in the returned model.  As such, saving the estimator or model is not supported, since their only state is the referenced dataset.  Instead, just re-create and re-fit the estimator on the dataset, as needed.

#### kneighbors

This API takes a DataFrame of query vectors, and returns the `k` nearest item vectors for each query vector, represented by their unique ids and distances.  The unique ids are automatically generated if not provided, so the input datasets are also returned with their unique ids.

In [None]:
item_id_df, query_id_df, neighbor_df = knn_model.kneighbors(query_df)

In [None]:
# original item_df is returned with unique identifiers
item_id_df.show(5, truncate=80)

In [None]:
# original query_df is returned with unique identifiers
query_id_df.show(5, truncate=80)

In [None]:
# neighbor_df shows the nearest item vectors for each query vector, represented by their unique ids and distances.
neighbor_df.show()

In [None]:
# change the value of 'k'
knn_model.setK(3)
_, _, neighbor_df = knn_model.kneighbors(query_df)

In [None]:
neighbor_df.show()

#### exactNearestNeighborsJoin

This API returns a join of the query vectors and their `k` closest item vectors.

In [None]:
result_df = knn_model.exactNearestNeighborsJoin(query_df)

In [None]:
result_df.orderBy("query_df", "item_df").show()

For each returned query or item vector, all columns from the original input DataFrame will be returned as a single struct column.

In [None]:
result_df.select("query_df.*").show()

# PySpark

PySpark does not have an exact kNN implementation, but it does have an LSH-based Approximate Nearest Neighbors implementation, shown here to illustrate the similarity between the APIs.  However, the algorithms are very different, so their results are only roughly comparable, and it would require elaborate tuning of parameters to produce similar results.

In [None]:
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.functions import array_to_vector
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

item_vector_df = item_df.select(array_to_vector(item_df.features).alias("features"))
query_vector_df = query_df.select(array_to_vector(query_df.features).alias("features"))
key = Vectors.dense([1.0] * dim)

In [None]:
item_vector_df.show(5, truncate=80)

In [None]:
query_vector_df.show(5, truncate=80)

In [None]:
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0, numHashTables=3)
model = brp.fit(item_vector_df)

In [None]:
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(item_vector_df).show(5)

In [None]:
# Compute the locality sensitive hashes for the input rows, then perform approximate similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformed_item_vector_df, transformed_query_vector_df, 3.0)`
print("Approximately joining items and queries on Euclidean distance smaller than 3.0:")
model.approxSimilarityJoin(item_vector_df, query_vector_df, 3.0, distCol="EuclideanDistance")\
    .select(col("datasetA.features").alias("item"),
            col("datasetB.features").alias("query"),
            col("EuclideanDistance")).orderBy("query", "item").show()

In [None]:
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformed_item_vector_df, key, 2)`
print("Approximately searching item vectors for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(item_vector_df, key, 2).show()

In [None]:
# saves the LSH hashes for the input rows
model.write().overwrite().save("/tmp/ann_model")