# 1 Setting up Pyspark

In [1]:
# Install Java
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark
# !wget -q https://downloads.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

# Unzip the file
# !tar xf spark-3.5.1-bin-hadoop3.tgz

# Setup environment for Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/home/ubuntu/spark-3.5.1-bin-hadoop3'

# Import findspark and load it
# !pip install -q findspark
import findspark
findspark.init()

# Install spark-nlp
# !pip install spark-nlp
# import sparknlp
# from sparknlp.base import *
# from sparknlp.annotator import *
# from sparknlp.common import *

# Create Spark session
from pyspark.sql import SparkSession
# spark = SparkSession.builder\
#         .master("local")\
#         .appName("Colab")\
#         .config('spark.ui.port', '4050')\
#         .getOrCreate()

# Start Spark Session with Spark NLP
# spark = sparknlp.start()

In [2]:
# spark

# 2 Import libraries

In [2]:
import json
import pickle
import urllib.request
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
import seaborn as sns
import altair as alt
import pandas as pd

import xml.etree.ElementTree as ET
from xml.etree.ElementTree import ElementTree
import datetime
from pyspark.sql.functions import lit
from pyspark.sql.functions import regexp_replace

from pyspark.sql.window import Window
from pyspark.sql.functions import first
from pyspark.sql.functions import udf, countDistinct
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

from pyspark.sql.functions import concat, col, collect_list, concat_ws, first, split, size
from pyspark.sql.types import StringType
from pyspark.sql.functions import concat, col, collect_list, concat_ws, first

import boto3
import io

# 3 Load data

###import files and create 4 datasets
1) PAN12_training_with_timestamp

2) PAN12_training_with_conversations

3) PAN12_test_with_timestamp

4) PAN12_test_with_conversations


In [4]:
# Initialize SparkSession
spark = SparkSession.builder \
                    .master("local[*]") \
                    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0") \
                    .appName("MyApp") \
                    .getOrCreate()

:: loading settings :: url = jar:file:/home/ubuntu/spark-3.5.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cbff675f-387f-4bc9-b6bd-9f39453749e3;1.0
	confs: [default]
	found com.databricks#spark-xml_2.12;0.17.0 in central
	found commons-io#commons-io;2.11.0 in central
	found org.glassfish.jaxb#txw2;3.0.2 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.3.0 in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.9.0 in central
:: resolution report :: resolve 331ms :: artifacts dl 22ms
	:: modules in use:
	com.databricks#spark-xml_2.12;0.17.0 from central in [default]
	commons-io#commons-io;2.11.0 from central in [default]
	org.apache.ws.xmlschema#xmlschema-core;2.3.0 from central in [default]
	org.glassfish.jaxb#txw2;3.0.2 from central in [default]
	org.scala-lang.modules#scala-collection-compat_2.12;2.9.0 from central in [default]
	--------

In [12]:
# Initialize SparkSession
spark = SparkSession.builder \
                    .master("local[*]") \
                    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.17.0") \
                    .appName("MyApp") \
                    .getOrCreate()

# Define the S3 bucket and key for both XML and text file
bucket_name = 'capstone210'
train_xml_key = 'data/pan12/train_xml.xml'
test_xml_key = 'data/pan12/test_xml.xml'
train_predator_ids_key = 'data/pan12/train_text.txt'
test_predator_ids_key = 'data/pan12/test_text.txt'

# Read XML file for train & test data
train_xml_df = spark.read.format("xml").option("rowTag", "conversation").load(f's3a://{bucket_name}/{train_xml_key}')
test_xml_df = spark.read.format("xml").option("rowTag", "conversation").load(f's3a://{bucket_name}/{test_xml_key}')

# Extract data from XML DataFrame
train_xml_data = train_xml_df.selectExpr("_id as conversation_id", "explode(message) as message") \
                           .selectExpr("conversation_id", "message.author as author", "message.time as time", "message.text as text")
test_xml_data = test_xml_df.selectExpr("_id as conversation_id", "explode(message) as message") \
                           .selectExpr("conversation_id", "message.author as author", "message.time as time", "message.text as text")

# Read predator IDs from text file
train_predator_ids_df = spark.read.text(f's3a://{bucket_name}/{train_predator_ids_key}')
test_predator_ids_df = spark.read.text(f's3a://{bucket_name}/{test_predator_ids_key}')

# Extract predator IDs from DataFrame
train_predator_ids = [row.value.strip() for row in train_predator_ids_df.collect()]
test_predator_ids = [row.value.strip() for row in test_predator_ids_df.collect()]

# Broadcast the list of predator IDs to all Spark executors for efficient lookup
broadcast_train_predator_ids = spark.sparkContext.broadcast(train_predator_ids)
broadcast_test_predator_ids = spark.sparkContext.broadcast(test_predator_ids)

# Add label column based on whether the author is a predator or not
pan12_train_df = train_xml_data.withColumn('label', when(col('author').isin(broadcast_train_predator_ids.value), 1).otherwise(0)) \
                             .withColumn('source', lit('PAN12-train')) \
                             .withColumn('merged_text', concat_ws(": ", col('author'), col('text')))

pan12_test_df = test_xml_data.withColumn('label', when(col('author').isin(broadcast_test_predator_ids.value), 1).otherwise(0)) \
                             .withColumn('source', lit('PAN12-test')) \
                             .withColumn('merged_text', concat_ws(": ", col('author'), col('text')))

# Show DataFrame
pan12_train_df.show(3)
pan12_test_df.show(3)

                                                                                

+--------------------+--------------------+-----+---------+-----+-----------+--------------------+
|     conversation_id|              author| time|     text|label|     source|         merged_text|
+--------------------+--------------------+-----+---------+-----+-----------+--------------------+
|e621da5de598c9321...|97964e7a9e8eb9cf7...|03:20|    Hola.|    0|PAN12-train|97964e7a9e8eb9cf7...|
|e621da5de598c9321...|0158d0d6781fc4d49...|03:20|      hi.|    0|PAN12-train|0158d0d6781fc4d49...|
|e621da5de598c9321...|0158d0d6781fc4d49...|03:20|whats up?|    0|PAN12-train|0158d0d6781fc4d49...|
+--------------------+--------------------+-----+---------+-----+-----------+--------------------+
only showing top 3 rows

+--------------------+--------------------+-----+--------------------+-----+----------+--------------------+
|     conversation_id|              author| time|                text|label|    source|         merged_text|
+--------------------+--------------------+-----+---------------

In [13]:
# Define the UDF to count unique author IDs
def count_unique_authors(authors):
    return len(set(authors))

In [14]:
# Merge list of texts
def merge_list_text(conversation_history_list):
  return ' '.join(conversation_history_list)

UDF_merge_list_text = udf(lambda x: merge_list_text(x), StringType())

In [15]:
pan12_train_df.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()



+-----+-----+
|label|total|
+-----+-----+
|    1| 2016|
|    0|66004|
+-----+-----+



                                                                                

In [16]:
pan12_test_df.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()



+-----+------+
|label| total|
+-----+------+
|    1|  3737|
|    0|153278|
+-----+------+



                                                                                

# 4 Data Cleaning

#### 4.1 Removing lines without any text

In [17]:
# Remove empty lines
pan12_train_df = pan12_train_df[pan12_train_df['text'] != '']

pan12_train_df.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()



+-----+-----+
|label|total|
+-----+-----+
|    1| 2015|
|    0|65992|
+-----+-----+



                                                                                

In [14]:
pan12_train_df.show(3)

+--------------------+--------------------+-----+---------+-----+----------+--------------------+
|     conversation_id|              author| time|     text|label|    source|         merged_text|
+--------------------+--------------------+-----+---------+-----+----------+--------------------+
|e621da5de598c9321...|97964e7a9e8eb9cf7...|03:20|    Hola.|    0|PAN12-test|97964e7a9e8eb9cf7...|
|e621da5de598c9321...|0158d0d6781fc4d49...|03:20|      hi.|    0|PAN12-test|0158d0d6781fc4d49...|
|e621da5de598c9321...|0158d0d6781fc4d49...|03:20|whats up?|    0|PAN12-test|0158d0d6781fc4d49...|
+--------------------+--------------------+-----+---------+-----+----------+--------------------+
only showing top 3 rows



In [18]:
# removing lines that have no text

# Remove empty lines
pan12_test_df = pan12_test_df[pan12_test_df['text'] != '']

pan12_test_df.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()



+-----+------+
|label| total|
+-----+------+
|    1|  3723|
|    0|153262|
+-----+------+



                                                                                

#### 4.2 Add in conversation start time

In [19]:
# Define a window partitioned by 'conversation_id' and ordered by 'time'
window_spec = Window.partitionBy('conversation_id').orderBy('time')

# Add a new column with the start time of the conversation
df_train_with_start_time = pan12_train_df.withColumn('conversation_start_time', first('time').over(window_spec))
df_test_with_start_time = pan12_test_df.withColumn('conversation_start_time', first('time').over(window_spec))

# show train data as sample
df_train_with_start_time.show(2, truncate=False)



+--------------------------------+--------------------------------+-----+----------------------------------------------------------------------------------------------+-----+-----------+--------------------------------------------------------------------------------------------------------------------------------+-----------------------+
|conversation_id                 |author                          |time |text                                                                                          |label|source     |merged_text                                                                                                                     |conversation_start_time|
+--------------------------------+--------------------------------+-----+----------------------------------------------------------------------------------------------+-----+-----------+--------------------------------------------------------------------------------------------------------------------------------+-----

                                                                                

#### 4.3 Add in number of people per conversation

In [20]:
# train
# Count unique authors for each conversation
unique_authors_per_conversation_train = df_train_with_start_time.select("conversation_id", "author") \
                                             .groupBy("conversation_id") \
                                             .agg(countDistinct("author").alias("n_people_in_conversation"))

# Join the unique author count with the original DataFrame
pan12_train_with_conversations = df_train_with_start_time.join(unique_authors_per_conversation_train, "conversation_id", "left")

In [21]:
pan12_train_with_conversations.show(2)

                                                                                

+--------------------+--------------------+-----+--------------------+-----+-----------+--------------------+-----------------------+------------------------+
|     conversation_id|              author| time|                text|label|     source|         merged_text|conversation_start_time|n_people_in_conversation|
+--------------------+--------------------+-----+--------------------+-----+-----------+--------------------+-----------------------+------------------------+
|0000604306a283600...|a9b326df4e6da61c5...|13:04|b8810fee2f4a71f84...|    0|PAN12-train|a9b326df4e6da61c5...|                  13:04|                       4|
|0000604306a283600...|b8810fee2f4a71f84...|13:12|a9b326df4e6da61c5...|    0|PAN12-train|b8810fee2f4a71f84...|                  13:04|                       4|
+--------------------+--------------------+-----+--------------------+-----+-----------+--------------------+-----------------------+------------------------+
only showing top 2 rows



In [22]:
df_train_with_start_time.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()



