In [1]:
from tqdm.auto import tqdm

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import (col, lit, from_unixtime, to_timestamp,
                                   year, weekofyear, hour, concat_ws)
from pyspark.sql import Window, functions as F
from pyspark.sql import functions as F
from pyspark.sql.functions import (
     when, size, collect_set
)

from pyspark.sql import SparkSession

try:
    spark.stop()
except:
    pass

spark = (
    SparkSession.builder
      .master("spark://spark:7077")
      .appName("WiClean+")
      .config("spark.driver.memory",   "16g")
      .config("spark.executor.memory", "16g")
      .config("spark.kryoserializer.buffer.max", "1024m")
      .config("spark.sql.shuffle.partitions", "200")
      .config("spark.default.parallelism",     "200")
      .getOrCreate()
)

# sanity check to make sure spark works in docker
spark.range(5).show()


  from .autonotebook import tqdm as notebook_tqdm
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/29 05:14:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [2]:
links_df    = spark.read.parquet("data/links.parquet")
articles_df = spark.read.parquet("data/articles.parquet")

assert links_df.count() > 0, "links table empty!"
assert articles_df.count() > 0, "articles table empty!"

                                                                                

In [3]:

adds = (links_df
            .select("src_article","dst_article",
                    to_timestamp(from_unixtime(col("created_at"))).alias("timestamp"))
            .withColumn("action", lit("+")))
removes = (links_df
               .select("src_article","dst_article",
                       to_timestamp(from_unixtime(col("removed_at"))).alias("timestamp"))
               .withColumn("action", lit("-")))
events = (adds.union(removes)
              .filter((col("timestamp") >= "2023-01-01") &
                      (col("timestamp") <  "2025-01-01")))

print("Event rows :", events.count())
events.show(5, truncate=False)



Event rows : 310810529
+-----------+-----------+-------------------+------+
|src_article|dst_article|timestamp          |action|
+-----------+-----------+-------------------+------+
|477        |7711       |2023-01-12 16:37:35|+     |
|477        |1935       |2023-01-12 16:37:35|+     |
|477        |11632      |2023-01-12 16:37:35|+     |
|477        |8309       |2023-01-12 16:37:35|+     |
|477        |11633      |2023-01-12 16:37:35|+     |
+-----------+-----------+-------------------+------+
only showing top 5 rows



                                                                                

In [4]:
events.write \
      .mode("overwrite") \
      .parquet("data/events.parquet")
print(" events persisted to data/events.parquet")



 events persisted to data/events.parquet


                                                                                

In [5]:


TOP_K = 500_000

# build a DataFrame of the TOP_K most-edited source articles
topk_src = (events
            .groupBy("src_article")
            .count()
            .orderBy(col("count").desc())
            .limit(TOP_K)
            .cache())

# compute what fraction of all add/remove events
covered = events.join(topk_src, on="src_article").count()
total   = events.count()
print(f"Top-{TOP_K:,} pages generate {covered/total:6.2%} of all link-edit events")






Top-500,000 pages generate 45.94% of all link-edit events


                                                                                

In [6]:
# rank all src pages by event count
counts = (events.groupBy("src_article")
                  .count()
                  .orderBy(col("count").desc())
                  .cache())

total_events = events.count()

# running cumulative sum
w = Window.orderBy(col("count").desc())
counts = counts.withColumn("cum_events",
                           F.sum("count").over(w)) \
               .withColumn("cum_pct",
                           col("cum_events") / total_events)

# how many pages to hit 90 % of traffic?
target_rows = counts.filter(col("cum_pct") <= 0.70).count()
print(f"Pages needed for 90 % coverage ≈ {target_rows:,}")

# generate DataFrame to pass to the Wikidata fetch
topk_src = counts.limit(target_rows).select("src_article").cache()

# sanity-print actual share
covered = events.join(topk_src, on="src_article").count()
print(f"Coverage with K={target_rows:,}: {covered/total_events:6.2%}")


25/04/28 20:45:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/28 20:45:02 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/28 20:45:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/28 20:45:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/28 20:45:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/04/28 20:45:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
          

Pages needed for 90 % coverage ≈ 1,495,665




