# Policy Analysis - Grade's Status 

This notebook presents code to analyse the grade's status of some articles. The WP's policy regarding the grade assessment and other features of articles is the following :

> In some circumstances, pages may need to be protected from modification by certain groups of editors. Pages are protected when there is disruption that cannot be prevented through other means, such as blocks. Protection is a technical restriction applied only by administrators, although any user may request protection. Protection can be indefinite or expire after a specified time.

## Requierements

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from matplotlib.patches import Patch
import re
import requests
import time
from datetime import datetime
from datetime import timezone
import csv
import mwparserfromhell
import sys
sys.path.append("/Users/garambois/Desktop/EPFL/MASTER/NX_MA1/MA_1/dhlab-cultural-weaponisation-ukraine-benchmark")

## Set of articles (Control and main set)

In [2]:
# Articles List
control_articles = [
    "Pop music",
    "Rock and roll",
    "Eric Clapton",
    "Rolling Stone",
    "Jazz",
    "Swing",
    "Classical music",
    "Ludwig van Beethoven",
    "Wolfgang Amadeus Mozart",
    "Joseph Haydn",
    "Country music",
    "BTS",
    "K-Pop",
    "Electronic music",
    "Daft Punk",
    "Paul Kalkbrenner",
    "Trumpet",
    "Music theory",
    "Fender",
    "Marshall Amplification",
    "Jimi Hendrix",
    "Bob Marley",
    "Edith Piaf",
    "Royal Albert Hall",
    "Piano",
    "Saxophone",
    "Pink Floyd",
    "Nirvana (band)",
    "Nina Simone",
    "Music of Africa",
    "Major scale",
    "Major chord",
    "Minor chord",
    "Red Hot Chili Peppers",
    "Funk rock",
    "James Brown",
    "Dire Straits",
    "Mark Knofler",
    "John Frusciante",
    "Alan Clark",
    "Stevie Wonder",
    "Guitar"
]

control_articles = sorted(control_articles)
df = pd.read_csv('../../datas/final/all_users_analysis_final.csv')
articles = sorted(df['article'].unique().tolist())

## Policy Analysis and evolution through time

### Functions

In [3]:
URL = "https://en.wikipedia.org/w/api.php"
HEADERS = {
    "User-Agent": "DH_Project/1.0 (https://www.epfl.ch/labs/dhlab/; maxime.garambois@epfl.ch)"
}

# API --> those functions use wikipedia API

def get_article_creation_date(title):
    params = {
        "action": "query",
        "format": "json",
        "prop": "revisions",
        "titles": title,
        "rvlimit": 1,
        "rvprop": "timestamp|user|comment",
        "rvdir": "newer"    # first revision = creation
    }

    r = requests.get(URL, params=params, headers=HEADERS)
    data = r.json()

    page = next(iter(data["query"]["pages"].values()))

    if "missing" in page:
        return None

    rev = page["revisions"][0]
    return {
        "timestamp": rev["timestamp"],
        "user": rev.get("user"),
        "comment": rev.get("comment")
    }


def get_article_protection_status(article_title):
    """
    Get the protection staturs of an article
    The status can be :
    - fully protected 
    - semi-protected
    - template protected
    - ...
    """
    
    session = requests.Session()
    session.headers.update(HEADERS)

    params = {
        "action": "query",
        "titles": article_title,
        "prop": "info",
        "inprop": "protection",
        "format": "json"
    }

    response = requests.get(url=URL, params=params, headers=HEADERS)
    data = response.json()

    # Extract page object (the pageid is unknown => key is not fixed)
    page = next(iter(data["query"]["pages"].values()))

    if "missing" in page:
        return {"status": "missing", "protections": []}

    protections = page.get("protection", [])

    # If no protection entries → page is fully open
    if not protections:
        return {"status": "unprotected", "protections": []} 

    # Convert the protection status to understand better
    status = "custom protection"
    for prot in protections:
        action = prot.get("type")
        level = prot.get("level")

        if action == "edit":
            if level == "autoconfirmed":
                status = "semi-protected"
            elif level == "extendedconfirmed":
                status = "extended-protected"
            elif level == "sysop":
                status = "fully protected"
        elif action == "move" and level == "sysop":
            status = "move-protected"

    return {
        "status": status,
        "protections": protections  # raw list for further analysis
    }