+-----+-----+
|label|total|
+-----+-----+
|    1| 2015|
|    0|65992|
+-----+-----+



                                                                                

In [23]:
# test
# Count unique authors for each conversation
unique_authors_per_conversation_test = df_test_with_start_time.select("conversation_id", "author") \
                                             .groupBy("conversation_id") \
                                             .agg(countDistinct("author").alias("n_people_in_conversation"))

# Join the unique author count with the original DataFrame
pan12_test_with_conversations = df_test_with_start_time.join(unique_authors_per_conversation_test, "conversation_id", "left")

#### 4.4 Add in type of conversation

In [24]:
# train
df_train_with_start_time = pan12_train_with_conversations.withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
                                                                                                                .when(col('n_people_in_conversation') == 2, 'Pair')\
                                                                                                                .when(col('n_people_in_conversation') > 2, 'Group'))
df_train_with_start_time.show(2)

                                                                                

+--------------------+--------------------+-----+--------------------+-----+-----------+--------------------+-----------------------+------------------------+-----------------+
|     conversation_id|              author| time|                text|label|     source|         merged_text|conversation_start_time|n_people_in_conversation|type_conversation|
+--------------------+--------------------+-----+--------------------+-----+-----------+--------------------+-----------------------+------------------------+-----------------+
|0000604306a283600...|a9b326df4e6da61c5...|13:04|b8810fee2f4a71f84...|    0|PAN12-train|a9b326df4e6da61c5...|                  13:04|                       4|            Group|
|0000604306a283600...|b8810fee2f4a71f84...|13:12|a9b326df4e6da61c5...|    0|PAN12-train|b8810fee2f4a71f84...|                  13:04|                       4|            Group|
+--------------------+--------------------+-----+--------------------+-----+-----------+--------------------+------

In [25]:
# test
df_test_with_start_time = pan12_test_with_conversations.withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
                                                                                                                .when(col('n_people_in_conversation') == 2, 'Pair')\
                                                                                                                .when(col('n_people_in_conversation') > 2, 'Group'))

#### 4.5 Creating concatenated conversation column

In [26]:
pan12_train_with_conversations = df_train_with_start_time.groupby('conversation_id','label').agg(concat_ws(" ", collect_list('merged_text')).alias('merged_text_id'),
                                                                                  concat_ws(' ', collect_list('text')).alias('merged_text'),
                                                                                  first('source').alias('source'),
                                                                                  first('conversation_start_time').cast('string').alias('conversation_start_time'),
                                                                                  first('n_people_in_conversation').cast('int').alias('n_people_in_conversation'),
                                                                                  first('type_conversation').alias('type_conversation'))\
                                                                                  .withColumn('n_texts', size(split(col('merged_text'), ' ')))\
                                                                                  .drop('conversation_history')
pan12_train_with_conversations.show(2)

[Stage 84:>                                                         (0 + 1) / 1]

+--------------------+-----+--------------------+--------------------+-----------+-----------------------+------------------------+-----------------+-------+
|     conversation_id|label|      merged_text_id|         merged_text|     source|conversation_start_time|n_people_in_conversation|type_conversation|n_texts|
+--------------------+-----+--------------------+--------------------+-----------+-----------------------+------------------------+-----------------+-------+
|0000604306a283600...|    0|a9b326df4e6da61c5...|b8810fee2f4a71f84...|PAN12-train|                  13:04|                       4|            Group|    651|
|0001347c00d419eb5...|    0|e2bd430b29412d926...|say asl and i&apo...|PAN12-train|                  13:34|                       2|             Pair|     17|
+--------------------+-----+--------------------+--------------------+-----------+-----------------------+------------------------+-----------------+-------+
only showing top 2 rows



                                                                                

In [27]:
pan12_test_with_conversations = df_test_with_start_time.groupby('conversation_id', 'label').agg(concat_ws(" ", collect_list('merged_text')).alias('merged_text_id'),
                                                                                  concat_ws(' ', collect_list('text')).alias('merged_text'),
                                                                                  first('source').alias('source'),
                                                                                  first('conversation_start_time').cast('string').alias('conversation_start_time'),
                                                                                  first('n_people_in_conversation').cast('int').alias('n_people_in_conversation'),
                                                                                  first('type_conversation').alias('type_conversation'))\
                                                                                  .withColumn('n_texts', size(split(col('merged_text'), ' ')))\
                                                                                  .drop('conversation_history')

In [28]:
pan12_train_with_conversations.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()

# pan12_test.groupBy('label')\
#                 .agg(countDistinct('conversation_id').alias('total'))\
#                 .show()



+-----+-----+
|label|total|
+-----+-----+
|    1| 2015|
|    0|65992|
+-----+-----+



                                                                                

#### 4.6 Rearrange the columns

In [29]:
# specifying the column order
columns = ['conversation_id',
'source',
'label',
'conversation_start_time',
'n_people_in_conversation',
'type_conversation',
'merged_text',
'merged_text_id',
'n_texts']

pan12_train = pan12_train_with_conversations.select(columns)
pan12_test = pan12_test_with_conversations.select(columns)

In [30]:
pan12_train.printSchema()

root
 |-- conversation_id: string (nullable = true)
 |-- source: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- conversation_start_time: string (nullable = true)
 |-- n_people_in_conversation: integer (nullable = true)
 |-- type_conversation: string (nullable = true)
 |-- merged_text: string (nullable = false)
 |-- merged_text_id: string (nullable = false)
 |-- n_texts: integer (nullable = false)



In [31]:
pan12_test.printSchema()

root
 |-- conversation_id: string (nullable = true)
 |-- source: string (nullable = true)
 |-- label: integer (nullable = false)
 |-- conversation_start_time: string (nullable = true)
 |-- n_people_in_conversation: integer (nullable = true)
 |-- type_conversation: string (nullable = true)
 |-- merged_text: string (nullable = false)
 |-- merged_text_id: string (nullable = false)
 |-- n_texts: integer (nullable = false)



#### 4.7 Check the dataframes

In [32]:
pan12_train.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()

pan12_test.groupBy('label')\
                .agg(countDistinct('conversation_id').alias('total'))\
                .show()

                                                                                

+-----+-----+
|label|total|
+-----+-----+
|    1| 2015|
|    0|65992|
+-----+-----+





+-----+------+
|label| total|
+-----+------+
|    1|  3723|
|    0|153262|
+-----+------+



                                                                                

#### 4.8 Bring in the PJZC data

In [33]:
old_train_data_path = 's3a://capstone210/data/train/'

old_train = spark.read.parquet(old_train_data_path)

In [34]:
old_train.groupBy('source', 'label')\
  .agg(count('conversation_id').alias('count'))\
  .show(truncate = False)



+-----------+-----+-----+
|source     |label|count|
+-----------+-----+-----+
|PAN12-train|0    |65340|
|PAN12-train|1    |1587 |
|Chit chats |0    |7248 |
|PJ chats   |1    |1104 |
|ZIG chats  |0    |12718|
+-----------+-----+-----+



                                                                                

In [35]:
# Filter data where a column is between two specific values
old_train_filtered = old_train.filter((old_train['source'] == 'PJ chats') & (old_train['label'] == 1))

old_train_filtered.groupBy('source', 'label')\
  .agg(count('conversation_id').alias('count'))\
  .show(truncate = False)



+--------+-----+-----+
|source  |label|count|
+--------+-----+-----+
|PJ chats|1    |1104 |
+--------+-----+-----+



                                                                                

In [36]:
old_train_filtered.show(3)

[Stage 111:>                                                        (0 + 1) / 1]

+---------------+--------+-----+-----------------------+------------------------+-----------------+--------------------+--------------------+-------+
|conversation_id|  source|label|conversation_start_time|n_people_in_conversation|type_conversation|         merged_text|      merged_text_id|n_texts|
+---------------+--------+-----+-----------------------+------------------------+-----------------+--------------------+--------------------+-------+
|              0|PJ chats|    1|                  14:40|                       2|             Pair|Hey Its Mads Hey ...|decoy: Hey Its Ma...|    120|
|              1|PJ chats|    1|                  06:38|                       2|             Pair|Hey lol Hey mads ...|decoy: Hey lol Bi...|     95|
|              2|PJ chats|    1|                  07:53|                       2|             Pair|Sorry my mom is u...|decoy: Sorry my m...|     14|
+---------------+--------+-----+-----------------------+------------------------+-----------------+-

                                                                                

#### merge the PJZC and PAN12 training

In [38]:
# perform union on two dataframes
merged_train = pan12_train.union(old_train_filtered)

# Show the merged DataFrame
merged_train.show(3)

[Stage 121:>                                                        (0 + 1) / 1]

+--------------------+-----------+-----+-----------------------+------------------------+-----------------+--------------------+--------------------+-------+
|     conversation_id|     source|label|conversation_start_time|n_people_in_conversation|type_conversation|         merged_text|      merged_text_id|n_texts|
+--------------------+-----------+-----+-----------------------+------------------------+-----------------+--------------------+--------------------+-------+
|0000604306a283600...|PAN12-train|    0|                  13:04|                       4|            Group|b8810fee2f4a71f84...|a9b326df4e6da61c5...|    651|
|0001347c00d419eb5...|PAN12-train|    0|                  13:34|                       2|             Pair|say asl and i&apo...|e2bd430b29412d926...|     17|
|000197b21283dc478...|PAN12-train|    0|                  06:27|                       2|             Pair|joint ? in my han...|487862cd4ec27d841...|     79|
+--------------------+-----------+-----+------------

                                                                                

In [39]:
merged_train.groupBy('source', 'label')\
  .agg(count('conversation_id').alias('count'))\
  .show()

pan12_test.groupBy('source', 'label')\
  .agg(count('conversation_id').alias('count'))\
  .show()

                                                                                

+-----------+-----+-----+
|     source|label|count|
+-----------+-----+-----+
|PAN12-train|    0|65992|
|PAN12-train|    1| 2015|
|   PJ chats|    1| 1104|
+-----------+-----+-----+





+----------+-----+------+
|    source|label| count|
+----------+-----+------+
|PAN12-test|    1|  3723|
|PAN12-test|    0|153262|
+----------+-----+------+



                                                                                

#### 4.9 Save the files

In [40]:
# save the files to S3 bucket 
train_output_path = 's3a://capstone210/data/final/train/'
test_output_path = 's3a://capstone210/data/final/test/'

merged_train.write.mode('overwrite').parquet(train_output_path)
pan12_test.write.mode('overwrite').parquet(test_output_path)

                                                                                

#### 4.9 Confirm the saved files by source and label

In [41]:
# read the data from S3 bucket
train_data_path = 's3a://capstone210/data/final/train/'
test_data_path = 's3a://capstone210/data/final/test/'

train = spark.read.parquet(train_data_path)
test = spark.read.parquet(test_data_path)

In [42]:
train.groupBy('source', 'label')\
  .agg(count('conversation_id').alias('count'))\
  .show(truncate = False)

