In [0]:
from pyspark.sql.types import (
    StructType, StructField, StringType, IntegerType, 
    BooleanType, LongType, MapType
)
from pyspark.sql.functions import window, col
from datetime import datetime

In [0]:
from datetime import datetime

# catalog created by the previous notebook
catalog = "wikimedia_db"

# schema where raw json files are stored
uc_schema_raw_events = "raw_events"

# today’s raw volume name (same format as notebook 1)
raw_events_volume_time = datetime.now()
raw_events_volume = f"events_tmp_{raw_events_volume_time.strftime('%y_%m_%d')}"

# path to raw json files
raw_data_path = f"/Volumes/{catalog}/{uc_schema_raw_events}/{raw_events_volume}"

# schema for streaming checkpoints
db_schema_checkpoints = "checkpoints"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db_schema_checkpoints}")

# === NEW: schema for aggregated (gold) data ===
analytics_schema = "analytics"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{analytics_schema}")

DataFrame[]

In [0]:

# Meta schema (nested)
meta_schema = StructType([
    StructField("uri", StringType(), True),
    StructField("request_id", StringType(), True),
    StructField("id", StringType(), True),
    StructField("dt", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("stream", StringType(), True)
])

# Length schema (nested)
length_schema = StructType([
    StructField("old", IntegerType(), True),
    StructField("new", IntegerType(), True)
])

# Revision schema (nested)
revision_schema = StructType([
    StructField("old", LongType(), True),
    StructField("new", LongType(), True)
])

# Main recent change schema
recentchange_schema = StructType([
    StructField("$schema", StringType(), True),
    StructField("meta", meta_schema, True),
    StructField("id", LongType(), True),
    StructField("type", StringType(), True),
    StructField("namespace", IntegerType(), True),
    StructField("title", StringType(), True),
    StructField("comment", StringType(), True),
    StructField("timestamp", LongType(), True),
    StructField("user", StringType(), True),
    StructField("bot", BooleanType(), True),
    StructField("minor", BooleanType(), True),
    StructField("patrolled", BooleanType(), True),
    StructField("length", length_schema, True),
    StructField("revision", revision_schema, True),
    StructField("server_url", StringType(), True),
    StructField("server_name", StringType(), True),
    StructField("wiki", StringType(), True),
    StructField("parsedcomment", StringType(), True),
])


In [0]:
# Read data from a file
# Similar to definition of staticInputDF above, just using `readStream` instead of `read`
streamingInputDF = (
  spark
    .readStream                       
    .schema(recentchange_schema)               # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1)  # Treat a sequence of files as a stream by picking n number of files at a time
    .json(raw_data_path)
)


In [0]:
from pyspark.sql.functions import col

# === NEW: rare events for alerting ===
# "Rare" here = big human edit (not bot, not minor, length.new > 5000)
rareEventsDF = streamingInputDF.filter(
    (col("bot") == False) &
    (col("minor") == False) &
    (col("length.new") > 5000)
)

In [0]:
# Do some transformations
# Same query as staticInputDF
streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.bot, # group by edit made by bot boolean
      window(
        col("timestamp").cast("timestamp"), 
        "5 minutes"
      )
    )
    .count()
)


In [0]:
# === GOLD TABLE (batch aggregation instead of streaming) ===

from pyspark.sql.functions import window, col

# name of the gold table
aggregated_table_name = f"{catalog}.{analytics_schema}.edits_per_5min_by_bot"

# 1) read all raw JSON files as a normal (batch) DataFrame
batchInputDF = (
    spark.read
         .schema(recentchange_schema)   # same schema as for streaming
         .json(raw_data_path)           # same path as streamingInputDF
)

# 2) aggregate by bot and 5-minute window
goldDF = (
    batchInputDF
      .groupBy(
          col("bot"),
          window(col("timestamp").cast("timestamp"), "5 minutes")
      )
      .count()
)