Coverage with K=1,495,665: 69.57%


                                                                                

In [9]:

#load
events = spark.read.parquet("data/events.parquet").cache()
articles = spark.read.parquet("data/articles.parquet")




# add week & hour
with_dt = (
    events
      .withColumn("week", concat_ws("-", year("timestamp"), weekofyear("timestamp")))
      .withColumn("hour", hour("timestamp"))
)

#  base features per (src,week)
base = (
    with_dt.groupBy("src_article","week")
           .agg(
             F.count("*").alias("total_edits"),
             F.sum(when(col("action")=="+",1).otherwise(0)).alias("adds"),
             F.sum(when(col("action")=="-",1).otherwise(0)).alias("removes"),
             F.approx_count_distinct("dst_article").alias("unique_targets")
           )
)

# compute revert_count per (src,week)
act_sets = (
    with_dt.groupBy("src_article","week","dst_article")
           .agg(collect_set("action").alias("acts"))
)
reverts = (
    act_sets.filter(size("acts")==2)
            .groupBy("src_article","week")
            .count()
            .withColumnRenamed("count","revert_count")
)

# compute odd_hour_edits per (src,week)
odd = (
    with_dt.filter((col("hour")<6)|(col("hour")>22))
           .groupBy("src_article","week")
           .count()
           .withColumnRenamed("count","odd_hour_edits")
)

# join them all together, filling missing with 0
weekly_feats = (
    base.join(reverts, ["src_article","week"], "left")
        .join(odd,     ["src_article","week"], "left")
        .na.fill({"revert_count":0, "odd_hour_edits":0})
)

# derive page-type flags from title
pages = (
    articles
      .withColumnRenamed("id","src_article")
      .withColumn("is_disambig",
         col("title").rlike(r"\(disambiguation\)$").cast("int"))
      .withColumn("is_list",
         col("title").rlike(r"(?i)^List of ").cast("int"))
      .withColumn("is_index",
         col("title").rlike(r"(?i)^Index of ").cast("int"))
      .withColumn("is_year_page",
         col("title").rlike(r"^\d{4}(–\d{4})?$").cast("int"))
      .select("src_article","is_disambig","is_list","is_index","is_year_page")
)

# attach page-type to the features
weekly_feats = weekly_feats.join(pages, on="src_article", how="left") \
                           .na.fill(0, subset=["is_disambig","is_list","is_index","is_year_page"])


weekly_feats.write.mode("overwrite").parquet("data/weekly_feats_v2.parquet")

print("Rows in new weekly_feats:", weekly_feats.count())
weekly_feats.show(5, truncate=False)


25/04/28 20:46:58 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Rows in new weekly_feats: 23979591




+-----------+-------+-----------+----+-------+--------------+------------+--------------+-----------+-------+--------+------------+
|src_article|week   |total_edits|adds|removes|unique_targets|revert_count|odd_hour_edits|is_disambig|is_list|is_index|is_year_page|
+-----------+-------+-----------+----+-------+--------------+------------+--------------+-----------+-------+--------+------------+
|1177751    |2023-18|52         |23  |29     |53            |0           |0             |0          |0      |0       |0           |
|3443144    |2024-42|1          |1   |0      |1             |0           |0             |0          |0      |0       |0           |
|3474916    |2024-32|1          |1   |0      |1             |0           |1             |0          |0      |0       |0           |
|3474916    |2024-32|1          |1   |0      |1             |0           |1             |0          |0      |0       |0           |
|3474916    |2023-4 |24         |17  |7      |24            |1           |0 

                                                                                

In [10]:
from pyspark.sql.functions import regexp_extract, lower, regexp_replace

# load and parse instance-types file
lines = spark.read.option("compression", "bzip2") \
                  .text("data/instance-types_lang=en_specific.ttl.bz2")

triples = lines.filter(
    col("value").rlike(r"^<http://dbpedia\.org/resource/[^>]+>\s+" +
                       r"<http://dbpedia\.org/ontology/[^>]+>\s+" +
                       r"<http://dbpedia\.org/resource/[^>]+>\s+\.$")
)

