In [1]:
import findspark
findspark.init()
findspark.find()

'H:\\SPARK'

In [2]:
from pyspark.sql import SparkSession

# Initialize SparkSession with necessary configurations
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('Spark') \
    .config("spark.driver.memory", "15g") \
    .config("spark.hadoop.home.dir", "H:/HADOOP/") \
    .config("spark.hadoop.conf.dir", "H:/HADOOP/etc/hadoop/") \
    .getOrCreate()
    
import sys
sys.path.append("G:\Dissertation_Project")

# Get SparkContext from the SparkSession
sc = spark.sparkContext


In [3]:
spark

### BASE DATASET

In [4]:
base_df = spark.read.csv("../../../Data/Custom_Datasets/conversation_datasets_GPT.csv", header=True, inferSchema=True)
base_df.show(10, truncate=False)

+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+
|Conversation_ID|Attacker_Helper                                                                                                                                                 |Victim                                                                                                                                                                                         |Conversation_Type|
+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------

### PREPROCESSED DATASET

In [5]:
preprocessed_df = spark.read.csv("../../../Data/Preprocessed_Datasets/GPT_dataset_preprocessed.csv", header=True, inferSchema=True)
preprocessed_df.show(10, truncate=False)

+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Convert Conversation Columns into actual Arrays

In [6]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf
import ast

# UDF to convert string representation of list to actual list
def str_to_array_of_arrays(s):
    # Convert the string to a list and then wrap it inside another list
    return [ast.literal_eval(s)][0]

str_to_array_of_arrays_udf = udf(str_to_array_of_arrays, ArrayType(ArrayType(StringType())))

df = preprocessed_df.withColumn("Attacker_Helper", str_to_array_of_arrays_udf(preprocessed_df["Attacker_Helper"])).withColumn("Victim", str_to_array_of_arrays_udf(preprocessed_df["Victim"]))

df.printSchema()

root
 |-- Conversation_ID: string (nullable = true)
 |-- Attacker_Helper: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- Victim: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- Conversation_Type: integer (nullable = true)



### FLATTEN

In [7]:
from src.CustonTransformers import FlattenTransformer
from pyspark.sql.functions import flatten

flatten_attacker_helper = FlattenTransformer(inputCol="Attacker_Helper", outputCol="Attacker_Helper")
flatten_victim = FlattenTransformer(inputCol="Victim", outputCol="Victim")

df = df.withColumn("Attacker_Helper", flatten(df["Attacker_Helper"]))
df = df.withColumn("Victim", flatten(df["Victim"]))

### CONVERTING INTO TF VECTORS
#### Each flattened row in the dataframe is converted to a vector. HashingTF uses the hashing trick. A potential drawback is that multiple words might hash to the same feature index, causing collisions. 

In [8]:
from pyspark.ml.feature import HashingTF
numFeatures = 2^16

# For Attacker_Helper column
hashingTF_ah = HashingTF(inputCol="Attacker_Helper", outputCol="AH_features", numFeatures=numFeatures) # numFeatures specifies how many features (hash buckets) 2^16 num features is provided here, increase if deemed necessary
# For Victim column
hashingTF_v = HashingTF(inputCol="Victim", outputCol="V_features", numFeatures=numFeatures)

### TF-IDF
#### IDF (Inverse Document Frequency): IDF is a measure of how important a term is. While CountVectorizer counts how many times a term appears in a document (Term Frequency or TF), the IDF looks at how often a term appears across all documents.

In [9]:
from pyspark.ml.feature import IDF

# For Attacker_Helper features
idf_ah = IDF(inputCol="AH_features", outputCol="AH_tfidf_features")

# For Victim features
idf_v = IDF(inputCol="V_features", outputCol="V_tfidf_features")

### ASSEMBLING THE TWO INPUT VECTORS INTO ONE

In [10]:
from pyspark.ml.feature import VectorAssembler

# 1. Feature Assembly
assembler = VectorAssembler(
    inputCols=["AH_tfidf_features", "V_tfidf_features"],
    outputCol="combined_features")

## Pipeline Creation

In [11]:
from pyspark.ml import Pipeline


pipeline = Pipeline(stages=[hashingTF_ah, hashingTF_v, idf_ah, idf_v, assembler])

pipeline_model = pipeline.fit(df)

df_assembled = pipeline_model.transform(df)

df_assembled.show(10)

+---------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Conversation_ID|     Attacker_Helper|              Victim|Conversation_Type|         AH_features|          V_features|   AH_tfidf_features|    V_tfidf_features|   combined_features|
+---------------+--------------------+--------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     GT1sURbxgG|[hi, thi, is, joh...|[im, sorri, i, do...|                1|(18,[0,1,2,4,5,6,...|(18,[0,1,2,4,5,6,...|(18,[0,1,2,4,5,6,...|(18,[0,1,2,4,5,6,...|[0.03333642026759...|
|     TwaGOeC96w|[hello, thi, is, ...|[thi, is, victim,...|                0|(18,[0,1,2,3,4,5,...|(18,[0,1,2,3,4,5,...|(18,[0,1,2,3,4,5,...|(18,[0,1,2,3,4,5,...|[0.16668210133795...|
|     V73ZDCviQL|[hello, sir, thi,...|[ive, never, had,...|                1|(18,[0,1

### Saving the pipeline to be used in training modules

In [12]:
pipeline_model.write().overwrite().save("../../Models/Pipelines/Prediction_Pipeline")