In [1363]:
import spacy
from spacy import displacy
from spacy.language import Language
from spacy.tokens import Doc, Span, Token
from spacy.matcher import Matcher
from spacy.pipeline import SpanRuler, EntityRuler
from pymongo import MongoClient
import re
from nltk import Tree
import itertools
from pprint import pprint

In [1364]:
def letter_generator():
    yield from itertools.cycle("ABCDEFGHIJKLMNOPQRSTUVWXYZ")

letters = letter_generator()

In [1365]:
# Connect to mongodb
client = MongoClient("mongodb://root:password@localhost:27017/")
catalog = client.get_database("catalog")

In [1366]:
# Sort by length of title
subject_codes_docs = list(catalog.get_collection("subject_codes").find())
subject_codes_docs.sort(key=lambda x: len(x["title"]), reverse=True)
subject_codes_map = {doc["title"]: doc["code"] for doc in subject_codes_docs}
subject_codes = [doc["code"] for doc in subject_codes_docs]

In [1367]:
# Base patterns
subject_code_regex = r"([A-Z]{3,4})"  # ART, MATH
course_number_regex = r"(\d{2}-\d|\d{3}\.\d{1,2}|\d{2,3})"  # 101, 30-1, 599.45
course_code_regex = rf"{subject_code_regex} {course_number_regex}"

In [1368]:
def replace_subject_code(sentence: str, loose: bool=False):
	for subject_code in subject_codes_docs:
		if loose:
			sentence = re.sub(rf"{subject_code["title"]}", rf"{subject_code["code"]}", sentence)
		else:
			sentence = re.sub(rf"{subject_code["title"]} {course_number_regex}", rf"{subject_code["code"]} \1", sentence)
	return sentence

In [1369]:
def get_replacement_letter():
    # Create an iterator that cycles through the alphabet
    for letter in itertools.cycle("ABCDEFGHIJKLMNOPQRSTUVWXYZ"):
        yield letter

replacement_letters = get_replacement_letter()

In [1370]:
Token.set_extension("course_code", default=None, force=True)
Doc.set_extension("replacements", default=[], force=True)
Doc.set_extension("json_logics", default=[], force=True)

In [1371]:
nlp = spacy.load("en_core_web_sm", exclude=["ner"])
expand_nlp = spacy.load("en_core_web_sm", exclude=["ner"])
constituency_nlp = spacy.load("en_core_web_sm", exclude=["ner"])

In [1372]:
@Language.component("set_custom_boundaries")
def set_custom_boundaries(doc):
    for token in doc:
        token.is_sent_start = False

    for token in doc[:-1]:
        if token.text == ";":
            doc[token.i + 1].is_sent_start = True
            
    return doc

nlp.add_pipe("set_custom_boundaries", before="parser")

<function __main__.set_custom_boundaries(doc)>

In [1373]:
@Language.component("fix_ent_head")
def fix_ent_head(doc: Doc):
    for sent in doc.sents:
        for token in sent:
            if token.pos_ == "NUM" and re.match(course_number_regex, token.text) is not None:
                ancestors = list(filter(lambda x: x.text in subject_codes, token.ancestors))
                ancestor = ancestors[0] if len(ancestors) > 0 else None

                if ancestor:
                    token.head = ancestor
                    token._.course_code = f"{ancestor.text}{token.text}"

    return doc

In [1374]:
@Language.component("expand_course_code")
def expand_course_code(doc: Doc):
    sent = ""

    for token in doc:
        if token.text in subject_codes:
            continue

        elif token.pos_ == "NUM" and re.match(course_number_regex, token.text) is not None:
            left_tokens = [token.head] + list(reversed(list(doc[: token.i])))

            for left_token in left_tokens:
                if left_token.text in subject_codes:
                    sent += left_token.text_with_ws
                    break

            sent += token.text_with_ws

        else:
            sent += token.text_with_ws

    new_doc = nlp(sent)
    new_doc.ents = []
    return new_doc


In [1375]:
entity_ruler = EntityRuler(nlp)
patterns = [
    {
        "label": "COURSE",
        "pattern": [
            {"TEXT": {"REGEX": subject_code_regex}},
            {"TEXT": {"REGEX": course_number_regex}},
        ],
    },
    {
        "label": "REQUISITE",
        "pattern": [
            {"TEXT": "RQ"},
            {"TEXT": {"REGEX": "[A-Z]"}},
        ],
    },
]
entity_ruler.clear()
entity_ruler.add_patterns(patterns)

