In [1]:
import numpy as np
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, FloatType

In [34]:
def norm_2_func(vectors):
    return float(np.linalg.norm(vectors, 2))

norm_2_udf = F.udf(lambda x: norm_2_func(x), FloatType())

In [74]:
def get_unit_vectors(vectors):
    return list(map(float, features/np.linalg.norm(features, 2)))

get_unit_vectors = F.udf(lambda x: get_unit_vectors(x), ArrayType(FloatType()))

In [3]:
df = spark.read.text('data.txt')

In [27]:
df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [18]:
from pyspark.sql.functions import split, expr, flatten, col
from pyspark.ml.linalg import VectorUDT
from pyspark.ml.linalg import Vectors

In [20]:
array_to_vector = F.udf(lambda a: Vectors.dense(a), VectorUDT())

In [21]:
df = df.withColumn("value", split(expr("rtrim(']', ltrim('[', value))"), ",")) \
       .withColumn("value", expr("""transform(value, x -> split(rtrim(']', ltrim('[', x)), ","))"""))\
       .withColumn("value", flatten(col("value")).cast("array<float>"))\
       .withColumn("value", array_to_vector(col("value")))

In [22]:
df.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                                                                              |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [36]:
df2 = df.withColumn('euclidean_norms', norm_2_udf(F.col('value')))

df2.head(1)

[Row(value=DenseVector([-7456.4746, 62.698, -5430.6108, -5047.9419, -3115.541, -2617.9648, -4505.6079, 2840.176, -432.0275, 7651.0527, 8569.9482, -6567.4453, 8257.3936, -6857.2305, -7036.5601, -5613.7266, 7545.5825, -8260.873, -6922.2271, -3106.9111]), euclidean_norms=26641.798828125)]

In [39]:
percentage = 10000/df2.count()
print(percentage)

0.01


In [41]:
from pyspark.sql.functions import percentile_approx

In [64]:
df2.summary().show()

+-------+-----------------+
|summary|  euclidean_norms|
+-------+-----------------+
|  count|          1000000|
|   mean|25685.35096683496|
| stddev|2609.796159577555|
|    min|        11651.874|
|    25%|        23963.074|
|    50%|        25753.686|
|    75%|         27481.87|
|    max|         37056.57|
+-------+-----------------+



In [61]:
df2.select(percentile_approx('euclidean_norms', [0.01, 0.5, 0.95], 1000000).alias('quantiles')).show(truncate = False)

+---------------------------------+
|quantiles                        |
+---------------------------------+
|[19312.668, 25754.148, 29858.316]|
+---------------------------------+



In [65]:
t1 = df2.select(percentile_approx('euclidean_norms', 0.01, 1000000).alias('threshold_quantile')).head()[0]

In [66]:
t1

19312.66796875

In [None]:
df3 

In [76]:
df3 = df2.withColumn('unit_vectors', norm_2_udf(F.col('value')))

df3.head(1)

[Row(value=DenseVector([-7456.4746, 62.698, -5430.6108, -5047.9419, -3115.541, -2617.9648, -4505.6079, 2840.176, -432.0275, 7651.0527, 8569.9482, -6567.4453, 8257.3936, -6857.2305, -7036.5601, -5613.7266, 7545.5825, -8260.873, -6922.2271, -3106.9111]), euclidean_norms=26641.798828125, unit_vectors=26641.798828125)]