def get_article_protection_history(article_title):
    """
    Retrieve the full protection history of an article using MediaWiki logevents.
    Returns a sorted list of protection events including timestamp, action,
    protection levels, expiry, and comments.
    """

    session = requests.Session()
    session.headers.update(HEADERS)

    events = []
    cont = {}

    while True:
        params = {
            "action": "query",
            "list": "logevents",
            "letype": "protect",
            "letitle": article_title,
            "lelimit": "max",
            "format": "json",
            **cont
        }

        response = session.get(URL, params=params)
        data = response.json()

        for ev in data["query"]["logevents"]:
            event = {
                "timestamp": ev.get("timestamp"),
                "user": ev.get("user"),
                "action": ev.get("action"),        # protect / modify / unprotect / move_prot
                "comment": ev.get("comment"),

                # details with levels and expiry (may be missing)
                "protection": ev.get("params", {}).get("details", []),
            }
            events.append(event)

        # Continue if there's more data
        if "continue" in data:
            cont = data["continue"]
        else:
            break

    # Sort chronologically
    events.sort(key=lambda x: x["timestamp"])

    return events

# Helpers --> those functions do not use wikipedia API

def parse_old_protection(comment):
    """
    Extracts old-style MediaWiki protection settings from a log comment.
    Example: "edit war [edit=sysop:move=sysop]"
    Returns dict: {"edit": "sysop", "move": "sysop"}
    """
    match = re.search(r"\[(.*?)\]", comment)
    if not match:
        return {}

    content = match.group(1)
    rules = content.split(":")

    prot = {}
    for rule in rules:
        if "=" in rule:
            key, value = rule.split("=", 1)
            prot[key.strip()] = value.strip()

    return prot


def resolve_status(prot_dict):
    """
    Convert a dict of edit/move protection settings to a unified status string.
    Used for new-style AND old-style protections.
    """
    edit = prot_dict.get("edit")
    move = prot_dict.get("move")

    if edit == "sysop":
        return "fully protected"
    if edit == "extendedconfirmed":
        return "extended-protected"
    if edit == "autoconfirmed":
        return "semi-protected"
    if move == "sysop":
        return "move-protected"

    return "unprotected"

