In [1]:
!ls
!pwd

00-erisk25task1EDA.ipynb	       05-eRiskPySparkSelfRefFiltering.ipynb
01-erisktokenestimate.ipynb	       06-eRiskPySparkSelfRefSpacy.ipynb
03-pyterrier-test2.ipynb	       07-eRiskPySparkSelfRefSpacy-v2.ipynb
04-eRiskAnalysisSelfReferential.ipynb
/storage/home/hcoda1/6/dahumada3/clef/erisk-2025/user/dahumada3/notebooks


In [2]:
import sys

sys.path.append("/storage/home/hcoda1/6/dahumada3/clef/erisk-2025")

from erisk.spark import get_spark

spark = get_spark()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/28 02:36:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/28 02:36:57 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


In [3]:
# Set paths
LABELS_MAJ_PATH = "/storage/home/hcoda1/6/dahumada3/erisk_shared/raw/training_data/2023/g_qrels_majority_2.csv"
LABELS_CONS_PATH = "/storage/home/hcoda1/6/dahumada3/erisk_shared/raw/training_data/2023/g_rels_consenso.csv"
PARQUET_DIR = "/storage/home/hcoda1/6/dahumada3/erisk_shared/parquet/training_data/2023/partitions"

In [4]:
df = spark.read.parquet(PARQUET_DIR)
df.printSchema()
df.show(5, truncate=False)


