In [6]:
# Import SparkSession
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, FloatType, StructType, StructField, IntegerType
from pyspark.sql.functions import spark_partition_id, col

## the distance function
from scipy.spatial import distance
## creates the feature vector
from pyspark.ml.feature import VectorAssembler
## import numpy
import numpy as np

import time

from pyspark.sql.functions import max, abs

from functools import reduce

from pyspark.sql.functions import greatest

## start session
spark = SparkSession.builder.appName("SparkLAESAKnn").getOrCreate()

In [7]:
def knn(dataframe, oq, k):       
    
    ##CALCULATES THE DIST FROM ELEMENTS WITHIN THE DF TO THE QUERY POINT
    df = dataframe.withColumn('distances', simpleF(oq)(F.col('fv')))
    
    ##SORTS AND RETRIEVES THE TOP-K RESULTS
    resultSet = df.select("id", "distances").orderBy('distances').limit(k)
    return(resultSet)


In [14]:
def laesa_knn(df, oq, k):
    
    ## PART_ZERO --> CALCULATE LOWER BOUND 
    lista_columns = []
    for i in range(len(pivots)):
        df = df.withColumn(f"|d(oq,p{i+1})-d(oi,p{i+1})|", \
            abs (distance.euclidean(oq, pivots[i][1]) - \
            F.col(f'distances_oi_pivot{i+1}')))

        lista_columns.append(f"|d(oq,p{i+1})-d(oi,p{i+1})|")

    df = df.withColumn("lower_bound", greatest(*[col_name for col_name in lista_columns]))
    
    
    ## PART_ONE --> DEFINE FIRST WINDOW 
    df_limit_k = df.limit(k).withColumn("dist_Oq", simpleF(oq)(F.col('fv'))).orderBy("dist_Oq")
    r_laesa = df_limit_k.agg(max('dist_Oq')).collect()[0][0]
    
    
    ## PART_TWO --> EXTEND WINDOW AND DROP OVERRUN ROWS
    i=1
    while i < df.count():
        i = min(i+k, df.count())
        # PART_TWO
        df = df.filter(df["lower_bound"] <= r_laesa)
        df_limit_k = df.limit(i+k).withColumn("dist_Oq", simpleF(oq)(F.col('fv'))).orderBy("dist_Oq").limit(k)
        r_laesa = df_limit_k.agg(max('dist_Oq')).collect()[0][0]

    
    
    ## RETURN RESULT SET
    df_count = df.count()
    rSet = df_limit_k["id","dist_Oq"]     
    return(rSet, df_count)  

In [9]:
## OBJECT QUERY
oq = [np.random.rand(1)[0],np.random.rand(1)[0]]

## K TOP-K RESULT
k = 3

## PIVOT AMOUNT
p = 2

## CREATE PIVOTS
pivots = [(i+1,[np.random.rand(1)[0],np.random.rand(1)[0]]) for i in range(p)]


In [10]:
##
## DATAFRAME CONFIG
##

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("coord_x", FloatType(), True),
    StructField("coord_y", FloatType(), True)])

#df = spark.createDataFrame(data = data, schema = schema)
df = spark.read.csv("/home/weber/Documents/coordDF2.csv",schema=schema)

##SMALL SANITY CHECK - @PRODUCTION TESTAR LOADING
df = df.na.drop()


## define the struct for the dimensional feature vector
cNames = df.columns
cNames.remove("id")
assembler = VectorAssembler(
    inputCols=cNames,
    outputCol="fv")

## appends the fv into the dataframe as column
df = assembler.transform(df)

## calculates the distance from oi to pivots
for i in range(len(pivots)):
    df = df.withColumn(f'distances_oi_pivot{i+1}', simpleF(pivots[i][1])(F.col('fv')))

#df = df.orderBy("|d(oq,p1)-d(oi,p1)|")
df.show()

+---+-------+-------+--------------------+-------------------+-------------------+
| id|coord_x|coord_y|                  fv|distances_oi_pivot1|distances_oi_pivot2|
+---+-------+-------+--------------------+-------------------+-------------------+
| 85|0.89244| 0.6258|[0.89244002103805...| 0.3749798092719081| 0.5888963871514872|
| 86|0.42807|  0.141|[0.42807000875473...| 0.5610672702466463| 0.5044307887543074|
| 87|0.74827|0.17428|[0.74826997518539...|0.24372472798545677|0.22086927342911752|
| 88|0.49114| 0.6876|[0.49114000797271...| 0.6464927709864476| 0.7796915056445385|
| 89|0.07834|0.31046|[0.07834000140428...| 0.8993620234028737| 0.8864832851148493|
| 90|0.57788| 0.5282|[0.57788002490997...| 0.4801057782675515| 0.5991093600207134|
| 91|0.64505|0.70722|[0.64504998922348...| 0.5562820810320263|  0.724521823476259|
| 92|0.74659|0.53865|[0.74659001827239...|  0.360883105880811| 0.5307660984019014|
| 93|0.69361|0.12281|[0.69361001253128...|0.31436281470565564|0.24356499671175785|
| 94

In [16]:
# measure the execution time of the function
start_time_laesa = time.time()
result = laesa_knn(df, oq, k)
end_time_laesa = time.time()
result[0].show()
print(result[1])
# measure the execution time of the function
start_time_knn = time.time()
result2 = knn(df, oq, k)
end_time_knn = time.time()
result2.show()

                                                                                

+---+--------------------+
| id|             dist_Oq|
+---+--------------------+
| 27|0.033646866255054926|
| 69| 0.07074054263034407|
| 40| 0.08374055547219363|
+---+--------------------+

10
+---+--------------------+
| id|           distances|
+---+--------------------+
| 27|0.033646866255054926|
| 69| 0.07074054263034407|
| 40| 0.08374055547219363|
+---+--------------------+



In [13]:
# print the execution time

print("Execution laesa-knn time:", end_time_laesa - start_time_laesa)

print("Execution knn time:", end_time_knn - start_time_knn)

Execution laesa-knn time: 12.610692501068115
Execution knn time: 0.04470992088317871
