<a href="https://colab.research.google.com/github/iliasmDS/spark/blob/main/SPARK.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [16]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark

In [17]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [18]:
!ls

drive	     spark-3.5.1-bin-hadoop3	  spark-3.5.1-bin-hadoop3.tgz.1
sample_data  spark-3.5.1-bin-hadoop3.tgz  spark_data


In [19]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType
from pyspark.ml.feature import MinHashLSH, BucketedRandomProjectionLSH, VectorAssembler
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [20]:
df = spark.read.csv(r'/content/drive/MyDrive/SPARK/data/data.csv', header = True)

df = df.select([col(c).cast("float").alias(c) for c in df.columns])

df_q = spark.read.csv(r'/content/drive/MyDrive/SPARK/data/queries.csv', header = True)

df_q = df_q.select([col(c).cast("float").alias(c) for c in df_q.columns])

df_v = df.select([i for i in df.columns])

df_q_v = df_q.select([i for i in df_q.columns])

In [21]:
def vectorize(df):
    """
    Uses the 100 vector features (starting with "f") to make a vector.
    """

    vector_cols = []

    for col in df.columns:

        if col.startswith('f'):

            vector_cols.append(col)

    assembler = VectorAssembler(inputCols = vector_cols, outputCol = "vectors")

    df_vector = assembler.transform(df).drop(*vector_cols)

    return df_vector

In [22]:
df_v = vectorize(df_v)

df_q_v = vectorize(df_q_v)

In [23]:
brp = BucketedRandomProjectionLSH(inputCol = 'vectors', numHashTables = 5, bucketLength = 10.0)

In [24]:
model = brp.fit(df_q_v)

In [25]:
results = True

for row in df_q_v.rdd.collect():

    if row.query_type == 0.0:

        print(row)

        res = model.approxNearestNeighbors(
                                           df_v,
                                           row.vectors,
                                           1,
                                           distCol = 'Distance'
                                          )

    elif row.query_type == 1.0:

        res = model.approxNearestNeighbors(
                                           spark.sql("SELECT * FROM {data} WHERE C = {v}", data = df_v, v = row.V),
                                           row.vectors,
                                           1,
                                           distCol = 'Distance'
                                          )

    elif row.query_type == 2.0:

        res = model.approxNearestNeighbors(
                                           spark.sql("SELECT * FROM {data} WHERE (T >= {l}) AND (T <= {r})", data = df_v, l = row.l, r = row.r),
                                           row.vectors,
                                           1,
                                           distCol = 'Distance'
                                          )

    elif row.query_type == 3.0:

        res = model.approxNearestNeighbors(
                                           spark.sql("SELECT * FROM {data} WHERE (C = {v}) AND (T >= {l}) AND (T <= {r})", data = df_v, v = row.V, l = row.l, r = row.r),
                                           row.vectors,
                                           1,
                                           distCol = 'Distance'
                                          )

    res = res\
        .withColumn('query_vector', lit(row.vector))\
        .withColumn('v', lit(row.V))\
        .withColumn('l', lit(row.l))\
        .withColumn('r', lit(row.r))\
        .withColumn('query_type', lit(row.query_type))\
        .withColumnRenamed('vector', 'data_vector')\
        .select('query_vector', 'Distance', 'data_vector', 'query_type', 'C', 'v', 'l', 'T', 'r')

    if results == True:

        results = res

    else:

        results = results.union(res)

Row(_c0=0.0, batch=1.0, vector=0.0, queries=100.0, query_type=0.0, V=-1.0, l=-1.0, r=-1.0, vectors=DenseVector([-2.41, 2.09, 0.16, -0.24, -0.23, -2.77, 1.28, 1.5, -0.1, 1.39, -0.48, -0.68, 0.71, -0.91, -0.36, 0.14, -0.06, -0.74, 0.66, -0.1, 0.06, -0.18, 1.27, 0.44, -0.09, -0.01, 0.54, -0.75, -0.48, 0.28, 0.51, -0.45, -0.5, 0.24, 0.27, 0.55, -0.14, -0.16, -0.47, 0.3, 0.59, -0.52, -0.83, 0.12, 0.33, 0.52, -0.37, -0.32, 0.3, 1.11, 0.24, -0.15, 0.7, 0.29, 0.14, -0.12, 0.51, 0.32, -0.07, 0.84, -0.05, -0.56, -0.01, 0.3, 0.06, -0.58, 0.69, -0.14, -0.75, -0.2, -0.56, 0.42, 0.16, -0.26, -0.58, -0.01, 0.11, -0.04, -0.24, 0.21, 0.33, -0.26, 0.31, -0.25, 0.28, 0.17, -0.23, -0.02, -0.15, -0.69, -0.75, -0.28, -0.67, -0.01, 0.0, 0.05, 0.07, -0.09, -0.2, 0.45]))
Row(_c0=4.0, batch=1.0, vector=4.0, queries=100.0, query_type=0.0, V=-1.0, l=-1.0, r=-1.0, vectors=DenseVector([-2.97, 2.04, 0.03, -0.6, -0.83, 2.61, -0.19, 0.81, 0.31, -0.08, 0.57, -0.4, -1.17, -0.33, 1.45, -0.68, -1.4, 1.33, -0.2, 0.02, 0.75

In [26]:
results.write.parquet("/content/drive/MyDrive/SPARK/data/results.parquet", mode = 'overwrite')