In [1]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.types import StringType, ArrayType, IntegerType, FloatType
from pyspark.sql.functions import concat, lit
from pyspark.sql.functions import avg, length
from pyspark.sql.functions import col, expr, count , row_number
from pyspark.sql.window import Window
import math
import time


# PART 1:
## 1. Grouping the similar processes according to Jaccard Similarities
## 2. Creating the new data 

# code to start the Master:
1. Open cmd and admin
2. write "cd %SPARK_HOME%"
3. bin\spark-class2.cmd org.apache.spark.deploy.master.Master
# code to start the worker:
1. Open cmd and admin
2. write "cd %SPARK_HOME%"
3. write "bin\spark-class2.cmd org.apache.spark.deploy.worker.Worker -c 6 -m 10G spark://192.168.1.81:7077"
* in step 3:
* -c -> number of cores
* -m -> amount of RAM for the current worker
* the spark link is from the Master link ( go to the web page of the master and locate the spark link )

In [2]:
from pyspark.sql import SparkSession
import findspark
findspark.init()

spark = SparkSession.builder \
    .appName("part1Grouping") \
    .master("spark://192.168.1.81:7077") \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "12") \
    .config("spark.driver.memory", "6g") \
    .config("spark.driver.cores", "3") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.heartbeatInterval", "100s") \
    .config("spark.sql.broadcastTimeout", "3600s") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
    .config("spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored", "true") \
    .getOrCreate()



In [None]:
import csv
import ast

# Define the input and output file paths
input_file = 'demiRecords.txt'
output_file = 'output.csv'

# Read the text file and parse the lines
with open(input_file, 'r') as file:
    lines = file.readlines()

# Preprocess the lines to handle 'null' as a string
processed_lines = [line.strip().replace('null', "'null'") for line in lines]

# Parse the lines into a list of dictionaries
data = [ast.literal_eval(line) for line in processed_lines]

# Define the CSV headers
headers = ['FromServer', 'ToServer', 'time', 'action', 'processId']

# Write the data to a CSV file
with open(output_file, 'w', newline='') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=headers)
    writer.writeheader()
    for row in data:
        writer.writerow(row)

print(f"Data has been successfully written to {output_file}")

In [3]:
# Load the data into a DataFrame
data_path = "output.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)


In [4]:
from pyspark.sql.functions import col, collect_list, struct

# Group by processID and collect the sequence of actions
processes_df = df.groupBy("processID").agg(collect_list(struct("FromServer", "ToServer", "time", "action")).alias("actions"))

# Convert actions to string for MinHash LSH
def actions_to_string(actions):
    return "".join([f"{action['FromServer']}{action['ToServer']}" for action in actions])

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

actions_to_string_udf = udf(actions_to_string, StringType())
processes_df = processes_df.withColumn("actions_str", actions_to_string_udf(col("actions")))


In [5]:
# calculate the median process lenght to aproximate

# Calculate the length of actions_str column
df_with_length = processes_df.withColumn("length", length("actions_str"))

# Calculate the median length using window function and sorting
windowSpec = Window.orderBy("length")
df_with_length = df_with_length.withColumn("row_num", row_number().over(windowSpec))
count_df = df_with_length.count()

median_row = math.ceil(count_df / 2.0)

median_length = df_with_length.filter(col("row_num") == median_row).select("length").first()
cur_k = 5
thresholds = [(10000, 9), (5000, 8), (1000, 7), (100, 6)]
for threshold, value in thresholds:
    if median_length[0] > threshold:
        cur_k = value
        break
    

In [6]:
# Convert actions string into shingles
def get_shingles(row, k=5):
    concatenated_str = ''.join(row)
    shingles = [concatenated_str[i:i+k] for i in range(len(concatenated_str) - (k - 1))]
    return shingles
from pyspark.sql.types import ArrayType

get_shingles_udf = udf(lambda x: get_shingles(x,cur_k), ArrayType(StringType()))
processes_df = processes_df.withColumn("shingles", get_shingles_udf(col("actions_str")))


In [7]:
from pyspark.sql.functions import array, array_union, collect_list, explode, col
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="shingles", outputCol="features",binary=True)
cv_model = cv.fit(processes_df)
vectorized_df = cv_model.transform(processes_df)
start = time.time()
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10)
mh_model = mh.fit(vectorized_df)

