# Spark Tasks

## Imports and environment initialization

In [1]:
import os
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, split, regexp_replace, collect_list, hash as spark_hash, concat_ws, when, abs, explode
from pyspark.sql.types import ArrayType, StructType, StructField, StringType, LongType, DoubleType, IntegerType
from pyspark.sql.functions import udf
import numpy as np
from collections import Counter
from random import shuffle
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinHashLSH


In [2]:
# Set environment variables
os.environ["JAVA_HOME"] = "C:/Program Files/Java/jdk-22"
os.environ["SPARK_HOME"] = "C:/Spark/spark-3.5.1-bin-hadoop3"
os.environ["HADOOP_HOME"] = "C:/Hadoop"
os.environ["PYSPARK_PYTHON"] = "C:/Users/chris/AppData/Local/Programs/Python/Python311/python.exe" 
os.environ["PATH"] = (
    os.path.join(os.environ["JAVA_HOME"], "bin") + os.pathsep +
    os.path.join(os.environ["SPARK_HOME"], "bin") + os.pathsep +
    os.path.join(os.environ["HADOOP_HOME"], "bin") + os.pathsep +
    os.path.join(os.environ["PYSPARK_PYTHON"]) + os.pathsep +
    os.environ["PATH"])

## Create & Configure Session

In [3]:
def create_session():
    # create the session
    conf = SparkConf()
    conf.setAppName("DIS-lab-1")    # Sets name of the Spark Application
    conf.setMaster("local[16]")    # Master URL. In this case local[*] uses all the available cores in the machine
    conf.set("spark.driver.memory", "10G")   # Memory allocated to driver process
    conf.set("spark.driver.maxResultSize", "6G")    # Maximum size of results that can be returned to driver
    conf.set("spark.executor.instances", "4")
    conf.set("spark.executor.cores", "4")
    conf.set("spark.executor.memory", "4G")    # Memory allocated to each executor
    conf.set("spark.network.timeout", "600s")  # Increase network timeout
    conf.set("spark.executor.heartbeatInterval", "60s")  # Increase heartbeat interval
    conf.set("spark.rpc.message.maxSize", "512")  # Increase max message size
    conf.set("spark.driver.maxResultSize", "4G")  # Increase driver max result size
    conf.set("spark.sql.broadcastTimeout", "600")  # Increase broadcast timeout
    conf.set("spark.sql.shuffle.partitions", "200")  # Increase the number of shuffle partitions
    conf.set("spark.yarn.executor.memoryOverhead", "2048")  # Increase memory overhead
    conf.set("spark.memory.offHeap.enabled","true") 
    conf.set("spark.memory.offHeap.size","10g")  
    conf.set("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.0-s_2.12")  # Add GraphFrames to the spark configuration
    conf.set("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem")  # Ensure local file system is used for checkpointing

    sc = pyspark.SparkContext(conf=conf)    # Initializes the Spark context with this specific configuration
    spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()    # Creates Spark session
    
    # Set checkpoint directory
    sc.setCheckpointDir("/path/to/checkpoint/dir")
    
    return sc, spark

try:
    if 'sc' in globals() and sc is not None:
        sc.stop()
        print("--Stopped existing SparkContext")
    if 'spark' in globals() and isinstance(spark, SparkSession):
        spark.stop()
        print("--Stopped existing SparkSession")
except Exception as e:
    print(f"Error stopping existing Spark session or context: {e}")

# Create a new Spark session
sc, spark = create_session()
print("Spark session created successfully!")
spark

Spark session created successfully!


## Load and Split Data

### Load the data from the CSVs

In [4]:
logs = spark.read.csv('datasets/dataset_2.csv', header=True, inferSchema=True)

### Split the data into 5 separate columns

In [5]:
logs_splitted = logs \
    .withColumn("from_server", regexp_replace(split(col("Logs"), ", ").getItem(0), "[<>]", "")) \
    .withColumn("to_server", split(col("Logs"), ", ").getItem(1)) \
    .withColumn("time", split(col("Logs"), ", ").getItem(2)) \
    .withColumn("action", split(col("Logs"), ", ").getItem(3)) \
    .withColumn("process_id", regexp_replace(split(col("Logs"), ", ").getItem(4), "[<>]", "")) \
    .drop("Logs")

# Cast the "time" and "process_id" columns to integers
logs_casted = logs_splitted \
    .withColumn("time", col("time").cast("integer")) \
    .withColumn("process_id", col("process_id").cast("integer"))

