In [50]:
from data_generation import *

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,concat_ws, collect_list, lit,split, size, avg, udf, row_number, when
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType
#from pyspark.sql.types import StructType, StructField, IntegerType, StringType
#from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import max as spark_max


from datasketch import MinHash, MinHashLSH
from sklearn.cluster import KMeans
from numpy import average
import numpy as np 

import shutil
import os
import numpy as np

import time
import resource
import psutil
import matplotlib.pyplot as plt
#import pandas as pd


# All functions used

In [250]:
################################################################################################################################
################################################# Question 1 ###################################################################
################################################################################################################################

def shingle(text, k=7):
    shingle_set = []
    for i in range(len(text)-k +1):
        shingle_set.append(text[i:i+k])
    return list(set(shingle_set))

def jaccard_similarity(list1, list2):   
    return len(set(list1).intersection(set(list2))) / len(set(list1).union(set(list2)))

def minhash_lsh(df, k_shingle, threshold):

    lsh = MinHashLSH(threshold=threshold, num_perm=128)
    minhashes = {}

    for features in df.collect():
        shingles = shingle(features["features"], k_shingle)
        m = MinHash(num_perm=128)
        for shingle_item in shingles:
            m.update(shingle_item.encode("utf8"))
        minhashes[int(features["user_id"])] = m
        lsh.insert(int(features["user_id"]), m)

    replacement_candidates = {}
    for key in lsh.keys: 
        replacement_candidates[key] = lsh.query(minhashes[key]) 

    #Key: New representative, value: Similar items
    return replacement_candidates,minhashes


def dfs_paths(graph, start):
    def dfs_helper(node, current_path, all_paths, visited):
        visited.add(node)
        current_path.append(node)
        
        if node not in graph or not graph[node]:  # If node has no children or is a leaf node
            all_paths.append(current_path[:])  # Append a copy of current_path
        else:
            for neighbor in graph[node]:
                if neighbor not in visited:
                    dfs_helper(neighbor, current_path, all_paths, visited)
        
        current_path.pop()  # Backtrack
        visited.remove(node)  # Remove node from visited set to allow other paths
    
    all_paths = []
    visited = set()  # Set to track visited nodes
    dfs_helper(start, [], all_paths, visited)
    return all_paths

def all_similar(graph, start_node):
    all_paths = dfs_paths(graph, start_node)
    return list(set([key for path in all_paths for key in path]))

def remove_dups(graph):
    for key,values in graph.items():
        if key in graph[key]:
            values.remove(key)
            graph[key] = values
    return graph

def bucketing(old_replacement_candidates):
    alt_old_replacement_candidates = old_replacement_candidates.copy()
    replacement_candidates = remove_dups(alt_old_replacement_candidates)
    visited = set()
    new_process_dictionary = {}
    for key in replacement_candidates.keys():
        if key not in visited:
            sim = all_similar(replacement_candidates,key)
            new_process_dictionary[key] = sim
            visited.add(key)
            for s in sim:
                visited.add(s)
        
    return new_process_dictionary


# def bucketing(replacement_candidates):
#     visited_processes = set()
#     new_process_dictionary = {}
#     for key, values in replacement_candidates.items():
#         new_values = []
#         for value in values:
#             if value not in visited_processes:
#                 visited_processes.add(value)
#                 new_values.append(value)
#         if new_values:  # Only add non-empty lists
#             new_process_dictionary[key] = sorted(new_values)
#     return new_process_dictionary

def get_memory_usage(): 
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024

def get_cpu_usage(): 
    return psutil.cpu_percent(interval=None)

def get_performance(func1,func2, vals):
    #k_values = [2, 3, 4, 5, 6, 7, 8]
    results = []

    for k in vals:
        start_time = time.time()
        start_mem = get_memory_usage()
        start_cpu = get_cpu_usage()

        replacement_candidates, minhashes = func1(df_grouped, k, 0.98)
        new_process_dictionary = func2(replacement_candidates)
        
        end_time = time.time()
        end_mem = get_memory_usage()
        end_cpu = get_cpu_usage()

        duration = end_time - start_time
        mem_used = end_mem - start_mem

        results.append({
            'k': k,
            'time_seconds': duration,
            'memory_mb': mem_used,
            'unique_processes': len(new_process_dictionary),
            'cpu': end_cpu
        })
    return results

