# OPTIMAL-EM: Accessibility and Complexity Analysis Pipeline

This notebook processes a set (target population) of HTML files to perform accessibility analysis using Pa11y with the Axe runner, and complexity analysis. It extracts features from the HTML content, performs dimensionality reduction and clustering, and calculates correlation matrices between accessibility barriers and complexity metrics.


## Notes

- **External Dependencies**:
  - Check the import statements to ensure that all required libraries are installed. 
  - **Pa11y**: Ensure that Pa11y is installed and accessible in your environment.
    - Install [Pa11y](https://github.com/pa11y/pa11y): `npm install -g pa11y`
    - Ensure the Axe runner is available: Pa11y uses [htmlcs](https://github.com/squizlabs/HTML_CodeSniffer) for accessibility checks by default.
- **Data Paths**:
  - Update `HTML_PATH` if your HTML files are located in a different directory.
  - Update `DATA_FILE` if you wish to save the processed data to a different location.
- **Visualisation**:
  - The code generates plots using Matplotlib and Seaborn.
- **Re-running Accessibility Analysis**:
  - Set `rerun_accessibility=True` in `process_accessibility_results()` if you want to run a re-analysis of accessibility results (e.g. this is your first time running the notebook or you have updated the HTML files).


In [None]:
import os
import subprocess
import json
from collections import Counter
from collections.abc import Callable
from concurrent.futures import as_completed, ProcessPoolExecutor
from typing import Any

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from bs4 import BeautifulSoup
from sklearn.cluster import DBSCAN
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.manifold import TSNE
from sklearn.preprocessing import StandardScaler

## Constants:

- **HTML_PATH**: The path to the folder containing the HTML files.
- **DATA_FILE**: The path to save the processed data.
- **RICH_CONTENT_TAGS**: A list of HTML tags considered as rich content.
- **INLINE_TAGS**: A list of inline elements.

In [None]:
# The path to the folder containing the HTML files:
HTML_PATH = 'pages/res/'

# The path to save the processed data:
DATA_FILE = 'collected_data_step2_ec.json'

RICH_CONTENT_TAGS = ['a', 'audio', 'button', 'canvas', 'embed', 'iframe', 'img', 'input', 'keygen', 'label', 'math',
                     'object', 'select', 'svg', 'textarea', 'video']

INLINE_TAGS = [
    'a', 'abbr', 'acronym', 'b', 'bdo', 'big', 'br', 'button',
    'cite', 'code', 'dfn', 'em', 'i', 'img', 'input', 'kbd', 'label',
    'map', 'object', 'output', 'q', 'samp', 'script', 'select',
    'small', 'span', 'strong', 'sub', 'sup', 'textarea', 'time', 'tt', 'var'
]

## Helper Methods

### `get_html_structure(soup)`

Returns all HTML tags in a string format (e.g., `'head meta div div div'`).

### `get_html_block_structure(soup)`

Returns only block-level HTML tags in a string format, excluding inline tags (e.g., `'div div div'`).


In [None]:
def get_html_structure(soup):
    """Returns the HTML tags in a string, e.g., 'head meta div div div'."""
    tags = " ".join(tag.name for tag in soup.find_all())
    return tags

def get_html_block_structure(soup):
    """Returns only block-level HTML tags in a string, e.g., 'div div div'."""
    tags = " ".join(tag.name for tag in soup.find_all() if tag.name not in INLINE_TAGS)
    return tags

## Compute Accessibility Results

### `get_accessibility_results(filename, output)`

Computes the accessibility results based on impact levels for a given HTML file using Pa11y output. Axe produces accessibility results categorised as critical, serious, moderate, or minor.

In [None]:
def get_accessibility_results(filename: str, output: list[dict[str, dict[str, str]]]) -> list[int]:
    """
    Computes the accessibility results based on impact levels for a given .html file.
    Axe produces accessibility results categorised as critical, serious, moderate, or minor.

    :param filename: str: The name of the .html file. Used for logging.
    :param output: list[dict[str, dict[str, str]]]: The list of barriers, each containing a 'runnerExtras' key with an
        'impact' field.
    :return: list[int, int, int, int]: Counts of barriers categorised as [critical, serious, moderate, minor].
    """
    print(f"Calculating accessibility for {filename}:")

    impacts = [issue['runnerExtras']['impact'] for issue in output]
    impact_counter = Counter(impacts)

    return [
        impact_counter.get('critical', 0),
        impact_counter.get('serious', 0),
        impact_counter.get('moderate', 0),
        impact_counter.get('minor', 0)
    ]

In [None]:
def process_html_file(filename: str) -> dict[str, Any]:
    """
    Processes a single HTML file for accessibility results and HTML content.

    :param filename: str: The name of the HTML file to process.
    :return: dict[str, Any]: A dictionary containing filename, accessibility results, and HTML content.
    """
    file_path = os.path.join(HTML_PATH, filename)
    print(f"Processing: {filename}")

    # Run Pa11y as a subprocess with Axe runner:
    process = subprocess.Popen(
        ['pa11y', '--runner', 'axe', '--reporter', 'json', file_path],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        universal_newlines=True
    )

    try:
        stdout_line = process.stdout.readline().strip()
        if not stdout_line:
            accessibility_results = [0, 0, 0, 0]
        else:
            pa11y_output = json.loads(stdout_line)
            accessibility_results = get_accessibility_results(filename, pa11y_output)
    except (json.JSONDecodeError, Exception) as e:
        print(f"Error processing Pa11y output for {filename}: {e}")
        accessibility_results = [0, 0, 0, 0]

    with open(file_path, 'r', encoding='utf-8') as f:
        content = f.read()

    return {
        'filename': filename,
        'accessibility': accessibility_results,
        'html_content': content
    }

In [None]:
def process_accessibility_results(rerun_accessibility: bool = False) -> list[dict[str, Any]]:
    """
    Collects data from HTML files, runs accessibility analysis using Pa11y if needed, and saves the results to a JSON
    file.

    :param rerun_accessibility: bool: Flag to indicate whether to rerun the accessibility analysis.
    :return: a list of dictionaries where each dictionary contains:
        - 'filename' (str): The name of the HTML file.
        - 'accessibility' (list[int]): Accessibility results categorised as [critical, serious, moderate, minor].
        - 'html_content' (str): The HTML content of the file.
    """
    if not rerun_accessibility and os.path.exists(DATA_FILE):
        print("Loading data from JSON file...")
        with open(DATA_FILE, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return data
    else:
        print("Collecting data and running accessibility analysis...")
        data = []

        with ProcessPoolExecutor() as executor:
            futures = [executor.submit(process_html_file, filename)
                       for filename in os.listdir(HTML_PATH)
                       if filename.endswith('.html') or filename.endswith('.htm')]

            for future in as_completed(futures):
                try:
                    result = future.result()
                    data.append(result)
                except Exception as e:
                    print(f"Error processing file: {e}")

        print("Saving data to JSON file...")
        with open(DATA_FILE, 'w', encoding='utf-8') as f:
            json.dump(data, f)

        return data

## Method to Run Analysis

### `run_analysis(data, feature_extraction_func)`

Runs the analysis using the provided HTML files and feature extraction function. The analysis includes:

1. HTML parsing
2. Feature extraction
3. Dimensionality reduction
4. Clustering
5. Calculation of a correlation matrix for accessibility and complexity metrics



In [None]:
def run_analysis(
    data: list[dict[str, Any]],
    feature_extraction_func: Callable[[BeautifulSoup], str]
) -> pd.DataFrame:
    """
    Runs the analysis using the provided .html files and feature extraction function. The analysis includes (i) HTML
    parsing, (ii) feature extraction, (iii) dimensionality reduction, (iv) clustering, and (v) the calculation of a
    correlation matrix for accessibility and complexity metrics.

    :param data: list[dict[str, Any]]
        A list of dictionaries containing 'filename', 'accessibility', and 'html_content' fields:
        - 'filename' (str): The name of the HTML file.
        - 'accessibility' (list[int]): Accessibility results as [critical, serious, moderate, minor].
        - 'html_content' (str): The HTML content of the file.

    :param feature_extraction_func: Callable[[BeautifulSoup], str]
        A function that takes a BeautifulSoup object (parsed HTML) and returns a string representing the extracted
        features (e.g., HTML tags).

    :return: pd.DataFrame
        A correlation matrix showing the relationships between the normalised metrics, including complexity and
        accessibility barriers.
    """
    filenames = []
    accessibility_data = []
    complexity_data = []
    html_structure_texts = []

    for item in data:
        filenames.append(item['filename'])
        accessibility_data.append(item['accessibility'])

        soup = BeautifulSoup(item['html_content'], 'html.parser')

        tags_text = feature_extraction_func(soup)
        html_structure_texts.append(tags_text)

        all_tags = soup.find_all()
        rich_content_tags = soup.find_all(RICH_CONTENT_TAGS)
        complexity = len(rich_content_tags) / len(all_tags) if len(all_tags) > 0 else 0
        complexity_data.append(complexity)

    vectorizer = CountVectorizer()
    dtm = vectorizer.fit_transform(html_structure_texts)

    print("Running t-SNE...")
    tsne = TSNE(perplexity=20)
    dtm_tsne = tsne.fit_transform(dtm.toarray())

    print("Running DBSCAN clustering...")
    dbscan = DBSCAN(eps=4, min_samples=2)
    clusters = dbscan.fit_predict(dtm_tsne)

    plt.figure(figsize=(10, 8))

    unique_clusters = set(clusters)
    colors = plt.cm.get_cmap('tab10', len(unique_clusters))

    for cluster in unique_clusters:
        # DBSCAN defines noise as -1:
        if cluster == -1:
            color = 'k'
            marker = 'x'
        else:
            color = colors(cluster)
            marker = 'o'

        cluster_points = dtm_tsne[clusters == cluster]

        plt.scatter(cluster_points[:, 0], cluster_points[:, 1], c=[color],
                    label=f'Cluster {cluster}' if cluster != -1 else 'Noise', marker=marker)

    plt.title('DBSCAN')
    plt.xlabel('t-SNE Dimension 1')
    plt.ylabel('t-SNE Dimension 2')
    plt.legend(loc='best')
    plt.show()

    cluster_data = pd.DataFrame({
        'filename': filenames,
        'cluster': clusters,
        'complexity': complexity_data,
        'accessibility': accessibility_data
    })

    # Expand 'accessibility' into separate columns:
    accessibility_df = pd.DataFrame(cluster_data['accessibility'].tolist(),
                                    columns=['critical', 'serious', 'moderate', 'minor'])

    cluster_data = pd.concat([cluster_data.drop('accessibility', axis=1), accessibility_df], axis=1)
    cluster_data['total_barriers'] = cluster_data[['critical', 'serious', 'moderate', 'minor']].sum(axis=1)

    # Group by cluster and compute the mean and variance:
    cluster_stats = cluster_data.groupby('cluster').agg({
        'complexity': ['mean', 'var'],
        'total_barriers': 'mean',
        'critical': 'mean',
        'serious': 'mean',
        'moderate': 'mean',
        'minor': 'mean'
    }).reset_index()

    cluster_stats.columns = [
        'Cluster', 'Avg. Complexity', 'Complexity Variance',
        'Total Barriers', 'Critical Barriers', 'Serious Barriers',
        'Moderate Barriers', 'Minor Barriers'
    ]

    # Flatten multi-level columns:
    cluster_stats.columns = ['_'.join(col).strip() if isinstance(col, tuple) else col for col in cluster_stats.columns]

    columns_to_normalize = [
        'Avg. Complexity',
        'Complexity Variance',
        'Total Barriers',
        'Critical Barriers',
        'Serious Barriers',
        # 'Moderate Barriers',
        # 'Minor Barriers'
    ]

    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(cluster_stats[columns_to_normalize])
    normalized_df = pd.DataFrame(normalized_data, columns=columns_to_normalize)
    correlation_matrix = normalized_df.corr()
    correlation_matrix.to_csv('correlation_matrix.csv', index=False)

    # Optionally, visualise the correlation matrix:
    plt.figure(figsize=(12, 12))
    sns.heatmap(correlation_matrix, cmap='coolwarm', fmt='.2f', annot=True)
    plt.title('Correlation Matrix')
    plt.show()

    return correlation_matrix

## Run the Analysis

We will now process the accessibility results and run the analysis using the `get_html_structure` function for feature extraction.

In [None]:
data = process_accessibility_results(rerun_accessibility=False)
correlation_matrix_1 = run_analysis(data, get_html_structure)

## Optional: Run Analysis with Block-Level HTML Structure

You can uncomment the code below to run the analysis using block-level HTML structures and compare the results.


In [None]:
# correlation_matrix_2 = run_analysis(data, get_html_block_structure)

# difference_matrix = correlation_matrix_2 - correlation_matrix_1
# plt.figure(figsize=(14, 14))
# sns.heatmap(difference_matrix, annot=True, cmap='coolwarm', fmt='.2f', center=0)
# plt.title('Difference of Correlation Matrices')
# plt.show()