In [2]:
from pathlib import Path

try:
    from google.colab import drive
    drive.mount("/content/drive")
    ROOT = Path('/content/drive/MyDrive/textbook-topic-analysis')
except ImportError:
    ROOT = Path(".")

In [3]:
! pip install -r {str(ROOT / "requirements.txt")}

Collecting tensorflow-hub (from top2vec[sentence_encoders]->-r requirements.txt (line 4))
  Using cached tensorflow_hub-0.15.0-py2.py3-none-any.whl.metadata (1.3 kB)
INFO: pip is looking at multiple versions of top2vec[sentence-encoders] to determine which version is compatible with other requirements. This could take a while.
Collecting top2vec[sentence_encoders] (from -r requirements.txt (line 4))
  Using cached top2vec-1.0.34-py3-none-any.whl.metadata (18 kB)
  Using cached top2vec-1.0.32-py3-none-any.whl.metadata (18 kB)
  Using cached top2vec-1.0.31-py3-none-any.whl.metadata (18 kB)
  Using cached top2vec-1.0.30-py3-none-any.whl.metadata (18 kB)
  Using cached top2vec-1.0.29-py3-none-any.whl (26 kB)
  Using cached top2vec-1.0.28-py3-none-any.whl (25 kB)
  Using cached top2vec-1.0.27-py3-none-any.whl (25 kB)
INFO: pip is still looking at multiple versions of top2vec[sentence-encoders] to determine which version is compatible with other requirements. This could take a while.
  Using

In [4]:
import json
import re
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
import numpy as np
from pathlib import Path

from bs4 import BeautifulSoup, NavigableString
from bs4.element import Tag

# Parsing

In [5]:
def join_hyphenated_words(words):
    HYPHEN = "-"
    i = 0
    while i < len(words) - 1:
        if words[i].endswith(HYPHEN):
            words[i] = words[i].rstrip(HYPHEN) + words[i + 1]
            del words[i + 1]
        else:
            i += 1
    return words


def convert_xml_content_to_string(raw_content: Tag):
    content = []
    for child in raw_content.find_all("ab", attrs={"type": "Body"}):
        for grandchild in child.children:
            if not grandchild.text.strip():
                continue
            if grandchild.name == "w":
                content += [text.strip() for text in grandchild.stripped_strings]
    return " ".join(join_hyphenated_words(content))


def parse_file(path: Path) -> dict:
    """
    Parses a TEI-encoded XML file into a dictionary of TOC entries -> section contents
    """
    with open(path, encoding="utf-8") as f:
        soup = BeautifulSoup(f, features="xml")

    toc = soup.find("front").find("div", attrs={"type": "contents"}).find("list")
    body = soup.find("body")
    index = soup.find("div", attrs={"type": "index"})

    toc_entries = {}
    for entry in toc.find_all("item"):
        if not entry.find("ref").has_attr("target"):
            continue
        entry_text = "".join(
            child for child in entry.contents if isinstance(child, NavigableString)
        ).strip()
        section_number_match = re.search(r"\b\d+(\.\d+)*\b", entry_text)
        if section_number_match is not None:
            section_number = section_number_match.group()
            header = entry_text.replace(section_number, "").strip()
        else:
            section_number = None

        entry_id = entry.find("ref").attrs["target"]

        content_xml = body.find("div", attrs={"xml:id": entry_id})
        nested_list = entry.find_next_sibling("list")
        if nested_list is not None:
            subsection_refs = [
                ref.attrs["target"]
                for ref in nested_list.find_all("ref")
                if ref.has_attr("target")
            ]
        else:
            subsection_refs = []

        # Exclude content that is contained in a subsection
        if content_xml is not None:
            for sub_ref in subsection_refs:
                sub_contents = content_xml.find_all("div", {"xml:id": sub_ref})
                if sub_contents is None:
                    continue
                for sub_content in sub_contents:
                    sub_content.decompose()

        if content_xml is None:
            continue

        content_string = convert_xml_content_to_string(content_xml)
        word_count = len(content_xml.find_all("w"))

        index_refs = index.find_all("ref", attrs={"xml:id": entry_id})
        index_refs = [] if index_refs is None else index_refs
        annotations = [ref.parent for ref in index_refs]

        toc_entries[entry_id] = {
            "header": header,
            "section_number": section_number,
            "content_xml": content_xml,
            "content_string": content_string,
            "word_count": word_count,
            "subsections": subsection_refs,
            "annotations": annotations,
            "similar": [],
        }

    return toc_entries

In [6]:
class SoupJSONEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, Tag):
            return str(o)
        return super().default(o)


def process_file(file):
    parsed_file = parse_file(file)
    new_file_name = PARSED_TEXTBOOKS_DIRECTORY / (file.stem + ".json")
    with open(new_file_name, "w") as f:
        json.dump(parsed_file, f, cls=SoupJSONEncoder)


PARSED_TEXTBOOKS_DIRECTORY = ROOT / "parsed_textbooks"
if not PARSED_TEXTBOOKS_DIRECTORY.exists():
    PARSED_TEXTBOOKS_DIRECTORY.mkdir()

#### CAN COMMENT THIS OUT IF TEXTBOOKS HAVE BEEN PARSED ####
# with ThreadPoolExecutor() as executor:
#     executor.map(process_file, Path(ROOT / "textbooks").glob("*.xml"))
############################################################

# TOC Integration

In [7]:
HEADER_TERMS_TO_EXCLUDE = ["exercises", "questions", "solutions"]
textbooks = {}
for file in Path(PARSED_TEXTBOOKS_DIRECTORY).glob("*.json"):
    with open(file) as f:
        textbook_json = json.load(f)
    parsed_textbook = {}
    for entry_id, section in textbook_json.items():
        if any(term in section["header"] for term in HEADER_TERMS_TO_EXCLUDE):
            continue
        if "content_xml" not in section:
            continue 
        section["content_xml"] = BeautifulSoup(section["content_xml"], features="xml")
        section['similar'] = []
        parsed_textbook[entry_id] = section
    textbooks[file.stem] = parsed_textbook