# 3) write result into a Delta table in the analytics schema
goldDF.write.format("delta").mode("overwrite").saveAsTable(aggregated_table_name)


In [0]:
# === ALERTS TABLE (batch version) ===

from pyspark.sql.functions import col

# schema for alerts
alerts_schema = "alerts"
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{alerts_schema}")

alerts_table_name = f"{catalog}.{alerts_schema}.large_human_edits"

# "Rare" event definition: big human edit
rareBatchDF = batchInputDF.filter(
    (col("bot") == False) &
    (col("minor") == False) &
    (col("length.new") > 5000)
)

# write rare events into alerts table
rareBatchDF.write.format("delta").mode("overwrite").saveAsTable(alerts_table_name)


In [0]:
# temp volume for checkpoint storage
volume = 'tmp_streamingInputDF'
volume_path = f'/Volumes/{catalog}/{db_schema_checkpoints}/{volume}'
volume_name = f'{catalog}.{db_schema_checkpoints}.{volume}'

# drop old temp volume and recreate
spark.sql(f"DROP VOLUME IF EXISTS {volume_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name}")

# Display the streaming dataframe
streamingInputDF.display(checkpointLocation=volume_path)

$schema,meta,id,type,namespace,title,comment,timestamp,user,bot,minor,patrolled,length,revision,server_url,server_name,wiki,parsedcomment
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Vincent_Varinier, 2c1afae3-f289-45ee-8100-386f83c9e4fe, 48a37cf3-1994-4c46-946f-3a69c5134d12, 2025-11-08T21:43:27.596Z, fr.wikipedia.org, mediawiki.recentchange)",560744680,edit,0,Vincent Varinier,/* Notes */ retouche·s de la modification précédente,1762638206,VVLLAACC,False,True,True,"List(6361, 6370)","List(230478994, 230479004)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Notes : retouche·s de la modification précédente
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Projet:Chimie/Articles_r%C3%A9cents/Liste, 18b7f73a-3a51-4dc5-8e2e-e283a0e7a101, ea2bf7e6-9a1c-4da2-9f77-5b3872006e39, 2025-11-08T21:43:28.313Z, fr.wikipedia.org, mediawiki.recentchange)",560744681,edit,102,Projet:Chimie/Articles récents/Liste,+ [[Bromure de prifinium]],1762638207,OrlodrimBot,True,False,True,"List(2603, 2603)","List(230442792, 230479005)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,+ Bromure de prifinium
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Attentats_du_13_novembre_2015_en_France, 7d01bb74-cd9d-4d33-ab24-14af4b8d1f26, da682cac-6457-4499-8c0e-a8953d3f61d8, 2025-11-08T21:43:28.953Z, fr.wikipedia.org, mediawiki.recentchange)",560744683,edit,0,Attentats du 13 novembre 2015 en France,"/* Statut des victimes */ L'accord du participe passé est la plaie de la grammaire française, c'est tout vu.",1762638194,Alyssa fan,False,True,False,"List(433972, 433971)","List(230478732, 230478998)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,"→Statut des victimes : L'accord du participe passé est la plaie de la grammaire française, c'est tout vu."
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Lyc%C3%A9e_fran%C3%A7ais_Charles-de-Gaulle, 44d4f955-83ab-4650-9f04-3bf965b66310, 8cddfa94-93ec-4b5d-aad8-79d73f4e5a98, 2025-11-08T21:43:30.803Z, fr.wikipedia.org, mediawiki.recentchange)",560744686,edit,0,Lycée français Charles-de-Gaulle,,1762638208,FraizTagada,False,False,False,"List(17169, 17181)","List(229042074, 230479006)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Armorial_des_familles_de_Touraine, c6c6a4a0-3e2e-4a5c-9b54-0cd44fa29be3, ea7e0490-f947-48c8-b2a5-36148eb7a25f, 2025-11-08T21:43:31.142Z, fr.wikipedia.org, mediawiki.recentchange)",560744687,edit,0,Armorial des familles de Touraine,/*P*/ Poirier :AGT 2 p. 777 / 779 différent,1762638209,Мя Масніи,False,True,True,"List(18130, 17766)","List(230475176, 230479008)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→P : Poirier :AGT 2 p. 777 / 779 différent
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/%C3%89glise_Saint-Remi_de_Charette-Varennes, 07aa6683-b7d4-490f-b111-ff6e9ba1e22d, 5ae79f09-78f9-487e-8f03-3fd8bab350a1, 2025-11-08T21:43:32.944Z, fr.wikipedia.org, mediawiki.recentchange)",560744688,edit,0,Église Saint-Remi de Charette-Varennes,/* Mobilier */,1762638212,~2025-32096-41,False,False,False,"List(2897, 3048)","List(220978440, 230479009)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Mobilier
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Utilisateur:%D9%85%D8%AD%D9%85%D8%AF_%D8%A3%D9%85%D9%8A%D9%86_%D8%A7%D9%84%D8%B7%D8%B1%D8%A7%D8%A8%D9%84%D8%B3%D9%8A/Brouillon1, 4952860e-1a5a-444f-9ad0-f2c8a593a61e, e2960453-d074-4b8a-a9a0-a2cf8c6b0549, 2025-11-08T21:43:37.353Z, fr.wikipedia.org, mediawiki.recentchange)",560744690,edit,2,Utilisateur:محمد أمين الطرابلسي/Brouillon1,,1762638214,محمد أمين الطرابلسي,False,False,True,"List(36850, 36906)","List(230091568, 230479010)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Portail:G%C3%A9n%C3%A9alogie/Articles_r%C3%A9cents, e870ae34-9e20-4a44-a395-da42a295a9d6, 6a308418-9908-47e8-b7af-1b212878757f, 2025-11-08T21:43:38.875Z, fr.wikipedia.org, mediawiki.recentchange)",560744691,edit,100,Portail:Généalogie/Articles récents,+ [[Famille Monnet de Lorbeau]],1762638218,OrlodrimBot,True,False,True,"List(7055, 7057)","List(230447464, 230479011)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,+ Famille Monnet de Lorbeau
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Victor_Fanneau_de_La_Horie, 4fbfc8a6-b4ee-4bf4-88aa-058f614cf805, 6b876265-5c4e-4dad-9b5a-dcd7bceeee34, 2025-11-08T21:43:41.279Z, fr.wikipedia.org, mediawiki.recentchange)",560744692,edit,0,Victor Fanneau de La Horie,/* Vie clandestine */,1762638219,Meissen72,False,True,False,"List(29406, 28960)","List(230478997, 230479012)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Vie clandestine
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Marvin_Johnson_(boxe_anglaise), 261c42e9-992d-437f-b9c8-32d7ef93a1ec, ee10be47-6b99-4246-bf7e-f9d0c69093b4, 2025-11-08T21:43:41.782Z, fr.wikipedia.org, mediawiki.recentchange)",560744693,edit,0,Marvin Johnson (boxe anglaise),ajout image,1762638220,Mats01,False,False,True,"List(4399, 4457)","List(182289033, 230479013)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,ajout image


