# Computing distance
2023-12-11

**NOTE:** Depending on the values used in this notebook (num_rows and vec_dimension), some cells may cause spark jobs to fail due to OOM.<br>
In some tests, the OOM even caused the Spark Driver to crash.


In this notebook you will try several methods of computing Eucalidean distance between vectors.<br>
Depending on the specific way you implement, it can cause your code to explode, give wrong answers, or take a long time.

In [None]:
import random
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import*
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, row_number
random.seed(4)
spark = SparkSession.builder.appName('efficient distance').getOrCreate()
sc = spark.sparkContext

In [None]:

num_rows = 1000
vec_dimension = 100

print(f"The all_pairs matrix will be {num_rows*num_rows} rows, of {vec_dimension} doubles")
print(f"The all_pairs matrix memory size will be at least (without python/java overheads:\n\
{num_rows*num_rows*vec_dimension*8/(1<<32)} GB")

In [None]:
def random_floats(N: int, M: int) -> pyspark.sql.DataFrame:
    """ Generate a DataFrame with random float numbers
    N = 5  # Number of rows
    M = 3  # Number of columns
    >>> random_floats(6,2)
    """
    data = [[float(random.uniform(0, 100)) for _ in range(M)]for _ in range(N)]
    rdd = spark.sparkContext.parallelize(data)
    df = rdd.toDF()
    return df

In [None]:
# Use this cell to save or load our df .
create = True
if create:
    df = random_floats(num_rows,vec_dimension)
    #df.write.mode("overwrite").parquet(f"rand{num_rows}_{vec_dimension}")
else:
    df = spark.read.parquet(f"rand{num_rows}_{vec_dimension}")
    

In [None]:
#df.sample(0.001).limit(4).toPandas()

# Given 2 vectors (of float), compute their Eucalean distance

## Different ways to compute the distance (2 vectors) - from the worst to a little better

To achieve the result in reasonable time, the computations have to be done within the JVM (running the Scala code) and not pass data to and from the python inteperter.

Also, as usual, we **must not** cause collect() to be called! it will load the full DF to the RAM of this node, probably killing it for large enough row count.

### The worst -- do it entirely in python - will break for big data

In [None]:
%%time
# This method should only be used if the resulting NumPy ndarray is expected to be small, as all the data is loaded into the driver’s memory.
# To make this abomination even worse, we genereate the column by running the lambda, then collect to a python list in the driver node, then copy to ndarray 
col1 = df.rdd.map(lambda x: x[0])
col2 = df.rdd.map(lambda x: x[1])
c1 = np.asarray(col1.collect())
c2 = np.asarray(col2.collect())

In [None]:
%%time
# using python only
delta = [ a-b for a,b in zip(c1,c2)]
deltaSqr = [d*d for d in delta]
sum(deltaSqr)

In the cell above, were you aware of possible numerical stability problems? See https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.hypot.html

### Some of it in python - using numpy (good for local workload only)

In [None]:
%%time
# Using numpy
d = np.square(c1-c2)
d.sum()

### All in Scala

In [None]:
%%time 
from pyspark.ml.linalg import Vectors
v1 = Vectors.dense((col1.collect())) # <<<<<<<<< must fix this!!!! The whole idea is that data never loaded to the Driver memory!
v2 = Vectors.dense((col2.collect()))

In [None]:
%time Vectors.squared_distance(v1,v2)

### Did you see that in the three methods above, each of them returned a different value? Why?

<hr>

# Given N vectors, compute the distance between every pair (i.e. N\**2 comparisons)

We have num_rows vectors. Each vector is of length vec_dimension.

Compute distance for all the pairs


vec_dimension = 100

config: DBR 14ML

standard DS3 v2 14GB RAM. <br>
Driver = 1 node<br>
Workers = 2 nodes

NOTE: If you run a cell more than once, the second run might skip some stages (optimizations), so you need to take the FIRST run.
```
run time [second] to compute distance of ALL the pairs
   rows       map()        udf()
  10000**2        
  30000**2       
 100000**2        
1000000**2        
```



In [None]:
# convert to vector, put in new columns 'features'
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(outputCol="features")
assembler.setInputCols(df.columns)
M = assembler.transform(df).select("features")