In [8]:
def extract_text(text_extraction_fn):
    return [
        (textbook, entry_id, text_extraction_fn(entry))
        for textbook, entries in textbooks.items()
        for entry_id, entry in entries.items()
    ]


def link_similar_entries(texts, is_similar_fn):
    result = deepcopy(textbooks)
    for textbook1, entry1, text1 in texts:
        for textbook2, entry2, text2 in texts:
            if textbook1 == textbook2 and entry1 == entry2:
                continue
            if is_similar_fn(text1, text2):
                result[textbook1][entry1]["similar"].append(
                    {"textbook": textbook2, "entry": entry2}
                )
                result[textbook2][entry2]["similar"].append(
                    {"textbook": textbook1, "entry": entry1}
                )
    return result


def count_linked(result):
    count = 0
    for textbook in result.values():
        for entry in textbook.values():
            count += len(entry["similar"])
    return count // 2

## Cosine Similarity

In [9]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity


def compute_cosine_similarity(text1, text2):
    if not text1 or not text2:
        return 0
    # Vectorizing the text
    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform([text1, text2])
    # Calculating cosine similarity
    return cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]

### Headers only

In [10]:
headers = extract_text(lambda entry: entry["header"])
linked_by_headers = link_similar_entries(
    headers, lambda t1, t2: compute_cosine_similarity(t1, t2) > 0.9
)
count_linked(linked_by_headers)

68

### Text only

In [11]:
text_contents = extract_text(lambda entry: entry["content_string"])
linked_by_text_contents = link_similar_entries(
    text_contents, lambda t1, t2: compute_cosine_similarity(t1, t2) > 0.48
)
count_linked(linked_by_text_contents)

2412

### Weighted average text and headers

In [12]:
headers_and_text_contents = extract_text(
    lambda entry: {"h": entry["header"], "c": entry["content_string"]}
)
linked_by_headers_and_text_contents = link_similar_entries(
    headers_and_text_contents,
    lambda t1, t2: np.average(
        [
            compute_cosine_similarity(t1["h"], t2["h"]),
            compute_cosine_similarity(t1["c"], t2["c"]),
        ],
        weights=[7, 3],
    )
    > 0.5,
)
count_linked(linked_by_headers_and_text_contents)

108

## Semantic Similarity

### Doc2Vec

In [28]:
from gensim.models.doc2vec import Doc2Vec, TaggedDocument
from nltk.tokenize import word_tokenize
import nltk

nltk.download("punkt")


def preprocess(doc):
    return word_tokenize(doc.lower())


train_corpus = [
    TaggedDocument(
        words=preprocess(section_contents["content_string"]), tags=[(textbook, section)]
    )
    for textbook, textbook_contents in textbooks.items()
    for section, section_contents in textbook_contents.items()
    if section_contents["content_string"]
]

model = Doc2Vec(vector_size=50, min_count=1, epochs=40)
model.build_vocab(train_corpus)
model.train(train_corpus, total_examples=model.corpus_count, epochs=model.epochs)


def cos(vec1, vec2):
    return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))


def compute_doc2vec_similarity(doc1, doc2):
    vector1 = model.infer_vector(doc1)
    vector2 = model.infer_vector(doc2)
    similarity = cos(vector1, vector2)
    return similarity


doc2vec = extract_text(lambda entry: preprocess(entry["content_string"]))
linked_by_doc2vec = link_similar_entries(
    doc2vec, lambda t1, t2: compute_doc2vec_similarity(t1, t2) > 0.6
)
count_linked(linked_by_doc2vec)

[nltk_data] Downloading package punkt to /Users/coby/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


3621

In [64]:
from textwrap import wrap

INDENT = " " * 4


def print_similar(linked, textbook, section=1):
    section = list(linked[textbook].values())[0]
    print("textbook = ", textbook)
    print("header =", section["header"])
    print("content =")
    for string in wrap(
        section["content_string"],
        width=80,
        initial_indent=INDENT,
        subsequent_indent=INDENT,
    ):
        print(string)
    print({k: v for k, v in section.items() if k not in {"similar", "content_xml"}})
    similar = section["similar"]
    print("*******")
    for s in similar:
        similar_entry = linked[s["textbook"]][s["entry"]]
        print("textbook = ", s["textbook"])
        print("header =", similar_entry["header"])
        print("content =")
        for string in wrap(
            similar_entry["content_string"],
            width=80,
            initial_indent=INDENT,
            subsequent_indent=INDENT,
        ):
            print(string)
        print("")


print_similar(
    linked_by_headers_and_text_contents, "2011_Book_StatisticsForNon-Statisticians", section=1
)

textbook =  2011_Book_StatisticsForNon-Statisticians
header = data collection
content =
    This chapter explains some basic concepts within statistics. Also, we look
    at the most important ways to collect data in surveys. Statistics can be
    defined as a collection of techniques used when planning a data collection,
    and when subsequently analyzing and presenting data. Dating back to ancient
    times people have needed knowledge about population size, to carry out a
    census of the armies or calculate expected taxes. The word statistics is
    derived from the word “status” (originally coming from Latin); and it was
    exactly the status of society, which was the subject of the first
    statistics! Later emerged probability theory (in connection with games!),
    demographics and insurance science as areas, in which statistical thinking
    was essential. In today’s digital age it is easy to collect as well as
    process and disseminate data, and therefore statistics is 

### 

## 