hashed_df = mh_model.transform(vectorized_df)
threshold = 0.5
# Find similar candidate process IDs using MinHashLSH
similarity_df = mh_model.approxSimilarityJoin(hashed_df, hashed_df, threshold, distCol="JaccardDistance") \
    .select(col("datasetA.processID").alias("processID_A"),
            col("datasetB.processID").alias("processID_B"),
            col("JaccardDistance"),col("datasetA.features").alias("featuresA"),col("datasetB.features").alias("featuresB"))

# Filter out self-joins and duplicates
similarity_df = similarity_df.filter(col("processID_A") < col("processID_B"))

# Function to calculate Jaccard similarity
def jaccard_similarity(vec1, vec2):
    set1 = set(vec1.indices)
    set2 = set(vec2.indices)
    intersection = set1.intersection(set2)
    union = set1.union(set2)
    if len(union) == 0:
        return 0.0
    return float(len(intersection)) / len(union)

# Register the function as a UDF
from pyspark.sql.types import FloatType
jaccard_similarity_udf = udf(jaccard_similarity, FloatType())

# Calculate Jaccard similarity for each candidate pair
similarity_df = similarity_df.withColumn("JaccardSimilarity", jaccard_similarity_udf(col("featuresA"), col("featuresB")))
# Filter pairs with Jaccard similarity above a threshold (e.g., 0.8)
similarity_df_filtered = similarity_df.filter(col("JaccardSimilarity") >= 0.9)
end = time.time()
print("the time that it takes our model to find the pairs are ",end-start)
# Group by processID_A and collect similar processIDs
grouped_df = similarity_df.groupBy("processID_A").agg(collect_list("processID_B").alias("similar_processIDs"))

# Convert processID_A to an array and concatenate with similar_processIDs
grouped_df = grouped_df.withColumn("all_processIDs", array_union(array(col("processID_A")), col("similar_processIDs")))

# Explode the all_processIDs array to get a mapping of each process ID to its group
exploded_df = grouped_df.select(explode(col("all_processIDs")).alias("processID"), col("processID_A").alias("group_representative"))
similarity_df.show()

