# Spider


In [1]:
import asyncio
import json
from contextlib import asynccontextmanager
from typing import Literal

import aiohttp
from lxml.etree import HTML
from yarl import URL

from grade_dashboard.exception import SpiderIOException
from grade_dashboard.utils import (
    cached,
    chunked,
    find,
    first,
    get_var,
    identifier,
    submit,
    flatten,
    compose,
    retry,
)


In [2]:
@cached
async def cookie():
    return aiohttp.CookieJar(unsafe=True)


@cached
async def session():
    return aiohttp.ClientSession(
        cookie_jar=await cookie(),
        headers={
            "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36"
        },
    )


In [3]:
BASE_URL = URL("https://apps.gwinnett.k12.ga.us/")


def resolve_url(url: str | URL, base_url: URL = BASE_URL) -> URL:
    return (
        URL(url)
        if URL(url).is_absolute()
        else URL(base_url / url)
        if isinstance(url, str)
        else base_url.join(url)
    )


In [4]:
@retry
@cached
async def login(username: str, password: str) -> bool:
    s = await session()
    async with s.post(
        BASE_URL / "pkmslogin.form",
        data={
            "forgotpass": "p0/IZ7_3AM0I440J8GF30AIL6LB453082=CZ6_3AM0I440J8GF30AIL6LB4530G6=LA0=OC=Eaction!ResetPasswd==/#Z7_3AM0I440J8GF30AIL6LB453082",
            "login-form-type": "pwd",
            "username": username,
            "password": password,
        },
    ) as r:
        return r.ok


In [5]:
@retry(deps=[login])
@cached
async def apps() -> list[tuple[str, URL]]:
    s = await session()
    url = BASE_URL / "dca" / "student" / "dashboard"
    async with s.get(url) as r:
        text = await r.text()
        html = HTML(text)
        apps = [
            (
                identifier(name),
                resolve_url(href),
            )
            for name, href in [
                (first(li.xpath("a/span/text()")), first(li.xpath("a/@href")))
                for li in html.xpath(
                    '//*[text()="MY eCLASS Apps"]/following-sibling::ul/li'
                )
            ]
            if href and name
        ]
        return apps


@retry(deps=[apps])
@cached
async def vue_url() -> URL:
    return find(await apps(), "my_student_vue")


In [6]:
@retry(deps=[vue_url])
@cached
async def vue():
    s = await session()
    async with s.get(await vue_url()) as r:
        text = await r.text()
    async with submit(s, text) as r:
        return r.url.parent, await r.text()


@retry(deps=[vue])
@cached
async def vue_base_url():
    VUE_BASE_URL, _ = await vue()
    return VUE_BASE_URL


@retry(deps=[vue])
@cached
async def vue_script():
    _, html_raw = await vue()
    html = HTML(html_raw.encode("utf-8"))
    script = html.xpath("//head/script[1]/text()")[0]
    return script


@retry(deps=[vue_script])
@cached
async def grade_book_url():
    return find(await navigations(), "grade_book")


@retry(deps=[grade_book_url])
@cached
async def navigations():
    return [
        (
            identifier(nav.get("description")),
            resolve_url(URL(nav.get("url")), await vue_base_url()),
        )
        for nav in get_var("PXP.NavigationData", await vue_script())["items"]
    ]


In [7]:
@retry(deps=[grade_book_url])
@cached
async def grade_book():
    s = await session()
    async with s.get(await grade_book_url()) as r:
        text = await r.text()
        return HTML(text.encode("utf-8"))


@retry(deps=[grade_book])
@cached
async def courses():
    html = await grade_book()
    rows = chunked(
        html.xpath(
            '//div[@id="gradebook-content"]'
            '//div[contains(@class, "header")]'
            '/following-sibling::div[div[contains(@class, "row")]]'
            "/div"
        ),
        2,
    )
    return [
        dict(
            course=first(header.xpath("div[1]/button/text()")),
            teacher=first(
                header.xpath('.//span[contains(@class, "teacher")]//a/text()')
            ),
            grade=first(content.xpath('.//span[contains(@class, "mark")]/text()')),
            params=json.loads(first(header.xpath(".//button/@data-focus"))),
        )
        for header, content in rows
    ]


