In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, split, array_join, expr, lower
!pip install numpy

from pyspark.sql.types import StringType
import pyspark.sql.functions as F
! pip install scikit-learn
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from transformers import DistilBertTokenizer
import torch
from sklearn.model_selection import train_test_split


spark = SparkSession.builder \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .appName("BERTDataPreprocessing") \
    .getOrCreate()







25/03/13 11:02:27 WARN Utils: Your hostname, hel-IdeaPad-Slim-5-14AHP9 resolves to a loopback address: 127.0.1.1; using 10.10.95.219 instead (on interface wlp2s0)
25/03/13 11:02:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/13 11:02:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/13 11:02:43 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [2]:
#df_comments_MB = spark.read.csv("./mbti9k_comments.csv")  # prob wont use this 
#df_post = spark.read.csv("./typed_posts.csv") # this has numeric data which is not suitable for distilled bert, might try it with differenr model if time allows

# Renaming columns in dataset mbti_1.csv
df_mbti_1 = spark.read.csv("mbti_1.csv", header=True, inferSchema=True)

# Renaming columns in dataset typed_comments.csv
df_comments_typed = spark.read.csv("typed_comments.csv", header=True, inferSchema=True)

from pyspark.sql.functions import col, countDistinct, trim, lower

# Step 1: Normalize 'lang' column to lowercase, trim spaces, and filter only English rows
df_english = df_comments_typed.filter(lower(trim(col("lang"))) == "en")
print(df_english.count())

# Step 3: Count unique users based on the 'author' column
unique_users_count = df_english.select(countDistinct("author")).collect()[0][0]

# Display the number of unique users
print(f"Number of unique English-language users in the Reddit comments dataset: {unique_users_count}")



                                                                                

20492509




Number of unique English-language users in the Reddit comments dataset: 13616


                                                                                

In [3]:
# Dropping irrelevant columns 

df_comments_typed = (
    df_comments_typed
    .filter(col("comment").isNotNull() & (col("comment") != ""))  # Non-empty comments
    .filter(col("word_count") > 5)  # Minimum word count
)
df_comments_typed_dropped = df_comments_typed[['type', 'comment']]

print(df_comments_typed_dropped.schema)

StructType([StructField('type', StringType(), True), StructField('comment', StringType(), True)])


In [4]:
from pyspark.sql.functions import split, explode, regexp_replace, trim, size, col

# Step 1: Normalize multiple delimiters
df_cleaned = df_mbti_1.withColumn("posts", regexp_replace(df_mbti_1["posts"], r"(\|\|\|)+", "|||"))
print(df_cleaned.count())
# Step 2: Split posts into an array
df_split = df_cleaned.withColumn("post", split(df_cleaned["posts"], r"\|\|\|"))

# Step 3: Explode the array to create separate rows
df_exploded = df_split.select("type", explode(df_split["post"]).alias("post"))

# Step 4: Remove URLs using regex
df_no_links = df_exploded.withColumn("post", regexp_replace(col("post"), r"http\S+|www\S+|\S+\.(com|net|org|io|gov|edu)\S*", ""))

# Step 5: Trim whitespace and remove empty rows
df_trimmed = df_no_links.withColumn("post", trim(col("post"))) \
                        .filter(col("post") != "")

# Step 6: Remove posts with fewer than 5 words
df_mbti_1= df_trimmed.filter(size(split(col("post"), " ")) >= 5)

# Step 7: Show cleaned dataset
df_mbti_1.show(truncate=False)
print(df_mbti_1.count())

# Optional: Save the cleaned dataset
# df_filtered.write.csv("mbti_1_filtered.csv", header=True)


8675
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|type|post                                                                                                                                                                                                     |
+----+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|INFJ|enfp and intj moments    sportscenter not top ten plays    pranks                                                                                                                                        |
|INFJ|What has been the most life-changing experience in your life?                                                                                            

[Stage 17:>                                                       (0 + 15) / 15]

383864


                                                                                

In [5]:
# Rename 'comment' to 'posts'
df_comments_typed_dropped = df_comments_typed_dropped.withColumnRenamed('comment', 'post')



# Convert all values in the 'type' column to lowercase
from pyspark.sql.functions import lower, col
df_comments_typed_dropped = df_comments_typed_dropped.withColumn("type", lower(col("type")))
df_mbti_1 = df_mbti_1.withColumn("type", lower(col("type")))