In [0]:
# temp volume for checkpoint storage
volume = 'tmp_streamingDF'
volume_path = f'/Volumes/{catalog}/{db_schema_checkpoints}/{volume}'
volume_name = f'{catalog}.{db_schema_checkpoints}.{volume}'

# drop old temp volume and recreate
spark.sql(f"DROP VOLUME IF EXISTS {volume_name}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {volume_name}")

# Display transformed data
streamingCountsDF.display(checkpointLocation=volume_path)

bot,window,count
False,"List(2025-11-08T21:45:00.000Z, 2025-11-08T21:50:00.000Z)",81
True,"List(2025-11-08T21:45:00.000Z, 2025-11-08T21:50:00.000Z)",16
True,"List(2025-11-08T21:40:00.000Z, 2025-11-08T21:45:00.000Z)",4
False,"List(2025-11-08T21:50:00.000Z, 2025-11-08T21:55:00.000Z)",82
False,"List(2025-11-08T21:40:00.000Z, 2025-11-08T21:45:00.000Z)",31
True,"List(2025-11-08T21:50:00.000Z, 2025-11-08T21:55:00.000Z)",15


In [0]:
%sql
SELECT *
FROM wikimedia_db.analytics.edits_per_5min_by_bot
LIMIT 20;


bot,window,count
False,"List(2025-11-09T18:55:00.000Z, 2025-11-09T19:00:00.000Z)",84
False,"List(2025-11-09T19:00:00.000Z, 2025-11-09T19:05:00.000Z)",58
True,"List(2025-11-08T21:45:00.000Z, 2025-11-08T21:50:00.000Z)",16
False,"List(2025-11-09T18:40:00.000Z, 2025-11-09T18:45:00.000Z)",89
True,"List(2025-11-09T18:25:00.000Z, 2025-11-09T18:30:00.000Z)",5
False,"List(2025-11-08T21:45:00.000Z, 2025-11-08T21:50:00.000Z)",81
False,"List(2025-11-09T18:25:00.000Z, 2025-11-09T18:30:00.000Z)",94
False,"List(2025-11-08T21:50:00.000Z, 2025-11-08T21:55:00.000Z)",82
False,"List(2025-11-09T18:50:00.000Z, 2025-11-09T18:55:00.000Z)",66
False,"List(2025-11-08T21:40:00.000Z, 2025-11-08T21:45:00.000Z)",31


