In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder.appName("wikimedia").getOrCreate()

In [3]:
!pwd

/home/jovyan/work


In [4]:
df = spark.read.format("json").option("path","/home/jovyan/work/stream2025-05-03_07_46_20Z_bdc832").load()

# https://www.mediawiki.org/wiki/API:RecentChanges


| Field       | Type      | Description                                                                                  |
| ----------- | --------- | -------------------------------------------------------------------------------------------- |
| `type`      | `string`  | Type of change. Values include: `edit`, `new`, `log`, `categorize`, `external` (rare).       |
| `ns`        | `int`     | Namespace number. For example: `0` = main/article, `1` = talk, `2` = user, etc.              |
| `title`     | `string`  | Title of the page that was changed.                                                          |
| `pageid`    | `int`     | ID of the page.                                                                              |
| `revid`     | `int`     | Revision ID (new version).                                                                   |
| `old_revid` | `int`     | Revision ID before the change.                                                               |
| `rcid`      | `int`     | Recent change ID (unique to this change event).                                              |
| `user`      | `string`  | Username or IP address of the editor.                                                        |
| `anon`      | `boolean` | Indicates if the user is anonymous (IP edit).                                                |
| `bot`       | `boolean` | Indicates if the change was made by a bot.                                                   |
| `minor`     | `boolean` | Indicates a "minor edit" (user marked it as such).                                           |
| `patrolled` | `boolean` | Indicates if the edit was patrolled (for wikis that use this feature).                       |
| `comment`   | `string`  | Edit summary/comment entered by the editor.                                                  |
| `timestamp` | `string`  | ISO 8601 timestamp of the change.                                                            |
| `logtype`   | `string`  | For log entries: type of log (e.g., `block`, `delete`, `move`). Only appears for `type=log`. |
| `logaction` | `string`  | Specific action within the log type.                                                         |


In [5]:
# df.select('logaction').distinct().show()

In [6]:
df.printSchema()