def build_protection_timeline(article_title):
    """
    Build chronological protection timeline with:
      - new-style MW protection data (post-2010)
      - old-style comment-embedded protection info (pre-2010)

    Output:
        [
          {"start": datetime, "end": datetime, "status": "unprotected"},
          {"start": datetime, "end": datetime, "status": "semi-protected"},
          ...
        ]
    """

    events = get_article_protection_history(article_title)

    # If no protection events at all --> always unprotected
    if not events:
        # Try to use creation date if available; otherwise return None/None
        creation_info = get_article_creation_date(article_title)
        if creation_info and "timestamp" in creation_info:
            creation_dt = datetime.fromisoformat(
                creation_info["timestamp"].replace("Z", "+00:00")
            )
            return [{
                "start": creation_dt,
                "end": datetime.utcnow(),
                "status": "unprotected"
            }]
        else:
            return [{
                "start": None,
                "end": None,
                "status": "unprotected"
            }]

    # Initial state: article just born, unprotected
    creation_info = get_article_creation_date(article_title)
    creation_dt = datetime.fromisoformat(
        creation_info["timestamp"].replace("Z", "+00:00")
    )

    timeline = []
    current_status = "unprotected"
    current_start = creation_dt

    # events should already be chronological, but just to be safe:
    events = sorted(events, key=lambda e: e["timestamp"])

    for ev in events:
        timestamp = datetime.fromisoformat(ev["timestamp"].replace("Z", "+00:00"))
        action = ev.get("action")
        protections = ev.get("protection", [])
        comment = ev.get("comment", "")
        
        
        expiry_dates = []
        for p in protections:
            expiry = p.get('expiry')
            if expiry and expiry != 'infinite':
                expiry_dates.append(datetime.fromisoformat(expiry.replace('Z', '+00:00')))

        expiry_dt = min(expiry_dates) if expiry_dates else None

        # Start by assuming status stays the same
        new_status = current_status

        # CASE 1: explicit unprotect
        if action == "unprotect":
            new_status = "unprotected"

        # CASE 2: protect / modify / move_prot → set some level 
        elif action in ("protect", "modify", "move_prot", "move_protect"):
            prot_dict = {}

            # (1) Try new-style protection entries from API
            for prot in protections:
                ptype = prot.get("type")   # "edit", "move", ...
                level = prot.get("level")  # "sysop", "autoconfirmed", ...
                if ptype and level:
                    prot_dict[ptype] = level

            # (2) If nothing found → try old-style [edit=sysop:move=sysop]
            if not prot_dict:
                old = parse_old_protection(comment)
                prot_dict.update(old)

            # (3) Resolve final status from prot_dict
            new_status = resolve_status(prot_dict)

        # else: action not recognised (rare) → keep current_status

        # If status changed, close previous interval and start a new one
        if new_status != current_status:
            # close previous interval
            timeline.append({
                "start": current_start,
                "end": timestamp,
                "status": current_status
            })

            # start new interval
            current_status = new_status
            current_start = timestamp
            

        if expiry_dt:
            # Close current interval at expiry
            timeline.append({
                "start": current_start,
                "end": expiry_dt,
                "status": current_status
            })

            # After expiry, status becomes unprotected until next event
            current_status = "unprotected"
            current_start = expiry_dt

    # Close the last interval, ending at "now"
    timeline.append({
        "start": current_start,
        "end": datetime.utcnow(),
        "status": current_status
    })

    return timeline


PROTECTION_COLORS = {
    "unprotected": "#C0C0C0",
    "semi-protected": "#1f77b4",
    "extended-protected": "#ff7f0e",
    "fully protected": "#d62728",
    "move-protected": "#9467bd",
}

# Plots the timeline

def plot_protection_timelines(timelines, wanna_save_fig = False):
    
    """
    Gantt-style plot for multiple articles.
    timelines: dict => {article_name: [intervals]}
    """

    articles = sorted(timelines.keys())  # alphabetical
    fig, ax = plt.subplots(figsize=(14, len(articles) * 0.4))

    y_pos = 0
    height = 0.8

    for article in articles:
        intervals = timelines[article]

        for interval in intervals:
            start = interval["start"]
            end = interval["end"]
            status = interval["status"]

            if start is None or end is None:
                continue

            ax.broken_barh(
                [(mdates.date2num(start), mdates.date2num(end) - mdates.date2num(start))],
                (y_pos, height),
                facecolors=PROTECTION_COLORS.get(status, "black"),
                edgecolor="none"
            )

        ax.text(
            mdates.date2num(intervals[0]["start"]) - 50, 
            y_pos + height / 2,
            article, va="center", ha="right"
        )

        y_pos += 1

    # --- remove black frame / spines ---
    for spine in ax.spines.values():
        spine.set_visible(False)

    ax.set_yticks([])
    ax.set_xlabel("Year")
    ax.xaxis.set_major_locator(mdates.YearLocator(2))
    ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y"))

    # --- add legend ---
    legend_handles = [
        Patch(facecolor=color, label=status)
        for status, color in PROTECTION_COLORS.items()
    ]
    ax.legend(
        handles=legend_handles,
        title="Protection Status",
        bbox_to_anchor=(1.02, 1),
        loc="upper left",
        frameon=False
    )

    plt.title("Wikipedia Protection Evolution Timeline (40 Articles)", fontsize=14)
    plt.tight_layout()
    if wanna_save_fig:
        fig_title = 'Wikipedia Protection Evolution Timeline (40 Articles)'
        plt.savefig(f"../../plots/Policy Analysis/{fig_title}.png", dpi=300, bbox_inches='tight')
    plt.show()