def plot_results(results):
    k_values = [result['k'] for result in results]
    time_seconds = [result['time_seconds'] for result in results]
    cpu_percentages = [result['cpu'] for result in results]
    fig, ax1 = plt.subplots(figsize=(10, 6))

    ax1.set_xlabel('k values')
    ax1.set_ylabel('Time (seconds)', color='tab:blue')
    ax1.plot(k_values, time_seconds, marker='o', color='tab:blue', label='Time')
    ax1.tick_params(axis='y', labelcolor='tab:blue')

    ax2 = ax1.twinx()
    ax2.set_ylabel('CPU Usage (%)', color='tab:red')

    ax2.plot(k_values, cpu_percentages, marker='^', color='tab:red', linestyle='--', label='CPU Usage')
    ax2.tick_params(axis='y', labelcolor='tab:red')

    plt.title('Performance Metrics for Different k Values')
    fig.legend(loc='upper left')
    plt.tight_layout()
    plt.grid(True)
    plt.show()

def plot_performances(results):
    k_values = [result['k'] for result in results]
    time_seconds = [result['time_seconds'] for result in results]
    cpu_percentages = [result['cpu'] for result in results]

    # Calculate the performance metric (Product of time_seconds and cpu)
    performance_metric = [time_seconds[i] * cpu_percentages[i] for i in range(len(results))]

    # Create a figure and axis
    plt.figure(figsize=(10, 6))

    # Plot the performance metric
    plt.plot(k_values, performance_metric, marker='o', linestyle='-', color='purple', label='Time * CPU')

    # Set labels and title
    plt.xlabel('k values')
    plt.ylabel('Performance Metric (Time * CPU)')
    plt.title('Combined Metric of Time and CPU Usage vs. k Values')
    plt.xticks(k_values)
    plt.grid(True)
    plt.legend()

    # Display the plot
    plt.tight_layout()
    plt.show()

def get_case(caseID,data):
    data1 = data.filter(data.user_id.isin([caseID]))
    data1.show()

def compare_cases(case1,case2,data):
    data1 = data.filter(data.user_id.isin([case1]))
    data2 = data.filter(data.user_id.isin([case2]))
    desired_column_list1 = data1.select("to").rdd.flatMap(lambda x: x).collect()
    desired_column_list2 = data2.select("to").rdd.flatMap(lambda x: x).collect()

    common_elements = np.intersect1d(desired_column_list1, desired_column_list2)
    union_elements = np.union1d(desired_column_list1, desired_column_list2)
    print(len(common_elements)/len(union_elements))

def get_traces(user_id,df):
    result = df.filter(col("user_id") == user_id).select("features").collect()
    if result:
        return result[0]["features"]
    else:
        return None

def get_shingles(user_id,df):
    result = df.filter(col("user_id") == user_id).select("shingles").collect()
    if result:
        return result[0]["shingles"]
    else:
        return None


def write_df(df,file_name):
    os.makedirs('Output', exist_ok=True)
    os.makedirs('temp', exist_ok=True)
    #df.write.csv('temp/temp_outoput', header=True, mode="overwrite")
    df.write.mode("overwrite").text(file_name)

    part_file = [f for f in os.listdir('temp/temp_outoput') if f.startswith("part-")][0]

    shutil.move(os.path.join('temp/temp_outoput', part_file), file_name)
    shutil.rmtree('temp/temp_outoput')
    shutil.rmtree('temp')

def output_part1(dataset,k,threshold):
    data = spark.read.csv(dataset, header=True, inferSchema=True)
    df_filtered_m = data.filter(data.type.isin(['Req']))
    df_grouped = df_filtered_m.groupBy("user_id").agg(concat_ws("",collect_list("to")).alias("features"))
    
    replacement_candidates = minhash_lsh(df_grouped,k,threshold)
    new_process_dictionary = bucketing(replacement_candidates[0])    

    user_ids_to_change = [key for key,values in new_process_dictionary.items() if values != []]
    max_user_id = df_grouped.select(spark_max("user_id")).collect()[0][0]

    df_to_change = df_grouped.filter(col("user_id").isin(user_ids_to_change))
    distinct_user_ids_to_change = df_to_change.select("user_id").distinct()
    window_spec = Window.orderBy("user_id")
    user_id_mapping = distinct_user_ids_to_change.withColumn("new_user_id", row_number().over(window_spec) + max_user_id - 1)
    df_with_new_ids = df_grouped.join(user_id_mapping, on="user_id", how="left")
    df_with_new_ids = df_with_new_ids.withColumn("user_id",
                                                when(col("new_user_id").isNotNull(), col("new_user_id"))
                                                .otherwise(col("user_id"))) \
                                    .drop("new_user_id")

    new_column_order = [col for col in df_grouped.columns if col != "user_id"]+ ["user_id"] 
    df_with_new_ids = df_with_new_ids.select(new_column_order)
    
    # output_df = data.filter(df.user_id.isin(user_ids))
    # final_df = output_df.coalesce(1)
    
    return write_df(df_with_new_ids,'Output/part1Output.txt')