@Language.component("detect_entity")
def detect_entity(doc: Doc):
    ents = entity_ruler.match(doc)
    doc.ents = ents
    return doc

In [1376]:
@Language.component("merge_entity_spans")
def merge_entity_spans(doc: Doc):
    with doc.retokenize() as retokenizer:
        for ent in doc.ents:
            if ent.label_ is not None:
                retokenizer.merge(ent, attrs={"ENT_TYPE": ent.label_, "ENT_IOB": "B", "ENT_IOE": "E", "ENT_IOR": "", "pos": "PROPN"})
    return doc


In [1377]:
expand_nlp.add_pipe("fix_ent_head")
expand_nlp.add_pipe("expand_course_code")
expand_nlp.add_pipe("detect_entity")
expand_nlp.add_pipe("merge_entity_spans")

expand_nlp.pipe_names

['tok2vec',
 'tagger',
 'parser',
 'attribute_ruler',
 'lemmatizer',
 'fix_ent_head',
 'expand_course_code',
 'detect_entity',
 'merge_entity_spans']

In [1378]:
def get_repeating_array(head: list, repeat_elemnts: list, repeat_times: int, tail: list):
    repeated_part = repeat_elemnts * repeat_times
    return head + repeated_part + tail

def get_dynamic_patterns(head: list, repeat_tokens: list, repeat_range: range, tail: list):
    patterns = []
    for i in repeat_range:
        patterns.append(get_repeating_array(head, repeat_tokens, i, tail))
    return patterns

In [1379]:
matcher = Matcher(nlp.vocab)

### X Units of
def x_units_of(matcher, doc, i, matches):
    match_id, start, end = matches[i]
    span = doc[start:end]

    units_required = int(span[0].text)
    courses = [ent for ent in span.ents if ent.label_ == "COURSE"]

    json_logic = {
        "units": {
            "required": units_required,
            "from": [course.text for course in courses],
        }
    }

    doc._.json_logics.append((span, json_logic))

x_units_of_patterns = get_dynamic_patterns(
    [
        {"IS_DIGIT": True},
        {"LEMMA": "unit"},
        {"POS": "ADP", "OP": "+"},
    ],
    [
        {"ENT_TYPE": "COURSE"},
        {"TEXT": {"IN": ["and", "or", ","]}},
    ],
    range(1, 20),
    [
        {"ENT_TYPE": "COURSE"},
    ],
)

matcher.add("X units of", x_units_of_patterns, greedy="LONGEST", on_match=x_units_of)
### X Units of


### One of
def one_of(matcher, doc, i, matches):
    match_id, start, end = matches[i]
    span = doc[start:end]

    courses = [ent for ent in span.ents if ent.label_ == "COURSE"]

    json_logic = {
        "or": [course.text for course in courses],
    }

    doc._.json_logics.append((span, json_logic))

one_of_patterns = get_dynamic_patterns(
    [
        {"LEMMA": "one"},
        {"POS": "ADP", "OP": "+"},
    ],
    [
        {"ENT_TYPE": "COURSE"},
        {"TEXT": {"IN": ["or", ","]}},
    ],
    range(1, 20),
    [
        {"ENT_TYPE": "COURSE"},
    ],
)

matcher.add("One of", one_of_patterns, greedy="LONGEST", on_match=one_of)
### One of


### Consent of
def consent_of(matcher, doc: Doc, i, matches):
    match_id, start, end = matches[i]
    span = doc[start:end]
    consent_of = span[2:]
    json_logic = {"consent": consent_of.text}
    doc._.json_logics.append((span, json_logic))

matcher.add(
    "Consent of",
    [
        [
            {"LEMMA": "consent"},
            {"POS": "ADP", "OP": "+"},
            {"TEXT": {"REGEX": "[A-Za-z, ]"}, "OP": "*"},
            {"IS_SENT_START": False},
        ]
    ],
    greedy="LONGEST",
    on_match=consent_of,
)

def admission_of(matcher, doc: Doc, i, matches):
    match_id, start, end = matches[i]
    span = doc[start:end]
    admission_of = span[2:]
    json_logic = {"admission": admission_of.text}
    doc._.json_logics.append((span, json_logic))

matcher.add(
    "Admission to",
    [
        [
            {"LEMMA": "admission"},
            {"POS": "ADP", "OP": "+"},
            {"TEXT": {"REGEX": "[A-Za-z, ]", "NOT_IN": ["and", "or"]}, "OP": "*"},
        ]
    ],
    greedy="LONGEST",
    on_match=admission_of,
)
### Consent of


