# Task 1

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pandas as pd 
import numpy as np 
from pyspark.sql.functions import split, col, lit, regexp_replace, collect_list, explode, concat, collect_set, array_union, flatten
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import ArrayType, StringType, BooleanType

import time
import random
import subprocess

### Configure Spark

In [2]:
partition = 32

In [3]:
spark = SparkSession.builder \
    .appName("Projet-Task-1") \
    .master("local[*]") \
    .config("spark.driver.memory", "12G") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.executor.memory", "6G") \
    .config("spark.executor.memoryOverhead", "2G") \
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:InitiatingHeapOccupancyPercent=35") \
    .config("spark.driver.extraJavaOptions", "-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -XX:InitiatingHeapOccupancyPercent=35") \
    .getOrCreate()
# spark.sparkContext.setLogLevel("DEBUG")
spark.conf.set("spark.sql.shuffle.partitions", partition)
spark

24/06/30 07:45:25 WARN Utils: Your hostname, abha-ThinkPad-P14s-Gen-4 resolves to a loopback address: 127.0.1.1; using 192.168.178.94 instead (on interface wlp2s0)
24/06/30 07:45:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/30 07:45:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# spark.stop()

### Read Data and Clean Logs

In [5]:
df = spark.read.text("data.txt").toDF("Log")

In [6]:
df = df.withColumn("Log", regexp_replace(col("Log"), "[<>]", ""))
df = df.withColumn("Log", regexp_replace(col("Log"), ",", ""))
df = df.withColumn("Log", split(col("Log"), " "))

columns = ["First Server", "Second Server", "Date", "Time", "Communication Type", "Process ID"]

for i in range(len(columns)):
    df = df.withColumn(columns[i], col("Log")[i])

df = df.withColumn("Timestamp", concat(col("Date"),lit(""), col("Time")))

In [7]:
# Remove Process ID from request 
log = udf(lambda x: x[:-1], ArrayType(StringType())) 

def remove_timestamp(log):
    return log[:2] + log[4:]

log_timestamp = udf(remove_timestamp, ArrayType(StringType()))

df = df.withColumn('Log', log('Log')) 
df = df.withColumn('Log', log_timestamp('Log'))

### Group Log Events by Process ID

In [8]:
grouped_df = df.groupBy("Process ID").agg(collect_list("Log").alias("Log"), 
                                          collect_list("First Server").alias("First Server"),
                                          collect_list("Second Server").alias("Second Server"),
                                          collect_list("Communication Type").alias("FCommunication Type"),
                                          collect_list("Timestamp").alias("Timestamp"))

grouped_df.printSchema()

root
 |-- Process ID: string (nullable = true)
 |-- Log: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)
 |-- First Server: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- Second Server: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- FCommunication Type: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- Timestamp: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [9]:
distinct_servers_df = df.groupBy("Process ID").agg(collect_set("First Server").alias("First Server"),
                                                   collect_set("Second Server").alias("Second Server"))

distinct_servers_df = distinct_servers_df.withColumn("Servers", array_union("First Server", "Second Server"))

### Build Characteristic Matrix 

In [10]:
characteristics = CountVectorizer(inputCol="Servers", outputCol="Characteristic Matrix")

model = characteristics.fit(distinct_servers_df)
char_matrix = model.transform(distinct_servers_df).select("Process ID", "Characteristic Matrix")

char_matrix.show()

servers = model.vocabulary