def timelines_to_dataframe(timelines):
    """
    Convert the timelines dict into a pandas DataFrame with:
    article | start | end | status | diff (days)
    Ensures all datetimes are UTC-aware to avoid subtraction errors.
    """
    rows = []

    for article, intervals in timelines.items():
        for interval in intervals:
            start = interval["start"]
            end = interval["end"]
            status = interval["status"]

            # Normalize timezone to UTC
            if start is not None:
                if start.tzinfo is None:
                    start = start.replace(tzinfo=timezone.utc)
                else:
                    start = start.astimezone(timezone.utc)

            if end is not None:
                if end.tzinfo is None:
                    end = end.replace(tzinfo=timezone.utc)
                else:
                    end = end.astimezone(timezone.utc)

            # Compute difference in days
            diff_days = (end - start).days if start and end else None

            rows.append({
                "article": article,
                "start": start,
                "end": end,
                "status": status,
                "diff": diff_days
            })

    return pd.DataFrame(rows)

### Run Plot and Dataframe

In [4]:
timelines = {} # needed to build the timelines and build the plot 
hist = {} # needed to check if the results of 'timelines' is good

for title in articles:
    timelines[title] = build_protection_timeline(title)
    hist[title] = get_article_protection_history(title)
    
df_protect = timelines_to_dataframe(timelines)

In [5]:
df_merge = df.copy()

In [8]:
df_merge.head()