inst = triples.select(
    regexp_extract("value", r"^<http://dbpedia\.org/resource/([^>]+)>", 1).alias("resource"),
    regexp_extract("value", r"<http://dbpedia\.org/ontology/([^>]+)>", 1).alias("type")
).withColumn("resource_lc", lower("resource"))

# reload articles
articles = spark.read.parquet("data/articles.parquet") \
                     .select(col("id").alias("article_id"), "title") \
                     .withColumn("resource", regexp_replace("title", " ", "_")) \
                     .withColumn("resource_lc", lower("resource"))

# match types to articles
article_types_specific = (inst.join(articles, on="resource_lc", how="inner")
                              .select("article_id", "type"))

#  remove duplicate (article_id, type) pairs
article_types_specific = article_types_specific.dropDuplicates(["article_id", "type"])

# Save
article_types_specific.write.mode("overwrite") \
                           .parquet("data/article_types_specific.parquet")

print(f"Article types extracted (deduped): {article_types_specific.count():,} rows")
article_types_specific.show(20, truncate=False)


                                                                                

Article types extracted (deduped): 10,704,167 rows




+----------+-------------+
|article_id|type         |
+----------+-------------+
|1313149   |battle       |
|1750043   |battle       |
|1324355   |manager      |
|9342802   |location     |
|11011132  |successor    |
|13019472  |category     |
|5442931   |homeStadium  |
|12550808  |city         |
|1507531   |artist       |
|434776    |country      |
|502222    |starring     |
|640764    |location     |
|5626903   |producer     |
|2077122   |hubAirport   |
|2094478   |parentCompany|
|4182488   |deathPlace   |
|239681    |recordLabel  |
|11189235  |birthPlace   |
|12774053  |occupation   |
|4038285   |birthPlace   |
+----------+-------------+
only showing top 20 rows



                                                                                

In [11]:
article_types_specific.show(30,truncate=False)














+----------+---------------+
|article_id|type           |
+----------+---------------+
|1313149   |battle         |
|1750043   |battle         |
|1324355   |manager        |
|9342802   |location       |
|11011132  |successor      |
|13019472  |category       |
|5442931   |homeStadium    |
|12550808  |city           |
|1507531   |artist         |
|434776    |country        |
|502222    |starring       |
|640764    |location       |
|5626903   |producer       |
|2077122   |hubAirport     |
|2094478   |parentCompany  |
|4182488   |deathPlace     |
|239681    |recordLabel    |
|11189235  |birthPlace     |
|12774053  |occupation     |
|4038285   |birthPlace     |
|9287577   |almaMater      |
|15008672  |class          |
|15022301  |diocese        |
|6827731   |party          |
|16775301  |division       |
|2034549   |timeZone       |
|3246079   |locationCountry|
|447483    |composer       |
|12087338  |regionServed   |
|16516942  |keyPerson      |
+----------+---------------+
only showing t

                                                                                

In [12]:
# load the cleaned types file
article_types_specific = spark.read.parquet("data/article_types_specific.parquet")

#  special meaningful types based on WiClean paper (manually curated list)
meaningful_types = [
    "person", "soccerPlayer", "actor", "musician", "artist",
    "country", "city", "organization", "company", "film",
    "movie", "location", "team", "writer", "album",
    "language", "university", "televisionShow", "politician"
]

# lowercase everything (matches normalization applied in scraper)
from pyspark.sql.functions import lower
article_types_specific = article_types_specific.withColumn("type", lower(col("type")))

# filter types
article_types_filtered = article_types_specific.filter(col("type").isin(meaningful_types))

# save filtered types
article_types_filtered.write.mode("overwrite") \
                            .parquet("data/article_types_filtered.parquet")

print(f" Meaningful article types extracted: {article_types_filtered.count():,} rows")
article_types_filtered.show(10, truncate=False)


 Meaningful article types extracted: 1,756,334 rows
+----------+--------+
|article_id|type    |
+----------+--------+
|8008769   |country |
|473035    |country |
|4812727   |country |
|5825702   |team    |
|6936722   |album   |
|10759919  |team    |
|458025    |artist  |
|18632216  |location|
|4804315   |location|
|16844646  |location|
+----------+--------+
only showing top 10 rows