################################################################################################################################
################################################# Question 2 ###################################################################
################################################################################################################################

def kmeans_clustering(df, n_clusters, max_iter):
    minhashes = []
    #for jaccard verification
    minhash_dict = {}
    user_ids = []
    final_buckets = {}
    for features in df.collect():
        shingles = shingle(features["features"], 5)
        m = MinHash(num_perm=128)
        for shingle_item in shingles:
            m.update(shingle_item.encode("utf8"))
        minhashes.append(m.hashvalues)
        minhash_dict[int(features["user_id"])] = m
        user_ids.append(int(features["user_id"]))

    kmeans = KMeans(n_clusters=n_clusters, max_iter=max_iter).fit(minhashes)

    user_clusters = dict(zip(user_ids, kmeans.labels_))
    final_buckets = {}
    for key, value in user_clusters.items():
        if value in final_buckets:
            final_buckets[value].append(key)
        else:
            final_buckets[value] = [key]

    return final_buckets, minhash_dict

def get_averege_jaccard_sim(final_buckets, minhashes,get = True):
    sims = {}
    for key, value in final_buckets.items():
        for user_id_1 in final_buckets[key]:
            for user_id_2 in final_buckets[key]:
                if user_id_1 != user_id_2:
                    sig_1 = minhashes[int(user_id_1)]
                    sig_2 = minhashes[int(user_id_2)]
                    sim = MinHash.jaccard(sig_1, sig_2)
                    if key not in sims:
                        sims[key] = [sim]
                    else:
                        sims[key].append(sim)
    total_sum = 0
    total_count = 0
    sims = dict(sorted(sims.items()))
    if get == True:
        for key, value in sims.items():
            avg_sim = average(value)
            print(key, avg_sim)
            total_sum += sum(value)
            total_count += len(value)
        
        overall_average = total_sum / total_count if total_count != 0 else 0
        print("Overall Average Jaccard Similarity:", overall_average)

    return sims


In [161]:
def dfs_paths(graph, start):
    def dfs_helper(node, current_path, all_paths, visited):
        visited.add(node)
        current_path.append(node)
        
        if node not in graph or not graph[node]:  # If node has no children or is a leaf node
            all_paths.append(current_path[:])  # Append a copy of current_path
        else:
            for neighbor in graph[node]:
                if neighbor not in visited:
                    dfs_helper(neighbor, current_path, all_paths, visited)
        
        current_path.pop()  # Backtrack
        visited.remove(node)  # Remove node from visited set to allow other paths
    
    all_paths = []
    visited = set()  # Set to track visited nodes
    dfs_helper(start, [], all_paths, visited)
    return all_paths

def all_similar(graph, start_node):
    all_paths = dfs_paths(graph, start_node)
    return list(set([key for path in all_paths for key in path]))



def bucketing(old_replacement_candidates):
    replacement_candidates = remove_dups(old_replacement_candidates)
    visited = set()
    new_process_dictionary = {}
    for key in replacement_candidates.keys():
        if key not in visited:
            sim = all_similar(replacement_candidates,key)
            new_process_dictionary[key] = sim
            visited.add(key)
            for s in sim:
                visited.add(s)
        
    return new_process_dictionary


{1: [1, 2, 5], 3: [3, 4], 6: [6], 7: [7]}

In [252]:


def write_df(df, file_name):
    # Ensure the Output directory exists
    os.makedirs('Output', exist_ok=True)
    
    # Ensure 'user_id' is on the last column
    columns = df.columns
    columns.remove('user_id')
    new_column_order = columns + ['user_id']
    df = df.select(new_column_order)
    
    # Coalesce to a single partition and write the DataFrame to a text file
    df.coalesce(1).write.mode("overwrite").option("header", "true").csv('temp/temp_output')
    
    # Find the part file and move it to the desired location
    part_file = [f for f in os.listdir('temp/temp_output') if f.startswith("part-")][0]
    shutil.move(os.path.join('temp/temp_output', part_file), file_name)
    shutil.rmtree('temp/temp_output')