In [0]:
%sql
SELECT *
FROM wikimedia_db.alerts.large_human_edits
LIMIT 20;


$schema,meta,id,type,namespace,title,comment,timestamp,user,bot,minor,patrolled,length,revision,server_url,server_name,wiki,parsedcomment
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Lyc%C3%A9e_fran%C3%A7ais_Charles-de-Gaulle, 44d4f955-83ab-4650-9f04-3bf965b66310, 8cddfa94-93ec-4b5d-aad8-79d73f4e5a98, 2025-11-08T21:43:30.803Z, fr.wikipedia.org, mediawiki.recentchange)",560744686,edit,0,Lycée français Charles-de-Gaulle,,1762638208,FraizTagada,False,False,False,"List(17169, 17181)","List(229042074, 230479006)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Utilisateur:%D9%85%D8%AD%D9%85%D8%AF_%D8%A3%D9%85%D9%8A%D9%86_%D8%A7%D9%84%D8%B7%D8%B1%D8%A7%D8%A8%D9%84%D8%B3%D9%8A/Brouillon1, 4952860e-1a5a-444f-9ad0-f2c8a593a61e, e2960453-d074-4b8a-a9a0-a2cf8c6b0549, 2025-11-08T21:43:37.353Z, fr.wikipedia.org, mediawiki.recentchange)",560744690,edit,2,Utilisateur:محمد أمين الطرابلسي/Brouillon1,,1762638214,محمد أمين الطرابلسي,False,False,True,"List(36850, 36906)","List(230091568, 230479010)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Bourgogne_c%C3%B4te-d%27or, e5cd8f91-46b5-400e-995c-ce245f23981e, a7273c4e-4dd4-4d69-ae12-84eb36138b61, 2025-11-08T21:43:47.874Z, fr.wikipedia.org, mediawiki.recentchange)",560744697,edit,0,Bourgogne côte-d'or,/* Vins */ repli d'AOC,1762638226,Lvcvlvs,False,False,True,"List(24387, 24920)","List(230452118, 230479016)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Vins : repli d'AOC
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Wikip%C3%A9dia:Vandalisme_en_cours, 63f6f660-baee-49a1-82db-4f07ab4ded53, 3760579b-776c-41c5-a999-1ae0655c4bb6, 2025-11-08T21:43:50.660Z, fr.wikipedia.org, mediawiki.recentchange)",560744699,edit,4,Wikipédia:Vandalisme en cours,/* Demande de blocage de Jeanfjddururkfjdjfj */ nouvelle section,1762638228,Leonidlednev,False,False,True,"List(28023, 28378)","List(230478751, 230479017)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Demande de blocage de Jeanfjddururkfjdjfj : nouvelle section
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Learner_Tien, a656c702-543c-464a-af6b-0ce89d54b57f, cf60f0e9-7cc5-4253-990f-44b0f8a01a28, 2025-11-08T21:43:52.357Z, fr.wikipedia.org, mediawiki.recentchange)",560744700,edit,0,Learner Tien,/* Palmarès */,1762638230,Kastiel21,False,False,True,"List(16745, 17132)","List(230474871, 230479018)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Palmarès
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Chester_A._Arthur, 79331e99-a255-4550-83f0-36937b443093, b5ae5910-547e-4a27-8a51-94e75be4187b, 2025-11-08T21:44:02.035Z, fr.wikipedia.org, mediawiki.recentchange)",560744704,edit,0,Chester A. Arthur,,1762638236,Philotam,False,False,True,"List(94503, 94503)","List(230478980, 230479020)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/J%C3%A9r%C3%B4me_Pauwels, 03b844de-70ae-4c6f-947a-eb8c6fac92c0, fe37c385-8ff6-44b6-b6d8-6a68d37905c7, 2025-11-08T21:44:01.776Z, fr.wikipedia.org, mediawiki.recentchange)",560744703,edit,0,Jérôme Pauwels,/* Films */Ajout. Source : Carton AVP VF,1762638238,~2025-32157-45,False,False,False,"List(78832, 78886)","List(230429272, 230479021)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Films : Ajout. Source : Carton AVP VF
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Utilisateur:%D9%85%D8%AD%D9%85%D8%AF_%D8%A3%D9%85%D9%8A%D9%86_%D8%A7%D9%84%D8%B7%D8%B1%D8%A7%D8%A8%D9%84%D8%B3%D9%8A/Brouillon2, b5ae30fd-73b6-47d3-a7dd-8a3df7a667af, 91a2243c-9842-49c7-adf3-6684511c4284, 2025-11-08T21:44:05.041Z, fr.wikipedia.org, mediawiki.recentchange)",560744708,edit,2,Utilisateur:محمد أمين الطرابلسي/Brouillon2,,1762638243,محمد أمين الطرابلسي,False,False,True,"List(15103, 15159)","List(230091560, 230479022)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Mort_aux_vaches_!, 2945e284-7b23-4b01-89b3-a8c385c83a04, e630602a-68b9-47a7-921a-f5fde8ea90f6, 2025-11-08T21:44:20.409Z, fr.wikipedia.org, mediawiki.recentchange)",560744721,edit,0,Mort aux vaches !,,1762638259,Lœnidra,False,False,True,"List(9285, 9303)","List(230359860, 230479028)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,
/mediawiki/recentchange/1.0.0,"List(https://fr.wikipedia.org/wiki/Canton_de_Castelsarrasin, 6d7bbf18-1314-40db-b098-ceb3d3f8e094, 80f21623-4bfc-490b-9c8e-885b871b9786, 2025-11-08T21:44:46.957Z, fr.wikipedia.org, mediawiki.recentchange)",560744733,edit,0,Canton de Castelsarrasin,/* Conseillers d'arrondissement (de 1833 à 1940) */,1762638285,Montgallet,False,False,True,"List(16627, 16695)","List(230478981, 230479032)",https://fr.wikipedia.org,fr.wikipedia.org,frwiki,→Conseillers d'arrondissement (de 1833 à 1940)