### Both A and B
def both_a_and_b(matcher, doc, i, matches):
    match_id, start, end = matches[i]
    span = doc[start:end]
    a = span[1]
    b = span[3]
    json_logic = {"and": [a.text, b.text]}
    doc._.json_logics.append((span, json_logic))

matcher.add(
    "Both A and B",
    [
        [
            {"LEMMA": "both"},
            {"IS_ALPHA": True},
            {"LEMMA": "and"},
            {"IS_ALPHA": True},
            {"IS_SENT_START": False},
        ]
    ],
    greedy="LONGEST",
    on_match=both_a_and_b,
)
### Both A and B



@Language.component("constitute_requisite")
def constitute_requisite(doc: Doc):
    sent = doc.text
    matches = matcher(doc)
    replacements = []

    for match_id, start, end in matches:
        letter = next(letters)
        replacement = f"RQ {letter}"

        span = doc[start:end]
        replacements.append((replacement, span))

        sent = re.sub(re.escape(span.text), replacement, sent)

    new_doc = nlp(sent)
    new_doc._.replacements = replacements
    new_doc._.json_logics = doc._.json_logics
    return new_doc

In [1380]:
constituency_nlp.add_pipe("constitute_requisite")
constituency_nlp.add_pipe("detect_entity")
constituency_nlp.add_pipe("merge_entity_spans")

constituency_nlp.pipe_names

['tok2vec',
 'tagger',
 'parser',
 'attribute_ruler',
 'lemmatizer',
 'constitute_requisite',
 'detect_entity',
 'merge_entity_spans']

In [1381]:
def tok_format(tok):
    # return "_".join([tok.orth_, tok.tag_])
    return f"{tok.orth_} ({tok.dep_})"


def to_nltk_tree(node):
    if node.n_lefts + node.n_rights > 0:
        return Tree(tok_format(node), [to_nltk_tree(child) for child in node.children])
    else:
        return tok_format(node)

In [1382]:
def find_replacement(key: str, replacements: list[tuple[str, Span]]):
    if not replacements or not key:
        return

    key = key[:-1] if key[-1] == "." else key
    
    for _key, span in replacements:
        if key == _key:
            return span

def find_json_logic(span: Span, json_logic: list[tuple[Span, dict]]):
    if not json_logic or not span:
        return
    
    for _span, logic in json_logic:
        if span.text == _span.text:
            return logic

def extract_sentence(sent: Span, replacements: dict[str, Span], json_logics: list[tuple[Span, dict]]):
    logic_operator = "and"
    conditions = []

    for token in sent:
        if token.text.lower() in ["and", "or"]:
            logic_operator = token.text.lower()

        elif token.ent_type_ == "COURSE":
            conditions.append({"course": token.text})

        elif token.ent_type_ == "REQUISITE":
            replacement = find_replacement(token.text, replacements)
            json_logic = find_json_logic(replacement, json_logics)
            conditions.append(json_logic)

    if len(conditions) == 1:
        return conditions[0]

    return {
        logic_operator: conditions
    }

def extract_doc(doc: Doc):
    logic_operator = "and"
    conditions = []

    # In the last sentence, find coordinating conjunctions
    last_sentence = list(doc.sents)[-1]
    last_sentence_first_word = last_sentence[0]

    if last_sentence_first_word.dep_ == "cc" or last_sentence_first_word.pos_ == "CCONJ":
        logic_operator = last_sentence_first_word.text.lower()

    for sent in doc.sents:
        conditions.append(extract_sentence(sent, doc._.replacements, doc._.json_logics))

    if len(conditions) == 1:
        return conditions[0]
    
    return {
        logic_operator: conditions
    }