In [8]:
async def load_control(control_name: str, params: dict[str, any]) -> dict:
    s = await session()
    url = (await vue_base_url()) / "service" / "PXP2Communication.asmx" / "LoadControl"
    data = dict(request=dict(control=control_name, parameters=params))
    async with s.post(
        url,
        json=data,
        headers={"X-Requested-With": "XMLHttpRequest"},
    ) as r:
        if not r.ok:
            display(await r.text())
            raise SpiderIOException(f'Failed to load "{control_name}"', r)
        return await r.json()


async def load_course(course):
    await load_control(
        course["params"]["LoadParams"]["ControlName"],
        course["params"]["FocusArgs"],
    )


course_lock = asyncio.Lock()


@asynccontextmanager
async def course(course: dict[str, any] | int | str):
    if isinstance(course, int):
        course = (await courses())[course]
    elif isinstance(course, str):
        course = first(
            (
                c
                for c in await courses()
                if course.lower() in c.get("course", "").lower()
            )
        )
    if not course:
        raise ValueError("No course found")
    async with course_lock:
        await load_course(course)
        yield


In [9]:
@retry(deps=[course])
async def call_api(action: str, data: dict[str, any]) -> dict:
    s = await session()
    url = (
        (await vue_base_url()) / "api" / "GB" / "ClientSideData" / "Transfer"
    ).with_query(action=action)
    headers = {
        "CURRENT_WEB_PORTAL": "StudentVUE",
        "X-Requested-With": "XMLHttpRequest",
    }
    async with s.post(url, json=data, headers=headers) as r:
        if not r.ok:
            display(await r.text())
            name = data.get("FriendlyName", "Unknown")
            raise SpiderIOException(f'Failed to call "{name}"', r)
        return await r.json()


async def get_class_data():
    return await call_api(
        "genericdata.classdata-GetClassData",
        {
            "FriendlyName": "genericdata.classdata",
            "Method": "GetClassData",
            "Parameters": "{}",
        },
    )


async def get_items(
    sort: str = "due_date",
    group_by: Literal["Week", "Subject", "AssignmentType", "Unit", "Date"] = "Week",
):
    return await call_api(
        "pxp.course.content.items-LoadWithOption",
        {
            "FriendlyName": "pxp.course.content.items",
            "Method": "LoadWithOptions",
            "Parameters": json.dumps(
                {
                    "loadOptions": {
                        "sort": [{"selector": sort, "desc": False}],
                        "filter": [["isDone", "=", False]],
                        "group": [{"Selector": group_by, "desc": False}],
                        "requireTotalCount": True,
                        "userData": {},
                    },
                    "clientState": {},
                }
            ),
        },
    )


In [10]:
USERNAME = "202016378"
PASSWORD = "202016378"
await login(USERNAME, PASSWORD)


True

In [12]:
html = await grade_book()

In [15]:
import re

email = re.compile(r"[\w\.-]+@[\w\.-]+")


In [16]:
rows = chunked(
    html.xpath(
        '//div[@id="gradebook-content"]'
        '//div[contains(@class, "header")]'
        '/following-sibling::div[div[contains(@class, "row")]]'
        "/div"
    ),
    2,
)
[
    dict(
        course=first(header.xpath("div[1]/button/text()")),
        teacher=first(header.xpath('.//span[contains(@class, "teacher")]//a/text()')),
        email=email.search(first(header.xpath('.//span[contains(@class, "teacher")]//a/@href'))).group(0),
        grade=first(content.xpath('.//span[contains(@class, "mark")]/text()')),
        params=json.loads(first(header.xpath(".//button/@data-focus"))),
    )
    for header, content in rows
]