root
 |-- $schema: string (nullable = true)
 |-- bot: boolean (nullable = true)
 |-- comment: string (nullable = true)
 |-- id: long (nullable = true)
 |-- length: struct (nullable = true)
 |    |-- new: long (nullable = true)
 |    |-- old: long (nullable = true)
 |-- log_action: string (nullable = true)
 |-- log_action_comment: string (nullable = true)
 |-- log_id: long (nullable = true)
 |-- log_params: string (nullable = true)
 |-- log_type: string (nullable = true)
 |-- meta: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- dt: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- partition: long (nullable = true)
 |    |-- request_id: string (nullable = true)
 |    |-- stream: string (nullable = true)
 |    |-- topic: string (nullable = true)
 |    |-- uri: string (nullable = true)
 |-- minor: boolean (nullable = true)
 |-- namespace: long (nullable = true)
 |-- notify_url: string (nullable 

Edit Activity Monitoring

    Volume of edits over time (e.g., per hour, per day).

    Compare activity across projects (e.g., English vs. French Wikipedia).

    Detect high activity spikes, which might indicate newsworthy events or vandalism.

In [15]:
from pyspark.sql.window import Window
df = df.withColumn("datetime" ,f.from_unixtime(f.col('timestamp')))
df = df.withColumn("minute", f.date_format(f.col('datetime'), "mm"))

edits_per_min = df.groupBy(f.window("datetime", "1 minute").alias("window")) \
.agg(f.count("*").alias("edit_count"))

In [16]:
edits_per_min.show()

+--------------------+----------+
|              window|edit_count|
+--------------------+----------+
|{2025-05-03 07:47...|        79|
|{2025-05-03 07:46...|        65|
+--------------------+----------+



In [17]:
edits_per_min.printSchema()

root
 |-- window: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- edit_count: long (nullable = false)



In [19]:
edits_per_min = edits_per_min.withColumn("window", f.col("window").cast("string"))

In [20]:
edits_per_min.show()

+--------------------+----------+
|              window|edit_count|
+--------------------+----------+
|{2025-05-03 07:47...|        79|
|{2025-05-03 07:46...|        65|
+--------------------+----------+



In [21]:
edits_per_min.printSchema()

root
 |-- window: string (nullable = false)
 |-- edit_count: long (nullable = false)



In [50]:
rolling_avg_df = df.withWatermark("datetime", "10 minutes") \
.groupBy(f.window("datetime", "5 minutes", "1 minutes").alias("window"))\
.agg(f.count("*").alias("rolling_avg_edit_count"))

In [104]:
rolling_avg_df.show()

+--------------------+----------------------+
|              window|rolling_avg_edit_count|
+--------------------+----------------------+
|{2025-05-03 07:42...|                    65|
|{2025-05-03 07:46...|                   144|
|{2025-05-03 07:44...|                   144|
|{2025-05-03 07:47...|                    79|
|{2025-05-03 07:43...|                   144|
|{2025-05-03 07:45...|                   144|
+--------------------+----------------------+



In [105]:
rolling_avg_df.printSchema()

root
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- rolling_avg_edit_count: long (nullable = false)



In [40]:
df_edit_per_minute = df.groupBy(["minute"]).agg(f.count(f.col("id")).alias("edit_per_minute"))

In [41]:
df_edit_per_minute.show()

+------+---------------+
|minute|edit_per_minute|
+------+---------------+
|    47|             75|
|    46|             65|
+------+---------------+



In [27]:
df.select('datetime').distinct().show(5)

+-------------------+
|           datetime|
+-------------------+
|2025-05-03 07:46:57|
|2025-05-03 07:47:48|
|2025-05-03 07:47:05|
|2025-05-03 07:47:41|
|2025-05-03 07:47:17|
+-------------------+
only showing top 5 rows



In [57]:
# 2. User Behavior Analysis

#     Breakdown of anonymous vs registered vs bot edits.

#     Track frequent contributors or new user contributions.

#     Detect possible suspicious accounts (new users making many edits quickly).

In [58]:
df.printSchema()

root
 |-- $schema: string (nullable = true)
 |-- bot: boolean (nullable = true)
 |-- comment: string (nullable = true)
 |-- id: long (nullable = true)
 |-- length: struct (nullable = true)
 |    |-- new: long (nullable = true)
 |    |-- old: long (nullable = true)
 |-- log_action: string (nullable = true)
 |-- log_action_comment: string (nullable = true)
 |-- log_id: long (nullable = true)
 |-- log_params: string (nullable = true)
 |-- log_type: string (nullable = true)
 |-- meta: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- dt: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- offset: long (nullable = true)
 |    |-- partition: long (nullable = true)
 |    |-- request_id: string (nullable = true)
 |    |-- stream: string (nullable = true)
 |    |-- topic: string (nullable = true)
 |    |-- uri: string (nullable = true)
 |-- minor: boolean (nullable = true)
 |-- namespace: long (nullable = true)
 |-- notify_url: string (nullable 

In [67]:
df.select('user', 'bot').distinct().show()
df.createOrReplaceTempView("wikimedia")

+---------------+-----+
|           user|  bot|
+---------------+-----+
|      Cloudz679|false|
|   Kristbaumbot| true|
|    Terraflorin| true|
|        Funnytu|false|
|       Kenlnwza|false|
|      Aygunmirz|false|
|       Carlo58s|false|
|     CuratorBot| true|
|  NearEMPTiness|false|
|        Gbawden|false|
| 60.118.117.249|false|
|PortusCaleDraco|false|
|            GBG|false|
|           인케|false|
|    Accurimbono|false|
|         Wheeke|false|
|       FuzzyBot| true|
|    OrlodrimBot| true|
|  SchlurcherBot| true|
|       Parpan05|false|
+---------------+-----+
only showing top 20 rows



In [73]:
spark.sql("""
SELECT user,bot,case when bot = true
then 'Bot'
when user regexp '^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
then 'Anonymous'
else 'User'
end as editor
FROM WIKIMEDIA
""").show()

+---------------+-----+---------+
|           user|  bot|   editor|
+---------------+-----+---------+
|   176.1.12.124|false|Anonymous|
|  120.29.87.147|false|Anonymous|
| 60.118.117.249|false|Anonymous|
|114.190.216.188|false|Anonymous|
+---------------+-----+---------+



In [None]:
f.regexp(f.col('user'),), 'Anonymous')

In [94]:
df = df.withColumn("type_of_editor", f.when(f.col("bot") == 'true', 'Bot')\
              .when(f.regexp(f.col('user'), f.lit(r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$')), 'Anonymous').otherwise('User'))

In [101]:
editing_count = df.groupBy(["type_of_editor"]).agg(f.count("*").alias("count_per_editor"))

In [102]:
editing_count.show()

+--------------+----------------+
|type_of_editor|count_per_editor|
+--------------+----------------+
|           Bot|              60|
|     Anonymous|               4|
|          User|              80|
+--------------+----------------+



In [103]:
spark.version

'3.5.0'