[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

root
 |-- DOCNO: string (nullable = true)
 |-- TEXT: string (nullable = true)



                                                                                

+------------+-------------------------------------------------------------------------------------+
|DOCNO       |TEXT                                                                                 |
+------------+-------------------------------------------------------------------------------------+
|s_2457_109_0|LADWP, Inyo agree to test run on well 395                                            |
|s_2457_110_0|State representatives recognize NIHDs District of Year designation                   |
|s_2457_111_0|Small plane crashes en route from Bishop to Nanaimo, BC (Canada)                     |
|s_2457_112_0|Audio and video production professionals, as an example: https://youtu.be/jv5HIrOrn2o|
|s_2457_113_0|Sure, its a professional powerhouse for audio and video production professionals.    |
+------------+-------------------------------------------------------------------------------------+
only showing top 5 rows



In [5]:
from pyspark.sql.functions import col, trim, regexp_replace

# Remove empty/whitespace-only posts
df_clean = df.filter(trim(col("TEXT")) != "")

# Remove URLs
df_clean = df_clean.withColumn("TEXT", regexp_replace("TEXT", r"http\S+|www\S+", ""))

# Remove Markdown-style links like [text](url)
df_clean = df_clean.withColumn("TEXT", regexp_replace("TEXT", r"\[.*?\]\(.*?\)", ""))

# Remove common artifacts like 'gt;' (HTML > symbol)
df_clean = df_clean.withColumn("TEXT", regexp_replace("TEXT", "gt;", ""))

df_clean.select("TEXT").show(5, truncate=False)
print("Remaining rows:", df_clean.count())

+---------------------------------------------------------------------------------+
|TEXT                                                                             |
+---------------------------------------------------------------------------------+
|LADWP, Inyo agree to test run on well 395                                        |
|State representatives recognize NIHDs District of Year designation               |
|Small plane crashes en route from Bishop to Nanaimo, BC (Canada)                 |
|Audio and video production professionals, as an example:                         |
|Sure, its a professional powerhouse for audio and video production professionals.|
+---------------------------------------------------------------------------------+
only showing top 5 rows





Remaining rows: 4264560



                                                                                

In [6]:
from pyspark.sql.functions import lower

SELF_REF_PATTERN = (
    r"\b("
    r"i|me|my|mine|myself|"  # basic forms
    r"i'm|im|"  # I'm and Im
    r"i’ve|ive|"  # I've and Ive
    r"i’d|id|"  # I'd and Id
    r"i’ll|ill|"  # I'll and Ill
    r"i am|i was"  # long forms
    r")\b"
)

df_with_flag = df_clean.withColumn(
    "is_self_ref", lower(col("TEXT")).rlike(SELF_REF_PATTERN)
)

In [7]:
self_ref_df = df_with_flag.filter(col("is_self_ref"))

total_count = df_with_flag.count()
self_ref_count = self_ref_df.count()

print(f"Self-referential posts: {self_ref_count:,} out of {total_count:,}")



Self-referential posts: 1,218,201 out of 4,264,560




                                                                                

In [8]:
df_with_flag.show(5)

+------------+--------------------+-----------+
|       DOCNO|                TEXT|is_self_ref|
+------------+--------------------+-----------+
|s_2457_109_0|LADWP, Inyo agree...|      false|
|s_2457_110_0|State representat...|      false|
|s_2457_111_0|Small plane crash...|      false|
|s_2457_112_0|Audio and video p...|      false|
|s_2457_113_0|Sure, its a profe...|      false|
+------------+--------------------+-----------+
only showing top 5 rows



In [12]:
from pyspark.sql import Row


def process_partition_with_spacy(partition):
    import spacy

    # Load spaCy once per partition
    nlp = spacy.load("en_core_web_sm", disable=["ner", "parser"])

    # Self-referential word set (lowercase)
    SELF_REF_WORDS = {
        "i",
        "me",
        "my",
        "mine",
        "myself",
        "i'm",
        "im",
        "i’ve",
        "ive",
        "i'd",
        "id",
        "i’ll",
        "ill",
        "i am",
        "i was",
    }

    for row in partition:
        text = row["TEXT"]

        doc = nlp(text.lower())
        tokens = [token.text for token in doc if not token.is_space]

        if not tokens:
            ratio = 0.0
        else:
            self_ref_count = sum(1 for token in tokens if token in SELF_REF_WORDS)
            ratio = self_ref_count / len(tokens)

        yield Row(**row.asDict(), self_ref_ratio=ratio)

In [13]:
import time

start = time.time()

# Your Spark transformation
df_with_ratio = df_with_flag.rdd.mapPartitions(process_partition_with_spacy).toDF()

end = time.time()
print(f"⏱️ Completed in {end - start:.2f} seconds")



⏱️ Completed in 7.09 seconds



                                                                                

In [14]:
df_with_ratio.show(10)



+------------+--------------------+-----------+-------------------+
|       DOCNO|                TEXT|is_self_ref|     self_ref_ratio|
+------------+--------------------+-----------+-------------------+
|s_2457_109_0|LADWP, Inyo agree...|      false|                0.0|
|s_2457_110_0|State representat...|      false|                0.0|
|s_2457_111_0|Small plane crash...|      false|                0.0|
|s_2457_112_0|Audio and video p...|      false|                0.0|
|s_2457_113_0|Sure, its a profe...|      false|                0.0|
|s_2457_113_1|I have no use for...|       true|0.07142857142857142|
|s_2457_113_2|But I wont preten...|       true|0.06666666666666667|
|s_2457_113_3|I could use a che...|       true|0.17647058823529413|
|s_2457_113_4|But industry prof...|      false|                0.0|
|s_2457_113_5|Dont you think pr...|      false|                0.0|
+------------+--------------------+-----------+-------------------+
only showing top 10 rows




                                                                                

In [None]:
df_with_ratio.filter((col("self_ref_ratio") > 0) & (~col("is_self_ref"))).show(
    10, truncate=False
)



In [None]:
df_with_ratio.filter((col("is_self_ref")) & (col("self_ref_ratio") == 0)).show(
    10, truncate=False
)



+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------+
|DOCNO       |TEXT                                                                                                                                                                       |is_self_ref|self_ref_ratio|
+------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------+
|s_2457_131_0| gt;I can imagine that the forces are far greater - a 3ft long blade (from root to tip) on an aircraft is nothing compared to the 300+ foot long monsters on wind turbines.|true       |0.0           |
|s_3020_36_0 |[Translation](  gt;my method of healing as of late!!!                                                                             

25/03/28 02:26:34 WARN PythonRunner: Detected deadlock while completing task 2.0 in stage 25 (TID 113): Attempting to kill Python Worker

                                                                                