[{'course': '0: MSTRY BAND II',
  'teacher': 'Matthew Haynor ',
  'email': 'Matthew.I.Haynor@gcpsk12.org',
  'grade': '99',
  'params': {'LoadParams': {'ControlName': 'Gradebook_RichContentClassDetails',
    'HideHeader': False},
   'FocusArgs': {'viewName': None,
    'studentGU': 'B2B4330C-5388-4E9E-9A8B-B886A4FBE89B',
    'schoolID': 136,
    'classID': 1447253,
    'markPeriodGU': '5C0E30B8-1C5C-4DE1-B0ED-9B34C27DEB6F',
    'gradePeriodGU': '775DDCAD-2F7C-406E-BAB9-A4241F16900E',
    'subjectID': -1,
    'teacherID': -1,
    'assignmentID': -1,
    'standardIdentifier': None,
    'AGU': '0',
    'OrgYearGU': '5461EE39-F6EA-44E9-9A06-61A8B9D51650',
    'gradingPeriodGroup': None}}},
 {'course': '1: AP CAL BC GF',
  'teacher': 'Joshua Cook ',
  'email': 'Josh.Cook@gcpsk12.org',
  'grade': '100',
  'params': {'LoadParams': {'ControlName': 'Gradebook_RichContentClassDetails',
    'HideHeader': False},
   'FocusArgs': {'viewName': None,
    'studentGU': 'B2B4330C-5388-4E9E-9A8B-B886A4FBE

In [None]:
result = []
for i in range(len(await courses())):
    async with course(i) as c:
        result.append(await asyncio.gather(get_items(), get_class_data()))


In [None]:
result[0][1]

# Analysis


In [None]:
from decimal import Decimal

import pandas as pd
import plotly.express as px
import plotly.graph_objects as go


## Parse


In [None]:
def parse_class_data(cd: dict[str, any]) -> dict[str, dict | pd.DataFrame]:
    meta = dict(
        class_id=cd.get("classId"),
        name=cd.get("className"),
        rigor_points=cd.get("rigorPoints"),
    )
    # measure types
    mt_df = (
        pd.DataFrame(cd.get("measureTypes"))
        .set_index("id")
        .rename(identifier, axis="columns")[["name", "drop_scores", "weight"]]
    )
    mt_df = mt_df[mt_df.weight > 0]
    # assignments
    as_df = (
        pd.DataFrame(cd.get("assignments"))
        .rename(identifier, axis="columns")
        .set_index("grade_book_id")
    )[
        [
            "measure_type_id",
            "score",
            "max_value",
            "max_score",
            "due_date",
            "is_for_grading",
            "comment_code",
        ]
    ]
    as_df.due_date = pd.to_datetime(as_df.due_date)
    cols = ["score", "max_score", "max_value"]
    as_df[cols] = as_df[cols].astype(float)
    # comments
    co_df = (
        pd.DataFrame(cd.get("comments"))
        .rename(identifier, axis="columns")
        .set_index("comment_code")[["comment", "assignment_value", "penalty_pct"]]
    )
    co_df.assignment_value = co_df.assignment_value.astype(float)
    co_df.penalty_pct = co_df.penalty_pct.astype(float)
    return dict(meta=meta, measure_types=mt_df, assignments=as_df, comments=co_df)


In [None]:
def parse_items(items: dict[str, any]) -> pd.DataFrame:
    items = items["responseData"]["data"]
    df = pd.DataFrame(flatten(e.get("items", []) for e in items)).rename(
        identifier,
        axis="columns",
    )[
        ["item_id", "title", "assignment_type", "due_date", "points"]
    ]  # grade_mark
    df.points = pd.to_numeric(df.points, errors="coerce").astype(float)
    df.due_date = pd.to_datetime(df.due_date)
    df.set_index("item_id", inplace=True)
    return df


In [None]:
def parse_gradebook_items(class_data, items) -> pd.DataFrame:
    class_data = parse_class_data(class_data)
    items = parse_items(items)
    df: pd.DataFrame = (
        class_data["measure_types"]
        .merge(class_data["assignments"], left_index=True, right_on="measure_type_id")
        .join(items["title"])
        .join(class_data["comments"], on="comment_code", how="left")
    )
    df = df.rename(
        columns={
            "name": "measure_type",
            "title": "name",
            "assignment_value": "comment_assignment_value",
        }
    ).apply(
        lambda x: x.apply(lambda e: Decimal(e))
        if pd.api.types.is_numeric_dtype(x)
        else x,
        axis="rows",
    )
    adjusted_score = df.comment_assignment_value.fillna(
        df.score
    ) - df.drop_scores.fillna(Decimal(0.0))
    adjusted_score = (
        adjusted_score
        / df.max_score.fillna(Decimal(100.0))
        * 100
        * (1 - df.penalty_pct.fillna(Decimal(0.0)) / 100)
    )
    df.score = adjusted_score
    return df.drop(
        columns=["drop_scores", "max_value", "max_score", "comment_assignment_value"]
    )[~pd.isna(df.name)]


In [None]:
def get_grade_df(df: pd.DataFrame) -> pd.DataFrame:
    return df[df.is_for_grading == 1][~pd.isna(df.score)]


def grade_df(fn):
    def wrapper(df, *args, **kwargs):
        return fn(get_grade_df(df), *args, **kwargs)

    return wrapper


@grade_df
def get_total_score(df: pd.DataFrame) -> Decimal:
    total_weight = get_total_weight(df)
    total_score_by_type = df.groupby("measure_type").apply(
        lambda x: x.score.sum()
        / len(x)
        * x.weight.iloc[0]
        / total_weight  # hack: prevent decimal convert to float
    )
    total_score = total_score_by_type.sum()
    return total_score


@grade_df
def get_total_weight(df) -> Decimal:
    return df.drop_duplicates(subset="measure_type_id").weight.sum()


@grade_df
def get_score_by_type(df: pd.DataFrame) -> pd.Series:
    return df.groupby("measure_type").apply(lambda x: x.score.sum() / len(x))


@grade_df
def get_blame(df: pd.DataFrame) -> pd.Series:
    return (
        df.score
        / df.groupby("measure_type").score.transform("sum")
        * df.weight
        / get_total_weight(df)
    )


@grade_df
def get_contrib(df: pd.DataFrame) -> pd.Series:
    count = df.groupby("measure_type_id").measure_type.count()
    df = df.join(count, on="measure_type_id", rsuffix="_count")
    return df.score / df.measure_type_count * df.weight / get_total_weight(df)


In [None]:
def plot_blame(assignments):
    assignments["blame"] = get_blame(assignments)
    fig = px.bar(
        assignments,
        x="blame",
        y="name",
        color="measure_type",
        barmode="group",
        color_discrete_sequence=px.colors.qualitative.Pastel,
        orientation="h",
        title="Blame",
    )
    fig.show()


def plot_score_by_type(assignments):
    score_by_type = pd.DataFrame(get_score_by_type(assignments), columns=["score"])
    fig = px.bar(
        score_by_type.reset_index(),
        x="score",
        y="measure_type",
        color_discrete_sequence=px.colors.qualitative.Pastel,
        orientation="h",
        title="Score by Type",
    )
    fig.show()


def plot_contrib(assignments):
    assignments["contrib"] = get_contrib(assignments)
    fig = px.bar(
        assignments,
        x="contrib",
        y="name",
        color="measure_type",
        barmode="group",
        color_discrete_sequence=px.colors.qualitative.Pastel,
        orientation="h",
        title="Contrib",
    )
    fig.show()


In [None]:
grade_books = [parse_gradebook_items(class_data, items) for items, class_data in result]


In [None]:
grade_book = grade_books[0]

plot_score_by_type(grade_book)
plot_blame(grade_book)
plot_contrib(grade_book)


## Multi-course


In [None]:
total_scores = pd.DataFrame(
    [
        {
            "name": (await courses())[i]["course"].split(": ")[1].capitalize(),
            "score": get_total_score(grade_books[i]),
            "ap": "AP" in (await courses())[i]["course"],
        }
        for i in range(len(grade_books))
    ]
)


In [None]:
def plot_scores(
    df: pd.DataFrame,
    type: Literal["bar", "radar"] = "bar",
    normalize: bool = False,
    weighted: bool = False,
):
    df = df.copy()
    if weighted:
        df.score += 10 * df.ap
    mean = df.score.sum() / len(df)  # hack: prevent decimal convert to float
    title = f"Total Scores {'Normalized' if normalize else ''}—{mean:.2f}"
    if normalize:
        std = (df.score - mean).pow(2).sum() / len(df)
        df.score = (df.score - mean) / std
        df.score -= df.score.min() - 1
    if type == "bar":
        fig = px.bar(
            df,
            x="score",
            y="name",
            color_discrete_sequence=px.colors.qualitative.Pastel,
            orientation="h",
            title=title,
        )
        fig.show()
    else:
        fig = px.line_polar(
            df,
            r="score",
            theta="name",
            line_close=True,
            color_discrete_sequence=px.colors.qualitative.Pastel,
            title=title,
        )
        fig.update_traces(fill="toself")
        fig.show()


In [None]:
plot_scores(total_scores, type="radar", normalize=False, weighted=True)
