In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list, trim, array, explode, udf, monotonically_increasing_id
from pyspark.sql.types import IntegerType, ArrayType, StringType
from pyspark.ml.feature import MinHashLSH, HashingTF
from collections import defaultdict
import logging
import tempfile
import os
import editdistance
from tqdm import tqdm

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

warehouse_location = tempfile.mkdtemp()

def set_env_vars():
    python_path = os.path.join(os.environ['CONDA_PREFIX'], 'python.exe')
    os.environ['PYSPARK_PYTHON'] = python_path
    os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

# Set environment variables
set_env_vars()

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ProcessGrouping") \
    .master("local[*]") \
    .config("spark.sql.warehouse.dir", f"file:///{warehouse_location}") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# Define paths
log_file_path = "C:/Users/dcave/Documents/DIS_project/DIS_project/data/server_log.txt"
part1_output_path = 'C:/Users/dcave/Documents/DIS_project/part1Output.txt'
part1_observations_path = 'C:/Users/dcave/Documents/DIS_project/part1Observations.txt'
part2_observations_path = 'C:/Users/dcave/Documents/DIS_project/part2Observations.txt'

logging.info("Starting to read the log file")

# Read log to df
log_df = spark.read.option("header", "false").csv(log_file_path)
log_df = log_df.limit(100000)
log_df = log_df.withColumnRenamed("_c0", "from_server") \
               .withColumnRenamed("_c1", "to_server") \
               .withColumnRenamed("_c2", "time") \
               .withColumnRenamed("_c3", "action") \
               .withColumnRenamed("_c4", "process_id")

logging.info("Completed reading and renaming columns of the log file")

# Convert columns to appropriate data types
log_df = log_df.withColumn("time", col("time").cast(IntegerType())) \
               .withColumn("process_id", col("process_id").cast(IntegerType())) \
               .withColumn("action", trim(col("action")))
logging.info("Converted columns to appropriate data types")

# Cache the DataFrame after sampling and transformations
log_df.cache()
logging.info("Cached the log DataFrame")

# Group by process_id and collect log entries
processes_df = log_df.groupBy("process_id").agg(collect_list(array(col("from_server"), col("to_server"), col("time"), col("action"))).alias("events"))
logging.info("Grouped by process_id and collected log entries")

# Cache the grouped DataFrame
processes_df.cache()
logging.info("Cached the processes DataFrame")

# Convert events to server sequences ignoring timestamps
def concat_events(events):
    return ["{}_{}".format(event[0], event[1]) for event in events]

concat_events_udf = udf(concat_events, ArrayType(StringType()))
processes_df = processes_df.withColumn("server_sequence", concat_events_udf(col("events")))

# Transform data using HashingTF for MinHashLSH
hashingTF = HashingTF(inputCol="server_sequence", outputCol="features", numFeatures=1000)
featurized_df = hashingTF.transform(processes_df)
logging.info("Transformed features using HashingTF")

# Apply MinHash LSH
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
model = mh.fit(featurized_df)
logging.info("Fitted MinHashLSH model")

# Transform data
transformed_df = model.transform(featurized_df)
logging.info("Transformed data using MinHashLSH model")

# Approximate similarity join
candidate_pairs = model.approxSimilarityJoin(transformed_df, transformed_df, 0.3, distCol="JaccardDistance") \
    .select(col("datasetA.process_id").alias("pid1"),
            col("datasetB.process_id").alias("pid2"),
            col("JaccardDistance"))

logging.info("Calculated Jaccard similarity and generated candidate pairs")

# Filter self-joins
candidate_pairs_filtered = candidate_pairs.filter("pid1 < pid2")

# Collect process events as a dictionary
process_events_dict = processes_df.select("process_id", "server_sequence", "events").rdd.map(lambda row: (row["process_id"], (row["server_sequence"], row["events"]))).collectAsMap()
process_events_broadcast = spark.sparkContext.broadcast(process_events_dict)

# Function to calculate edit distance using the editdistance package
def calculate_edit_distance(pid1, pid2, process_events_dict):
    seq1 = process_events_dict[pid1][0]
    seq2 = process_events_dict[pid2][0]
    return editdistance.eval(seq1, seq2)

# Filter candidate pairs for part 2 (edit distance < 6)
def filter_candidates_part2(row):
    pid1, pid2 = row["pid1"], row["pid2"]
    edit_dist = calculate_edit_distance(pid1, pid2, process_events_broadcast.value)
    if edit_dist < 6:
        return (pid1, pid2)
    return None

filtered_candidates_part2_rdd = candidate_pairs_filtered.rdd.map(filter_candidates_part2).filter(lambda x: x is not None)
filtered_candidates_part2 = filtered_candidates_part2_rdd.collect()
logging.info("Filtered candidates for part 2 based on edit distance < 6")

# Further filter the part 2 candidates for part 1 (edit distance < 3)
def filter_candidates_part1(row):
    pid1, pid2 = row
    edit_dist = calculate_edit_distance(pid1, pid2, process_events_broadcast.value)
    if edit_dist < 3:
        return (pid1, pid2)
    return None

filtered_candidates_part1_rdd = spark.sparkContext.parallelize(filtered_candidates_part2).map(filter_candidates_part1).filter(lambda x: x is not None)
filtered_candidates_part1 = filtered_candidates_part1_rdd.collect()
logging.info("Filtered candidates for part 1 based on edit distance < 3")