logs_casted.limit(1).show(truncate=False)

+-----------+------------+----+-------+----------+
|from_server|to_server   |time|action |process_id|
+-----------+------------+----+-------+----------+
|user       |ui_server_11|0   |Request|1369      |
+-----------+------------+----+-------+----------+



## Group data

### Group by process_id

In [6]:
logs_grouped = logs_casted.groupBy("process_id").agg(
    collect_list("from_server").alias("from_servers"),
    collect_list("to_server").alias("to_servers"),
    collect_list("time").alias("times"),
    collect_list("action").alias("actions")
)
logs_grouped.limit(1).show(truncate=True)

+----------+--------------------+--------------------+--------------------+--------------------+
|process_id|        from_servers|          to_servers|               times|             actions|
+----------+--------------------+--------------------+--------------------+--------------------+
|      1001|[user, ui_server_...|[ui_server_11, pu...|[207, 212, 219, 2...|[Request, Request...|
+----------+--------------------+--------------------+--------------------+--------------------+



### Sort all the column items based on times so the sub-processes are in the right order

In [7]:
def sort_lists(times, from_servers, to_servers, actions):
    combined = list(zip(times, from_servers, to_servers, actions))
    sorted_combined = sorted(combined, key=lambda x: x[0])
    times_sorted, from_servers_sorted, to_servers_sorted, actions_sorted = zip(*sorted_combined)
    return list(times_sorted), list(from_servers_sorted), list(to_servers_sorted), list(actions_sorted)

# Define the schema for the sorted columns
sorted_lists_schema = StructType([
    StructField("times", ArrayType(LongType()), nullable=True),
    StructField("from_servers", ArrayType(StringType()), nullable=True),
    StructField("to_servers", ArrayType(StringType()), nullable=True),
    StructField("actions", ArrayType(StringType()), nullable=True)
])

# Register the function as a UDF
sort_lists_udf = udf(sort_lists, sorted_lists_schema)

# Apply the UDF to sort the lists based on the "times" column
logs_grouped = logs_grouped.withColumn("sorted_lists", sort_lists_udf("times", "from_servers", "to_servers", "actions"))

# Split the sorted lists into separate columns
logs_grouped = logs_grouped.withColumn("times", col("sorted_lists.times")) \
                           .withColumn("from_servers", col("sorted_lists.from_servers")) \
                           .withColumn("to_servers", col("sorted_lists.to_servers")) \
                           .withColumn("actions", col("sorted_lists.actions")) \
                           .drop("sorted_lists")

### Create 'process_string' column with a string of all the 'from_servers'

In [8]:
''' 
Using the withColumn method to create a new column "process_string".
The concat_ws function concatenates multiple column values into a single string, separated by a comma.
col("from_servers"): Selects the column "from_servers".
col("to_servers"): Selects the column "to_servers".
col("times"): Selects the column "times".
col("actions"): Selects the column "actions".
The concatenated string is stored in the new column "process_string".
'''

logs_grouped = logs_grouped.withColumn(
    "process_string",
    concat_ws(",", col("from_servers"))
)

logs_grouped.limit(1).show(truncate=True)

+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|process_id|        from_servers|          to_servers|               times|             actions|      process_string|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|      1001|[user, ui_server_...|[ui_server_11, pu...|[207, 212, 219, 2...|[Request, Request...|user,ui_server_11...|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+



# Approach 1 (LSH)

## 1. Create shingles

### 1.1. Pre-processing before shingles

#### Find sub-strings that exist in most server names and remove them (they do not add information)

In [9]:
def get_all_substrings(string, min_length):
    length = len(string)
    return [string[i:j] for i in range(length) for j in range(i + min_length, length + 1)]


def find_common_substring(server_names, min_length=3, min_occurrences_ratio=0.6):
    substr_counter = Counter()
    total_names = len(server_names)
    min_occurrences = int(total_names * min_occurrences_ratio)
    
    for name in server_names:
        substrings = get_all_substrings(name, min_length)
        substr_counter.update(substrings)
    
    # Filter substrings that occur at least min_occurrences times
    common_substrings = [substr for substr, count in substr_counter.items() if count >= min_occurrences]
    
    if not common_substrings:
        return ""
    
    # Return the longest common substring
    return max(common_substrings, key=len)


def remove_common_substring(strings, common_substring):
    return [string.replace(common_substring, "") for string in strings]

In [10]:
# Sample 5 rows from the data
sample_rows = logs_grouped.select("from_servers").take(5)

# Extract the "from_servers" lists from the sample rows
sample_from_servers = [row["from_servers"] for row in sample_rows]

# Flatten the list of lists to a single list of server names
flattened_server_names = [server for sublist in sample_from_servers for server in sublist]

# Get unique server names
unique_server_names = list(set(flattened_server_names))

# Use the function on the unique server names
common_substring = find_common_substring(unique_server_names, min_length=3, min_occurrences_ratio=0.70)

print("Most Common Substring:", common_substring)

Most Common Substring: _server_


#### Remove the most common sub-string and all the special characters and spaces

In [11]:
# Create a new DataFrame with the common substring removed from the process_string
logs_grouped_cleaned = logs_grouped.withColumn(
    "process_string_clean_common",
    regexp_replace(col("process_string"), common_substring, ""),
)

logs_grouped_cleaned = logs_grouped_cleaned.withColumn(
    "process_string_cleaned",
    regexp_replace(col("process_string_clean_common"), "[^a-zA-Z0-9\s]", "")
)

# Show the cleaned DataFrame
logs_grouped_cleaned.select("process_string_cleaned").limit(1).show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|process_string_cleaned                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+-----------------

### 1.2. Define the shingles of each function

In [12]:
first_rows = logs_grouped_cleaned.take(5)

for row in first_rows:
    servers = row["process_string_clean_common"]
    server_names = servers.split(',')
    length = sum([len(server) for server in server_names])
    print(length)
    print(server_names)

523
['user', 'ui11', 'purchase_book7', 'book_availability2', 'user_credentials5', 'user_history2', 'card_check6', 'american_express1', 'limit_check_american_express20', 'fraud_check_american_express5', 'currency_conversion1', 'shipping_options2', 'inventory_update7', 'review1', 'review_verification1', 'review_analysis1', 'ad1', 'customer_support9', 'ad1', 'review_analysis1', 'review_verification1', 'review1', 'inventory_update7', 'shipping_options2', 'currency_conversion1', 'fraud_check_american_express5', 'limit_check_american_express20', 'american_express1', 'card_check6', 'user_history2', 'user_credentials5', 'book_availability2', 'purchase_book7']
573
['user', 'ui5', 'purchase_book10', 'book_availability2', 'external_inventory1', 'second_hand_market2', 'bundle_offer1', 'user_credentials2', 'card_check5', 'visa5', 'fraud_check_visa2', 'currency_conversion2', 'shipping_options1', 'express_delivery1', 'inventory_update14', 'supplier_notification3', 'review3', 'review_verification1', '

In [13]:
# Define the shingling function for process strings
def shingle_process_string(process_string, k):
    shingle_set = set()
    for i in range(len(process_string) - k + 1):
        shingle = process_string[i:i + k]
        shingle_set.add(shingle)
    return list(shingle_set)

# Define the shingle length based on the average length of the servers of the first 5 rows
first_rows = logs_grouped_cleaned.take(5)
average_length = 0

# Calculate the average length of server names
total_length = 0
total_servers = 0

# For each row in the sample rows sum the length of the names and the number of servers
for row in first_rows:
    servers = row["process_string_clean_common"]
    server_names = servers.split(',')
    total_length += sum([len(server) for server in server_names])
    total_servers += len(server_names)

# Find the average server name length
average_server_name_length = total_length / total_servers

# Devine ths shingle length as the 30% of the average server name (or 2 if the names are too short)
shingle_length_percentage = 0.3
shingle_length = max(2, int(np.ceil(average_server_name_length *shingle_length_percentage)))

print(f'Average length of server names: {average_server_name_length}')
print(f'Determined shingle length: {shingle_length}')

# Register the UDF for shingling
shingle_udf = udf(lambda process_string: shingle_process_string(process_string, shingle_length), ArrayType(StringType()))

# Apply the UDF to create shingles from process strings
logs_grouped_cleaned_with_shingles = logs_grouped_cleaned.withColumn("shingles", shingle_udf(col("process_string_cleaned")))

# Show the resulting DataFrame with shingles
logs_grouped_cleaned_with_shingles.select("process_id", "process_string_cleaned", "shingles").limit(1).show(truncate=False)

Average length of server names: 15.35754189944134
Determined shingle length: 5
+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### 1.3. Create Vocabulary from shingles

#### Keep all unique shingles

In [14]:
# Collect all shingles from the DataFrame into a single list
all_unique_shingles = logs_grouped_cleaned_with_shingles.select("shingles").rdd.flatMap(lambda row: row.shingles).distinct().collect()

# Create a vocabulary dictionary where each shingle is assigned a unique index
vocab = {shingle: idx for idx, shingle in enumerate(all_unique_shingles)}
# Print the vocabulary to verify
print(f'Vocabulary: {list(vocab)[:10]} . . .')
print(f'Length: {len(vocab)}')

Vocabulary: ['20fra', 'rific', 'ustom', 'ckame', '5limi', 'tials', 'icati', 'rt9ad', 'press', 'k7boo'] . . .
Length: 2971


#### Create the sparse vector of shingle occurence for each process

In [15]:
# Define a function to create sparse vectors based on the vocabulary
def create_sparse_vector(shingles, vocab):
    vector = [0] * len(vocab)
    for shingle in shingles:
        if shingle in vocab:
            idx = vocab[shingle]
            vector[idx] = 1
    return vector

# Register the UDF to create sparse vectors
sparse_vector_udf = udf(lambda shingles: create_sparse_vector(shingles, vocab), ArrayType(IntegerType()))

# Apply the UDF to create sparse vectors
logs_grouped_sparse = logs_grouped_cleaned_with_shingles.withColumn("sparse_vector", sparse_vector_udf(col("shingles")))

# Show the resulting DataFrame with sparse vectors
logs_grouped_sparse.select("process_id", "sparse_vector").limit(1).show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### Convert sparse vectors to the format expected by MinHashLSH

In [16]:
def convert_to_vector(sparse_vector):
    indices = [i for i, x in enumerate(sparse_vector) if x == 1]
    values = [1.0] * len(indices)
    return Vectors.sparse(len(sparse_vector), indices, values)

# Create a new DataFrame with the vectors
data = logs_grouped_sparse.rdd.map(lambda row: Row(process_id=row['process_id'], features=convert_to_vector(row['sparse_vector'])))
logs_sparse_vectors = spark.createDataFrame(data)

logs_sparse_vectors.limit(1).show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## 2. Perform MinHash and LSH using the pyspark function MinHashLSH

### 2.1. Create MinHashLSH model and fit it with the 'features' column of the 'logs_sparse_vectors' dataframe

In [17]:
# Initialize MinHashLSH
'''
:inputCol: The column of our dataframe that is going to be used for hashing
:outputCol: The new solumn that will be created that includes the hashes
:numHashTables: Number of MinHash functions
:seed: Seed for reproducibility

'''
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10, seed=12345)

# Fit the model
model = mh.fit(logs_sparse_vectors)

# Transform the data to include the hashes
transformed_df = model.transform(logs_sparse_vectors)

# Show the resulting DataFrame with hashes
transformed_df.select("process_id", "hashes").limit(1).show(truncate=False)

+----------+--------------------------------------------------------------------------------------------------------------------------------+
|process_id|hashes                                                                                                                          |
+----------+--------------------------------------------------------------------------------------------------------------------------------+
|1001      |[[973116.0], [3758271.0], [2348276.0], [7263313.0], [1179714.0], [8642351.0], [3089106.0], [628949.0], [6335436.0], [6136671.0]]|
+----------+--------------------------------------------------------------------------------------------------------------------------------+



### 2.2. Find the pairs that have Jaccard distance less than 'max_distance_threshold'

In [18]:
# Find similar pairs with Jaccard similarity above a threshold
max_distance_threshold = 0.5
similar_pairs_df = model.approxSimilarityJoin(logs_sparse_vectors, logs_sparse_vectors, max_distance_threshold, distCol="JaccardDistance") \
    .select(col("datasetA.process_id").alias("process_id_1"),
            col("datasetB.process_id").alias("process_id_2"),
            col("JaccardDistance"))

In [19]:
similar_pairs_df.select("JaccardDistance").describe().show()

+-------+-------------------+
|summary|    JaccardDistance|
+-------+-------------------+
|  count|               2688|
|   mean|0.30062358523621213|
| stddev|0.23198173532691344|
|    min|                0.0|
|    max| 0.4989339019189766|
+-------+-------------------+