#filter out coment less < 5 words 
df_comments_typed_dropped= df_comments_typed_dropped.filter(size(split(col("post"), " ")) >= 5)


# Ensure the schema is exactly the same on both dataframes before joining them
print("Comments_types schema", df_comments_typed_dropped.schema)
print("MBTI schema", df_mbti_1.schema)

Comments_types schema StructType([StructField('type', StringType(), True), StructField('post', StringType(), True)])
MBTI schema StructType([StructField('type', StringType(), True), StructField('post', StringType(), False)])


In [6]:
# Join dataframes
print("Comments_types row count", df_comments_typed_dropped.count())
print("MBTI row count", df_mbti_1.count())

#df_mbti_1.sort(asc("type")).limit(10).show()
#df_comments_typed_dropped.limit(10).show()

df_short_mbti = df_mbti_1.limit(5)
df_short_comments = df_comments_typed_dropped.limit(5)

df_short_union = df_short_mbti.union(df_short_comments)

df_short_union.show()

df_union = df_mbti_1.union(df_comments_typed_dropped)

print("Union dataframe row count", df_union.count())


                                                                                

Comments_types row count 12691384


                                                                                

MBTI row count 383864


                                                                                

+----+--------------------+
|type|                post|
+----+--------------------+
|infj|enfp and intj mom...|
|infj|What has been the...|
|infj|On repeat for mos...|
|infj|May the PerC Expe...|
|infj|The last thing my...|
|entp|Those stats come ...|
|entp|"That's great to ...|
|entp|I can totally agr...|
|entp|"I took it severa...|
|entp|Gawd it's like we...|
+----+--------------------+





Union dataframe row count 13075248


                                                                                

In [7]:
from pyspark.sql.functions import desc, asc
df_union = df_union.sort(asc("type"))
print(spark)


<pyspark.sql.session.SparkSession object at 0x7c8964fcc4a0>


In [8]:
import os
import shutil

# Define output directory for CSV files
output_dir = "mbti_split_data"
os.makedirs(output_dir, exist_ok=True)  # Ensure directory exists

# Get unique personality types
personality_types = [row[0] for row in df_union.select("type").distinct().collect()]

# Get total number of rows in the dataset


total_rows = df_union.count()

# Dictionary to store row counts and percentages
row_counts = {}

# Iterate over personality types, save CSVs, and count rows
for p_type in personality_types:
    # Filter DataFrame for the current personality type
    df_filtered = df_union.filter(df_union["type"] == p_type)

    # Save as a single CSV file in a temporary directory
    temp_dir = f"{output_dir}/{p_type}"
    df_filtered.coalesce(1).write.mode("overwrite").option("header", "true").csv(temp_dir)

    # Find the actual CSV file inside the created directory
    for file in os.listdir(temp_dir):
        if file.startswith("part-") and file.endswith(".csv"):
            os.rename(os.path.join(temp_dir, file), os.path.join(output_dir, f"{p_type}.csv"))
            break  # Stop once we rename the first file

    # Remove the now-empty directory Spark created
    shutil.rmtree(temp_dir)

    # Count the number of rows in the filtered DataFrame
    count = df_filtered.count()

    # Calculate percentage of total dataset
    percentage = (count / total_rows) * 100

    # Store the count and percentage
    row_counts[p_type] = (count, round(percentage, 2))

# Print row counts and percentages
print("✅ CSV files successfully created with the following row counts and percentages:")
for p_type, (count, percentage) in row_counts.items():
    print(f"{p_type}: {count} rows ({percentage}%)")




✅ CSV files successfully created with the following row counts and percentages:
estp: 176051 rows (1.35%)
entj: 1244567 rows (9.52%)
estj: 185015 rows (1.42%)
enfp: 461949 rows (3.53%)
istj: 769810 rows (5.89%)
enfj: 373265 rows (2.85%)
isfj: 453292 rows (3.47%)
istp: 775190 rows (5.93%)
intp: 2910987 rows (22.26%)
infj: 830725 rows (6.35%)
intj: 2258382 rows (17.27%)
isfp: 323723 rows (2.48%)
entp: 813315 rows (6.22%)
infp: 908018 rows (6.94%)
esfp: 204347 rows (1.56%)
esfj: 386612 rows (2.96%)


                                                                                

In [9]:
import os
import logging
import typing
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from random import randint