test.groupBy('source', 'label')\
  .agg(count('conversation_id').alias('count'))\
  .show(truncate = False)

                                                                                

+-----------+-----+-----+
|source     |label|count|
+-----------+-----+-----+
|PAN12-train|0    |65992|
|PAN12-train|1    |2015 |
|PJ chats   |1    |1104 |
+-----------+-----+-----+





+----------+-----+------+
|source    |label|count |
+----------+-----+------+
|PAN12-test|1    |3723  |
|PAN12-test|0    |153262|
+----------+-----+------+



                                                                                

# *Functions Archived (Ignore - storage only)*

In [None]:
# Merge Texts without author id
def merge_text(messages_column):
  text_list = [i["text"] for i in messages_column]
  return ' '.join(text_list)

UDF_merge_text = udf(lambda x: merge_text(x), StringType())

In [None]:
# # People in conversation
def n_people_in_conversation(messages_column):
  unique_author = set([i['author'] for i in messages_column])
  return len(unique_author)

UDF_n_people_in_conversation = udf(lambda x: n_people_in_conversation(x), IntegerType())

In [None]:
# Merge texts with author id
def merge_text_id(messages_column):
  text_list = [f'{i["author"]}: {i["text"]}' for i in messages_column]
  return ' '.join(text_list)

UDF_merge_text_id = udf(lambda x: merge_text_id(x), StringType())

In [None]:
# number of texts in conversation
def n_texts_in_conversation(messages_column):
  unique_author = len(messages_column)
  return unique_author

UDF_n_texts_in_conversation = udf(lambda x: n_texts_in_conversation(x), IntegerType())

In [None]:
pan12_training_df.show(3, truncate=False)

+--------------------------------+--------------------------------+-----+---------+
|conversation_id                 |author                          |time |text     |
+--------------------------------+--------------------------------+-----+---------+
|e621da5de598c9321a1d505ea95e6a2d|97964e7a9e8eb9cf78f2e4d7b2ff34c7|03:20|Hola.    |
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|hi.      |
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|whats up?|
+--------------------------------+--------------------------------+-----+---------+
only showing top 3 rows



In [None]:
## Add conversation start time
from pyspark.sql.window import Window
from pyspark.sql.functions import first

# Define a window partitioned by 'conversation_id' and ordered by 'time'
window_spec = Window.partitionBy('conversation_id').orderBy('time')

# Add a new column with the start time of the conversation
pan12_training_df = pan12_training_df.withColumn('conversation_start_time', first('time').over(window_spec))

pan12_training_df.show(3, truncate=False)

+--------------------------------+--------------------------------+-----+-----------------------------------------------------------------------------------------------------------------------+-----------------------+
|conversation_id                 |author                          |time |text                                                                                                                   |conversation_start_time|
+--------------------------------+--------------------------------+-----+-----------------------------------------------------------------------------------------------------------------------+-----------------------+
|0000604306a283600b730276a2039471|a9b326df4e6da61c5b6f5e1058be83a2|13:04|b8810fee2f4a71f849f3f7409546d1d9 - do you have any set of test cases for non-conformant HTML5?                         |13:04                  |
|0000604306a283600b730276a2039471|b8810fee2f4a71f849f3f7409546d1d9|13:12|a9b326df4e6da61c5b6f5e1058be83a2: there are some in the

In [None]:
pan12_training_df_2 = pan12_training_df.withColumn("n_people_in_conversation", UDF_n_people_in_conversation("text"))
pan12_training_df_2.show(truncate=False)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-22-633844c0c109>", line 6, in <lambda>
  File "<ipython-input-22-633844c0c109>", line 3, in n_people_in_conversation
  File "<ipython-input-22-633844c0c109>", line 3, in <listcomp>
TypeError: string indices must be integers


In [None]:
pan12_training_df_2 = pan12_training_df.withColumn("n_people_in_conversation", UDF_n_people_in_conversation("text"))\
                                  .withColumn("type_conversation", when(col('n_people_in_conversation') == 1, 'Monologue')\
                                                                  .when(col('n_people_in_conversation') == 2, 'Pair')\
                                                                  .when(col('n_people_in_conversation') >= 2, 'Group'))\
                                  .withColumn("merged_text", UDF_merge_text("text"))\
                                  .withColumn("merged_text_id", UDF_merge_text_id("text"))\
                                  .withColumn("n_texts", UDF_n_texts_in_conversation("text"))\
                                  .drop("text")

In [None]:
pan12_training_df_2.show(3, truncate=False)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-15-6018879285ad>", line 6, in <lambda>
  File "<ipython-input-15-6018879285ad>", line 3, in n_people_in_conversation
  File "<ipython-input-15-6018879285ad>", line 3, in <listcomp>
TypeError: string indices must be integers


###3.3 Pickle files

In [None]:
def loading_pkl(name):
    with open(name, 'rb') as f:
        file = pickle.load(f)
    return file

In [None]:
PJ_pickle = f'drive/MyDrive/210 Capstone/Data/PJ.pkl'
ZIG_pickle = f'drive/MyDrive/210 Capstone/Data/ZIG.pkl'
CHIT_pickle = f'drive/MyDrive/210 Capstone/Data/CHIT.pkl'

In [None]:
conversations = loading_pkl(PJ_pickle)
conversations[0]