def output_part1(dataset, k, threshold):
    spark = SparkSession.builder.appName("OutputUserIDs").getOrCreate()
    data = spark.read.csv(dataset, header=True, inferSchema=True)
    df_filtered_m = data.filter(data.type.isin(['Req']))
    df_grouped = df_filtered_m.groupBy("user_id").agg(concat_ws("", collect_list("to")).alias("features"))
    
    replacement_candidates = minhash_lsh(df_grouped, k, threshold)
    # print('candidates')
    # print(replacement_candidates)
    # new_process_dictionary = bucketing(replacement_candidates[0])    
    
    # print('final')
    # print(new_process_dictionary)
    # user_ids_to_change = [key for key in new_process_dictionary.keys()]#[key for key, values in new_process_dictionary.items() if len(values) > 1 or ]
    # # for key, values in new_process_dictionary.items():
    # #     if len(values) >1:
    # #         user_ids_to_change.append(key)
    # #     elif len(values) == 1 and values[0]!= key:
    # #         user_ids_to_change.append(key)

    # user_ids_to_delete = []
    # for id in user_ids_to_change:
    #     for case in new_process_dictionary[id]:
    #         if case not in user_ids_to_change:
    #             user_ids_to_delete.append(case)
    # print('unique cases')
    # print(user_ids_to_change)
    # print('variations')
    # print(user_ids_to_delete)
    # max_user_id = df_grouped.select(spark_max("user_id")).collect()[0][0]


    # df_to_change = df_grouped.filter(col("user_id").isin(user_ids_to_change))
    # distinct_user_ids_to_change = df_to_change.select("user_id").distinct()
    # window_spec = Window.orderBy("user_id")
    # user_id_mapping = distinct_user_ids_to_change.withColumn("new_user_id", row_number().over(window_spec) + max_user_id - 1)
    
    # df_with_new_ids = data.join(user_id_mapping, on="user_id", how="left")
    # df_with_new_ids = df_with_new_ids.withColumn("user_id",
    #                                              when(col("new_user_id").isNotNull(), col("new_user_id"))
    #                                              .otherwise(col("user_id"))) \
    #                                  .drop("new_user_id")

    # # Ensure 'user_id' is on the last column
    # columns = df_with_new_ids.columns
    # columns.remove('user_id')
    # new_column_order = columns + ['user_id']
    # df_with_new_ids = df_with_new_ids.select(new_column_order)

    # # Write DataFrame to text file
    # #write_df(df_with_new_ids, 'Output/part1Output.txt')
    return replacement_candidates[0]

#generate_dataset(tasks, 1000,start_time,end_time,file_name="dataset1")  



In [313]:
def remove_dups(graph):
    for key,values in graph.items():
        if key in graph[key]:
            values.remove(key)
            graph[key] = values
    return graph

def remove_node_from_neighbors(graph, node_to_remove):
    if node_to_remove in graph:
        # Remove node_to_remove from all adjacency lists in the graph
        for node, neighbors in graph.items():
            if node != node_to_remove:  # Skip the node_to_remove itself
                if node_to_remove in neighbors:
                    neighbors.remove(node_to_remove)

def dfs(graph, start, visited):
    stack = [start]
    result = []
    while stack:
        node = stack.pop()
        if node not in visited:
            visited.add(node)
            result.append(node)
            stack.extend(graph[node])
    return result

def transform_graph(graph):
    reachable_dict = {}
    for node in graph:
        visited = set()
        reachable_nodes = dfs(graph, node, visited)
        reachable_dict[node] = [n for n in reachable_nodes if n != node]  # Exclude the start node itself
    
    # Filter out keys that also appear as values
    keys_to_remove = set()
    for node, neighbors in graph.items():
        keys_to_remove.update(neighbors)
    
    final_dict = {k: v for k, v in reachable_dict.items() if k not in keys_to_remove}
    return final_dict

def bucketing(old_replacement_candidates):
    alt_old_replacement_candidates = old_replacement_candidates.copy()
    replacement_candidates = remove_dups(alt_old_replacement_candidates)
    print('no dups')
    print(replacement_candidates)
    visited = set()
    for node in replacement_candidates.keys():
        if node not in visited:
            remove_node_from_neighbors(replacement_candidates, node)
        for neighbour in replacement_candidates[node]:
            visited.add(neighbour)
    # print('no neighbours')
    # print(replacement_candidates)
    new_process_dictionary = transform_graph(replacement_candidates)
    print('final')
    print(new_process_dictionary)
    
        
    return new_process_dictionary