# Define paths and constants
PATH_TO_CSVS = "./mbti_split_data"
NUMBER_OF_PERSONALITIES = len(os.listdir(PATH_TO_CSVS))
NUMBER_OF_CLIENTS = 10
SAMPLE_SIZE_PER_PERSONALITY = 15000
SAMPLE_SIZE_PER_CLIENT = SAMPLE_SIZE_PER_PERSONALITY * NUMBER_OF_PERSONALITIES

def create_samples_for_all_clients() -> list:

    samples = []
    for _ in range(NUMBER_OF_CLIENTS):
        samples.append(create_sample_for_one_client())

    return samples

def create_sample_for_one_client():

    client_sample = None  # Initialize empty Spark DataFrame

    for personality_csv in os.listdir(PATH_TO_CSVS):
        file_path = os.path.join(PATH_TO_CSVS, personality_csv)

        try:
            # Load CSV using Spark
            personality_df = spark.read.csv(file_path, header=True, inferSchema=True)
        except Exception as e:
            logger.warning(f"File {personality_csv} caused error '{e}' when loading.")
            continue
        
        # Determine if we need replacement
        replace = SAMPLE_SIZE_PER_PERSONALITY > personality_df.count()

        # Sample the DataFrame (Spark sampling equivalent)
        sampled_df = personality_df.sample(withReplacement=replace, fraction=1.0 * SAMPLE_SIZE_PER_PERSONALITY / personality_df.count())

        # Append to client_sample
        if client_sample is None:
            client_sample = sampled_df
        else:
            client_sample = client_sample.union(sampled_df)

    return client_sample

# Run and display client samples
samples = create_samples_for_all_clients()

# Show a sample from the first client dataset
samples[0].show(5)  # Display first 5 rows from first client's dataset


+----+--------------------+
|type|                post|
+----+--------------------+
|entj|Um, Hilter for su...|
|entj|Yes and also when...|
|entj|I am done, done, ...|
|entj|He strikes me as ...|
|entj|I haven't dealt w...|
+----+--------------------+
only showing top 5 rows



In [14]:
import os
import numpy as np
import torch
from transformers import DistilBertTokenizer
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql import DataFrame
import pandas  as pd

# Start Spark Session
spark = SparkSession.builder.appName("MBTIProcessing").getOrCreate()

def convert_dfs_to_pth(df_list, output_dir=None, output_prefix="data", test_fraction=0.1):

    # Determine output directory
    if output_dir is None:
        output_dir = os.getcwd()  # Default: same directory as notebook/script
    os.makedirs(output_dir, exist_ok=True)

    tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")

    # Determine number of test datasets (ensure at least 1)
    num_test = max(1, int(len(df_list) * test_fraction))  
    test_dfs = df_list[:num_test]   # First N datasets for test
    train_dfs = df_list[num_test:]  # Remaining datasets for training

    # ✅ Combine all test DataFrames using PySpark's `union()`
    if len(test_dfs) > 1:
        test_df = reduce(DataFrame.union, test_dfs)  # Merge multiple Spark DataFrames
    else:
        test_df = test_dfs[0]  # If only one DataFrame, use it directly

    # Convert to Pandas for further processing
    test_pandas = test_df.toPandas()

    # ✅ Drop missing values and ensure 'post' column is a string
    test_pandas = test_pandas.dropna(subset=["post"])  # Remove NaN values
    test_pandas["post"] = test_pandas["post"].astype(str)  # Convert all values to strings

    # ✅ Convert 'type' column to numerical labels and ensure integer dtype
    test_pandas["type_indexed"], _ = pd.factorize(test_pandas["type"])
    test_pandas["type_indexed"] = test_pandas["type_indexed"].astype(int)  # Ensure integer labels

    # ✅ One-hot encoding (fixed)
    num_classes = test_pandas["type_indexed"].nunique()
    test_pandas["type_encoded"] = test_pandas["type_indexed"].apply(
        lambda x: np.eye(num_classes, dtype=np.float32)[int(x)]
    )

    # Tokenize the global test set
    test_encodings = tokenizer(
        test_pandas["post"].tolist(), truncation=True, padding=True, max_length=128, return_tensors="pt"
    )
    test_labels = torch.tensor(np.stack(test_pandas["type_encoded"].values).astype(np.float32))

    # Save global test dataset
    test_path = os.path.join(output_dir, f"{output_prefix}_test.pth")
    torch.save({"input_ids": test_encodings["input_ids"], "attention_mask": test_encodings["attention_mask"], "labels": test_labels}, test_path)
    print(f"✅ Global test dataset saved as {test_path}")

    saved_paths = {"test": test_path}

    # Process each training dataset separately
    for i, train_df in enumerate(train_dfs, start=1):
        # Convert to Pandas (only if needed for tokenization)
        train_pandas = train_df.toPandas()

        # ✅ Drop missing values and ensure 'post' column is a string
        train_pandas = train_pandas.dropna(subset=["post"])  # Remove NaN values
        train_pandas["post"] = train_pandas["post"].astype(str)  # Convert all values to strings

        # ✅ Convert 'type' column to numerical labels and ensure integer dtype
        train_pandas["type_indexed"], _ = pd.factorize(train_pandas["type"])
        train_pandas["type_indexed"] = train_pandas["type_indexed"].astype(int)

        # ✅ One-hot encoding (fixed)
        num_classes = train_pandas["type_indexed"].nunique()
        train_pandas["type_encoded"] = train_pandas["type_indexed"].apply(
            lambda x: np.eye(num_classes, dtype=np.float32)[int(x)]
        )

        # Tokenize train dataset
        train_encodings = tokenizer(
            train_pandas["post"].tolist(), truncation=True, padding=True, max_length=128, return_tensors="pt"
        )
        train_labels = torch.tensor(np.stack(train_pandas["type_encoded"].values).astype(np.float32))

        # Save train dataset
        train_path = os.path.join(output_dir, f"{output_prefix}{i}_train.pth")
        torch.save({"input_ids": train_encodings["input_ids"], "attention_mask": train_encodings["attention_mask"], "labels": train_labels}, train_path)
        
        saved_paths[f"train_client_{i}"] = train_path
        print(f"✅ Client {i} training dataset saved as {train_path}")

    return saved_paths