# Group similar pairs to form clusters
def merge_clusters(clusters):
    merged_groups = []
    seen = set()
    for key, group in clusters.items():
        if key not in seen:
            merged_group = {key} | set(group)
            to_merge = [g for g in merged_groups if g & merged_group]
            for g in to_merge:
                merged_group |= g
                merged_groups.remove(g)
            merged_groups.append(merged_group)
            seen.update(merged_group)
    return merged_groups

def form_clusters(candidate_pairs):
    clusters_rdd = spark.sparkContext.parallelize(candidate_pairs).groupByKey().mapValues(list).collect()
    clusters_dict = defaultdict(set)
    for k, v in clusters_rdd:
        clusters_dict[k].update(v)
        for val in v:
            clusters_dict[val].add(k)
            clusters_dict[val].update(v)
    return merge_clusters(clusters_dict)

merged_groups_part1 = form_clusters(filtered_candidates_part1)
merged_groups_part2 = form_clusters(filtered_candidates_part2)

logging.info("Merged similar groups into clusters")

# Generate part1Output.txt
def generate_part1_output(merged_groups, output_file_path):
    with open(output_file_path, 'w') as file:
        new_id = 1
        for group in merged_groups:
            merged_pids = ",".join(map(str, group))
            file.write(f"{new_id}:{merged_pids}\n")
            new_id += 1
    logging.info(f"Generated part1Output.txt: {output_file_path}")

generate_part1_output(merged_groups_part1, part1_output_path)

# Generate part1Observations.txt
def generate_part1_observations(merged_groups, output_file_path):
    with open(output_file_path, 'w') as file:
        for group in merged_groups:
            file.write(f"Group: {group}\n")
            for pid in group:
                file.write(f"{pid}:\n")
                _, events = process_events_dict[pid]
                for event in events:
                    file.write(f"<{event[0]}, {event[1]}, {event[2]}, {event[3]}, {pid}>\n")
            file.write("\n")
    logging.info(f"Generated part1Observations.txt: {output_file_path}")

generate_part1_observations(merged_groups_part1, part1_observations_path)

# Generate part2Observations.txt
def generate_part2_observations(merged_groups, output_file_path):
    with open(output_file_path, 'w') as file:
        for group in merged_groups:
            file.write(f"Group: {group}\n")
            for pid in group:
                file.write(f"{pid}:\n")
                _, events = process_events_dict[pid]
                for event in events:
                    file.write(f"<{event[0]}, {event[1]}, {event[2]}, {event[3]}, {pid}>\n")
            file.write("\n")
    logging.info(f"Generated part2Observations.txt: {output_file_path}")

generate_part2_observations(merged_groups_part2, part2_observations_path)

# Close Spark session
spark.stop()

logging.info("Process completed.")



2024-06-27 15:33:15,864 - INFO - Starting to read the log file
2024-06-27 15:33:18,230 - INFO - Completed reading and renaming columns of the log file
2024-06-27 15:33:18,264 - INFO - Converted columns to appropriate data types
2024-06-27 15:33:18,300 - INFO - Cached the log DataFrame
2024-06-27 15:33:18,331 - INFO - Grouped by process_id and collected log entries
2024-06-27 15:33:18,365 - INFO - Cached the processes DataFrame
2024-06-27 15:33:18,471 - INFO - Transformed features using HashingTF
2024-06-27 15:33:18,485 - INFO - Fitted MinHashLSH model
2024-06-27 15:33:18,502 - INFO - Transformed data using MinHashLSH model
2024-06-27 15:33:18,643 - INFO - Calculated Jaccard similarity and generated candidate pairs
2024-06-27 15:36:15,339 - INFO - Filtered candidates for part 2 based on edit distance < 6
2024-06-27 15:36:25,760 - INFO - Filtered candidates for part 1 based on edit distance < 3
2024-06-27 15:39:20,923 - INFO - Merged similar groups into clusters
2024-06-27 15:39:20,925 -

In [5]:
import sys
print("Python version:", sys.version)
print("Environment path:", sys.executable)

!pip list | grep -E 'pyspark|editdistance|tqdm'
!pip list




Python version: 3.11.7 | packaged by Anaconda, Inc. | (main, Dec 15 2023, 18:05:47) [MSC v.1916 64 bit (AMD64)]
Environment path: C:\Users\dcave\anaconda3\python.exe


'grep' is not recognized as an internal or external command,
operable program or batch file.


Package                           Version
--------------------------------- ------------
aext-assistant                    4.0.15
aext-assistant-server             4.0.15
aext-core                         4.0.15
aext-core-server                  4.0.15
aext-panels                       4.0.15
aext-panels-server                4.0.15
aext-share-notebook               4.0.15
aext-share-notebook-server        4.0.15
aext-shared                       4.0.15
aiobotocore                       2.7.0
aiohttp                           3.9.3
aioitertools                      0.7.1
aiosignal                         1.2.0
alabaster                         0.7.12
altair                            5.0.1
anaconda-anon-usage               0.4.3
anaconda-catalogs                 0.2.0
anaconda-client                   1.12.3
anaconda-cloud-auth               0.5.1
anaconda-navigator                2.6.1
anaconda-project                  0.11.1
anyio                             4.2.0
appdirs            