In [None]:
# Obtain Canvas access token from .env file

import os
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
token = os.environ.get("TOKEN")

In [None]:
import requests
import json
import pandas as pd
import numpy as np

# Overhead

In [None]:
response = requests.get(f"https://swinburne.instructure.com/api/v1/courses/", params={
    "access_token":token,
    "enrollment_state":"complete",
    "per_page":100,
})

In [None]:
from requests.exceptions import HTTPError
from datetime import datetime
from tzlocal import get_localzone

def parse_iso_timestamps(data : pd.DataFrame) -> pd.DataFrame:
    """
    Replace all string timestamps in a DataFrame with datetime objects.
    All timestamps must comply with ISO-8601.

    Args:
        data (DataFrame): DataFrame containing ISO-8601 string timestamps.

    Returns:
        data (DataFrame): DataFrame containing datetime timestamps.
    """

    # Obtain all columns which can be converted
    # into a timestamp through ugly brute-forcing
    timestamp_columns = []
    for key, value in data.iloc[0].to_dict().items():
        if type(value) is str:
            try:
                datetime.fromisoformat(value)
                timestamp_columns.append(key)
            except:
                pass

    def isotime_to_timestamp(value : str | None, use_local_timezone : bool = True, as_string : bool = False):
        if type(value) is not str: return None
        
        time = datetime.fromisoformat(value)
        if use_local_timezone:
            time = time.astimezone(get_localzone())

        if as_string:
            time = time.strftime("%A, %d %B %Y, %I:%M %p")
            
        return time
    
    for column in timestamp_columns:
        data[column] = data[column].map(isotime_to_timestamp)

    return data

def process_canvas_dataframe(response : pd.DataFrame) -> pd.DataFrame:
    response = parse_iso_timestamps(response) # Convert string timestamps to object
    response = response.dropna(axis=1, how="all") # Drop columns with entirely NA values
    return response
    
def decode_canvas_response(response : requests.Response | list[dict]) -> pd.DataFrame:
    """
    Converts raw Canvas API response into a DataFrame for downstream use,
    or raises Exception if the response is invalid.

    Args:
        response (Response | dict): The API response obtained from Canvas.

    Returns:
        response_data (DataFrame): The response data.
    """

    if type(response) is requests.Response:
        if not response.ok: response.raise_for_status()

        # Canvas API will always respond in JSON format,
        # but we obtain the response in bytes, so it
        # must be decoded into a raw string and then
        # encoded into a JSON object.
        
        response = response.content.decode('utf-8')
        
        response = json.loads(response)
    
    if not response:
        # Empty response
        return pd.DataFrame([])
    
    if type(response) is not list: response = [response]
    
    response = pd.DataFrame(response)

    response = process_canvas_dataframe(response)

    return response

# Get current units

In [None]:
# Two enrolment states:
# active   - Visible on home menu
# complete - Invisible

units = requests.get(f"https://swinburne.instructure.com/api/v1/courses/", params={
    "access_token":token,
    "enrollment_state":"complete",
    "per_page":100,
})
units = decode_canvas_response(units)

In [None]:
# Internally, Swinburne Organisation (ORG) units are assigned to enrollment term ID 1
# Since we're only concerned with academic units, let's filter out organisation units
units = units.loc[units.enrollment_term_id != 1]

# Get assignments from current units

In [None]:
assignment_groups = []

for unit_id in units["id"].to_list():
    
    response = requests.get(f"https://swinburne.instructure.com/api/v1/courses/{unit_id}/assignment_groups", params={
        "access_token":token,
        "per_page":100, # Max items per unit
        "include":"assignments",
    })
    
    response = decode_canvas_response(response)
    response["course_id"] = unit_id
    
    assignment_groups.append(response)

assignment_groups = pd.concat(assignment_groups).reset_index(drop=True)

In [None]:
# Obtain a list of units which do NOT use weighting to calculate their total points
unweighted_units = units.loc[ units["apply_assignment_group_weights"] == False ].id.tolist()

In [None]:
units.loc[ units["apply_assignment_group_weights"] == False ].id.tolist()

In [None]:
assignment_groups.loc[assignment_groups.course_id.isin(unweighted_units)].sort_values("course_id")

In [None]:
# Extract assignments from groups
assignments = assignment_groups.assignments.explode().dropna().tolist()
assignments = decode_canvas_response(assignments)
#assignments.points_possible = assignments.points_possible.fillna(0)

In [None]:
assignments[["course_id", "name", "points_possible"]].sort_values("points_possible", ascending=False)#.head()

In [None]:
# Count the total amount of points possible for each assignment group
# and add it as a feature for the data

import numpy as np
split = assignment_groups.explode("assignments")

entries = pd.json_normalize(split["assignments"])
entries = entries.add_prefix("assignment_")

split["assignment_id"] = split.assignments.map(lambda x: x['id'] if type(x) is dict else None)
split = split.dropna(subset="assignment_id")
split["assignment_id"] = split["assignment_id"].astype("int")

split = split.merge(entries, how="inner", left_on="assignment_id", right_on="assignment_id")
del split["assignments"]

total_points_possible = split.groupby("id")["assignment_points_possible"].sum().reset_index()
total_points_possible = total_points_possible.rename(columns={"assignment_points_possible" : "total_points_possible"})
assignment_groups = assignment_groups.merge(total_points_possible, how="outer", left_on="id", right_on="id")
split = split.merge(total_points_possible, how="outer", left_on="id", right_on="id")

In [None]:
assignment_groups[["name", "total_points_possible"]].sort_values("total_points_possible", ascending=False).head()

In [None]:
def get_assignment_group_feature(assignment_id, group_feature_name : str, assignment_groups_split : pd.DataFrame = split):
    try:
        match = assignment_groups_split.loc[assignment_groups_split.assignment_id == assignment_id]
        feature = match[group_feature_name].item()
        return feature
    except: return None

assignments["points_total_for_group"] = assignments["id"].map(lambda x: get_assignment_group_feature(x, "total_points_possible"))
assignments["group_weight"] = assignments["id"].map(lambda x: get_assignment_group_feature(x, "group_weight"))

In [None]:
# Calculate grade contribution for each assignment (float value)
assignments["score"] = (assignments["points_possible"] / assignments["points_total_for_group"]) * (assignments["group_weight"] / 100)
assignments["score"] = assignments["score"].fillna(0)

# For units which are unweighted, score of each assignment is equal to points given
assignments.loc[assignments.course_id.isin(unweighted_units), "score"] = assignments["points_possible"] / 100

In [None]:
assignments[["name", "id", "course_id", "points_possible", "points_total_for_group", "group_weight", "score"]].sort_values("score", ascending=False)

## Get course name for each assignment from course ID

This would be so much more efficient in SQL. It's not even a contest.

In [None]:
def get_course_name(course_id, units = units):
    if course_id not in units.id.unique(): return None

    name = units.loc[units.id == course_id].name.item()

    return name

assignments["course_name"] = assignments.course_id.map( get_course_name )

In [None]:
assignments[["id", "course_name", "name", "due_at", "points_possible"]]