In [15]:
convert_dfs_to_pth(samples,output_dir=None, output_prefix="data", test_fraction=0.1)


                                                                                

✅ Global test dataset saved as /home/hel/ThesisFL/data/data_test.pth


                                                                                

✅ Client 1 training dataset saved as /home/hel/ThesisFL/data/data1_train.pth


                                                                                

✅ Client 2 training dataset saved as /home/hel/ThesisFL/data/data2_train.pth


                                                                                

✅ Client 3 training dataset saved as /home/hel/ThesisFL/data/data3_train.pth


                                                                                

✅ Client 4 training dataset saved as /home/hel/ThesisFL/data/data4_train.pth


                                                                                

✅ Client 5 training dataset saved as /home/hel/ThesisFL/data/data5_train.pth


                                                                                

✅ Client 6 training dataset saved as /home/hel/ThesisFL/data/data6_train.pth


                                                                                

✅ Client 7 training dataset saved as /home/hel/ThesisFL/data/data7_train.pth


                                                                                

✅ Client 8 training dataset saved as /home/hel/ThesisFL/data/data8_train.pth


                                                                                

✅ Client 9 training dataset saved as /home/hel/ThesisFL/data/data9_train.pth


{'test': '/home/hel/ThesisFL/data/data_test.pth',
 'train_client_1': '/home/hel/ThesisFL/data/data1_train.pth',
 'train_client_2': '/home/hel/ThesisFL/data/data2_train.pth',
 'train_client_3': '/home/hel/ThesisFL/data/data3_train.pth',
 'train_client_4': '/home/hel/ThesisFL/data/data4_train.pth',
 'train_client_5': '/home/hel/ThesisFL/data/data5_train.pth',
 'train_client_6': '/home/hel/ThesisFL/data/data6_train.pth',
 'train_client_7': '/home/hel/ThesisFL/data/data7_train.pth',
 'train_client_8': '/home/hel/ThesisFL/data/data8_train.pth',
 'train_client_9': '/home/hel/ThesisFL/data/data9_train.pth'}

In [11]:

csv_file = "estp1.csv"
df_spark = spark.read.csv(csv_file, header=True, inferSchema=True)

# Count total samples
total_samples = df_spark.count()
print(f"Total samples in dataset: {total_samples}")

# Define batch size (same as what you use in training)
batch_size = 16  # Adjust based on your training setup

# Calculate number of batches
num_batches = math.ceil(total_samples / batch_size)
print(f"Total batches: {num_batches}")

# Stop Spark session
spark.stop()


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/hel/ThesisFL/data/estp1.csv.