the time that it takes our model to find the pairs are  0.21000146865844727
+-----------+-----------+-------------------+--------------------+--------------------+-----------------+
|processID_A|processID_B|    JaccardDistance|           featuresA|           featuresB|JaccardSimilarity|
+-----------+-----------+-------------------+--------------------+--------------------+-----------------+
|          1|       1164|                0.0|(8739,[72,142,145...|(8739,[72,142,145...|              1.0|
|          2|       5660|                0.0|(8739,[55,99,100,...|(8739,[55,99,100,...|              1.0|
|         10|       2725|                0.0|(8739,[62,310,545...|(8739,[62,310,545...|              1.0|
|         14|       2074|                0.0|(8739,[16,51,75,7...|(8739,[16,51,75,7...|              1.0|
|         28|       7807|                0.0|(8739,[17,18,19,2...|(8739,[17,18,19,2...|              1.0|
|         32|       1445|                0.4|(8739,[71,222,227...|(8739,[71,

In [8]:
# Merge overlapping groups
def merge_groups(group_list):
    groups = []
    for group in group_list:
        merged = False
        for existing_group in groups:
            if any(item in group for item in existing_group):
                existing_group.update(group)
                merged = True
                break
        if not merged:
            groups.append(set(group))
    return [list(group) for group in groups]

merge_groups_udf = udf(lambda x: merge_groups(x), ArrayType(ArrayType(IntegerType())))

grouped_lists = exploded_df.groupBy("group_representative") \
    .agg(collect_list("processID").alias("group_list")) \
    .agg(collect_list("group_list").alias("group_lists"))

merged_groups = grouped_lists.withColumn("merged_groups", merge_groups_udf(col("group_lists"))) \
    .select(explode(col("merged_groups")).alias("final_group"))

# Convert the final groups to a DataFrame
from pyspark.sql.functions import concat_ws

final_groups_df = merged_groups.select(concat_ws("_", col("final_group")).alias("Group"), col("final_group"))

# Find the representative process for each final group
final_groups_exploded = final_groups_df.withColumn("processID", explode(col("final_group")))

# Join with the original DataFrame to keep only the representative process
filtered_df = df.join(final_groups_exploded, on="processID", how="inner")

# Select the smallest processID in each group as the representative
from pyspark.sql.functions import min

group_representative_df = final_groups_exploded.groupBy("Group").agg(min("processID").alias("representative_processID"))

# Join to get the full details of the representative processes
representative_processes_df = group_representative_df.join(filtered_df, filtered_df["processID"] == group_representative_df.representative_processID, "inner") \
    .select("processID", "FromServer", "ToServer", "time", "action")

In [9]:
from pyspark.sql.functions import col, expr
# Step 1: Remove Processes in Groups
# Get the list of process IDs to remove
processes_to_remove = final_groups_df.selectExpr("explode(final_group) as processID").distinct()

# Filter out rows where processID is in processes_to_remove
df_without_groups = df.join(processes_to_remove, "processID", "left_anti")
df_without_groups = df_without_groups.select("FromServer", "ToServer", "time", "action","processID")
# Add a constant number to processID
constant_number = df.agg({"processID": "max"}).first()[0]
new_representative_processes_df = representative_processes_df.withColumn(
    "processID",
    expr(f"processID + {constant_number}")
)

# Show the final DataFrame
new_representative_processes_df = new_representative_processes_df.select("FromServer", "ToServer", "time", "action","processID").orderBy("time")

# Combine original DataFrame and representatives DataFrame
combined_df = df_without_groups.union(new_representative_processes_df)


# creating the txt files:
## The desired files will be in the folder output

In [10]:
def write_to_one_txt(df, local_path_name,wanted_list):
    correct_path = wanted_list + "/part1Output.txt"
    formatted_df = df.withColumn(
    "formatted_line",
    concat(lit("<"), df.FromServer, lit(","),
           df.ToServer, lit(","),
           df.time, lit(","),
           df.action, lit(","),
           df.processID, lit(">"))
)
    open(correct_path, "w")
    formatted_df.select("formatted_line").write.mode("overwrite").text(output_path)
    os.system(f'cat {local_path_name}/*.txt >> {correct_path}')
    os.system(f'rm -r {local_path_name}')
    

In [None]:

# Define the output path
output_path = "./part1OUT1"
output_path1 = "./output"
write_to_one_txt(combined_df,output_path,output_path1)
# Write the DataFrame to a CSV file


In [None]:
# creating a dataframe only with the processes that were grouped.
df_with_groups = df.join(processes_to_remove, "processID", "semi")
df_with_groups.show()

In [None]:
exploded_final_groups_df = final_groups_df.select("Group", explode("final_group").alias("processID"))
joined_df = df_with_groups.join(exploded_final_groups_df, "processID")

In [None]:
# Function to write groups to txt file
def write_groups_to_txt(grouped_df, output_path):
    with open(output_path, "w") as file:
        for row in grouped_df.collect():
            group_name = row["Group"]
            process_ids = row["processIDs"]
            formatted_rows = row["formatted_rows"]
            
            # Ensure process_ids are unique and sorted
            process_ids = sorted(set(process_ids))
            
            file.write(f"Group: {{{', '.join(map(str, process_ids))}}}\n")
            
            for process_id in process_ids:
                file.write(f"{process_id}:\n")
                
                # Find all formatted rows for the current process ID
                rows_for_process_id = [row for row in formatted_rows if row.endswith(f",{process_id}>")]
                
                if rows_for_process_id:
                    for formatted_row in rows_for_process_id:
                        file.write(f"{formatted_row}\n")
                else:
                    file.write("<No corresponding formatted rows found>\n")
                    
            file.write("\n")  # Add empty line between groups for clarity

In [None]:

# Format each row into the desired format
formatted_df = joined_df.withColumn(
    "formatted_row",
    concat_ws("", lit("<"), col("FromServer"), lit(","), col("ToServer"),
              lit(","), col("time"), lit(","), col("action"), lit(","), col("processID"), lit(">"))
)

# Group by Group name and aggregate process IDs and formatted rows
grouped_df = formatted_df.groupBy("Group").agg(
    collect_list("processID").alias("processIDs"),
    collect_list("formatted_row").alias("formatted_rows")
)



# Output path
output_path = "./output/part1Observations.txt"

# Call function to write to text file
write_groups_to_txt(grouped_df, output_path)

# Evaluation

In [12]:
# Evaluation function

def evaluateData(original_df,df_similar_Minhash,threshold, process_id_col='processID'):
    # start
    start = time.time()
    jaccard_similarity_udf = udf(jaccard_similarity, FloatType())
    cross_joined_df = original_df.alias("df1").join(original_df.alias("df2")).select(col("df1.processID").alias("processID_A"),col("df1.features").alias("processAFeatures"),
                                                                  col("df2.processID").alias("processID_B"),col("df2.features").alias("processBFeatures")).orderBy(col("processID_A"))
    filtered_df = cross_joined_df.filter(col("processID_A") < col("processID_B"))
    # Calculate Jaccard similarity for each candidate pair
    similarity_df = filtered_df.withColumn("JaccardSimilarity", jaccard_similarity_udf(col("processAFeatures"), col("processBFeatures")))
    
    similarity_df = similarity_df.select(col("processID_A"),col("processID_B"),col("JaccardSimilarity"))

    t1 = similarity_df.filter(col("JaccardSimilarity") >= threshold)
    #stop
    end = time.time()
    print("the time it takes to calculate the original pairs are ",end - start)
    print("This is the similar ds for Original")
    f1 = similarity_df.filter(col("JaccardSimilarity") < threshold)
    print("This is the disimilar ds for Original")
    df_similar_Minhash = df_similar_Minhash.select(col("processID_A"),col("processID_B"),col("JaccardSimilarity"))
    t3 = df_similar_Minhash.filter(col("JaccardSimilarity") >= threshold)
    f3 = df_similar_Minhash.filter(col("JaccardSimilarity") < threshold)
    TN = f1.subtract(t3)
    TP = t3.intersect(t1)
    FP = t3.intersect(f1)
    FN = t1.subtract(t3)
    return TN,TP,FP,FN


In [13]:
TN,TP,FP,FN = evaluateData(vectorized_df,similarity_df,0.9)

the time it takes to calculate the original pairs are  0.06589198112487793
This is the similar ds for Original
This is the disimilar ds for Original


In [None]:
TP.take(5)

In [14]:
TN.take(5)

Py4JJavaError: An error occurred while calling o471.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 in stage 36.0 (TID 40) (192.168.1.81 executor 6): java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:318)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796)
	at java.base/java.net.Socket$SocketInputStream.implRead(Socket.java:1108)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1095)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:291)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:347)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:399)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:208)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:179)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:318)
	at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:346)
	at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:796)
	at java.base/java.net.Socket$SocketInputStream.implRead(Socket.java:1108)
	at java.base/java.net.Socket$SocketInputStream.read(Socket.java:1095)
	at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:291)
	at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:347)
	at java.base/java.io.BufferedInputStream.implRead(BufferedInputStream.java:420)
	at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:399)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:208)
	at java.base/java.io.DataInputStream.readFully(DataInputStream.java:179)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1160)
	at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)


In [None]:
similarity_df.take(1)

# Part 2

In [None]:
from pyspark.sql.functions import collect_set, col, concat_ws, collect_list
from pyspark.ml.feature import CountVectorizer, MinHashLSH
from pyspark.sql.functions import array_union, explode,array
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType, IntegerType, FloatType



dataForPart2 = df
dataForPart2.show()
# Aggregate FromServer and ToServer into sets for each processId
agg_df = dataForPart2.groupBy("processId").agg(
    collect_set("FromServer").alias("servers_array")
)

agg_df.show(truncate=False)


In [None]:
# Use CountVectorizer to convert server names to feature vectors
cv = CountVectorizer(inputCol="servers_array", outputCol="features")
cv_model = cv.fit(agg_df)
cv_df = cv_model.transform(agg_df)

cv_df.show()

In [None]:

print("Distinct Attributes (Vocabulary):")
for i, attr in enumerate(cv_model.vocabulary):
    print(f"{i}. {attr}")


In [None]:

# Show the transformed DataFrame with processId and features
cv_df.select("processId", "features").show(truncate=False)


In [None]:

# Apply MinHash LSH
numOftables = 10
minhash = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=numOftables)
model = minhash.fit(cv_df)
transformed_df = model.transform(cv_df)


threshold=0.5

# Perform clustering based on MinHash LSH

candidates = model.approxSimilarityJoin(transformed_df, transformed_df, threshold , distCol="JaccardDistance") \
    .select(col("datasetA.processId").alias("processIdA"),
            col("datasetB.processId").alias("processIdB"),
            col("JaccardDistance"),col("datasetA.features").alias("featuresA"),col("datasetB.features").alias("featuresB"))


In [None]:

# Filter out self-joins and duplicates
candidates = candidates.filter(col("processIdA") < col("processIdB"))
# Function to calculate Jaccard similarity
print("Candidates")
candidates.show()

jaccard_similarity_udf = udf(jaccard_similarity, FloatType())

In [None]:
# Calculate Jaccard similarity for each candidate pair
similarity_df = candidates.withColumn("JaccardSimilarity", jaccard_similarity_udf(col("featuresA"), col("featuresB")))
similarity_df.show()


 # Filter pairs with Jaccard similarity above a threshold (e.g., 0.6)
similarity_df = similarity_df.filter(col("JaccardSimilarity") >= 0.3)

similarity_df.show()

merge_groups_udf = udf(lambda x: merge_groups(x), ArrayType(ArrayType(IntegerType())))

grouped_lists = exploded_df.groupBy("group_representative") \
    .agg(collect_list("processID").alias("group_list")) \
    .agg(collect_list("group_list").alias("group_lists"))

merged_groups = grouped_lists.withColumn("merged_groups", merge_groups_udf(col("group_lists"))) \
    .select(explode(col("merged_groups")).alias("final_group"))

# Convert the final groups to a DataFrame
from pyspark.sql.functions import concat_ws

final_groups_df = merged_groups.select(concat_ws("_", col("final_group")).alias("Group"), col("final_group"))

final_groups_df.show()
# Output path
output_path = "./output/part2Observations.txt"

processes_from_groups = final_groups_df.selectExpr("explode(final_group) as processID").distinct()
# creating a dataframe only with the processes that were grouped.
df_with_groups = dataForPart2.join(processes_from_groups, "processID", "semi")
df_with_groups.show()

exploded_final_groups_df = final_groups_df.select("Group", explode("final_group").alias("processID"))
joined_df = df_with_groups.join(exploded_final_groups_df, "processID")
# Call function to write to text file
joined_df.show()
# write_groups_to_txt(final_groups_df, output_path)

# creating the txt files:
## The desired files will be in the folder output

In [None]:
# Format each row into the desired format
formatted_df = joined_df.withColumn(
    "formatted_row",
    concat_ws("", lit("<"), col("FromServer"), lit(","), col("ToServer"),
              lit(","), col("time"), lit(","), col("action"), lit(","), col("processID"), lit(">"))
)

# Group by Group name and aggregate process IDs and formatted rows
grouped_df = formatted_df.groupBy("Group").agg(
    collect_list("processID").alias("processIDs"),
    collect_list("formatted_row").alias("formatted_rows")
)



# Output path
output_path = "./output/part2Observations.txt"

# Call function to write to text file
write_groups_to_txt(grouped_df, output_path)

# Evaluation Part 2

In [None]:
#Evaluation function for part 2
# Evaluation function

def evaluateData(original_df,df_similar_Minhash,threshold, process_id_col='processID'):
    jaccard_similarity_udf = udf(jaccard_similarity, FloatType())
    cross_joined_df = original_df.alias("df1").join(original_df.alias("df2")).select(col("df1.processId").alias("processIdA"),col("df1.features").alias("processAFeatures"),
                                                                  col("df2.processId").alias("processIdB"),col("df2.features").alias("processBFeatures")).orderBy(col("processIdA"))
    filtered_df = cross_joined_df.filter(col("processIdA") < col("processIdB"))
    # Calculate Jaccard similarity for each candidate pair
    similarity_df = filtered_df.withColumn("JaccardSimilarity", jaccard_similarity_udf(col("processAFeatures"), col("processBFeatures")))
    df_similar_Minhash = df_similar_Minhash.select(col("processIdA"),col("processIdB"),col("JaccardSimilarity"))
    similarity_df = similarity_df.select(col("processIdA"),col("processIdB"),col("JaccardSimilarity"))
    t1 = similarity_df.filter(col("JaccardSimilarity") >= threshold)
    f1 = similarity_df.filter(col("JaccardSimilarity") < threshold)
    t3 = df_similar_Minhash.filter(col("JaccardSimilarity") >= threshold)
    f3 = df_similar_Minhash.filter(col("JaccardSimilarity") < threshold)
    TN = f1.subtract(t3)
    TP = t3.intersect(t1)
    FP = t3.intersect(f1)
    FN = t1.subtract(t3)
    print("TN" , TN.count(), " TP " , TP.count(), " FP " , FP.count(),  " FN " , FN.count() )

In [None]:
# Filter pairs with Jaccard similarity above a threshold (e.g., 0.6)
evaluateData(cv_df,similarity_df,0.3)