In [13]:



# reload events
events = spark.read.parquet("data/events.parquet").cache()

# add week and hour
with_dt = (events.withColumn("week", concat_ws("-", year("timestamp"), weekofyear("timestamp")))
                   .withColumn("hour", hour("timestamp")))

# base features
base = (with_dt.groupBy("src_article", "week")
             .agg(
                 F.count("*").alias("total_edits"),
                 F.sum(when(col("action") == "+", 1).otherwise(0)).alias("adds"),
                 F.sum(when(col("action") == "-", 1).otherwise(0)).alias("removes"),
                 F.approx_count_distinct("dst_article").alias("unique_targets")
             ))

# reverts
act_sets = (with_dt.groupBy("src_article", "week", "dst_article")
                  .agg(collect_set("action").alias("acts")))

reverts = (act_sets.filter(size("acts") == 2)
                    .groupBy("src_article", "week")
                    .count()
                    .withColumnRenamed("count", "revert_count"))

# odd hour edits
odd = (with_dt.filter((col("hour") < 6) | (col("hour") > 22))
             .groupBy("src_article", "week")
             .count()
             .withColumnRenamed("count", "odd_hour_edits"))

# final join
weekly_feats = (base.join(reverts, ["src_article", "week"], "left")
                    .join(odd, ["src_article", "week"], "left")
                    .na.fill({"revert_count": 0, "odd_hour_edits": 0}))

# save
weekly_feats.write.mode("overwrite").parquet("data/weekly_feats_v23.parquet")

print(f" Weekly features generated: {weekly_feats.count():,} rows")
weekly_feats.show(5, truncate=False)


25/04/28 20:53:35 WARN CacheManager: Asked to cache already cached data.
                                                                                

 Weekly features generated: 21,498,555 rows




+-----------+-------+-----------+----+-------+--------------+------------+--------------+
|src_article|week   |total_edits|adds|removes|unique_targets|revert_count|odd_hour_edits|
+-----------+-------+-----------+----+-------+--------------+------------+--------------+
|206823     |2024-42|1          |1   |0      |1             |0           |0             |
|1235884    |2024-32|8          |4   |4      |4             |4           |4             |
|1281794    |2024-2 |104        |52  |52     |106           |0           |0             |
|2011965    |2023-3 |1          |1   |0      |1             |0           |0             |
|7043377    |2024-9 |3          |2   |1      |2             |1           |0             |
+-----------+-------+-----------+----+-------+--------------+------------+--------------+
only showing top 5 rows



                                                                                

In [14]:
from pyspark.sql.functions import sum as spark_sum

# load the clean weekly features
weekly_feats = spark.read.parquet("data/weekly_feats_v23.parquet")

# load the meaningful filtered article types
article_types = spark.read.parquet("data/article_types_filtered.parquet")

# compute total edits per page
total_edits = (weekly_feats.groupBy("src_article")
                           .agg(spark_sum("total_edits").alias("sum_edits")))

# join with article types
typed_pages = (total_edits.join(article_types,
                                total_edits.src_article == article_types.article_id,
                                how="inner")
                         .select("article_id", "sum_edits", "type"))

# select top 10,000 pages by edit count
top10k = typed_pages.orderBy(col("sum_edits").desc()).limit(10000)

# save for later use
top10k.write.mode("overwrite").parquet("data/top10k_typed_pages.parquet")

print(f"Top-10,000 meaningful typed pages saved: {top10k.count():,} rows")
top10k.show(10, truncate=False)





                                                                                

Top-10,000 meaningful typed pages saved: 10,000 rows




+----------+---------+--------+
|article_id|sum_edits|type    |
+----------+---------+--------+
|179793    |36038    |location|
|179793    |36038    |company |
|40339     |30668    |language|
|62        |28185    |language|
|1678533   |27671    |country |
|9855      |20037    |country |
|1332454   |19620    |country |
|383352    |18736    |country |
|383352    |18736    |company |
|145866    |17602    |writer  |
+----------+---------+--------+
only showing top 10 rows



                                                                                