In [9]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SimilarityScoreCalculation") \
    .getOrCreate()

# Example data
data = [
    Row(id=1, features=[1.0, 0.0, 3.0]),
    Row(id=2, features=[0.0, 2.0, 1.0]),
    Row(id=3, features=[2.0, 2.0, 2.0])
]

df = spark.createDataFrame(data)

# Define the cosine similarity function using Python lists
def cosine_similarity(v1, v2):
    numerator = sum(x * y for x, y in zip(v1, v2))
    denominator = (sum(x ** 2 for x in v1) ** 0.5) * (sum(y ** 2 for y in v2) ** 0.5)
    return numerator / denominator if denominator != 0 else 0.0

# Register UDF
cosine_similarity_udf = udf(cosine_similarity, FloatType())

# Cross join the DataFrame to compare each record with every other record
cross_df = df.alias("df1").crossJoin(df.alias("df2"))

# Calculate the similarity score
similarity_df = cross_df.withColumn(
    "similarity",
    cosine_similarity_udf(col("df1.features"), col("df2.features"))
).select(
    col("df1.id").alias("id1"),
    col("df2.id").alias("id2"),
    col("similarity")
)

# Show results
similarity_df.show()


+---+---+----------+
|id1|id2|similarity|
+---+---+----------+
|  1|  1|       1.0|
|  1|  2|0.42426407|
|  1|  3|0.73029673|
|  2|  1|0.42426407|
|  2|  2|       1.0|
|  2|  3| 0.7745967|
|  3|  1|0.73029673|
|  3|  2| 0.7745967|
|  3|  3|       1.0|
+---+---+----------+