Unnamed: 0,article,user,date,comment,llm_output,weaponised,ngram,df_index,row_index_matched,fg_row_index,...,propaganda_similarity,category_extracted_propaganda_mapped,aligned_before_chunk,aligned_after_chunk,similarity,significance_extracted,year,user_type,is_anon,is_bot
0,COVID-19 pandemic in Ukraine,Agathoclea,2020-03-11T20:56:06Z,removed [[Category:2019–20 coronavirus outbrea...,"Changed the category from ""2019–20 coronavirus...",Not Weaponised,annexation of Crimea by,0,15,6,...,0.418396,"Obfuscation, intentional vagueness",A referendum in the largely ethnic Russian Ukr...,A referendum in the largely ethnic Russian Ukr...,0.925267,The use of terms like 'bloodless' and 'bloody ...,2020,Registered,False,False
1,History of Ukraine,Icey,2006-05-21T14:09:22Z,/* Further reading */ Disambiguation link repa...,Changed the reference format for Andrew Wilson...,Not Weaponised,A referendum in the,4,384,6,...,0.418396,"Obfuscation, intentional vagueness",A referendum in the largely ethnic Russian Ukr...,A referendum in the largely ethnic Russian Ukr...,0.925267,The use of terms like 'bloodless' and 'bloody ...,2006,Registered,False,False
2,History of Ukraine,Irpen,2006-06-06T21:00:08Z,"this whole section doesn't belong here, speara...","Removed a section titled ""Ukraine and Nuclear ...",Not Weaponised,A referendum in the,4,389,6,...,0.418396,"Obfuscation, intentional vagueness",A referendum in the largely ethnic Russian Ukr...,A referendum in the largely ethnic Russian Ukr...,0.925267,The use of terms like 'bloodless' and 'bloody ...,2006,Registered,False,False
3,History of Ukraine,193.60.161.100,2006-05-23T11:39:26Z,,"Changed ""beyond"" to ""gayniss"" in the context o...",Not Weaponised,in the largely ethnic,4,383,6,...,0.418396,"Obfuscation, intentional vagueness",A referendum in the largely ethnic Russian Ukr...,A referendum in the largely ethnic Russian Ukr...,0.925267,The use of terms like 'bloodless' and 'bloody ...,2006,Anonymous (IP),True,False
4,History of Ukraine,Irpen,2006-06-14T17:49:44Z,revert to myself,Removed a POV (point of view) section regardin...,Not Weaponised,in the largely ethnic,4,392,6,...,0.418396,"Obfuscation, intentional vagueness",A referendum in the largely ethnic Russian Ukr...,A referendum in the largely ethnic Russian Ukr...,0.925267,The use of terms like 'bloodless' and 'bloody ...,2006,Registered,False,False


In [9]:
#df_protect[df_protect['article'] == '2022 Russian invasion of Ukraine']
hist['2022 Russian invasion of Ukraine']

[{'timestamp': '2022-02-24T04:10:47Z',
  'user': 'ST47',
  'action': 'protect',
  'comment': 'Persistent [[WP:Vandalism|vandalism]], highly visible current event',
  'protection': [{'type': 'edit',
    'level': 'autoconfirmed',
    'expiry': '2022-02-26T04:10:47Z'}]},
 {'timestamp': '2022-02-24T18:46:08Z',
  'user': 'Scottywong',
  'action': 'modify',
  'comment': '[[WP:30/500|Arbitration enforcement]], [[WP:ARBEE]]',
  'protection': [{'type': 'edit',
    'level': 'extendedconfirmed',
    'expiry': '2023-02-24T18:46:08Z'},
   {'type': 'move', 'level': 'extendedconfirmed', 'expiry': 'infinite'}]},
 {'timestamp': '2022-03-17T13:41:45Z',
  'user': 'Writ Keeper',
  'action': 'move_prot',
  'comment': '[[Ukraine 1, Russia 0]] moved to [[2022 Russian invasion of Ukraine]]: revert',
  'protection': []},
 {'timestamp': '2022-03-17T14:42:02Z',
  'user': 'Lectonar',
  'action': 'modify',
  'comment': 'request at rfp',
  'protection': [{'type': 'edit',
    'level': 'extendedconfirmed',
    'expir

In [11]:
df_merge[df_merge['article'] == '2022 Russian invasion of Ukraine']

Unnamed: 0,article,user,date,comment,llm_output,weaponised,ngram,df_index,row_index_matched,fg_row_index,...,propaganda_similarity,category_extracted_propaganda_mapped,aligned_before_chunk,aligned_after_chunk,similarity,significance_extracted,year,user_type,is_anon,is_bot
2730,2022 Russian invasion of Ukraine,Thriley,2024-02-26T00:36:25Z,Undid revision 1210309323 by [[Special:Contrib...,"Changed ""PUTIN, DIE IN HELL"" to ""#REDIRECT [[R...",Not Weaponised,declared their independence from,58,5,119,...,0.418396,"Obfuscation, intentional vagueness",''Crimea's accession to Russia''' is an ongoin...,\n+The '''accession of Republic of Crimea to t...,0.883564,The change from 'Autonomous Republic of Crimea...,2024,Registered,False,False
2731,2022 Russian invasion of Ukraine,Chessrat,2025-03-10T08:38:01Z,,Rephrased a sentence regarding Putin's announc...,Not Weaponised,declared their independence from,58,6,119,...,0.418396,"Obfuscation, intentional vagueness",''Crimea's accession to Russia''' is an ongoin...,\n+The '''accession of Republic of Crimea to t...,0.883564,The change from 'Autonomous Republic of Crimea...,2025,Registered,False,False


In [16]:
# Make sure protection_status column exists and has a default
df_merge["protection_status"] = "unprotected"
df_merge["date"] = pd.to_datetime(df_merge["date"], utc=True)

for ind, row in df_merge.iterrows():
    article = row["article"]
    date = row["date"]
    
    for ind2, row2 in df_protect.iterrows():
        if row2["article"] != article:
            continue

        start = row2["start"]
        end = row2["end"]

        # choose the inclusiveness you want; here: start <= date < end
        if (date >= start) and (date < end):
            df_merge.loc[ind, "protection_status"] = row2["status"]
            break   # we found the interval, no need to check others

In [41]:
df_merge['protection_status'].value_counts()

protection_status
unprotected           6913
fully protected          5
semi-protected           3
extended-protected       1
Name: count, dtype: int64

In [50]:
)

40
40