In [None]:
from pyspark.sql import functions as f
# I think v1,v2 have to be {Spark/pandas}Series. 
# This will activate the function pairwise(?)
def dist(v1,v2):
    # the squared_distance() returns numpy.float64.
    # The Dataframe structType expects float/double, so complains about the type.
    # we must cast it
    return float( Vectors.squared_distance(v1,v2))

dist_udf = f.udf(dist)

In [None]:
all_pairs = M.crossJoin(M).toDF(*['left','right']) # after the join(), both columns have the same name, so we create a NEW DF with meaningful names

How is it possible that creating all the pairs took so little time?

## How many rows are in all_pairs? How many do we need?

all_pairs has num_rows**2. We need the distance(v[i],v[j]) for all i <> j , but distance (i,j) == distance(j,i) so need HALF the pairs.

In [None]:
# If you want to round or just display shorter numbers, you can:
from pyspark.sql.functions import format_number
df.select( [format_number(c,2).alias(c) for c in df.columns] ).show(2)
"""
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|   _1|   _2|   _3|   _4|   _5|   _6|   _7|   _8|   _9|  _10|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|23.60|10.32|39.61|15.50| 6.65|40.16|91.80|80.05|76.52|22.19|
|53.67|27.67|17.27|10.62|21.44|92.75|82.89|80.67|80.04|19.34|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
"""
pass

## Apply the function using map() for each row
This can be done on RDD only so we move there and back

NOTE: When running on local setup, with 
num_rows = 10000, vec_dimension = 100
the following cell hangs. CPU is 0%, and memory consumption is fixed. I suspect the memory allocation to the JVM is exhausted.

**Need to find stderr of this stage.**

In [None]:
%%time
# RDD have no column names (the internet is full of lies), so we
# must use int indexes
result_rdd = all_pairs.rdd.map(lambda row: dist(row[0], row[1]))
# for some reason, the first row in the RDD is empty, so the schema cannot be inferred. 
# We must supply it explicitly.
spark.createDataFrame(result_rdd,schema="Double").show(4)

# Use UDF

In [None]:
from pyspark.sql.functions import udf,sum
from pyspark.sql.types import *

# Register the function as a UDF
your_udf = udf(dist, returnType=DoubleType())

# Apply the function to each row
result_df = all_pairs.withColumn("result",your_udf(all_pairs['left'], all_pairs['right']))
result_df.cache()
result_df.select('result').show(4)

total = result_df.agg(sum('result')).collect()
total

In [None]:
# We should get num_rows**2 rows. For num_rows = 1000000, the Driver node crashed: "java.lang.RuntimeException: abort: DriverClient destroyed
#    Internal error. Attach your notebook to a different compute or restart the current compute."
# This is because count() can (but not always due to optimizations) load data into memory. In this case I saw in the driver logs "OOM"
# result_df.count()

distances = result_df.select('result')
sorted_dist = distances.orderBy(col("result").desc())

In [None]:
# this code ran on 2 [???] executors for 11 minutes
top10 = sorted_dist.take(10)
# only now the execution plan will evaluate. 
# for num_rows = 10K, it ran for 13.45minutes

# Why does the cell above hangs for so long? (when using num_rows = 10K)?
In DBR, click the **View** link of the job, choose Executors, choose stdout of one of the workers.

Here is one line that repeats forever:
`2023-12-10T17:19:58.806+0000: 301.202: [GC (Allocation Failure) [PSYoungGen: 2478672K->4033K(2480640K)] 3593339K->1118724K(4011008K), 0.0086329 secs] [Times: user=0.02 sys=0.00, real=0.01 secs] `

Which simply means: **I am out of memory, asking for more, not getting it fast enough**

### Can we get the K smallest values without sorting the full DF?

Below are two attempts. They improved by 10..20% in my test.    Remember that the runtime includes the whoe DAG execution -- not just sorting.

In [None]:
# this code ran on 5 executors for 10 minutes
from pyspark.sql.functions import col, dense_rank
from pyspark.sql.window import Window
k = 20
window_spec = Window.orderBy(col("result").asc()) # This does not realy help, since the window is all the DF!
dfK = distances.withColumn("rank", dense_rank().over(window_spec)).filter(col("rank") <= k).drop("rank")
dfK.limit(k).show()

In [None]:
# this code ran on 6 executors for 8 minutes
top_k_elements = distances.select("result").rdd.flatMap(lambda x: x).takeOrdered(k)
top_k_elements

Before comparing results: <br>
How many executors (worker nodes) worked on each job? The cluster may resize due to load