generate_dataset(tasks, 1000,start_time,end_time,file_name="dataset1",random=False,connect=False)  
#print('minhash')
ans = output_part1("data/SDG_dataset1.csv", 5, 0.6)
#print(ans[39])
print(ans)
ans2 = bucketing(ans)

smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
smth wrong
{31: [1, 3, 8, 17, 18, 20, 31], 34: [34, 35, 9, 12, 25, 27, 29], 28: [32, 5, 6, 18, 28, 29], 26: [32, 26], 27: [1, 34, 6, 12, 17, 24, 27], 12: [34, 35, 12, 22, 27], 22: [11, 12, 13, 22], 1: [1, 6, 16, 17, 27, 31], 13: [9, 13, 22], 6: [33, 1, 4, 5, 6, 17, 27, 28], 16: [1, 15, 16, 17, 20], 3: [3, 8, 17, 18, 30, 31], 20: [35, 16, 20, 21, 31], 5: [32, 4, 5, 6, 7, 10, 14, 25, 28], 19: [33, 19], 15: [16, 33, 11, 15], 9: [24, 9, 34, 13], 17: [1, 3, 6, 8, 16, 17, 18, 27, 31], 35: [32, 34, 35, 12, 20, 21], 4: [4, 5, 6, 7, 10, 14], 8: [3, 8, 10, 11, 17, 18, 24, 30, 31], 23: [18, 23], 7: [4, 5, 7, 14, 18, 25

In [305]:
test = ans[34]
print(test)
for value in test:
    new_values = ans[value]
    for new_value in new_values:
        if new_value not in ans2[34]:
            print('False')
            print(new_value)
print(ans2[34])

[33, 2, 37, 42, 14, 21, 29]
[29, 40, 28, 44, 25, 30, 36, 42, 39, 38, 35, 24, 21, 18, 27, 17, 37, 14, 8, 19, 20, 15, 16, 10, 33, 32, 13, 11, 12, 9, 3, 7, 23, 5, 1, 4, 2]


In [309]:
print(get_traces(351,data))

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `features` cannot be resolved. Did you mean one of the following? [`type`, `to`, `from`, `timestamp`, `user_id`].;
'Project ['features]
+- Filter (user_id#6702 = 351)
   +- Relation [from#6698,to#6699,timestamp#6700,type#6701,user_id#6702] csv


In [121]:
def dfs_paths(graph, start):
    def dfs_helper(node, current_path, all_paths):
        current_path.append(node)
        
        if node not in graph or not graph[node]:  # If node has no children
            all_paths.append(current_path[:])  # Append a copy of current_path
        else:
            for neighbor in graph[node]:
                dfs_helper(neighbor, current_path, all_paths)
        
        current_path.pop()  # Backtrack
    
    all_paths = []
    dfs_helper(start, [], all_paths)
    return all_paths

# Example graph with integer keys
graph = {
    1: [2, 3],
    2: [4, 5],
    3: [6],
    4: [7],
    5: [],
    6: []
}

# Example usage:
start_node = 1
all_paths = dfs_paths(graph, start_node)

# Print all DFS paths starting from start_node
for path in all_paths:
    print(f"DFS path from {start_node}: {path}")


DFS path from 1: [1, 2, 4, 7]
DFS path from 1: [1, 2, 5]
DFS path from 1: [1, 3, 6]


In [25]:
def write_df(df, file_name):
    # Ensure the Output directory exists
    os.makedirs('Output', exist_ok=True)
    
    # Coalesce to a single partition and write the DataFrame to a text file
    df.coalesce(1).write.mode("overwrite").option("header", "true").csv('temp/temp_output')
    
    # Find the part file and move it to the desired location
    part_file = [f for f in os.listdir('temp/temp_output') if f.startswith("part-")][0]
    shutil.move(os.path.join('temp/temp_output', part_file), file_name)
    shutil.rmtree('temp/temp_output')



def output_part1(dataset, k, threshold):
    data = spark.read.csv(dataset, header=True, inferSchema=True)
    df_filtered_m = data.filter(data.type.isin(['Req']))
    df_grouped = df_filtered_m.groupBy("user_id").agg(concat_ws("", collect_list("to")).alias("features"))
    
    replacement_candidates = minhash_lsh(df_grouped, k, threshold)
    new_process_dictionary = bucketing(replacement_candidates[0])    

    user_ids_to_change = [key for key, values in new_process_dictionary.items() if values != []]
    max_user_id = df_grouped.select(spark_max("user_id")).collect()[0][0]

    df_to_change = df_grouped.filter(col("user_id").isin(user_ids_to_change))
    distinct_user_ids_to_change = df_to_change.select("user_id").distinct()
    window_spec = Window.orderBy("user_id")
    user_id_mapping = distinct_user_ids_to_change.withColumn("new_user_id", row_number().over(window_spec) + max_user_id - 1)
    
    df_with_new_ids = data.join(user_id_mapping, on="user_id", how="left")
    df_with_new_ids = df_with_new_ids.withColumn("user_id",
                                                 when(col("new_user_id").isNotNull(), col("new_user_id"))
                                                 .otherwise(col("user_id"))) \
                                     .drop("new_user_id")

    # Select columns with 'user_id' on the right side
    # new_column_order = [col for col in df_grouped.columns if col != "user_id"] + ["user_id"]
    # df_with_new_ids = df_with_new_ids.select(new_column_order)

    # Write DataFrame to text file
    write_df(df_with_new_ids, 'Output/part1Output.txt')

# Ensure to call the function with the appropriate parameters
output_part1("data/SDG_dataset1.csv", 5, 0.98)


24/06/25 10:16:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 10:16:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 10:16:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 10:16:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 10:16:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 10:16:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [162]:
spark = SparkSession.builder.getOrCreate()
output_part1("data/SDG_dataset1.csv",7,0.97)

candidates
({1: [1], 3: [3], 4: [4], 2: [2]}, {1: <datasketch.minhash.MinHash object at 0x7f7f410c4f40>, 3: <datasketch.minhash.MinHash object at 0x7f7f50e02f40>, 4: <datasketch.minhash.MinHash object at 0x7f7f60e6ed90>, 2: <datasketch.minhash.MinHash object at 0x7f7f60e6e430>})
final
{1: [1], 3: [3], 4: [4], 2: [2]}
unique cases
[1, 3, 4, 2]
variations
[]


24/06/25 12:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 12:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 12:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 12:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 12:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 12:00:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


# Tests

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number, when
from pyspark.sql.window import Window

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Replace user_ids") \
    .getOrCreate()

# Read data from CSV file
# file_path = "data/SDG_dataset2.csv"  # Adjust the file path as needed
# df = spark.read.csv(file_path, header=True, inferSchema=True)

data = [
    (1, 'info1', 'info2', 'info3', 'info4', 'info5'),
    (2, 'info6', 'info7', 'info8', 'info9', 'info10'),
    (1, 'info11', 'info12', 'info13', 'info14', 'info15'),
    (3, 'info16', 'info17', 'info18', 'info19', 'info20'),
    (2, 'info21', 'info22', 'info23', 'info24', 'info25')
]
df = spark.createDataFrame(data, ['user_id', 'col2', 'col3', 'col4', 'col5', 'col6'])

# Define the list of user_ids to be changed and the starting value for new user IDs
user_ids_to_change = [1]  # Example user_ids to be changed
start_value = 1000

# Filter the DataFrame to get only rows with the specified user_ids
df_to_change = df.filter(col("user_id").isin(user_ids_to_change))

# Get distinct user_ids to change
distinct_user_ids_to_change = df_to_change.select("user_id").distinct()

# Create a window specification to order the distinct user_ids to change
window_spec = Window.orderBy("user_id")

# Assign new user_ids starting from start_value
user_id_mapping = distinct_user_ids_to_change.withColumn("new_user_id", row_number().over(window_spec) + start_value - 1)

# Show the mapping of old user_id to new_user_id
#print("User ID Mapping:")
#user_id_mapping.show()

# Join the original DataFrame with the user_id mapping to get new_user_id for the specified user_ids
df_with_new_ids = df.join(user_id_mapping, on="user_id", how="left")

# Replace old user_id with new_user_id where applicable, keep original user_id otherwise
df_with_new_ids = df_with_new_ids.withColumn("user_id",
                                             when(col("new_user_id").isNotNull(), col("new_user_id"))
                                             .otherwise(col("user_id"))) \
                                 .drop("new_user_id")
new_column_order = [col for col in df.columns if col != "user_id"]+ ["user_id"] 
df_with_new_ids = df_with_new_ids.select(new_column_order)

# Show the resulting DataFrame with new user_ids
#print("DataFrame with New User IDs:")
df_with_new_ids.show()

# Stop SparkSession
#spark.stop()


+------+------+------+------+------+-------+
|  col2|  col3|  col4|  col5|  col6|user_id|
+------+------+------+------+------+-------+
| info1| info2| info3| info4| info5|   1000|
| info6| info7| info8| info9|info10|      2|
|info11|info12|info13|info14|info15|   1000|
|info16|info17|info18|info19|info20|      3|
|info21|info22|info23|info24|info25|      2|
+------+------+------+------+------+-------+



24/06/25 09:36:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 09:36:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 09:36:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 09:36:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 09:36:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 09:36:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/25 0

# Experiments

In [108]:
spark = SparkSession.builder.getOrCreate()

data = spark.read.csv("data/SDG_dataset2.csv", header=True, inferSchema=True)
df_filtered_m = data.filter(data.type.isin(['Req']))
df_grouped = df_filtered_m.groupBy("user_id").agg(concat_ws("",collect_list("to")).alias("features"))

shingles_udf = udf(shingle, ArrayType(StringType()))
df_shingles = df_filtered_m.groupBy("user_id").agg(concat_ws("", collect_list("to")).alias("trace")) \
    .withColumn("shingles", shingles_udf(col("trace"))) \
    .select("user_id", "shingles")

#data.show()
#df_filtered_m.show()
#df_grouped.show()
#df_shingles.show()

## Question 1

### Paramter-tuning for k-shingles

First we're going to see which k is the best for the k-shingles. We gonna do such a thing by investigating how computational expensive it is to compute such minhash for different values of k

In [None]:
results = get_performance(minhash_lsh,bucketing, [3,4,5,6,7,8,9])
plot_results(results)
plot_performances(results)

By running this a considerable amount of times we saw that the value that get a better balance between time and CPU usage is when k=7.

### Parameter-tuning for the threshold

In [11]:
average_num_shingles = df_shingles.withColumn("list_length", size(col("shingles"))) \
                     .agg(avg("list_length").alias("average_list_length")).collect()[0][0]
average_num_shingles
#spark.stop()

                                                                                

32.50138338098312

Given that the average number of 7-shingles is 32, and we want to group processes with only small variations, we want that 31/32 shingles to be the same, so that we still allow for some small variations.

In [109]:
print(f"Initial number of cases: {df_grouped.count()}")
ans = minhash_lsh(df_grouped,7,0.97)
replacement_candidates7, minhash_dic = ans[0],ans[1]
new_process_dictionary7= bucketing(replacement_candidates7)
print(f"Number of unique processes after merging them with 0.97 threshold using 7-shingles: {len(new_process_dictionary7)}")

Initial number of cases: 43372


KeyboardInterrupt: 

After merging the processes with a threshold of 97%, using 7-shingles, we obtain 29729 candidate unique cases. In order to investigate if further analysis into the similarities needs to be done, to make sure that the false positives do not result in cases where the cases are not small variations of each other, we are going to compute the average similarities of all buckets and investigate the mininum

In [7]:
sims = get_averege_jaccard_sim(replacement_candidates7, minhash_dic,get=False)

It's important to mention that we're still just computing the approximate jaccard similarities provided by MinHashLSH, so, in order to investigate the cases that have the smallest approximate jaccard similarities, we're going to compute the actual similarities between those cases

In [8]:
ans = min(set(value for key,values in sims.items() for value in values if value != 1.0))
final_values = []
for key,values in sims.items():
    for value in values:
        if value == ans:
            final_values.append(key)

dissimilar = set(final_values)

In [23]:
new_sims = []
for key in dissimilar:
    for value in replacement_candidates7[key]:
        new_sims.append((key,value,jaccard_similarity(get_shingles(value),get_shingles(key))))
investigate = [case for case in new_sims if case[-1]!=1.0]

In [30]:
for case in investigate:
    print(f'######################### {case[0]} vs {case[1]} ################################')
    print(get_traces(case[0],df_grouped))
    print(get_traces(case[1],df_grouped))
    print('#######################################################################')

######################### 29289 vs 3819 ################################
S0S4S4_1S3S3_1S3_2S3_3S2S2_2
S0S4S4_1S3S3_1S3_2S3_3S2S2_4
#######################################################################
######################### 7690 vs 3819 ################################
S0S4S4_1S3S3_1S3_2S3_3S2S2_2
S0S4S4_1S3S3_1S3_2S3_3S2S2_4
#######################################################################
######################### 3819 vs 29289 ################################
S0S4S4_1S3S3_1S3_2S3_3S2S2_4
S0S4S4_1S3S3_1S3_2S3_3S2S2_2
#######################################################################
######################### 3819 vs 7690 ################################
S0S4S4_1S3S3_1S3_2S3_3S2S2_4
S0S4S4_1S3S3_1S3_2S3_3S2S2_2
#######################################################################
######################### 3819 vs 35188 ################################
S0S4S4_1S3S3_1S3_2S3_3S2S2_4
S0S4S4_1S3S3_1S3_2S3_3S2S2_2
###########################################################

24/06/25 01:51:36 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3644304 ms exceeds timeout 120000 ms
24/06/25 01:51:36 WARN SparkContext: Killing executors is not supported by current scheduler.
24/06/25 01:51:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

In [None]:
# output_part1("data/SDG_dataset2.csv",3,0.95)
# output_part1("data/SDG_dataset2.csv",5,0.95)
output_part1("data/SDG_dataset2.csv",7,0.97)

## Question 2

### Parameter tuning for Kmeans

In [None]:
from sklearn.model_selection import train_test_split, GridSearchCV

minhashes = []
user_ids = []
final_buckets = {}
for features in df_grouped.collect():
    shingles = shingle(features["features"], 5)
    m = MinHash(num_perm=128)
    for shingle_item in shingles:
        m.update(shingle_item.encode("utf8"))
    minhashes.append(m.hashvalues)
    user_ids.append(int(features["user_id"]))

param_grid = {
    'n_clusters': [100, 250, 500],
    'max_iter': [100, 500, 1000],
}

kmeans = KMeans()

grid_search = GridSearchCV(kmeans, param_grid, cv=5)

grid_search.fit(minhashes)

best_param = grid_search.best_params_
best_model = grid_search.best_estimator_

print("The best parameters are: " , best_param )
print("The best model is: ", best_model)

### Step 1: Find and merge

In [None]:
replacement_candidates = minhash_lsh(df_grouped,5,0.7)
new_process_dictionary = bucketing(replacement_candidates)
print(len(replacement_candidates))
print(len(new_process_dictionary))

### STEP 2: Find/cluster similar items

In [None]:
#from sklearn.model_selection import train_test_split, GridSearchCV

users = []
for key in new_process_dictionary:
    users.append(key)

filtered_df = df_grouped[df_grouped['user_id'].isin(users)]

final_buckets, minhashes = kmeans_clustering(filtered_df,500,100)

### Verification

In [None]:
get_averege_jaccard_sim(final_buckets, minhashes)


# Old

In [None]:
#print(f"Initial number of cases: {df_grouped.count()}")
# replacement_candidates3 = minhash_lsh(df_grouped,3,0.98)
# new_process_dictionary3 = bucketing(replacement_candidates3)
#print(f"After merging cases with threshold 3-shingles: {len(new_process_dictionary3)}")

In [None]:
# df_split = df_grouped.withColumn("feature_length", size(split(col("features"), "S")))
# average_length = df_split.agg(avg("feature_length")).collect()[0][0]

# windowSpec = Window.partitionBy(F.lit(1)).orderBy("feature_length")
# df_split = df_split.withColumn("row_number", F.row_number().over(windowSpec))
# total_count = df_split.count()

# if total_count % 2 == 0:
#     median_index1 = total_count // 2
#     median_index2 = median_index1 + 1
#     median_value = df_split.filter(col("row_number").isin([median_index1, median_index2])) \
#                                   .agg(avg("feature_length")).collect()[0][0]
# else:
#     median_index = (total_count // 2) + 1
#     median_value = df_split.filter(col("row_number") == median_index) \
#                                   .select("feature_length").collect()[0][0]
    
# print(f"Average number of requests: {average_length}")
# print(f"Median number of requests: {median_value}")


#Given that both the average and the median are close to 13, which shows that the dataset is symetricly distributed when it comes to how many 
#requests are performed, we gonna assume that two cases have a small variation iif the number of different requests is around 1. To get an 
# #approximation of the threshold, we're going to use the resutls shown before, so we assume that 12/13 are the same. Given this, we decided 
# to use a threshold of 95%