('Billy Joe',
 [[['decoy', '14:40', 'Hey Its Mads'],
   ['Billy Joe', '14:40', 'Hey babes'],
   ['Billy Joe', '14:41', 'Almost done with work. So glad'],
   ['decoy', '14:41', "Hey:) that's good right?"],
   ['Billy Joe', '14:42', 'Yea'],
   ['decoy', '14:43', "So what's new lol"],
   ['Billy Joe', '14:43', 'Nothing'],
   ['Billy Joe', '14:43', 'did u touch it in the shower. Lol'],
   ['decoy', '14:44', 'No had to hurry up lol'],
   ['decoy', '14:44', 'Hold on a min'],
   ['decoy', '14:47', 'Sorry my mom keeps bossing me around lol'],
   ['Billy Joe', '14:47', 'That sucks'],
   ['decoy', '14:48', "Yeah it's annoying. Do u get along with your mom"],
   ['Billy Joe', '14:48', "Kinda..she's nosy"],
   ['decoy', '14:48', 'Still lol'],
   ['Billy Joe', '14:49', 'Yea'],
   ['decoy', '14:50', 'Is it just u and her?'],
   ['Billy Joe', '14:50', 'Whatcha doing now'],
   ['Billy Joe', '14:51', 'Yea'],
   ['decoy', '14:51', 'Same here :)'],
   ['Billy Joe', '14:51', 'I know the feeling'],
   ['de

In [None]:
len(conversations)

24

In [None]:
for groomer_id,sub_conversations in conversations[:]:
  for sub_c in sub_conversations:
    messages_l = []
    full_text = sub_conversations
    for message in sub_c:
        author = message[0]
        time = message[1]
        text = message[2]

        messages_l.append(
        {
          "gromer_id":groomer_id,
          "author":author,
          "time": time,
          "text": text,
                        })

In [None]:
messages_l[0]

{'gromer_id': 'tory beltz',
 'author': 'tory beltz',
 'time': '16:05',
 'text': "I haven't changed my mind yet I'm excited and really super nervous"}

In [None]:
groomers = []
for groomer_id,sub_conversations in conversations:
  groomers.append(groomer_id)

In [None]:
groomers

['Billy Joe',
 'Carlos Cabrera',
 'DB',
 'F C',
 'J H',
 'Josh Mcclendon',
 'Roger Cassidy',
 'Ryan Thompson',
 'allenriley2011',
 'bossofct',
 'chrisnr1998',
 'christopher_brown1991',
 'georgiamike1968',
 'innocentz6197@sbcglobal.net',
 'jackjohnsons7',
 'jakelewis315',
 'jinxman555',
 'jlucero155_l',
 'john_adamowski',
 'kashkhan773',
 'koolkrod16',
 'obercock1985',
 'paulcarder86',
 'tory beltz']

### 3.1 train.csv --> SGD file

In [None]:
train = f'drive/MyDrive/210 Capstone/data/raw data/train.csv'

In [None]:
train_df = spark.read.option('encoding', 'utf-8').csv(train, multiLine=True)

In [None]:
train_df_final = train_df.withColumnRenamed('_c0', 'text')\
        .withColumnRenamed('_c1', 'conversation_start_time')\
        .withColumnRenamed('_c2', 'label')

In [None]:
train_df_final.show()

+--------------------+-----------------------+-----+
|                text|conversation_start_time|label|
+--------------------+-----------------------+-----+
|Hola. hi. whats u...|                  03:20|    0|
|  asuu lonte koe hi |                  14:36|    0|
|    hi asl m or f m |                  02:55|    0|
|happy is ayuppie ...|                  12:37|    0|
|ask me 5 question...|                  02:45|    0|
|hi asl Hi hellloooo |                  15:54|    0|
|sets mode: +oo ta...|                  14:49|    0|
|Definitely very f...|                  21:28|    0|
|aloha... hi asl? ...|                  15:42|    0|
|         ArtB: ping |                  18:17|    0|
|sex female yes an...|                  18:41|    0|
|heey (: how&apos;...|                  23:36|    0|
|hi hihi m or f?? ...|                  08:16|    0|
|changes: begin co...|                  23:37|    0|
|hi for t = 1 : T-...|                  12:00|    0|
|we're not at the ...|                  04:43|

In [None]:
train_df_final.count()

66927

In [None]:
train_df_final.groupBy('label')\
              .agg(count('*').alias('count'))\
              .show(truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|label                                                                                                                                                                                                 |count|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
| btw that is on topic because its running  OpenSolaris build 39 nice mornin' jmcp. cool welcome back                                                                                                  |1    |
| there are some name mangling rules based on underscores to make variables private                                                                                         

In [None]:
train_df_final.filter(col('label').isin([1])).show(truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
df  = pd.read_csv(train,encoding='utf-8',header=None)

In [None]:
len(df.filter(~df[2].isin([0,1])))

66927

In [None]:
train_df.filter(col("_c2") == 1).count()

1990

### 3.2 PJZC.json file

In [None]:
PJZC_json = f'drive/MyDrive/210 Capstone/data/raw data/PJZC.txt'

In [None]:
PJZC_json_df = spark.read.json(PJZC_json)

In [None]:
PJZC_json_df.show()

----------------------------------------ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/content/spark-3.5.0-bin-hadoop3/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java

Py4JError: py4j.reflection.TypeUtil does not exist in the JVM

In [None]:
PJZC_json_df_exploded = PJZC_json_df.select(explode(col("conversation")))

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
PJZC_final = PJZC_json_df_exploded.select(col('col').getItem('id').alias('id'),
                             col('col').getItem('source').alias('source'),
                             col('col').getItem('label').alias('label'),
                             col('col').getItem('messages').alias('messages'))

In [None]:
PJZC_final.count()

In [None]:
PJZC_final.show(truncate = False)

In [None]:
# # chats per source
PJZC_final.groupBy('source')\
          .agg(countDistinct(col('id')).alias('count'))\
          .show(truncate = False)

In [None]:
# # Time when conversation starts
UDF_time = udf(lambda x: x[0]['time'], StringType())

In [None]:
# # People in conversation
def n_people_in_conversation(messages_column):
  unique_author = set([i['author'] for i in messages_column])
  return len(unique_author)

UDF_n_people_in_conversation = udf(lambda x: n_people_in_conversation(x), IntegerType())

In [None]:
# Merge texts
def merge_text(messages_column):
  text_list = [i['text'] for i in messages_column]
  return ' '.join(text_list)

UDF_merge_text = udf(lambda x: merge_text(x), StringType())

In [None]:
PJZC_final_df = PJZC_final.withColumn("conversation_start_time", UDF_time("messages"))\
                                  .withColumn("n_people_in_conversation", UDF_n_people_in_conversation("messages"))\
                                  .withColumn("type_conversation", when(col('n_people_in_conversation') == 1, 'Monologue')\
                                                                  .when(col('n_people_in_conversation') == 2, 'Pair')\
                                                                  .when(col('n_people_in_conversation') >= 2, 'Group'))\
                                  .withColumn("merge_text", UDF_merge_text("messages"))


In [None]:
PJZC_final_df.show()

In [None]:
# Records with no text
PJZC_final_df.filter(col('merge_text').isNull()).show(truncate = False)

In [None]:
# # Conversations by conversation type
PJZC_final_df.groupBy('label', 'type_conversation')\
          .agg(countDistinct(col('id')).alias('n_conversations'))\
          .orderBy(col('label'), col('type_conversation'))\
          .show(truncate = False)\


In [None]:
# Expand messages into individual texts
PJZC_texts_final = PJZC_final_df.select('id', 'source', 'conversation_start_time', 'label', explode(col('messages')))\
          .select('id', 'source', 'label', 'conversation_start_time', col('col').getItem('author').alias('author'), col('col').getItem('text').alias('text'), col('col').getItem('time').alias('time'))

In [None]:
PJZC_texts_final.show(truncate = False)

In [None]:
# # Texts
PJZC_texts_final.count()

In [None]:
# # Groomers
PJZC_texts_final.filter((col('source') == 'PJ chats') & (col('author') != 'decoy'))\
                .select('author')\
                .distinct()\
                .show(truncate = False)

In [None]:
# # Groomers
PJZC_texts_final.filter((col('source') == 'PJ chats') & (col('author') != 'decoy'))\
                .select('author')\
                .distinct()\
                .count()

In [None]:
# Save file
# define the output JSON file path
json_file_path = f'drive/MyDrive/210 Capstone/data/PJZC.json'

# write the DataFrame as a JSON file with parameters
PJZC_final.write \
    .mode('overwrite')\
    .json(json_file_path)

In [None]:
# load both PAN12 training and test json files to one dataframe
pan12_df = pan12_training_df_final.union(pan12_test_df_final)

pan12_df.show(truncate=False)

+--------------------------------+--------------------------------+-----+------------------------------------------------------------------------------------+-----+-----------+
|conversation_id                 |author                          |time |text                                                                                |label|source     |
+--------------------------------+--------------------------------+-----+------------------------------------------------------------------------------------+-----+-----------+
|e621da5de598c9321a1d505ea95e6a2d|97964e7a9e8eb9cf78f2e4d7b2ff34c7|03:20|Hola.                                                                               |0    |PAN12-train|
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|hi.                                                                                 |0    |PAN12-train|
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|whats up?                                 

In [None]:
# load the PJZC json file to a dataframe
pjzc_file_path = f'drive/MyDrive/210 Capstone/data/PJZC.json'
pjzc_df = spark.read.json(pjzc_file_path)

pjzc_df.show(truncate=False)

+---------+-----------------------+---+-----+--------+------------------------------------------------+-----+
|author   |conversation_start_time|id |label|source  |text                                            |time |
+---------+-----------------------+---+-----+--------+------------------------------------------------+-----+
|decoy    |14:40                  |0  |1    |PJ chats|Hey Its Mads                                    |14:40|
|Billy Joe|14:40                  |0  |1    |PJ chats|Hey babes                                       |14:40|
|Billy Joe|14:40                  |0  |1    |PJ chats|Almost done with work. So glad                  |14:41|
|decoy    |14:40                  |0  |1    |PJ chats|Hey:) that's good right?                        |14:41|
|Billy Joe|14:40                  |0  |1    |PJ chats|Yea                                             |14:42|
|decoy    |14:40                  |0  |1    |PJ chats|So what's new lol                               |14:43|
|Billy Joe

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import when, first

# need processing of the pjzc file to match the two dataframes
# Define a window partitioned by conversation_id
window_spec = Window.partitionBy("conversation_id").orderBy("time")

# Add a new column to show the conversation start time
df_with_start_time = pan12_df.withColumn("conversation_start_time",
                                   when(pan12_df["time"] == first(pan12_df["time"]).over(window_spec), pan12_df["time"]).otherwise(None))

# Show the DataFrame
df_with_start_time.show(truncate=False)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
df_with_start_time.show(5)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
merged_df = df_with_start_time.union(pjzc_df)
merged_df.show()

In [None]:
# append the pan12_df and pjzc_df
merged_df = pan12_df.union(pjzc_df)

# Show the merged dataframe
merged_df.show(truncate=False)

In [None]:
# save file
# define the output json file path
json_file_path = f'drive/MyDrive/210 Capstone/data/merged_dataset/'

# Write the merged DataFrame to a new JSON file
merged_df.write.json(json_file_path)

# Stop Spark session
spark.stop()

## 4. Merge PAN12 and PJZC json files

###Creating another df table with merged texts (training and test)


In [None]:
# concatenating training and test datasets
pan12_final_merged_text = pan12_training_final_merged_text.union(pan12_test_final_merged_text)

In [None]:
# spliting the dataframes into two dataframes
weights = [.3, .3, .2, .2]
splits = pan12_final_merged_text.randomSplit(weights, seed=None)
splits[0].show(10)

In [None]:
# Define the path where the pan12_test_data can be saved
output_path = f'drive/MyDrive/210 Capstone/data/PAN12_processed/merged/merged_text'

# Save the DataFrame to a parquet file
splits[0].write.mode('append').parquet(output_path)
splits[1].write.mode('append').parquet(output_path)
splits[2].write.mode('append').parquet(output_path)
splits[3].write.mode('append').parquet(output_path)

#### attempt #3 - successfully split the json into multiple files


In [None]:
def split_xml_to_json(input_xml_path, output_json_path, num_files):

    # Parse XML file
    xml_tree = ET.parse(input_xml_path)
    root = xml_tree.getroot()

    # Calculate the size of each chunk
    total_conversations = len(root)
    chunk_size = total_conversations // num_files
    remainder = total_conversations % num_files

    # Split the XML data into chunks
    chunks = [root[i*chunk_size:(i+1)*chunk_size] for i in range(num_files)]
    if remainder:
        chunks[-1].extend(root[-remainder:])  # Add the remaining items to the last chunk

    # Convert each chunk to JSON and save to separate files
    for i, chunk in enumerate(chunks):
        json_data = []
        for conversation in chunk:
            conv_data = {
                'id': conversation.get('id'),
                'messages': [
                    {'author': message.find('author').text, 'text': message.find('text').text}
                    for message in conversation
                ]
            }
            json_data.append(conv_data)

        output_file = os.path.join(output_json_path, f"file_{i+1}.json")
        with open(output_file, 'w') as f:
            json.dump(json_data, f)

        print(f"Chunk {i+1} saved to {output_file}")

# Define paths
train_data_path = f"drive/MyDrive/210 Capstone/data/PAN12/pan12-sexual-predator-identification-training-corpus-2012-05-01/"
test_data_path = f"drive/MyDrive/210 Capstone/data/PAN12/pan12-sexual-predator-identification-test-corpus-2012-05-21/"
output_train_path = f"drive/MyDrive/210 Capstone/data/PAN12/training/"
output_test_path = f"drive/MyDrive/210 Capstone/data/PAN12/test/"

# Split training file into 3 JSON files
split_xml_to_json(train_data_path + 'pan12-sexual-predator-identification-training-corpus-2012-05-01.xml',
                  output_train_path, 3)

# Split test file into 5 JSON files
split_xml_to_json(test_data_path + 'pan12-sexual-predator-identification-test-corpus-2012-05-17.xml',
                  output_test_path, 5)

#### running one training file as a sample to see what the df table looks like

In [None]:
pan12_training = f'drive/MyDrive/210 Capstone/data/PAN12/training/file_1.json'

In [None]:
pan12_training_json = spark.read.json(pan12_training)

In [None]:
pan12_training_json.show(truncate=False)

+--------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
pan12_training_df_exploded = pan12_training_json.select(explode(col("id")))

AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "explode(id)" due to data type mismatch: Parameter 1 requires the ("ARRAY" or "MAP") type, however "id" has the type "STRING".;
'Project [explode(id#8) AS ()]
+- Relation [id#8,messages#9] json


In [None]:
pan12_training_df_exploded.show()

+--------------------+
|                 col|
+--------------------+
|{97964e7a9e8eb9cf...|
|{0158d0d6781fc4d4...|
|{0158d0d6781fc4d4...|
|{97964e7a9e8eb9cf...|
|{97964e7a9e8eb9cf...|
|{0158d0d6781fc4d4...|
|{97964e7a9e8eb9cf...|
|{97964e7a9e8eb9cf...|
|{0158d0d6781fc4d4...|
|{97964e7a9e8eb9cf...|
|{0158d0d6781fc4d4...|
|{0158d0d6781fc4d4...|
|{97964e7a9e8eb9cf...|
|{97964e7a9e8eb9cf...|
|{97964e7a9e8eb9cf...|
|{0158d0d6781fc4d4...|
|{0158d0d6781fc4d4...|
|{97964e7a9e8eb9cf...|
|{97964e7a9e8eb9cf...|
|{0158d0d6781fc4d4...|
+--------------------+
only showing top 20 rows



In [None]:
pan12_training_df_final = pan12_training_df_exploded.select(col('col').getItem('author').alias('author'),
                                                            col('col').getItem('text').alias('text'))

In [None]:
pan12_training_df_final.count()

309297

In [None]:
pan12_training_df_final.show(truncate=False)

+--------------------------------+------------------------------------------------------------------------------------+
|author                          |text                                                                                |
+--------------------------------+------------------------------------------------------------------------------------+
|97964e7a9e8eb9cf78f2e4d7b2ff34c7|Hola.                                                                               |
|0158d0d6781fc4d493f243d4caa49747|hi.                                                                                 |
|0158d0d6781fc4d493f243d4caa49747|whats up?                                                                           |
|97964e7a9e8eb9cf78f2e4d7b2ff34c7|not a ton.                                                                          |
|97964e7a9e8eb9cf78f2e4d7b2ff34c7|you?                                                                                |
|0158d0d6781fc4d493f243d4caa49747|same. 

Once the training and test files are split into multiple chunks (3 training and 5 test), bring them all into spark dataframe tables

In [None]:
def process_json_file(json_file_path):
    """
    Reads a JSON file, explodes the 'messages' array, and selects the 'author' and 'text' fields.

    Args:
    - json_file_path: Path to the JSON file

    Returns:
    - DataFrame containing 'author' and 'text' fields
    """
    # Initialize SparkSession
    spark = SparkSession.builder \
        .appName("Process JSON File") \
        .getOrCreate()

    # Read JSON file
    json_df = spark.read.json(json_file_path)

    # Explode 'messages' array
    exploded_df = json_df.select(explode(col("messages")))

    # Select 'author' and 'text' fields
    final_df = exploded_df.select(col('col').getItem('author').alias('author'),
                                  col('col').getItem('text').alias('text'))

    return final_df

# Example usage:
pan12_training_files = ['drive/MyDrive/210 Capstone/data/PAN12/training/file_1.json',
                        'drive/MyDrive/210 Capstone/data/PAN12/training/file_2.json',
                        'drive/MyDrive/210 Capstone/data/PAN12/training/file_3.json']

# Process each training file
for file_path in pan12_training_files:
    df = process_json_file(file_path)
    print(f"File: {file_path}, Count: {df.count()}")

# Repeat the same process for test files
pan12_test_files = ['drive/MyDrive/210 Capstone/data/PAN12/test/file_1.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_2.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_3.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_4.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_5.json']

# Process each test file
for file_path in pan12_test_files:
    df = process_json_file(file_path)
    print(f"File: {file_path}, Count: {df.count()}")

File: drive/MyDrive/210 Capstone/data/PAN12/training/file_1.json, Count: 309297
File: drive/MyDrive/210 Capstone/data/PAN12/training/file_2.json, Count: 298668
File: drive/MyDrive/210 Capstone/data/PAN12/training/file_3.json, Count: 295642
File: drive/MyDrive/210 Capstone/data/PAN12/test/file_1.json, Count: 410832
File: drive/MyDrive/210 Capstone/data/PAN12/test/file_2.json, Count: 411308
File: drive/MyDrive/210 Capstone/data/PAN12/test/file_3.json, Count: 409207
File: drive/MyDrive/210 Capstone/data/PAN12/test/file_4.json, Count: 412878
File: drive/MyDrive/210 Capstone/data/PAN12/test/file_5.json, Count: 414556


In [None]:
def process_json_file(json_file_path):
    """
    Reads a JSON file, explodes the 'messages' array, and selects the 'author' and 'text' fields.

    Args:
    - json_file_path: Path to the JSON file

    Returns:
    - DataFrame containing 'author' and 'text' fields
    """

    # Read JSON file
    json_df = spark.read.json(json_file_path)

    # Explode 'messages' array
    exploded_df = json_df.select(explode(col("messages")))

    # Select 'author' and 'text' fields
    final_df = exploded_df.select(col('col').getItem('author').alias('author'),
                                  col('col').getItem('text').alias('text'))
    return final_df

In [None]:
# Example usage:
pan12_training_files = ['drive/MyDrive/210 Capstone/data/PAN12/training/file_1.json',
                        'drive/MyDrive/210 Capstone/data/PAN12/training/file_2.json',
                        'drive/MyDrive/210 Capstone/data/PAN12/training/file_3.json']

# Process each training file
for file_path in pan12_training_files:
    df = process_json_file(file_path)
    df.show(truncate = False)
    print(f"File: {file_path}, Count: {df.count()}")

    # Save file as parquet
    df.write.mode("append").parquet('drive/MyDrive/210 Capstone/data/merged_dataset')

# Repeat the same process for test files
pan12_test_files = ['drive/MyDrive/210 Capstone/data/PAN12/test/file_1.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_2.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_3.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_4.json',
                    'drive/MyDrive/210 Capstone/data/PAN12/test/file_5.json']

# Process each test file
for file_path in pan12_test_files:
    df = process_json_file(file_path)
    print(f"File: {file_path}, Count: {df.count()}")

    # Save file as parquet
    df.write.mode("append").parquet('drive/MyDrive/210 Capstone/data/merged_dataset')

+------------------------------------------------------------------------------------------------------------------------+
|col                                                                                                                     |
+------------------------------------------------------------------------------------------------------------------------+
|{97964e7a9e8eb9cf78f2e4d7b2ff34c7, Hola.}                                                                               |
|{0158d0d6781fc4d493f243d4caa49747, hi.}                                                                                 |
|{0158d0d6781fc4d493f243d4caa49747, whats up?}                                                                           |
|{97964e7a9e8eb9cf78f2e4d7b2ff34c7, not a ton.}                                                                          |
|{97964e7a9e8eb9cf78f2e4d7b2ff34c7, you?}                                                                                |
|{0158d0d6781fc4

In [None]:
#

#### additional cleaning

In [None]:
# ## Add conversation start time
# from pyspark.sql.window import Window
# from pyspark.sql.functions import first

# # Define a window partitioned by 'conversation_id' and ordered by 'time'
# window_spec = Window.partitionBy('conversation_id').orderBy('time')

# # Add a new column with the start time of the conversation
# pan12_training_df = pan12_training_df.withColumn('conversation_start_time', first('time').over(window_spec))


In [None]:
# # Load list of author IDs from the text file
# with open(train_data_path + 'pan12-sexual-predator-identification-training-corpus-predators-2012-05-01.txt', 'r') as f:
#     predator_ids = [line.strip() for line in f]

# # Broadcast the list of author IDs to all Spark executors for efficient lookup
# broadcast_predator_ids = spark.sparkContext.broadcast(predator_ids)

In [None]:
# pan12_training_df_2 = pan12_training_df.groupby('conversation_id').agg(countDistinct('author').alias('n_people_in_conversation'),
#                                                                        first('conversation_id').alias('conversation_id'),
#                                                                        first('author').alias('author'),
#                                                                        first('time').alias('time'),
#                                                                        first('text').alias('text'),
#                                                                        first('conversation_start_time').alias('conversation_start_time'))

In [None]:
# pan12_training_df_2.show(3)

In [None]:
# # Load list of author IDs from the text file
# with open(train_data_path + 'pan12-sexual-predator-identification-training-corpus-predators-2012-05-01.txt', 'r') as f:
#     predator_ids = [line.strip() for line in f]

# # Broadcast the list of author IDs to all Spark executors for efficient lookup
# broadcast_predator_ids = spark.sparkContext.broadcast(predator_ids)

# # Define a window specification to partition by conversation_id
# window_spec = Window.partitionBy("conversation_id")

# # Add a new column 'label' to the DataFrame
# pan12_training_df_final_with_time = pan12_training_df.withColumn('label',
#                                                       lit(1).alias('label')). \
#                                             withColumn('source', lit('PAN12-train').alias('source')).\
#                                            withColumn('label',
#                                                       when(col('author').isin(broadcast_predator_ids.value), 1).otherwise(0)). \
#                                             withColumn('n_texts', UDF_n_texts_in_conversation("text"))\


# Show the DataFrame
# pan12_training_df_final_with_time.show(truncate=False)

In [None]:
# ## Add conversation start time
# from pyspark.sql.window import Window
# from pyspark.sql.functions import first

# # Define a window partitioned by 'conversation_id' and ordered by 'time'
# window_spec = Window.partitionBy('conversation_id').orderBy('time')

# # Add a new column with the start time of the conversation
# df_with_start_time = pan12_training_df_final_with_time.withColumn('conversation_start_time', first('time').over(window_spec))

# # Show the DataFrame with the new column
# # df_with_start_time.show(truncate=False)

In [None]:
# Define the regular expression pattern to match the author ID
# author_id_pattern = r'\b[a-f\d]{32}\b'

# Remove the author ID from the text column
# df_with_start_time = df_with_start_time.withColumn('text_copy', col('text'))\
                                                    # .withColumn('text_copy', col('text'))

# Show the DataFrame with cleaned text
# df_with_start_time.show(truncate=False)

In [None]:
# ## Add # people in conversation
# ## Add in the type of conversation (pair, monologue, group)
# # Add a new column 'distinct_authors_count' to the DataFrame
# pan12_training_final_with_timestamp = df_with_start_time.groupby('conversation_id').agg(countDistinct('author').alias('n_people_in_conversation'),
#                                                                                    first('source').alias('source'),
#                                                                                   first('label').alias('label'),
#                                                                                    first('conversation_start_time').alias('conversation_start_time'),
#                                                                                    first('time').alias('time'),
#                                                                                    first('text').alias('merged_text'),
#                                                                                    first('author').alias('author')
#                                                                                    )\
#                                                                                 .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))\
#                                                                                 .withColumn('n_people_in_conversation', col('n_people_in_conversation').cast('int'))\
#                                                                                 .drop('n_texts')

# Show the DataFrame with the new column
# pan12_training_final_with_timestamp.show(truncate=False)

In [None]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'author',
# 'time']

# pan12_training_final_with_timestamp = pan12_training_final_with_timestamp.select(columns)

# # show the dataframe with the new column order
# pan12_training_final_with_timestamp.show(10, truncate=False)

In [None]:
# pan12_training_final_with_timestamp.printSchema

In [None]:
# ## count the conversations by conversation type and label
# pan12_training_final_with_timestamp.groupBy('label', 'type_conversation')\
#           .agg(countDistinct(col('conversation_id')).alias('n_conversations'))\
#           .orderBy(col('label'), col('type_conversation'))\
#           .show(truncate = False)

## file saving

In [None]:
# spliting the dataframes into four dataframes
weights = [.3, .3, .2, .2]
splits = pan12_test_final_merged_text.randomSplit(weights, seed=None)
# splits[0].show(10)

In [None]:
# Define the path where the pan12_test_data can be saved
output_path = f'drive/MyDrive/210 Capstone/data/merged_data_conversations_2/'

# Save the DataFrame to a json file
splits[0].write.mode('append').parquet(output_path)
splits[1].write.mode('append').parquet(output_path)
splits[2].write.mode('append').parquet(output_path)
splits[3].write.mode('append').parquet(output_path)

In [None]:
# # Define the path to your Parquet files on Google Drive
# parquet_folder_path = f'drive/MyDrive/210 Capstone/data/merged_data_conversations_2/'

# # Read Parquet files into a PySpark DataFrame
# df = spark.read.parquet(parquet_folder_path)

# Show the DataFrame
# pan12_test_final.groupBy('source')\
#   .agg(count('conversation_id').alias('count'))\
#   .show(truncate = False)

+----------+------+
|source    |count |
+----------+------+
|PAN12-test|155128|
+----------+------+



In [None]:
pan12_training_final.groupBy('source')\
  .agg(count('conversation_id').alias('count'))\
  .show(truncate = False)

+-----------+-----+
|source     |count|
+-----------+-----+
|PAN12-train|66927|
+-----------+-----+



In [None]:
pan12_test_final.groupBy('source')\
  .agg(count('conversation_id').alias('count'))\
  .show(truncate = False)

In [None]:
# pan12_test_final_merged_text.groupBy('source')\
#   .agg(countDistinct('conversation_id').alias('count'))\
#   .show(truncate = False)

+----------+------+
|source    |count |
+----------+------+
|PAN12-test|155128|
+----------+------+



In [None]:
# # save the file to show all the conversation at each chat level
# # Define the path where the pan12_training_data can be saved
# output_path = f'drive/MyDrive/210 Capstone/data/merged_data_texts/'

# # Save the DataFrame to a json file
# pan12_training_final_with_timestamp.write.mode('append').parquet(output_path)

### test

In [None]:



# # Define a window specification to partition by conversation_id
# window_spec = Window.partitionBy("conversation_id")

# # Add a new column 'label' to the DataFrame
# pan12_test_df_final_with_time = pan12_test_df.withColumn('label',
#                                                       lit(1).alias('label')). \
#                                            withColumn('label',
#                                                       when(col('author').isin(broadcast_predator_ids.value), 1).otherwise(0)). \
#                                             withColumn('source', lit('PAN12-test').alias('source'))\
#                                             .withColumn('n_texts', count('text').over(window_spec))




# Show the DataFrame
# pan12_test_df_final_with_time.show(10)

In [None]:
# Define a window partitioned by 'conversation_id' and ordered by 'time'
window_spec = Window.partitionBy('conversation_id').orderBy('time')

# Add a new column with the start time of the conversation
df_test_with_start_time = pan12_test_df_2.withColumn('conversation_start_time', first('time').over(window_spec))
# df_with_start_time.show(3, truncate=False)

In [None]:
# Define the UDF to count unique author IDs
def count_unique_authors(authors):
    return len(set(authors))

# Register the UDF
count_unique_authors_udf = udf(count_unique_authors, IntegerType())

# Group by 'conversation_id' and apply the UDF to count unique authors
df_test_with_unique_authors_count = df_test_with_start_time.groupBy('conversation_id') \
                                                 .agg(count_unique_authors_udf(collect_list('author')).alias('n_people_in_conversation'))

# Join the result back to the original DataFrame
df_test_with_start_time = df_test_with_start_time.join(df_test_with_unique_authors_count, 'conversation_id', 'left')

# Show the DataFrame with the new column 'unique_author_count'
# df_with_start_time.show()

In [None]:
# Define the UDF to concatenate text or merged_text
def concatenate_texts(texts):
    return ' '.join(texts)

# Register the UDF
concatenate_texts_udf = udf(concatenate_texts, StringType())

# Group by 'conversation_id' and collect all 'text' and 'merged_text'
# df_concatenated_texts = df_with_start_time.groupBy('conversation_id').agg(collect_list('text').alias('merged_text'),
#                                                                               collect_list('merged_text').alias('merged_text_id'),
#                                                                           first('label').alias('label'),
#                                                                           first('source').alias('source'),
#                                                                           first('conversation_start_time').alias('conversation_start_time'),
#                                                                           first('n_people_in_conversation').alias('n_people_in_conversation').cast('int'))\
#                                                                           .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))\


df_test_concatenated_texts = df_test_with_start_time.groupBy('conversation_id').agg(concat_ws(" ", collect_list('text')).alias('merged_text'),
                                                                          concat_ws(" ", collect_list('merged_text')).alias('merged_text_id'),
                                                                          first('label').alias('label'),
                                                                          first('source').alias('source'),
                                                                          first('conversation_start_time').alias('conversation_start_time'),
                                                                          first('n_people_in_conversation').alias('n_people_in_conversation'))\
                                                                          .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
                                                                                                                .when(col('n_people_in_conversation') == 2, 'Pair')\
                                                                                                                .when(col('n_people_in_conversation') > 2, 'Group'))\
                                                                          .withColumn('n_texts', size(split(col('merged_text'), ' ')))

In [None]:
# # Define a window partitioned by 'conversation_id' and ordered by 'time'
# window_spec = Window.partitionBy('conversation_id').orderBy('time')

# # Add a new column with the start time of the conversation
# df_with_start_time_test = pan12_test_df_final_with_time.withColumn('conversation_start_time', first('time').over(window_spec))

# # Show the DataFrame with the new column
# # df_with_start_time_test.show(10)

In [None]:
# Add new columns 'distinct_authors_count' and 'type_conversation' to the DataFrame
# pan12_test_final_with_timestamp = df_with_start_time_test.groupby('conversation_id') \
#     .agg(
#         countDistinct('author').alias('n_people_in_conversation'),
#         first('time').alias('time'),
#         first('conversation_start_time').alias('conversation_start_time'),
#         first('label').alias('label'),
#         first('source').alias('source'),
#         first('text').alias('text'),
#         first('author').alias('author')
#     ) \
#     .withColumn('type_conversation',
#                 when(col('n_people_in_conversation') == 1, 'Monologue')
#                 .when(col('n_people_in_conversation') == 2, 'Pair')
#                 .otherwise('Group'))\


# Show the DataFrame with the new columns
# pan12_test_final_with_timestamp.show(10)

In [None]:
# Define the regular expression pattern to match the author ID
# author_id_pattern = r'\b[a-f\d]{32}\b'

# # Remove the author ID from the text column
# df_with_start_time_test = df_with_start_time_test.withColumn('text_copy', col('text'))\
#                                                     .withColumn('text_copy', regexp_replace('text_copy', author_id_pattern, ""))

# Show the DataFrame with cleaned text
# df_with_start_time_test.show(truncate=False)

In [None]:
# ## Add # people in conversation
# ## Add in the type of conversation (pair, monologue, group)
# # Add a new column 'distinct_authors_count' to the DataFrame
# pan12_test_final_with_timestamp = df_with_start_time_test.groupby('conversation_id').agg(countDistinct('author').alias('n_people_in_conversation'),
#                                                                                    first('time').alias('time'),
#                                                                                    first('conversation_start_time').alias('conversation_start_time'),
#                                                                                    first('label').alias('label'),
#                                                                                    first('source').alias('source'),
#                                                                                    first('text').alias('merged_text'),
#                                                                                    first('author').alias('author')
#                                                                                    )\
#                                                                                 .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))\
#                                                                                 .withColumn('n_people_in_conversation', col('n_people_in_conversation').cast('int'))\
#                                                                                 .drop('n_texts')
# # Show the DataFrame with the new column
# pan12_test_final_with_timestamp.show(10)

In [None]:
# ## count the conversations by conversation type and label
# pan12_test_final_with_timestamp.groupBy('label', 'type_conversation')\
#           .agg(countDistinct(col('conversation_id')).alias('n_conversations'))\
#           .orderBy(col('label'), col('type_conversation'))\
#           .show(truncate = False)

+-----+-----------------+---------------+
|label|type_conversation|n_conversations|
+-----+-----------------+---------------+
|0    |Group            |19705          |
|0    |Monologue        |27711          |
|0    |Pair             |104950         |
|1    |Monologue        |1850           |
|1    |Pair             |912            |
+-----+-----------------+---------------+



In [None]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'author',
# 'time']

# pan12_test_final_with_timestamp = pan12_test_final_with_timestamp.select(columns)

# show the dataframe with the new column order
# pan12_test_final_with_timestamp.show(10, truncate=False)

In [None]:
# pan12_test_final_with_timestamp.printSchema

In [None]:
# # spliting the dataframes into two dataframes
# weights = [.3, .3, .2, .2]
# splits = pan12_test_final_with_timestamp.randomSplit(weights, seed=None)
# # splits[0].show(10)

In [None]:
# # Define the path where the pan12_test_data can be saved
# output_path = f'drive/MyDrive/210 Capstone/data/merged_data_texts/'

# # Save the DataFrame to a json file
# splits[0].write.mode('append').parquet(output_path)
# splits[1].write.mode('append').parquet(output_path)
# splits[2].write.mode('append').parquet(output_path)
# splits[3].write.mode('append').parquet(output_path)

In [None]:
# # Add a new column with author name and text concatenated
# pan12_test_final_merged_text = df_with_start_time_test.withColumn("author_text_concat", concat(col("author"), lit(": "), col("text")))

# pan12_test_final_merged_text = pan12_test_final_merged_text.groupby('conversation_id').agg(countDistinct('author').alias('n_people_in_conversation'),
#                                                                                   first('time').alias('time'),
#                                                                                   first('conversation_start_time').alias('conversation_start_time'),
#                                                                                   first('label').alias('label'),
#                                                                                   first('source').alias('source'),
#                                                                                   first('n_texts').cast('int').alias('n_texts'),
#                                                                                   concat_ws(" ", collect_list('author_text_concat')).alias('merged_text_id'),
#                                                                                   concat_ws(" ", collect_list('text')).alias('merged_text')
#                                                                                       )\
#                                                                                       .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))\
#                                                                                       .withColumn('n_people_in_conversation', col('n_people_in_conversation').cast('int'))\
#                                                                                       .drop('text', 'time')

# Show the DataFrame with the new column
# pan12_test_final_merged_text.show(10)

In [None]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'merged_text_id',
# 'n_texts']

# pan12_test_final = df_test_concatenated_texts.select(columns)

# show the dataframe with the new column order
# pan12_test_final_merged_text.show(10, truncate=False)

In [None]:
# pan12_test_final_merged_text.show(15, truncate=False)

In [None]:
# pan12_test_final.printSchema()

In [None]:
# pan12_test_final.show(3, truncate=False)

In [None]:
# pan12_test_final_merged_text.groupBy('label', 'type_conversation')\
#           .agg(countDistinct(col('conversation_id')).alias('n_conversations'))\
#           .orderBy(col('label'), col('type_conversation'))\
#           .show(truncate = False)

+-----+-----------------+---------------+
|label|type_conversation|n_conversations|
+-----+-----------------+---------------+
|0    |Group            |19705          |
|0    |Monologue        |27711          |
|0    |Pair             |104950         |
|1    |Monologue        |1850           |
|1    |Pair             |912            |
+-----+-----------------+---------------+



### *A) PAN12 Training Dataset*

#### A_a) Training with Timestamps

##### A_a_1) extracting conversation_id, author, time, text from the raw data

In [8]:
# # Iterate through the XML tree and extract data
# training_xml_data = []

# for conversation in training_root.findall('conversation'):
#   for message in conversation.findall('message'):
#     author = message.find('author').text
#     time = message.find('time').text
#     text = message.find('text').text

#     # Append data to list
#     training_xml_data.append((conversation.get('id'), author, time, text))

# # Create DataFrame from list of tuples
# pan12_training_df = spark.createDataFrame(training_xml_data, ['conversation_id', 'author', 'time', 'text'])

In [None]:
# # Show DataFrame
# pan12_training_df.show(3, truncate=False)

+--------------------------------+--------------------------------+-----+---------+
|conversation_id                 |author                          |time |text     |
+--------------------------------+--------------------------------+-----+---------+
|e621da5de598c9321a1d505ea95e6a2d|97964e7a9e8eb9cf78f2e4d7b2ff34c7|03:20|Hola.    |
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|hi.      |
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|whats up?|
+--------------------------------+--------------------------------+-----+---------+
only showing top 3 rows



##### A_a_2) look up the label, add label/source and create merged_text column('author': 'text')

In [9]:
# # Load list of author IDs from the text file
# with open(train_data_path + 'pan12-sexual-predator-identification-training-corpus-predators-2012-05-01.txt', 'r') as f:
#     predator_ids = [line.strip() for line in f]

# # Broadcast the list of author IDs to all Spark executors for efficient lookup
# broadcast_predator_ids = spark.sparkContext.broadcast(predator_ids)

# pan12_training_df_2 = pan12_training_df.withColumn('label', lit(1).alias('label'))\
#                                      .withColumn('source', lit('PAN12-train').alias('source'))\
#                                       .withColumn('label', when(col('author').isin(broadcast_predator_ids.value), 1).otherwise(0))\
#                                       .withColumn('merged_text', concat_ws(": ", col('author'), col('text')))


In [23]:
# pan12_training_df_2_predator = pan12_training_df_2.filter(pan12_training_df_2['label'] == 1)
# pan12_training_df_2_predator.show(10, truncate=False)

In [None]:
# pan12_training_df_2.show(3, truncate=False)

+--------------------------------+--------------------------------+-----+---------+-----+-----------+-------------------------------------------+
|conversation_id                 |author                          |time |text     |label|source     |merged_text                                |
+--------------------------------+--------------------------------+-----+---------+-----+-----------+-------------------------------------------+
|e621da5de598c9321a1d505ea95e6a2d|97964e7a9e8eb9cf78f2e4d7b2ff34c7|03:20|Hola.    |0    |PAN12-train|97964e7a9e8eb9cf78f2e4d7b2ff34c7: Hola.    |
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|hi.      |0    |PAN12-train|0158d0d6781fc4d493f243d4caa49747: hi.      |
|e621da5de598c9321a1d505ea95e6a2d|0158d0d6781fc4d493f243d4caa49747|03:20|whats up?|0    |PAN12-train|0158d0d6781fc4d493f243d4caa49747: whats up?|
+--------------------------------+--------------------------------+-----+---------+-----+-----------+-----------------------

##### A_a_3) add in a conversation start time to the table

In [11]:
# # Define a window partitioned by 'conversation_id' and ordered by 'time'
# window_spec = Window.partitionBy('conversation_id').orderBy('time')

# # Add a new column with the start time of the conversation
# df_with_start_time = pan12_training_df_2.withColumn('conversation_start_time', first('time').over(window_spec))


In [24]:
# df_with_start_time.show(3, truncate=False)

##### A_a_4) add in number of people in the conversation and the type of conversation

In [25]:
# # Register the UDF
# count_unique_authors_udf = udf(count_unique_authors, IntegerType())

# # Group by 'conversation_id' and apply the UDF to count unique authors
# df_with_unique_authors_count = df_with_start_time.groupBy('conversation_id') \
#                                                  .agg(count_unique_authors_udf(collect_list('author')).alias('n_people_in_conversation'))\
#                                                  .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))

# # Join the result back to the original DataFrame
# df_with_start_time = df_with_start_time.join(df_with_unique_authors_count, 'conversation_id', 'left')


In [26]:
# Show the DataFrame with the new column 'unique_author_count'
# df_with_start_time.show(3, truncate=False)

##### A_a_5) add in the conversation_history to show the staggered chats

In [13]:
# # Add previous texts
# window = Window.partitionBy("conversation_id").orderBy("time").rowsBetween(Window.unboundedPreceding, -1)

# df_training_with_time_stamp = df_with_start_time.withColumn("conversation_history_list", collect_list(col("merged_text")).over(window))\
#     .withColumn("conversation_history", UDF_merge_list_text(col("conversation_history_list")))\
#     .drop("conversation_history_list")\
#     .withColumn('n_texts', size(split(col('merged_text'), ' ')))

In [22]:
# df_training_with_time_stamp.show(3, truncate=False)

##### A_a_6) change the column order and check the datatypes

In [27]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'author',
# 'time',
# 'conversation_history']

# pan12_training_with_timestamp = df_training_with_time_stamp.select(columns)

In [28]:
# show the dataframe with the new column order
# pan12_training_with_timestamp.show(3, truncate=False)

In [29]:
# pan12_training_with_timestamp.printSchema()

##### A_a_7) save the file to gDrive

In [None]:
# # save the file to show all the conversation at each chat level
# # Define the path where the pan12_training_data can be saved
# output_path = f'drive/MyDrive/210 Capstone/data/merged_data_texts_SS/'

# # Save the DataFrame to a json file
# pan12_training_with_timestamp.write.mode('append').parquet(output_path)

In [None]:
# # Define the UDF to concatenate text or merged_text
# def concatenate_texts(texts):
#     return ' '.join(texts)

# # Register the UDF
# concatenate_texts_udf = udf(concatenate_texts, StringType())

# # Group by 'conversation_id' and collect all 'text' and 'merged_text'
# # df_concatenated_texts = df_with_start_time.groupBy('conversation_id').agg(collect_list('text').alias('merged_text'),
# #                                                                               collect_list('merged_text').alias('merged_text_id'),
# #                                                                           first('label').alias('label'),
# #                                                                           first('source').alias('source'),
# #                                                                           first('conversation_start_time').alias('conversation_start_time'),
# #                                                                           first('n_people_in_conversation').alias('n_people_in_conversation').cast('int'))\
# #                                                                           .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
# #                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
# #                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))\


# df_concatenated_texts = df_with_start_time.groupBy('conversation_id').agg(concat_ws(" ", collect_list('text')).alias('merged_text'),
#                                                                           concat_ws(" ", collect_list('merged_text')).alias('merged_text_id'),
#                                                                           first('label').alias('label'),
#                                                                           first('source').alias('source'),
#                                                                           first('conversation_start_time').alias('conversation_start_time'),
#                                                                           first('n_people_in_conversation').alias('n_people_in_conversation'))\
#                                                                           .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))\
#                                                                           .withColumn('n_texts', size(split(col('merged_text'), ' ')))


# df_concatenated_texts.show(3, truncate=False)

# # Apply the UDF to concatenate all text or merged_text for each conversation_id
# df_concatenated_texts = df_concatenated_texts.withColumn('merged_text', concatenate_texts_udf('merged_text')) \
#                                              .withColumn('merged_text_id', concatenate_texts_udf('merged_text_id'))

# df_concatenated_texts.show(3, truncate=False)
# # Show the DataFrame with concatenated text and merged_text
# df_concatenated_texts.show(truncate=False)



# Add a new column with author name and text concatenated
# pan12_training_final_merged_text = df_with_start_time.withColumn("author_text_concat", concat(col("author"), lit(": "), col("text")))\

# pan12_training_final_merged_text = pan12_training_final_merged_text.groupby('conversation_id').agg(countDistinct('author').alias('n_people_in_conversation'),
#                                                                                   first('time').alias('time'),
#                                                                                   first('conversation_start_time').alias('conversation_start_time'),
#                                                                                   first('label').alias('label'),
#                                                                                   first('source').alias('source'),
#                                                                                   first('n_texts').cast('int').alias('n_texts'),
#                                                                                   concat_ws(" ", collect_list('author_text_concat')).alias('merged_text_id'),
#                                                                                   concat_ws(' ', collect_list('text')).alias('merged_text'))\
#                                                                                   .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))\
#                                                                                   .withColumn('n_people_in_conversation', col('n_people_in_conversation').cast('int'))\
#                                                                                       # .withColumn('merged_text', col('merged_text'))\
#                                                                                       # .drop('time')

In [30]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'merged_text_id',
# 'n_texts']

# pan12_training_final = df_concatenated_texts.select(columns)

# # show the dataframe with the new column order
# pan12_training_final.show(10, truncate=False)

In [31]:
# pan12_training_final.printSchema()

In [None]:
# # save the file to show all the conversation at each chat level
# # Define the path where the pan12_training_data can be saved
# output_path = f'drive/MyDrive/210 Capstone/data/merged_data_texts/'

# # Save the DataFrame to a json file
# pan12_training_final.write.mode('append').parquet(output_path)

#### A_b) Training with Conversations

##### A_b_1) confirm the dataset to modify

In [32]:
# df_training_with_time_stamp.show(3, truncate=False)

##### A_b_2) add in the merged_text_id column, aggregate by the conversation_id and the number of texts (based on the merged_text column)

In [33]:
# # Add a new column with author name and text concatenated
# # pan12_training_final_merged_text = df_with_start_time.withColumn("author_text_concat", concat(col("author"), lit(": "), col("text")))\

# pan12_training_with_conversations = df_training_with_time_stamp.groupby('conversation_id').agg(concat_ws(" ", collect_list('merged_text')).alias('merged_text_id'),
#                                                                                   concat_ws(' ', collect_list('text')).alias('merged_text'),
#                                                                                   first('label').alias('label'),
#                                                                                   first('source').alias('source'),
#                                                                                   first('conversation_start_time').alias('conversation_start_time'),
#                                                                                   first('n_people_in_conversation').alias('n_people_in_conversation'),
#                                                                                   first('type_conversation').alias('type_conversation'))\
#                                                                                   .withColumn('n_texts', size(split(col('merged_text'), ' ')))\
#                                                                                   .drop('conversation_history')

In [34]:
# pan12_training_with_conversations_predator = pan12_training_with_conversations.filter(pan12_training_with_conversations['label'] == 1)
# pan12_training_with_conversations_predator.show(10, truncate=False)

In [35]:
# pan12_training_with_conversations.show(2, truncate=False)

##### A_b_3) change the column order and check the datatypes

In [36]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'merged_text_id',
# 'n_texts']

# pan12_training_with_conversations = pan12_training_with_conversations.select(columns)

In [37]:
# show the dataframe with the new column order
# pan12_training_with_conversations.show(3, truncate=False)

In [38]:
# pan12_training_with_conversations.printSchema()

##### A_b_4) save the file to gDrive

In [40]:
# # Define the path where the pan12_training_data can be saved
# output_path = f'drive/MyDrive/210 Capstone/data/final/merged_data_conversations/'

# # Save the DataFrame to a json file
# pan12_training_with_conversations.write.mode('append').parquet(output_path)

##### A_b_5) confirm the count of data points

In [39]:
# pan12_training_with_conversations.groupBy('source')\
#   .agg(count('conversation_id').alias('count'))\
#   .show(truncate = False)

### *B) PAN12 Test Dataset*

#### B_a) Test with Timestamps

##### B_a_1) extracting conversation_id, author, time, text from the raw data

In [None]:
# # Iterate through the XML tree and extract data
# test_xml_data = []

# for conversation in test_root.findall('conversation'):
#   for message in conversation.findall('message'):
#     author = message.find('author').text
#     time = message.find('time').text
#     text = message.find('text').text

#     # Append data to list
#     test_xml_data.append((conversation.get('id'), author, time, text))

# # Create DataFrame from list of tuples
# pan12_test_df = spark.createDataFrame(test_xml_data, ['conversation_id', 'author', 'time', 'text'])

##### B_a_2) look up the label, add label/source and create merged_text column('author':'text')

In [6]:
# # # Load list of author IDs from the text file
# # with open('s3a://capstone210/data/pan12/test_text.txt', 'r') as f:
# #     predator_ids = [line.strip() for line in f]

# # Load list of author IDs from the text file
# predator_ids_df = spark.read.text('s3a://capstone210/data/pan12/test_text.txt')

# # Extract predator IDs from the DataFrame
# predator_ids = [row.value.strip() for row in predator_ids_df.collect()]

# # Broadcast the list of author IDs to all Spark executors for efficient lookup
# broadcast_predator_ids = spark.sparkContext.broadcast(predator_ids)

# pan12_test_df_2 = pan12_test_df.withColumn('label', lit(1).alias('label'))\
#                                      .withColumn('source', lit('PAN12-test').alias('source'))\
#                                       .withColumn('label', when(col('author').isin(broadcast_predator_ids.value), 1).otherwise(0))\
#                                       .withColumn('merged_text', concat_ws(": ", col('author'), col('text')))

##### B_a_3) add in a conversation start time to the table

In [41]:
# # Define a window partitioned by 'conversation_id' and ordered by 'time'
# window_spec = Window.partitionBy('conversation_id').orderBy('time')

# # Add a new column with the start time of the conversation
# df_test_with_start_time = pan12_test_df.withColumn('conversation_start_time', first('time').over(window_spec))

In [9]:
# df_test_with_start_time.show(3)

[Stage 6:>                                                          (0 + 1) / 1]

+--------------------+--------------------+-----+----+-----+----------+--------------------+-----------------------+
|     conversation_id|              author| time|text|label|    source|         merged_text|conversation_start_time|
+--------------------+--------------------+-----+----+-----+----------+--------------------+-----------------------+
|000049c4530615e68...|53a66119381d88719...|07:26|  hi|    0|PAN12-test|53a66119381d88719...|                  07:26|
|000049c4530615e68...|1c8edb8bfd4b3f9ec...|07:26|  hi|    0|PAN12-test|1c8edb8bfd4b3f9ec...|                  07:26|
|000049c4530615e68...|1c8edb8bfd4b3f9ec...|07:26| fr?|    0|PAN12-test|1c8edb8bfd4b3f9ec...|                  07:26|
+--------------------+--------------------+-----+----+-----+----------+--------------------+-----------------------+
only showing top 3 rows



                                                                                

##### B_a_4) add in number of people in the conversation and the type of conversation

In [44]:
# df_test_with_start_time.groupBy('source')\
#   .agg(count('conversation_id').alias('count'))\
#   .show(truncate = False)

In [42]:
# # Count unique authors for each conversation
# unique_authors_per_conversation = df_test_with_start_time.select("conversation_id", "author") \
#                                              .groupBy("conversation_id") \
#                                              .agg(countDistinct("author").alias("n_people_in_conversation"))

# # Join the unique author count with the original DataFrame
# pan12_test_with_conversations = df_test_with_start_time.join(unique_authors_per_conversation, "conversation_id", "left")

In [45]:
# pan12_test_with_conversations.show(3)

In [11]:
# # Define the UDF as a separate function
# @pandas_udf(returnType=IntegerType())
# def count_unique_authors(authors):
#     return len(set(authors))

# # Register the UDF
# count_unique_authors_udf = count_unique_authors

# # # Register the UDF
# # count_unique_authors_udf = udf(lambda authors: len(set(authors)), IntegerType())

# # Group by 'conversation_id' and apply the UDF to count unique authors
# df_test_with_unique_authors_count = df_test_with_start_time.groupBy('conversation_id') \
#                                                  .agg(count_unique_authors_udf(collect_list('author')).alias('n_people_in_conversation'))\
#                                                  .withColumn('type_conversation', when(col('n_people_in_conversation') == 1, 'Monologue')\
#                                                                                                                 .when(col('n_people_in_conversation') == 2, 'Pair')\
#                                                                                                                 .when(col('n_people_in_conversation') > 2, 'Group'))

# # Join the result back to the original DataFrame
# df_test_with_start_time = df_test_with_start_time.join(df_test_with_unique_authors_count, 'conversation_id', 'left')


##### B_a_5) add in the conversation_history to show the staggered chats

In [31]:
# # Add previous texts
# window = Window.partitionBy("conversation_id").orderBy("time").rowsBetween(Window.unboundedPreceding, -1)

# df_test_with_time_stamp = df_test_with_start_time.withColumn("conversation_history_list", collect_list(col("merged_text")).over(window))\
#     .withColumn("conversation_history", UDF_merge_list_text(col("conversation_history_list")))\
#     .drop("conversation_history_list")\
#     .withColumn('n_texts', size(split(col('merged_text'), ' ')))

##### B_a_6) change the column order and check the datatypes

In [None]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'author',
# 'time',
# 'conversation_history']

# pan12_test_with_timestamp = df_test_with_time_stamp.select(columns)

In [None]:
# pan12_test_with_timestamp.printSchema()

root
 |-- conversation_id: string (nullable = true)
 |-- source: string (nullable = false)
 |-- label: integer (nullable = false)
 |-- conversation_start_time: string (nullable = true)
 |-- n_people_in_conversation: integer (nullable = true)
 |-- type_conversation: string (nullable = true)
 |-- merged_text: string (nullable = false)
 |-- author: string (nullable = true)
 |-- time: string (nullable = true)
 |-- conversation_history: string (nullable = true)



In [None]:
# pan12_test_with_timestamp.show(2, truncate=False)

##### B_a_7) save the file to gDrive

In [None]:
# # spliting the dataframes into two dataframes
# weights = [.3, .3, .2, .2]
# splits = pan12_test_with_timestamp.randomSplit(weights, seed=None)

In [None]:
# # Define the path where the pan12_test_data can be saved
# output_path = './capstone-210-spring2024/data/test/'

# # Save the DataFrame to a json file
# splits[0].write.mode('append').parquet(output_path)
# splits[1].write.mode('append').parquet(output_path)
# splits[2].write.mode('append').parquet(output_path)
# splits[3].write.mode('append').parquet(output_path)

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 37118)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/IPython/core/interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-18-1303cdcd3a9b>", line 6, in <cell line: 6>
    splits[1].write.mode('append').parquet(output_path)
  File "/content/spark-3.5.0-bin-hadoop3/python/pyspark/sql/readwriter.py", line 1721, in parquet
    self._jwrite.parquet(path)
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/content/spark-3.5.0-bin-hadoop3/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/content/spark-3.5.0-bin-hadoop3/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in g

ConnectionRefusedError: [Errno 111] Connection refused

#### B_b) Test with Conversations

##### B_b_1) confirm the dataset to modify

In [None]:
# df_test_with_time_stamp.show(3, truncate=False)

##### B_b_2) add in the merged_text_id column, aggregate by the conversation_id and the number of texts (based on the merged_text column)

In [10]:
# pan12_test_with_conversations = df_test_with_start_time.groupby('conversation_id').agg(concat_ws(" ", collect_list('merged_text')).alias('merged_text_id'),
#                                                                                   concat_ws(' ', collect_list('text')).alias('merged_text'),
#                                                                                   first('label').alias('label'),
#                                                                                   first('source').alias('source'),
#                                                                                   first('conversation_start_time').cast('string').alias('conversation_start_time'),
#                                                                                   first('n_people_in_conversation').cast('int').alias('n_people_in_conversation'),
#                                                                                   first('type_conversation').alias('type_conversation'))\
#                                                                                   .withColumn('n_texts', size(split(col('merged_text'), ' ')))\
#                                                                                   .drop('conversation_history')

##### B_b_3) change the column order and check the datatypes

In [46]:
# # specifying the column order
# columns = ['conversation_id',
# 'source',
# 'label',
# 'conversation_start_time',
# 'n_people_in_conversation',
# 'type_conversation',
# 'merged_text',
# 'merged_text_id',
# 'n_texts']

# pan12_test_with_conversations = pan12_test_with_conversations.select(columns)

In [48]:
# pan12_test_with_conversations.printSchema()

##### B_b_4) save the file to gDrive

In [49]:
# # Define the path where the pan12_test_data can be saved
# output_path = './capstone-210-spring2024/data/test/'

# pan12_test_with_conversations.write.mode('overwrite').parquet(output_path)

In [27]:
# train_data_path = 's3a://capstone210/data/train_merged_data_conversations/'
# test_data_path = './capstone-210-spring2024/data/test/'

# train_data_df = spark.read.parquet(train_data_path)
# test_data_df = spark.read.parquet(test_data_path)

In [50]:
# train_data_df.groupBy('source')\
#   .agg(count('conversation_id').alias('count'))\
#   .show()

# test_data_df.groupBy('source')\
#   .agg(count('conversation_id').alias('count'))\
#   .show()

In [17]:
# train_data_path = 's3a://capstone210/data/train_merged_data_conversations/'
# train_data_df = spark.read.parquet(train_data_path)
# train_data_df.show(3, truncate=False)

In [51]:
# # Read CSV files into DataFrame
# output_path = './capstone-210-spring2024/data/test/'
# pan12_test_data = spark.read.csv(output_path, header=True)
# # pan12_test_data.show(3, truncate=False)

In [14]:
# # spliting the dataframes into two dataframes
# weights = [.3, .3, .2, .2]
# splits = pan12_test_with_conversations.randomSplit(weights, seed=None)
# # splits[0].show(10)

In [27]:
# # Define the path where the pan12_test_data can be saved
# output_path = './capstone-210-spring2024/data/test/'

# # Save the DataFrame to a parquet file
# splits[0].write.mode('append').parquet(output_path)
# splits[1].write.mode('append').parquet(output_path)
# splits[2].write.mode('append').parquet(output_path)
# splits[3].write.mode('append').parquet(output_path)

##### B_b_5) confirm the count of data points

In [28]:
# pan12_test_with_conversations.groupBy('source')\
#   .agg(count('conversation_id').alias('count'))\
#   .show(truncate = False)



+----------+------+
|source    |count |
+----------+------+
|PAN12-test|155128|
+----------+------+



                                                                                