# PRIMMDebug Log Data Analysis Notebook
This notebook displays all of the analysis of the log data that took place in the PRIMMDebug initial research paper.

The log data was collected from five schools between December 2024-Month 2025. It is divided into the following sections:
1. **Summary statistics:** ...
2. **Establishing variables:**...
3. **Visualisation of variables:**...
4. **Students' written responses:**...

All you need to do is run the notebooks in order and the statistics that appear in the paper will be displayed. If there are any issues, please report them in the [Issues section of the GitHub repository](https://github.com/LaurieGale10/primmdebug-log-data-analysis/issues).

Before we run anything else, let's first import all of the necessary libraries and data.

In [None]:
from classes.stage_log import StageLog
from classes.exercise_log import ExerciseLog
from classes.student_id import StudentId
from classes.exercise_classes.exercise import Exercise
from classes.processors.exercise_log_processor import ExerciseLogProcessor
from classes.processors.stage_log_processor import StageLogProcessor

from loading_services.fetch_logs_from_file import fetch_data_from_json
from loading_services.parse_logs import *

from testing_service.docker_interface import DockerInterface
from testing_service.test_report import TestReport
from testing_service.docker_interface import DockerInterface

from constants import *
from notebook_utils import *

import plotly.express as px
from collections import Counter
from statistics import median
import datetime
from pandas import DataFrame

import sklearn
import kmedoids

EXERCISES: list[Exercise] = parse_exercises(fetch_data_from_json("data/exercises"))
STAGE_LOGS: list[StageLog] = parse_stage_logs(fetch_data_from_json("data/cleaned_stage_logs"))
EXERCISE_LOGS: list[ExerciseLog] = parse_exercise_logs(STAGE_LOGS, fetch_data_from_json("data/cleaned_exercise_logs"))
STUDENT_IDS: list[StudentId] = parse_student_ids(fetch_data_from_json("data/student_ids"))

EXERCISE_LOGS_PER_SESSION: dict[int, list[ExerciseLog]] = {}
for exercise_log in EXERCISE_LOGS:
    session_number: int = exercise_log.session
    EXERCISE_LOGS_PER_SESSION[session_number] = EXERCISE_LOGS_PER_SESSION.get(session_number, []) + [exercise_log]

## Summary Statistics

### Log Data Summary
This data displays the following summary statistics to give information into the scale of the data we collected. We report below on:
- Number of exercises (that contain at least one completed PRIMMDebug stage)
  - Successful
  - Unsuccessful
  - Completed
  - Per each PRIMMDebug challenge
- Number of PRIMMDebug stages.
- Time of data collection


In [None]:
print(f"Number of attempted PRIMMDebug challenges: {len(EXERCISE_LOGS)}")

test_harness_results: list[bool] = [ExerciseLogProcessor.did_final_program_pass_test(exercise_log) for exercise_log in EXERCISE_LOGS]
number_successful_exercises: int = len([test_harness_result for test_harness_result in test_harness_results if test_harness_result])
number_unsuccessful_exercises: int = len([test_harness_result for test_harness_result in test_harness_results if not test_harness_result])

print(f"- Number of successfully completed PRIMMDebug challenges: {display_percentage_string(number_successful_exercises, len(EXERCISE_LOGS))}")
print(f"- Number of unsuccessful PRIMMDebug challenges: {display_percentage_string(number_unsuccessful_exercises, len(EXERCISE_LOGS))}")

number_completed_exercises: int = len([exercise_log for exercise_log in EXERCISE_LOGS if ExerciseLogProcessor.get_last_stage(exercise_log).stage_name == DebuggingStage.modify])
print(f"- Number of entirely completed PRIMMDebug challenges (where students reached the Make stage): {display_percentage_string(number_completed_exercises, len(EXERCISE_LOGS))}")

final_program_states: list[bool] = [ExerciseLogProcessor.is_final_program_erroneous(exercise) for exercise in EXERCISE_LOGS]
number_successful_final_program_states: list[bool] = len([final_program_state for final_program_state in final_program_states if final_program_state])
print(f"- Proportion of PRIMMDebug challenges where last program run successfully executed: {display_percentage_string(number_successful_final_program_states, len(EXERCISE_LOGS))}\n")

total_time: float = sum([ExerciseLogProcessor.get_time_on_exercise(exercise_log) for exercise_log in EXERCISE_LOGS])
print(f"Total time on PRIMMDebug challenges: {datetime.timedelta(seconds=total_time)}\n")

print(f"Number of completed PRIMMDebug stages: {len(STAGE_LOGS)}")

#Number of attempts at each PRIMMDebug challenge
challenge_attempts: dict[str, int] = {}
for exercise_log in EXERCISE_LOGS:
    challenge_attempts[exercise_log.exercise_name] = challenge_attempts.get(exercise_log.exercise_name, 0) + 1
challenge_attempts = dict(sorted(challenge_attempts.items(), key=lambda item: item[1], reverse=True)) #Sort by frequency
challenge_attempts_fig = px.bar(x = challenge_attempts.keys(), y = challenge_attempts.values(), labels = {"x": "Challenge Name", "y": "Frequency"})
challenge_attempts_fig.show()

#Number of challenges attempted by each student
challenges_per_student: dict[str, int] = {}
for exercise in EXERCISE_LOGS:
    student_id: str = exercise.student_id
    challenges_per_student[student_id] = challenges_per_student.get(student_id, 0) + 1
challenges_per_student_fig = px.histogram(challenges_per_student.values(), marginal="box", labels = {"value": "Attempted challenges per student", "count": "Frequency"})
challenges_per_student_fig.show()

#Number of stages per PRIMMDebug challenge attempt
stages_per_challenge_attempt: list[int] = [len(exercise.stage_logs) for exercise in EXERCISE_LOGS]
stages_per_challenge_fig = px.histogram(stages_per_challenge_attempt, marginal="box", labels={"value": "Number of stages"})
stages_per_challenge_fig.show()

#Number of challenge attempts per session
attempts_per_session: dict[int, int] = {}
for session, logs in EXERCISE_LOGS_PER_SESSION.items():
    attempts_per_session[session] = len(logs)
attempts_per_session_fig = px.bar(x = attempts_per_session.keys(), y = attempts_per_session.values(), labels = {"x": "Session", "y": "Frequency"})
attempts_per_session_fig.show()

### Student Demographics

Number of students:
- By gender
- By year group
- By school


In [None]:
print(f"Number of participating students: {len(STUDENT_IDS)}")

gender_split_fig = px.bar(x = get_gender_split().keys(), y = get_gender_split().values(), labels = {"x": "Gender", "y": "Frequency"})
gender_split_fig.show()

year_group_split_fig = px.bar(x = get_year_group_split().keys(), y = get_year_group_split().values(), labels={"x": "Year Group", "y": "Frequency"})
year_group_split_fig.show()

school_split_fig = px.bar(x = get_school_split().keys(), y = get_school_split().values(), labels={"x": "School", "y": "Frequency"})
school_split_fig.show()

## Establishing Variables
Now we move onto introducing the variables that underpin our log data analysis. These are:
- Time taken
  - Per challenge attempt
  - Per stage log
  - Per PRIMMDebug stage
- Correctness
- Engagement

### Time Taken

#### Summary Data
This contains data on:
- Time taken per PRIMMDebug challenge attempt
- Time taken per PRIMMDebug stage

In [None]:
#Time taken per PRIMMDebug challenge attempt
time_per_challenge_attempt: list[float] = [ExerciseLogProcessor.get_time_on_exercise(exercise) for exercise in EXERCISE_LOGS if hasattr(exercise,"end_time")]
time_per_challenge_fig = px.histogram(time_per_challenge_attempt, marginal="box", labels={"value": "Time taken (seconds)", "count": "Count"})
time_per_challenge_fig.show()

#Time taken per stage log
time_per_stage: list[float] = [StageLogProcessor.get_time_on_stage(stage) for stage in STAGE_LOGS if StageLogProcessor.get_time_on_stage(stage) is not None]
time_per_stage_fig = px.histogram(time_per_stage, marginal="box", labels={"value": "Time taken (seconds)", "count": "Count"})
time_per_stage_fig.show()


#### Stage-Specific Data
This contains more of the interesting data relating to each stage of the PRIMMDebug process, including:
- Time taken per PRIMMDebug stage
- How this varies over number of sessions
- How time taken on PRIMMDebug challenge attempts varies over number of sessions

In [None]:
#Time taken for each stage of PRIMMDebug (TODO: Add confidence intervals)
time_by_primmdebug_stage: dict[DebuggingStage, list[float]] = {}
for stage in STAGE_LOGS:
    if stage.stage_name != DebuggingStage.exit:
        if stage.stage_name not in time_per_stage:
            time_by_primmdebug_stage[stage.stage_name] = [StageLogProcessor.get_time_on_stage(stage)]
        else:
            time_by_primmdebug_stage[stage.stage_name].append(StageLogProcessor.get_time_on_stage(stage))
display_dict = {"stage": [], "time": []}
for key in DebuggingStage:
    if key in time_by_primmdebug_stage:
        display_dict["stage"].append(key.value)
        display_dict["time"].append(median(time_by_primmdebug_stage[key]))

time_by_primmdebug_stage_fig = px.bar(display_dict, x="stage", y="time", labels={"stage": "PRIMMDebug stage", "time": "Median time on stage (seconds)"}, title="Median time spent on each PRIMMDebug stage")
time_by_primmdebug_stage_fig.show()

#Time series for time per PRIMMDebug stage
median_time_per_stage_per_session = {"session": [], "stage": [], "median_time": []}
# Iterate through each session and calculate median time for each stage
for session_id, logs in EXERCISE_LOGS_PER_SESSION.items():
    stage_times = {stage: [] for stage in DebuggingStage if stage != DebuggingStage.exit}
    for log in logs:
        for stage_log in log.stage_logs:
            if stage_log.stage_name != DebuggingStage.exit:
                time_on_stage = StageLogProcessor.get_time_on_stage(stage_log)
                if time_on_stage is not None:
                    stage_times[stage_log.stage_name].append(time_on_stage)
    for stage, times in stage_times.items():
        if times:
            median_time_per_stage_per_session["session"].append(session_id)
            median_time_per_stage_per_session["stage"].append(stage.value)
            median_time_per_stage_per_session["median_time"].append(median(times))

median_time_per_stage_per_session_df = DataFrame(median_time_per_stage_per_session).sort_values(by="session")
median_time_per_stage_per_session_fig = px.line(median_time_per_stage_per_session_df, x="session", y="median_time", color="stage", labels={"session": "Session", "median_time": "Median time (seconds)", "stage": "PRIMMDebug stage"}, title="Median time spent on each PRIMMDebug stage per session")
median_time_per_stage_per_session_fig.show()

#Time series for time per PRIMMDebug challenge attempt
median_time_per_challenge_per_session: dict[int, float] = {}
for session_id, logs in EXERCISE_LOGS_PER_SESSION.items():
    median_time_per_challenge = median([ExerciseLogProcessor.get_time_on_exercise(log) for log in logs if hasattr(log, "end_time")])
    median_time_per_challenge_per_session[session_id] = median_time_per_challenge

median_time_per_challenge_per_session = dict(sorted(median_time_per_challenge_per_session.items()))
median_time_per_challenge_per_session_fig = px.line(x=median_time_per_challenge_per_session.keys(), y=median_time_per_challenge_per_session.values(), labels={"x": "Session", "y": "Median time (seconds)"}, title="Median time spent on each PRIMMDebug challenge per session")
median_time_per_challenge_per_session_fig.show()

### Correctness of exercise
- Per challenge
- Per student

Also includes number of find the error stages where correct answer was inputted. This could be broken down into:
- First time correct answers
- Number of attempts taken to correctly identify erroneous line

In [None]:
#Initiate Docker container
docker_interface: DockerInterface = DockerInterface.get_instance()
docker_interface.create_docker_container()
test_reports: list[TestReport] = []

for exercise_log in EXERCISE_LOGS:
    test_reports.append(ExerciseLogProcessor.test_final_program(exercise_log, docker_interface))

total_tests: list[TestReport] = [test_report for test_report in test_reports if test_report is not None]
successful_attempts: int = len([test_report for test_report in total_tests if test_report is not None and test_report.n_successful_tests == test_report.n_total_tests])
print(f"Number of successful test harness runs: {display_percentage_string(successful_attempts, len(total_tests))}")

find_error_stages_with_correct_field: list[StageLog] = [stage_log for stage_log in STAGE_LOGS if stage_log.stage_name == DebuggingStage.find_error and stage_log.correct is not None]
correct_find_error_stages: int = len([stage_log for stage_log in find_error_stages_with_correct_field if stage_log.correct])
print(f"Number of find the error stages where the correct response was entered (for challenges where students had to pinpoint a line): {display_percentage_string(correct_find_error_stages, len(find_error_stages_with_correct_field))}")
docker_interface.close_docker_container()

### Engagement
Some contextual information about general engagement with the tool, including:
- Time focused on the window
- Runtime behaviour:
  - Quality/similarity of tests
- Number of exercises where Test stage is reached.

In [None]:
#Final stage of PRIMMDebug challenge attempts
challenge_end_stages: dict[str, int] = dict(Counter([ExerciseLogProcessor.get_last_stage(exercise_log).stage_name.name for exercise_log in EXERCISE_LOGS]))
final_stage_fig = px.bar(x = list(challenge_end_stages.keys()), y = list(challenge_end_stages.values()), labels = {"x": "Final stage of PRIMMDebug", "y": "Frequency"})
final_stage_fig.show()

#Time spent focused on PRIMMDebug window per exercise
time_spent_focused: list[float] = [ExerciseLogProcessor.get_time_focused(exercise) for exercise in EXERCISE_LOGS]
time_spent_focused_fig = px.histogram(time_spent_focused, marginal="box", labels={"value": "Time spent focused on PRIMMDebug window"})
time_spent_focused_fig.show()

#Challenge attempts where test case panes were viewed
exercises_with_test_case_views: int = len([exercise_log for exercise_log in EXERCISE_LOGS if ExerciseLogProcessor.were_test_cases_viewed(exercise_log)]) / len(EXERCISE_LOGS) * 100
print(f"Percentage of exercises where test cases were viewed at some point: {exercises_with_test_case_views:.2f}")
inspect_the_code_test_case_views: int = len([exercise_log for exercise_log in EXERCISE_LOGS if ExerciseLogProcessor.were_test_cases_viewed(exercise_log, [DebuggingStage.inspect_code])])
print(f"- In the Inspect the Code stage: {display_percentage_string(inspect_the_code_test_case_views, len(EXERCISE_LOGS))}")
test_stage_test_case_views: int = len([exercise_log for exercise_log in EXERCISE_LOGS if ExerciseLogProcessor.were_test_cases_viewed(exercise_log, [DebuggingStage.test])])
print(f"- In the Test stage: {display_percentage_string(test_stage_test_case_views, len(EXERCISE_LOGS))}")

#Exercises where test stage is reached
exercises_test_stage_reached: int = len([exercise_log for exercise_log in EXERCISE_LOGS if any(stage.stage_name == DebuggingStage.fix_error for stage in exercise_log.stage_logs)])
print(f"Number of PRIMMDebug challenge attempts where Test stage was reached: {display_percentage_string(exercises_test_stage_reached, len(EXERCISE_LOGS))}")

## Cluster Analysis

The exercise logs are structured in such a way that could benefit from sequence analysis, which can then be clustered. Given the study's focus on reflective debugging, we decided to cluster based on the time spent on each PRIMMDebug stage. Then, the success, quality of written responses, and survey responses can be calculated to see any difference between the clusters.

Procedure for clustering analysis was informed by Murphy et al., (2024), Everitt et al., (year) and Frades and Matthiesen (2010). As a result, a few options for each stage was selected in order that could in turn be compared and evaluated.

1. **What are we clustering**: Clustering for two main objects is being explored:
  - Feature: Median time on PRIMMDebug stage.
  - Item: PRIMMDebug challenge attempts
  - Item: Students (with potential addition of number of challenges attempted)
  - Both only contain continuous data.
2. **How does the data need to be transformed**: Some general preprocessing will need to be performed on the data.
  - Missing data: Check how much missing data there is (lots of challenge attempts won't get to the final stages - what to do about these? Labelling them as 0 will naturally influence the clusters). Check how much missing data there is.
  - For the partioning methods, data will need to be standardised.
  - Item-feature matrix also needs to be made for clustering
3. **Similarity measure**: Trying a mix of methods that work solely with continuous data, namely Euclidean, Manhattan (more robust to outliers), Minkowski distance, and maybe Pearson?
4. **Clustering algorithm**: Three clustering algorithms are being tried and compared: K-means, K-medoids, and agglomerative hierarchical clustering.
5. **Evaluation of clusters**: Widely different solutions may indicate abscence of clusters based on current variables. Here's some metrics used, specifically based on (refs) and (refs):
   - For cluster quality: Average Silhouette Width and silhouette plot can be used to determine whether each cluster is "good", as well as bootstrapping.
   - For influence (of a single point): Point biserial index.
Values for both measures range from -1-1.
6. **Interpretation**: Inspect clusters and give them names

First, we generate a set of matrices (`DataFrame`s) to act as parameters for clustering. Each of these matrices contains features related to the time spent on each debugging stage, but they're worked out slightly differently:
- `non_zero_times_per_stage`: Contains the *total* time spent on each debugging stage, *excluding* challenge attempts didn't complete every debugging stage (n=).
- `median_times_per_stage`: Contains the *median* time spent on each debugging stage, *including* challenge attempts that didn't complete every debugging stage (n=).
- `non_zero_median_times_per_stage`: Contains the *median* time spent on each debugging stage, *excluding* challenge attempts didn't complete every debugging stage (n=).

In [None]:
null_stages: list[DebuggingStage] = [DebuggingStage.completed_test, DebuggingStage.modify, DebuggingStage.make, DebuggingStage.exit]

#Contains the times per stage for each exercise log. Not created a DataFrame for this as clustering is not being performed on it
times_per_stage: list[dict[DebuggingStage, float]] = []
for exercise_log in EXERCISE_LOGS:
    time_per_stage: dict[DebuggingStage, float] = {stage: time for stage, time in ExerciseLogProcessor.get_time_per_stage(exercise_log).items() if stage not in null_stages}
    times_per_stage.append(time_per_stage)

#Filter zero values
non_zero_times_per_stage: list[dict[DebuggingStage, float]] = [time_per_stage for time_per_stage in times_per_stage if all(value > 0 for value in time_per_stage.values())]
non_zero_times_per_stage_df: DataFrame = DataFrame.from_dict(non_zero_times_per_stage)

median_times_per_stage: list[dict[DebuggingStage, float]] = []
for exercise_log in EXERCISE_LOGS:
    time_per_stage: dict[DebuggingStage, float] = {stage: time for stage, time in ExerciseLogProcessor.get_time_per_stage(exercise_log, True).items() if stage not in null_stages}
    median_times_per_stage.append(time_per_stage)
median_times_per_stage_df: DataFrame = DataFrame.from_dict(median_times_per_stage)

non_zero_median_times_per_stage: list[dict[DebuggingStage, float]] = [time_per_stage for time_per_stage in median_times_per_stage if all(value > 0 for value in time_per_stage.values())]
non_zero_median_times_per_stage_df: DataFrame = DataFrame.from_dict(non_zero_median_times_per_stage)

We then standardise these values using `StandardScaler.fit_transform` and plot scatter plot matrices to identify outliers

In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

columns = non_zero_times_per_stage_df.columns

non_zero_times_per_stage_df = DataFrame(scaler.fit_transform(non_zero_times_per_stage_df), columns=columns)
median_times_per_stage_df = DataFrame(scaler.fit_transform(median_times_per_stage_df), columns=columns)
non_zero_median_times_per_stage_df = DataFrame(scaler.fit_transform(non_zero_median_times_per_stage_df), columns=columns)

px.scatter_matrix(non_zero_times_per_stage_df, dimensions=non_zero_times_per_stage_df.columns, title="Scatter matrix of non-zero times per stage (non_zero_times_per_stage_df)").show()
px.scatter_matrix(median_times_per_stage_df, dimensions=median_times_per_stage_df.columns, title="Scatter matrix of median times per stage (median_times_per_stage_df)").show()
px.scatter_matrix(non_zero_median_times_per_stage_df, dimensions=non_zero_median_times_per_stage_df.columns, title="Scatter matrix of non-zero median times per stage (non_zero_median_times_per_stage_df)").show()

**Removing outliers**

Each scatter plot matrix shows a few outliers, which are subsequently removed with the Z score threshold > +- 3.

(Note to self: this might prevent identifying which items are in which clusters. If this is the case, programmatically detect and remove outliers before standardising dfs. Hopefully there shouldn't be any Z scores > +- 3).

In [None]:
#Threshold for outliers:
non_zero_times_per_stage_df = non_zero_times_per_stage_df[(non_zero_times_per_stage_df.abs() <= 3).all(axis=1)]
median_times_per_stage_df = median_times_per_stage_df[(median_times_per_stage_df.abs() <= 3).all(axis=1)]
non_zero_median_times_per_stage_df = non_zero_median_times_per_stage_df[(non_zero_median_times_per_stage_df.abs() <= 3).all(axis=1)]

### K-means

Disadvantages:
- `k` has to be determined in advanced (although visual heuristics can be used to help optimise this Murphy et al., (2024)).
- Constrained to Euclidean distance. This is not a significant drawback as this was defined as a suitable distance metric given the continuous nature of the data.
- Sensitive to initial clusters and can converge to local minima. Can be done by running the algorithm within a large number of random centroids.

First, we work out the best value of k from elbow plot from visualising elbow plot of TWCSS.

In [None]:
from sklearn.cluster import KMeans

K: int = 10
k_means_solutions: DataFrame = DataFrame(columns=["k", "Total within-cluster sum of squares", "DataFrame"])

dataframe_names: dict[str, DataFrame] = {
    "Non-zero times per stage": non_zero_times_per_stage_df,
    "Median times per stage": median_times_per_stage_df,
    "Non-zero median times per stage": non_zero_median_times_per_stage_df,
}

i: int = 0
for name, df in dataframe_names.items():
    for k in range(1, K + 1):
        k_means = KMeans(n_clusters=k, random_state=0).fit(df)
        k_means_solutions.loc[i] = [k, k_means.inertia_, name]
        if k >= 2:
            print("Silhouette score for k =", k, "on", name, ":", sklearn.metrics.silhouette_score(df, k_means.predict(df)))
        i += 1
px.line(
    k_means_solutions,
    x = "k",
    y = "Total within-cluster sum of squares",
    color="DataFrame",
    title="Elbow plot for k-means clustering"#TODO: Could include ASW in title as sanity check
).show()

#### Parameter Tuning
From this we can see that the `non_zero_median_times_per_stage` DataFrame consistently has the lowest TWCSS for each `k`, so we use this DataFrame for the rest of the analysis (**is this ok?**). The elbow plot indicates `k=4` is optimal, so we continue by trying different `init` and `n_init` parameter values to see if the TWCSS can be improved any further.

In [None]:
for init in ["k-means++", "random"]:
    for n_init in [1, 50, 100]:
        k_means = KMeans(n_clusters=4, init=init, n_init=n_init, random_state=0).fit(non_zero_median_times_per_stage_df)
        print(f"{init}, {n_init}: {k_means.inertia_}")

#### Fitting Final Model
Shows n_init=50 with k_means++ (default argument) is suitable. We now fit this model and check the number of clusters per item. Evaluation of this model comes later on.

In [None]:
k_means_final_model = KMeans(n_clusters=4, n_init=50, random_state=0)
k_means_final_model.fit_transform(non_zero_median_times_per_stage_df)
#print(k_means_final_model.cluster_centers_) #TODO: Needs transformation back to original scale
items_per_cluster: dict[int, int] = {
    1: 0,
    2: 0,
    3: 0,
    4: 0
}
for label in k_means_final_model.labels_:
    items_per_cluster[label+1] += 1
print(items_per_cluster)

#### Evaluating Model
Now we plot an Average Silhouette Plot for the relevant k_means models

**Should I evaluate all the different models? Seems like there's too many variables to do this**

In [None]:
from matplotlib import pyplot as plt

cluster_labels = k_means_final_model.fit_predict(non_zero_median_times_per_stage_df)
silhouette_values = sklearn.metrics.silhouette_samples(non_zero_median_times_per_stage_df, cluster_labels)
silhouette_scores = sklearn.metrics.silhouette_score(non_zero_median_times_per_stage_df, k_means.predict(non_zero_median_times_per_stage_df))
fig, ax = plot_silhouette_plot(cluster_labels, silhouette_values, silhouette_scores, k_means_final_model.n_clusters)

plt.show()

### K-Medoids

Improvements on K-means:
- Not constrained to Euclidean distance; other dissimilarity measures can be used.
- Less affected by outliers.
- Data doesn't have to be continuous.

We've already standardised and removed missing values, so the first thing to do is to visualise an elbow plot.

Instead of deciding on the DataFrames, we visualise different dissimilarity measures (euclidean, manhattan, and minkowski). We have to pre-create the dissimilarity matrices to do this.

In [None]:
from sklearn import metrics

K: int = 10
k_medoids_solutions: DataFrame = DataFrame(columns=["k", "Total within-cluster sum of squares", "Dissimilarity measure"])

i: int = 0
for dissimilarity_measure in ["euclidean", "manhattan", "minkowski"]:
    df = sklearn.metrics.pairwise_distances(non_zero_median_times_per_stage_df, metric=dissimilarity_measure)
    for k in range(1, K + 1):
        k_medoids_intermediate = kmedoids.fastpam1(df, k)
        k_medoids_solutions.loc[i] = [k, k_medoids_intermediate.loss, dissimilarity_measure]
        i += 1

px.line(
    k_medoids_solutions,
    x = "k",
    y = "Total within-cluster sum of squares",
    color="Dissimilarity measure",
    title="Elbow plot for k-means clustering"
).show()

There's clearly not much of an elbow for this measure (and very little different between the euclidean and minkowski measures), so I'm going to move on for now.

### Agglomerative Hierarchical Clustering

Advantages:
- Not restricted to continuous data.
- Don't have to specify K (but do have to specify a linkage method)

Disadvantages:
- Complete linkage doesn't perform well with outliers.
- Single linkage can lead to "chaining" of clusters.

Procedure:
- Try a variation of linkage criteria and dissimilarity measures.
- Visualise the dendrogram and cut at the most appropriate places (could cut at several points).

After a manual inspection the following combinations of parameters:
- Any model with single linkage
- Average linkage and city block dissimilarity
- Average linkage and minkowski dissimilarity

In [None]:
from scipy.cluster.hierarchy import dendrogram, linkage

linkage_methods: list[str] = ["complete", "average", "ward"]  # NOTE THAT WARD ONLY WORKS WITH EUCLIDEAN DISTANCE
for dissimilarity_measure in ["euclidean", "cityblock", "minkowski"]:
    for linkage_method in linkage_methods:
        if (linkage_method == "ward" and dissimilarity_measure != "euclidean") or (linkage_method == "average" and dissimilarity_measure != "euclidean"):
            continue  # Ward only works with Euclidean distance
        plt.figure(figsize=(10, 7))
        plt.title(f"Dendrogram for {linkage_method} linkage with {dissimilarity_measure} dissimilarity")
        dendrogram(linkage(non_zero_median_times_per_stage_df, method=linkage_method, metric=dissimilarity_measure))
        plt.show()

### Based on sequence analysis
Aiming to perform some sequence analysis on the PRIMMDebug challenge attempts, as each student undertook a set of steps that can be mapped as a sequence

Other ideas:
- Could also try sequence analysis from an exercise logs' first find the error stage.

Sequence analysis (following guidance from .. (2024)):
- Define properties for sequencing:
  - **Alphabet**: The PRIMMDebug stages, as defined in `enums.py`
  - **Time scheme**: Whenever the PRIMMDebug stage switches (downside is that this doesn't take time into account; how to set a time window that allows this data to be categorised consistently? e.g. if the time window was every 60 seconds, the student could've gone through several PRIMMDebug stages, which would be missed if we just labelled this the last one)
  - **Actor**: A single PRIMMDebug challenge attempt.
- Create sessions through ordered actions (done automatically in the `stage_logs` attribute of the `ExerciseLogs` object upon loading data)
- Trim overly short or long sessions (outliers in terms of times between states or number of states)
- Calculate dissimilarity
- Fit data to format that clustering/sequencing package requires - *reshaping*
- Add a colour palette (not sure how to do this in Python)

## Written Responses

Another aspect of the log data was the written responses that students had to enter for many of the stages. Some descriptive statistics for these responses are displayed here, with qualitative analysis being performed separately.

Some things to display (all by PRIMMDebug stage):
- Number of responses containing actual words
- Average length of response
- First ~50 rows of responses table (logic for saving table should be in here)

In [None]:
from save_logs import *

import nltk
from nltk.corpus import words
from nltk.tokenize import word_tokenize

nltk.download("words", quiet=True)
nltk.download("punkt", quiet=True)
nltk.download("punkt_tab", quiet=True)

english_words = set(words.words("en"))  # Load English words into a set for fast lookup
written_responses: list[str] = [response for exercise_responses in [ExerciseLogProcessor.get_written_responses(exercise_log) for exercise_log in EXERCISE_LOGS] for response in exercise_responses]
print(f"Number of written responses: {len(written_responses)}")

responses_with_valid_words: list[str] = []
responses_with_invalid_words: list[str] = []

for response in written_responses:
    tokens = word_tokenize(response.lower())  # Convert to lowercase for case-insensitive matching
    # Check if any token is a valid English word
    if any(token in english_words for token in tokens):
        responses_with_valid_words.append(response)
    else:
        responses_with_invalid_words.append(response)

print(f"Number of written responses that contain at least one valid English word: {len(responses_with_valid_words)}/{len(written_responses)} ({(len(responses_with_valid_words) / len(written_responses)) * 100:.2f}%)")

number_inspect_code_stages: int = len([stage_log for stage_log in STAGE_LOGS if stage_log.stage_name == DebuggingStage.inspect_code])
number_no_response_inspect_code_stages: int = len([stage_log for stage_log in STAGE_LOGS if stage_log.stage_name == DebuggingStage.inspect_code and StageLogProcessor.does_inspect_the_code_contain_response(stage_log) is False])
print(f"Number of inspect the code stages which contain no response: {display_percentage_string(number_no_response_inspect_code_stages, number_inspect_code_stages)}")

## Contextual Information
Analysis that doesn't fit into the current framework but might come in handy later.

### Number of stages taken for a PRIMMDebug challenge
- Per exercise
- Per student

In [None]:


#Median number of stages that each student took on the PRIMMDebug challenges they attempted
average_stages_per_student: list[int] = []
for student in STUDENT_IDS:
    student_EXERCISE_LOGS: list[ExerciseLog] = [exercise for exercise in EXERCISE_LOGS if exercise.student_id == student.id]
    if len(student_EXERCISE_LOGS) > 0:
        average_stages_per_student.append(median([len(exercise.stage_logs) for exercise in student_EXERCISE_LOGS]))
average_stages_per_student_fig = px.histogram(average_stages_per_student, marginal="box", labels={"value": "Median number of stages per student", "count": "Count"})
average_stages_per_student_fig.show()

### Program Log Stats
For relevant PRIMMDebug stages that contain program logs

In [None]:
number_of_runs_inspect_the_code_and_test: list[int] = [StageLogProcessor.get_number_of_runs(stage_log) for stage_log in STAGE_LOGS if stage_log.stage_name in [DebuggingStage.inspect_code, DebuggingStage.test] and StageLogProcessor.get_number_of_runs(stage_log) > 0] #Remove stages where there's 0 runs
number_of_runs_inspect_the_code: list[int] = [StageLogProcessor.get_number_of_runs(stage_log) for stage_log in STAGE_LOGS if stage_log.stage_name == DebuggingStage.inspect_code and StageLogProcessor.get_number_of_runs(stage_log) > 0]
number_of_runs_test: list[int] = [StageLogProcessor.get_number_of_runs(stage_log) for stage_log in STAGE_LOGS if stage_log.stage_name == DebuggingStage.test and StageLogProcessor.get_number_of_runs(stage_log) > 0]
number_of_runs_fig = px.histogram(number_of_runs_inspect_the_code_and_test, marginal="box", labels={"x": "Time taken (seconds)"})
number_of_runs_fig.show()

time_between_runs: list[float] = [time for stage_log in STAGE_LOGS if stage_log.stage_name in [DebuggingStage.inspect_code, DebuggingStage.test] for time in StageLogProcessor.get_time_between_runs(stage_log) if StageLogProcessor.get_time_between_runs(stage_log) != []]
time_between_runs_fig = px.histogram(time_between_runs, marginal="box", labels={"x": "Time between runs (seconds)"})
time_between_runs_fig.show()

runs_per_minute: list[float] = [round(StageLogProcessor.get_runs_per_minute(stage_log), 2) for stage_log in STAGE_LOGS if stage_log.stage_name in [DebuggingStage.inspect_code, DebuggingStage.test]]
print(f"Runs per minute for inspect the code/test stages: {runs_per_minute}")

number_of_inputs: list[list[int]] = [StageLogProcessor.get_number_of_inputs_from_runs(stage_log) for stage_log in STAGE_LOGS if stage_log.stage_name in [DebuggingStage.inspect_code, DebuggingStage.test]]
print(f"Number of inputs per stage for test stages: {number_of_inputs}")

## References
Everitt, B.S., Landau, S., Leese, M. and Stahl, D., 2011. Cluster analysis 5th edition Wiley.
Frades I, Matthiesen R. Overview on techniques in cluster analysis. Methods Mol Biol. 2010;593:81-107. doi: 10.1007/978-1-60327-194-3_5. PMID: 19957146.
Keefe Murphy, Sonsoles López-Pernas, Mohammed Saqr (2024). Dissimilarity-based Cluster Analysis of Educational Data: A Comparative Tutorial using R. In M. Saqr & S. López-Pernas (Eds.), Learning analytics methods and tutorials: A practical guide using R   (pp. 231-283).Springer, Cham. doi: 10.1007/978-3-031-54464-4_8
Saqr, M. et al. (2024). Sequence Analysis in Education: Principles, Technique, and Tutorial with R. In: Saqr, M., López-Pernas, S. (eds) Learning Analytics Methods and Tutorials. Springer, Cham. https://doi.org/10.1007/978-3-031-54464-4_10

# Appendix

## Save logs