In [199]:
# Configurations
JSON_FILE = "./assets/sample.json"

In [205]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

# Create spark session
spark = SparkSession \
    .builder \
    .appName("PySpark SQL App") \
    .config("", "") \
    .getOrCreate()

In [227]:
# Load data files
web_log_data = spark.read.option("multiline","true").json(JSON_FILE)
web_log_data.show()

+-------------+---------+-------+
|       domain|timestamp|user_id|
+-------------+---------+-------+
|    apple.com|111111110| 123456|
|   google.com|010111110| 123456|
| facebook.com|010101111| 123456|
|       amazon|010101111| 123456|
|microsoft.com|010111111| 123456|
|    apple.com|110101010| 234567|
|   google.com|011101010| 234567|
|  netflix.com|010111110| 234567|
| telegram.com|010101111|    431|
|  pokemon.com|010101110|    431|
|  digimon.com|111101010|    431|
+-------------+---------+-------+



In [241]:
# Get Unique domain names
unique_domains = web_log_data.select(web_log_data.domain).distinct()

In [284]:
from functools import cmp_to_key, reduce
from operator import mul
from typing import List

# Calculate norm of vector
def get_normalized(vector):
    norm = reduce(lambda acc, cur: acc + cur ** 2, vector) ** 0.5
    return [x / norm for x in vector]


# Calculate correlation between two stamp
@F.udf(returnType=T.FloatType())
def get_timestamp_cosine_similarity(
        timestamp_a,
        timestamp_b
    ):
    a = [float(i) for i in timestamp_a]
    b = [float(i) for i in timestamp_b]
    
    a = get_normalized(a)
    b = get_normalized(b)

    dot_product = reduce(
        lambda acc, curr: acc + mul(*curr),
        zip(a, b),
        0
    )

    return dot_product

@F.udf(T.FloatType())
def get_rms_correlation(correlations: List[float]) -> float:
    return (
        reduce(
            lambda acc, cur: acc + cur ** 2,
            correlations,
            0
        ) / correlations.__len__()
    ) ** 0.5

@F.udf(T.IntegerType())
def get_len(array: List[float]) -> int:
    return array.__len__()

cross_joined = web_log_data \
    .alias("a") \
    .join(
        web_log_data.alias("b"),
        [
            F.col('a.user_id') == F.col('b.user_id'),
            F.col("a.domain") <  F.col("b.domain")
        ]
    ) \
    .select(
        F.col("a.user_id"),
        F.col("a.domain").alias("domain_from"),
        F.col("b.domain").alias("domain_to"),
        F.col("a.timestamp").alias('timestamp_from'),
        F.col("b.timestamp").alias('timestamp_to')
    ) \
    .withColumn(
        "correlation",
        get_timestamp_cosine_similarity(
            F.col('timestamp_from'),
            F.col('timestamp_to'),
        )
    ) \
    .select(
        F.col('user_id'),
        F.col('domain_from'),
        F.col('domain_to'),
        F.col('correlation')
    ) \
    .groupby(
        F.col('domain_from'),
        F.col('domain_to')
    ) \
    .agg(
        F.collect_list('correlation').alias('correlations')
    ) \
    .withColumn(
        "average_correlation",
        get_rms_correlation(
            F.col('correlations')
        )
    ) \
    .withColumn(
        "count_correlation",
        get_len(F.col('correlations'))
    )  \
    .show()
    

In [197]:
# Terminate the spark session
spark.stop()