+----------+---------------------+
|Process ID|Characteristic Matrix|
+----------+---------------------+
|        56| (106,[0,2,17,68,9...|
|        73| (106,[0,2,13,16],...|
|        16| (106,[0,1,4,9,18,...|
|        35| (106,[0,1],[1.0,1...|
|        62| (106,[0,2],[1.0,1...|
|        64| (106,[0,1],[1.0,1...|
|        89| (106,[0,1,17,18,2...|
|        12| (106,[0,2,3,6,21,...|
|        27| (106,[0,1],[1.0,1...|
|        68| (106,[0,2],[1.0,1...|
|         0| (106,[0,2,3,6,14,...|
|         6| (106,[0,2],[1.0,1...|
|        10| (106,[0,1,3],[1.0...|
|        24| (106,[0,1],[1.0,1...|
|        30| (106,[0,2,5,40,59...|
|        33| (106,[0,1,8],[1.0...|
|        46| (106,[0,1,4,6,7,3...|
|        50| (106,[0,1],[1.0,1...|
|        82| (106,[0,2,11,12,2...|
|        83| (106,[0,2,7,33,38...|
+----------+---------------------+
only showing top 20 rows



### Generate MinHash

In [11]:
minhash = MinHashLSH(inputCol="Characteristic Matrix", outputCol="Signatures", numHashTables=5)

# MinHash produces the signatures for the Characteritic matrix 
# numvHashTables is the number of the hash functioms that we want to use and the lenght of the signature 
model = minhash.fit(char_matrix)
signatures = model.transform(char_matrix)

### Find Similar Pairs

In [12]:
# approxSimilarityJoin uses LSH automatically to find rows that it is most likely 
# to have same "Signatures"
# threshold: pairs with Jaccard Distance lower than threshlod
similar_pairs = model.approxSimilarityJoin(signatures, signatures, threshold=0.01, distCol="Jaccard Distance")

In [13]:
similar_pairs = similar_pairs.select("datasetA.Process ID", "datasetB.Process ID", 
                     "Jaccard Distance") \
                    # .filter((col("datasetA.Process ID") != col("datasetB.Process ID")))

In [14]:
new_cols = ["Process ID A", "Process ID B", "Jaccard Distance"]
similar_pairs = similar_pairs.toDF(*new_cols)

In [15]:
pairs = similar_pairs.join(grouped_df, similar_pairs["Process ID A"] == col("Process ID")) \
                     .select(col("Process ID A"), col("Process ID B"), col("Log").alias("Log A")) \
                     .join(grouped_df, similar_pairs["Process ID B"] == col("Process ID")) \
                     .select(col("Process ID A"), col("Process ID B"), col("Log A"), col("Log").alias("Log B"))

### Check using Original Log

In [16]:
def original_check(x,y):
    return x==y

original_checking = udf(original_check, BooleanType())

In [17]:
same_pairs = pairs.filter(original_checking(col("Log A"), col("Log B")))
same_pairs = same_pairs.groupBy("Log A").agg(collect_set("Process ID A").alias("Process Set"))

In [18]:
same_pairs.printSchema()

root
 |-- Log A: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)
 |-- Process Set: array (nullable = false)
 |    |-- element: string (containsNull = false)



### Output

part1Observations.txt

In [19]:
same_pairs_explode = same_pairs.select(same_pairs["Log A"], same_pairs["Process Set"], explode(same_pairs["Process Set"]).alias("Process ID"))

In [20]:
def format_group(process_set):
    process_set_string = ', '.join(str(x) for x in process_set)
    return f"Group: {{{process_set_string}}}"

def format_log(log, process_id):
    log_formatted = ""
    for l in log:
        log_concat = ', '.join(str(x) for x in l)
        log_formatted += f"<{log_concat}, {process_id}>\n"
    return log_formatted

def format_group_logs(group, logs):
    formatted = f"{group}\n\n" + "\n".join(logs) 
    return formatted

In [21]:
# UDFs for Formatting Output - part1Observations.txt
format_group_udf = udf(format_group, StringType())
format_udf = udf(format_log, StringType())
final_format_udf = udf(format_group_logs, StringType())

In [22]:
formatted_group = same_pairs_explode.withColumn("Group", format_group_udf(col("Process Set")))
formatted_df = formatted_group.withColumn("Formatted Log", format_udf(col("Log A"), col("Process ID")))
grouped_logs = formatted_df.groupBy("Group").agg(collect_list("Formatted Log").alias("Group Log"))
final_formatted = grouped_logs.withColumn("Formatted", final_format_udf(col("Group"), col("Group Log"))).select("Formatted")

In [23]:
final_formatted.coalesce(partition).write.mode('overwrite').text('part1Observations')

In [24]:
subprocess.run("mkdir -p output && cat part1Observations/part-* > output/part1Observations.txt", shell=True)
subprocess.run("find part1Observations/ -name 'part-*' -delete", shell=True)
subprocess.run("rm -f part1Observations/.*", shell=True)
subprocess.run("rm -f part1Observations/_SUCCESS", shell=True) 
subprocess.run("rmdir part1Observations", shell=True)

rm: cannot remove 'part1Observations/.': Is a directory
rm: cannot remove 'part1Observations/..': Is a directory


CompletedProcess(args='rmdir part1Observations', returncode=0)

part1Output.txt

In [25]:
def format_log_output(log):
    epoch = int(time.time())
    rand = random.randint(1000, 9999)
    process_id = f"{epoch}{rand}"
    log_formatted = ""
    for l in log:
        log_concat = ', '.join(str(x) for x in l)
        log_formatted += f"<{log_concat}, {process_id}>\n"

    formatted = f"{process_id}:\n" + log_formatted
    return formatted

format_udf_output = udf(format_log_output, StringType())
formatted_df_output = same_pairs.withColumn("Formatted", format_udf_output(col("Log A"))).select("Formatted")

In [26]:
formatted_df_output.coalesce(partition).write.mode('overwrite').text('part1Output')

In [27]:
subprocess.run("cat part1Output/part-* > output/part1Output.txt", shell=True)
subprocess.run("find part1Output/ -name 'part-*' -delete", shell=True)
subprocess.run("rm -f part1Output/.*", shell=True)
subprocess.run("rm -f part1Output/_SUCCESS", shell=True) 
subprocess.run("rmdir part1Output", shell=True)

rm: cannot remove 'part1Output/.': Is a directory
rm: cannot remove 'part1Output/..': Is a directory


CompletedProcess(args='rmdir part1Output', returncode=0)

## Another aproach - Shingling

In [28]:
def k_shingling(text, k):
    shingles = set()
    for i in range(len(text) - k + 1):
        shingle = text[i:i + k]
        shingles.add(shingle)
    return list(shingles)

k_shingling_udf = udf(lambda text: k_shingling(text, 5), ArrayType(StringType()))

In [29]:
df = spark.read.text("data.txt").toDF("Log")
log = udf(lambda x: x[1:-4], StringType()) 
df = df.withColumn('Log_split', log('Log')) 
# df.collect()

def remove_timestamp(log):
    log_list = [item.strip() for item in log.split(",")]
    log_rem_time = log_list[:2] + log_list[3:]
    return ", ".join(log_rem_time)

log_timestamp = udf(remove_timestamp, StringType())
df = df.withColumn('Log_split', log_timestamp('Log_split'))

df.collect()
df_shingles = df.withColumn("Shingles", k_shingling_udf(df["Log_split"]))

In [30]:
df_shingles = df_shingles.withColumn("Log", regexp_replace(col("Log"), "[<>]", ""))
df_shingles = df_shingles.withColumn("Log", regexp_replace(col("Log"), ",", ""))
df_shingles = df_shingles.withColumn("Log", split(col("Log"), " "))

columns = ["First Server", "Second Server", "Date", "Time", "Communication Type", "Process ID"]

for i in range(len(columns)):
    df_shingles = df_shingles.withColumn(columns[i], col("Log")[i])

df_shingles = df_shingles.withColumn("Timestamp", concat(col("Date"),lit(""), col("Time")))

In [31]:
log = udf(lambda x: x[:-1], ArrayType(StringType())) 

def remove_timestamp(log):
    return log[:2] + log[4:]

log_timestamp = udf(remove_timestamp, ArrayType(StringType()))

df_shingles = df_shingles.withColumn('Log', log('Log')) 
df_shingles = df_shingles.withColumn('Log', log_timestamp('Log'))

In [32]:
grouped_df = df_shingles.groupBy("Process ID").agg(collect_set("Shingles").alias("Shingles"),collect_list("Log").alias("Log"))
grouped_df = grouped_df.withColumn("Flat shingles", flatten(col("Shingles")))
grouped_df.printSchema()

root
 |-- Process ID: string (nullable = true)
 |-- Shingles: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)
 |-- Log: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: string (containsNull = true)
 |-- Flat shingles: array (nullable = false)
 |    |-- element: string (containsNull = true)



In [33]:
characteristics = CountVectorizer(inputCol="Flat shingles", outputCol="Characteristic Matrix")

model = characteristics.fit(grouped_df)
char_matrix = model.transform(grouped_df).select("Process ID", "Characteristic Matrix")

char_matrix.printSchema()
# char_matrix.show()

shingles = model.vocabulary
print("Rows of Characteristic Matrix: ", shingles)

root
 |-- Process ID: string (nullable = true)
 |-- Characteristic Matrix: vector (nullable = true)

Rows of Characteristic Matrix:  ['S-37.', 'Respo', ' Resp', 'quest', 'ponse', ' Requ', 'espon', 'eques', ', Res', 'Reque', 'spons', ', Req', 'est, ', 'nse, ', 'onse,', 'uest,', 'null,', 'ull, ', '7.2, ', ', S-3', '-37.2', '37.2,', '2, Re', ' S-37', '7.1, ', '37.1,', '-37.1', ', S-1', '.2, R', '1, Re', '.1, R', '2, S-', '1, S-', '.1, S', ' null', 'l, Re', 'll, S', 'll, R', ', nul', 'l, S-', '.2, S', '.2, n', '2, nu', '4.1, ', '1, nu', '.1, n', 'S-94.', ', S-4', ' S-11', '4.2, ', '8, Re', '8, S-', 'S-64,', 'S-115', '115, ', '-64, ', '-115,', ', S-9', '158, ', 'S-78,', 'S-158', '-158,', '-78, ', '5, Re', '5, S-', ' S-15', ', S-6', '6, S-', '6, Re', '-94.1', '94.1,', '-118.', ', S-7', 'S-118', '0.1, ', '8.2, ', ' S-94', 'S-4.2', 'S-40.', '-4.2,', '0, S-', '0, Re', '4, Re', '4, S-', '15, S', '15, R', ' S-4.', ' S-16', 'S-166', '1.2, ', 'S-42,', '166, ', '94.2,', '8.1, ', '64, S', '64, R', '-

In [34]:
minhash = MinHashLSH(inputCol="Characteristic Matrix", outputCol="Signatures", numHashTables=5)

# MinHash produces the signatures for the Characteritic matrix 
# numvHashTables is the number of the hush functioms that we want to use and the lenght of the signature 
model = minhash.fit(char_matrix)
signatures = model.transform(char_matrix)

# signatures.show()

In [35]:
# approxSimilarityJoin uses autmatically LSH to find rows that it is most likely 
# to have same "Signatures"
# threshold: pairs with Jaccard Distance lower than threshlod
similar_pairs = model.approxSimilarityJoin(signatures, signatures, threshold=0.2, distCol="Jaccard Distance")
# similar_pairs.show()

In [36]:
similar_pairs = similar_pairs.select("datasetA.Process ID", "datasetB.Process ID", 
                     "Jaccard Distance")\
                        # .filter((col("datasetA.Process ID") != col("datasetB.Process ID")))

In [37]:
new_cols = ["Process ID A", "Process ID B", "Jaccard Distance"]
similar_pairs = similar_pairs.toDF(*new_cols)

In [38]:
pairs = similar_pairs.join(grouped_df, similar_pairs["Process ID A"] == col("Process ID")) \
                     .select(col("Process ID A"), col("Process ID B"), col("Log").alias("Log A")) \
                     .join(grouped_df, similar_pairs["Process ID B"] == col("Process ID")) \
                     .select(col("Process ID A"), col("Process ID B"), col("Log A"), col("Log").alias("Log B"))

In [39]:
def original_check(x,y):
    return x==y

orifinal_checking = udf(original_check, BooleanType())
same_pairs = pairs.filter(orifinal_checking(col("Log A"), col("Log B")))

In [40]:
same_pairs = same_pairs.groupBy("Log A").agg(collect_set("Process ID A"))

In [41]:
def format_log_output(log):
    epoch = int(time.time())
    rand = random.randint(1000, 9999)
    process_id = f"{epoch}{rand}"
    log_formatted = ""
    for l in log:
        log_concat = ', '.join(str(x) for x in l)
        log_formatted += f"<{log_concat}, {process_id}>\n"

    formatted = f"{process_id}:\n" + log_formatted
    return formatted

format_udf_output = udf(format_log_output, StringType())
formatted_df_output = same_pairs.withColumn("Formatted", format_udf_output(col("Log A"))).select("Formatted")

In [42]:
formatted_df_output.coalesce(partition).write.mode('overwrite').text('part1Output')

In [43]:
subprocess.run("cat part1Output/part-* > output/part1OutputShingles.txt", shell=True)
subprocess.run("find part1Output/ -name 'part-*' -delete", shell=True)
subprocess.run("rm -f part1Output/.*", shell=True)
subprocess.run("rm -f part1Output/_SUCCESS", shell=True) 
subprocess.run("rmdir part1Output", shell=True)

rm: cannot remove 'part1Output/.': Is a directory
rm: cannot remove 'part1Output/..': Is a directory


CompletedProcess(args='rmdir part1Output', returncode=0)

24/06/30 07:45:39 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