In [1383]:
# sent = "Actuarial Science 327; Statistics 323; 3 units from Mathematics 311, 313, 367 or 375; and 3 units from Computer Science 217, 231, 235 or Data Science 211."
# sent = "CPSC 457 and 3 units from SENG 300, 301 or ENSF 480; and admission to the Schulich School of Engineering."
# sent = "SGMA 395 or ENTI 317 or 381."
# sent = "One of FILM 321 or 323 and one of FILM 331 or 333."
# sent = "FILM 331 or 333."
# sent = "ENCI 473; and ENGG 319 or ENDG 319."
# sent = "3 units from ENCI 481, ENEE 377 or 519.09."
# sent = "ENEL 341, BMEN 327 or ENGG 225."
# sent = "ENEL 471; and one of BMEN 319 or ENGG 319 or ENEL 419."
# sent = "3 units from ENGG 319, ENDG 319 or ENEL 419."
# sent = "FILM 201 and 3 units from 305 or 321."
# sent = "INDG 201 and 3 units from INDG 303 or 345."
# sent = "One of GEOG 211, 251, 253, UBST 253, GLGY 201, 209; and consent of the Department."
# sent = "STAT 205 or 213; and admission to the Kinesiology Honours program; and consent of the Faculty."
sent = "MATH 209 and admission to the Energy Engineering program."
# sent = "Both MATH 349 and 353; or both MATH 283 and 381; or MATH 267."
# sent = "MATH 431 or PMAT 431; MATH 429 or PMAT 429 or MATH 327 or PMAT 427."
# sent = "MATH 445 or 447; 3 units of Mathematics in the Field of Mathematics at the 400 level or above."
# sent = "MATH 383; and 6 units of Mathematics in the Field of Mathematics at the 400 level or above."
# sent = "MRSC 451 and consent of the Department."
# sent = "Admission to the Haskayne School of Business and OBHR 317."
# sent = "PHYS 211 or 221 or 227."
# sent = "MATH 277 and PHYS 259 and admission to a program in Engineering."
# sent = "PHYS 341; and 3 units from CPSC 217, 231 or DATA 211."
# sent = "ACSC 327; and MATH 323 or STAT 323."
# sent = "ANTH 203."s
# sent = "One of GEOG 211, 251, 253, UBST 253, GLGY 201, 209; and consent of the Department."
# sent = "One of CORE 209, 435, KNES 355, NURS 303, 305, PSYC 203, 205, SOWK 300, 302, 304, 306, 363 or consent of the instructor(s)."

sent = replace_subject_code(sent)
print("Original:", sent)

doc = expand_nlp(sent)
print("Expand  :", doc)


doc = constituency_nlp(doc)
print("Constituency:", doc)

print(doc.ents)
print(doc._.replacements)
print(doc._.json_logics)

for key, replacement in doc._.replacements:
    replacement: Span
    print(key, replacement.text, replacement.ents)


j = extract_doc(doc)
pprint(j, indent=2, depth=10)


displacy.render(doc, style="ent", jupyter=True, options={"compact": True, "distance": 100})
displacy.render(doc, style="dep", jupyter=True, options={"compact": True, "distance": 100})

Original: MATH 209 and admission to the Energy Engineering program.
Expand  : MATH 209 and admission to the Energy Engineering program.
Constituency: MATH 209 and RQ A.
(MATH 209, RQ A.)
[('RQ A', admission to the Energy Engineering program)]
[(admission to the Energy Engineering program, {'admission': 'the Energy Engineering program'})]
RQ A admission to the Energy Engineering program []
{ 'and': [ {'course': 'MATH 209'},
           {'admission': 'the Energy Engineering program'}]}


In [1384]:
def try_nlp(course: dict, sent: str):
    sent = replace_subject_code(sent)
    doc = expand_nlp(sent)
    doc = constituency_nlp(doc)
    j = extract_doc(doc)
    return j

In [1385]:
# Get all courses
courses = list(
    catalog.get_collection("courses").find(
        {"prereq": {"$ne": None}, "career": "Undergraduate Programs"}
    )
)

courses_prereq = catalog.get_collection("courses_prereq")
courses_prereq.delete_many({})

for course in courses:
    prereq = course["prereq"]

    if prereq:
        print(prereq)

        result = try_nlp(course, prereq)

        print(result)
        print("")

        courses_prereq.insert_one(
            {"course": course["code"], "prereq_text": prereq, "prereq": result}
        )

Admission to the Haskayne School of Business and 12 units.
{'admission': 'the Haskayne School of Business'}

24 units including Entrepreneurship and Innovation 201.
{'course': 'ENTI 201'}

Admission to the Haskayne School of Business, and Accounting 217.
{'and': [{'admission': 'the Haskayne School of Business,'}, {'course': 'ACCT 217'}]}

Admission to the Haskayne School of Business and Accounting 217 and 323.
{'and': [{'admission': 'the Haskayne School of Business'}, {'course': 'ACCT 217'}, {'course': 'ACCT 323'}]}

Admission to the Haskayne School of Business and Accounting 341.
{'and': [{'admission': 'the Haskayne School of Business'}, {'course': 'ACCT 341'}]}

24 units including Accounting 217 or 301. For certain topics consent of the Haskayne School of Business will also be required.
{'or': [{'course': 'ACCT 217'}, {'course': 'ACCT 301'}, {'consent': 'the Haskayne School of Business will also be required.'}]}

Admission to the Haskayne School of Business and Accounting 323.
{'and'