# Extracting catalogue entries from the Early Malay Printed Books catalogue

Collaboration with Annabel Gallop and Adi Keinan-Schoonbaert to create catalogue entries from the OCR of the Early Malay Printed Books catalogue (EMP). Match the extracted entries to a set of works AG is digitising to provide skeleton metadata records as part of the digitisation process.

In [None]:
from copy import copy
import json
from random import sample
import re

import pandas as pd
import pymupdf
from rapidfuzz import fuzz, utils, process

## Extract Description section

In [None]:
doc = pymupdf.open("../data/raw/emp.pdf") # open a document

In [None]:
len(doc)

In [None]:
text = {"desc": {}, "bl_list": {}, "titles": {}}
for i, page in enumerate(doc):
    i = i + 1  # Just for ease when comparing indexing to the pdf pages
    # Printed page numbers are the numbers printed on the page (from title page, then i - 858)
    # Actual page numbers are the 1 - 886 numbers of the pages in the pdf, do not correspond to number on the page
    # Remember that all actual page numbers in the pdf are one greater than the Python indexing
    if i == 1:
        section = None
    if i == 126:  # Page 98
        section = "desc"
    if i == 596:  # Page 568
        section = None
    if i == 711:  # Page 683
        section = "titles"
    if i == 802:  # Page 774
        break

    if section:
        page_num = i - 28
        page_text = page.get_text() # get plain text (is in UTF-8)
        text[section][i] = page_text

In [None]:
assert len(text["desc"]) == 470
assert len(text["titles"]) == 91

In [None]:
full_desc = ""
for k, v in text["desc"].items():
    full_desc += v

In [None]:
len(full_desc)

In [None]:
with open("../data/interim/full_description.txt", "w", encoding="utf8") as f:
    f.write(full_desc)

### Tidy desc page text

In [None]:
early_header = []
for i in range(126, 596, 2):
    early_header.append(text["desc"][i].split("\n")[0])

desc_header = []
for i in range(127, 597, 2):
    desc_header.append(text["desc"][i].split("\n")[0])

has_page_num = []
for i in range (126, 595):
    has_page_num.append(text["desc"][i].count(str(i - 28)))

In [None]:
pd.Series(has_page_num).value_counts().sort_index()

I've investigate the below, in each instance the header was not transcribed, rather than transcribed in the wrong place, so current logic to remove it if present holds

In [None]:
[(i, e) for i, e in enumerate(early_header) if "EARL" not in e]

In [None]:
emp_head_counts = pd.Series([e for e in early_header if "EARL" in e]).value_counts()
print(emp_head_counts.sum())
emp_head_counts

The below is just some extra mistranscribed lines due to lines at the top of the photocopy, DESCRIPTION appears on the third line

In [None]:
[(i, d) for i, d in enumerate(desc_header) if "DESC" not in d]

In [None]:
pd.Series([d for d in desc_header if "DESC" in d]).value_counts()

#### Pre-treat mistranscriptions

In [None]:
# bad header
if text["desc"][383][0] == "_":
    text["desc"][383] = text["desc"][383][10:]
assert text["desc"][383][:5] == "DESCR"

#### Parsing the first page

In [None]:
split = text["desc"][126].split("\n")
if split[1] == "It should be assumed that the author/editor ":
    text["desc"][126] = "\n".join(split[9:49] + split[58:])
assert text["desc"][126][:5] == "Abbas"

#### Parse columns on remaining pages

In [None]:
def process_desc_page(page, page_num):
    trim_space = page.replace(" \n", "\n")
    split = trim_space.split("\n")
    lines = [l for l in split if l]
    if "DESC" in lines[0] or "EARL" in lines[0]:
        lines = lines[1:]

    # Only 20 out of 469 pages where count != 1
    if lines.count(page_num) == 1:
        lines.remove(page_num)

    return lines

In [None]:
processed_desc_pages = []
for i in range(126, 596):
    page_num = str(i - 28)
    page = text["desc"][i]
    processed_desc_pages.append(process_desc_page(page=page, page_num=page_num))

#### Compare processed pages to ground truth

In [None]:
for i, p in enumerate(processed_desc_pages[:5]):
    with open(f"../data/processed/ground_truth/p{i+1}_column_parse.txt", encoding="utf8") as f:
        gt = [l.strip("\n") for l in f.readlines()]
        print(i + 1)
        print([a for a,b in zip(gt, p) if a!=b])
        assert gt == p

## Extract catalogue entries

### Tidy title page text

In [None]:
titles_early_header = []
for i in range(713, 802, 2):
    titles_early_header.append(text["titles"][i].split("\n")[0])

titles_title_header = []
for i in range(712, 802, 2):
    titles_title_header.append(text["titles"][i].split("\n")[0])

titles_has_page_num = []
for i in range (711, 802):
    titles_has_page_num.append(text["titles"][i].count(str(i - 28)))

In [None]:
pd.Series(titles_has_page_num).value_counts().sort_index()

I've investigated the below, the page has been mis-transcribed as having two columns and needs correcting

In [None]:
[(i, e) for i, e in enumerate(titles_early_header) if "EARL" not in e]

In [None]:
emp_head_counts = pd.Series([e for e in titles_early_header if "EARL" in e]).value_counts()
print(emp_head_counts.sum())
emp_head_counts

The below is just some extra mistranscribed lines due to lines at the top of the photocopy, DESCRIPTION appears on the third line

In [None]:
[(i, d) for i, d in enumerate(titles_title_header) if "LES" not in d]

In [None]:
pd.Series([d for d in titles_title_header if "LES" in d]).value_counts()

#### Pre-treat mistranscriptions

In [None]:
# bad reading order
text["titles"][724] = text["titles"][724].replace("\nTITI..ES ", "")
text["titles"][768] = text["titles"][768].replace("\nTfILES ", "")
text["titles"][770] = text["titles"][770].replace("\nTI1LES ", "")
text["titles"][774] = text["titles"][774].replace("\nTITLES ", "")

In [None]:
text["titles"][720][:1680]

In [None]:
# semi-accidental double column
# the excluded section contains no main works among collected ceretera/cerita/ceritera/cetera
if text["titles"][720][-5:] == "692 \n":
    text["titles"][720] = text["titles"][720][:1680]
    
# accidental double column
# the small amount of extracted is the only main work among collected ceretera/cerita/ceritera/cetera
if text["titles"][721][:5] == '"Chre':
    text["titles"][721] = text["titles"][721][2613:2657].replace("\n", "")

# accidental double columns
# the small amount of extracted is the only main work among collected ceretera/cerita/ceritera/cetera
if text["titles"][722][:5] == "TI1LE":
    text["titles"][722] = "Cerita Rampai-Rampai 1916 (t) - see also Abu Nawas 1917"

In [None]:
text["titles"][722][:5]

### Create list of titles

In [None]:
with open("../data/processed/ground_truth/raw_title_list_p683.txt", encoding="utf8") as f:
    raw_title_gt_lines_683 = [x.strip("\n") for x in f.readlines()]

with open("../data/processed/ground_truth/raw_title_list_p688.txt", encoding="utf8") as f:
    raw_title_gt_lines_688 = [x.strip("\n") for x in f.readlines()]

In [None]:
def process_titles_page(page, page_num):
    # trim_space = page.replace(" \n", "\n")
    continuing_date_p = re.compile(r"\n(\d{4,4})")
    continue_date = continuing_date_p.sub(r"\1", page)
    continue_dash = continue_date.replace("\n-\n", "- ").replace("-\n", "- ")
    continue_a = continue_dash.replace("\na ", "a ").replace("\na, ", "a, ")
    split = continue_a.split("\n")
    lines = [l.strip() for l in split if l]

    # Only 20 out of 469 pages where count != 1
    if lines.count(page_num) == 1:
        lines.remove(page_num)
    
    lines = lines[1:]
    return lines

In [None]:
text["titles"][711][833:]

In [None]:
processed_title_pages = []
for i in range(711, 802):
    page_num = str(i - 28)
    page = text["titles"][i]
    processed_title_pages.append(process_titles_page(page=page, page_num=page_num))

processed_title_pages[0] = processed_title_pages[0][9:]

In [None]:
len(processed_title_pages[5]), len(raw_title_gt_lines_688)

In [None]:
all_titles = []
for p in processed_title_pages:
    all_titles.extend(p)

In [None]:
len(all_titles)

#### Manual extraction of lines that should be concatenated but haven't

I went through all 4.3k title lines in all_titles and checked whether the line was a title or was a continuation of the previous line. Extracted all the lines that should be continued into lines_to_concatenate_with_text.  

Decided this was a good trade-off as not too many lines to extract (~4% error) vs adding new rules to find these errors. 

In [None]:
# Used this to check all_titles 1k at a time, scanning quickly through the first letters of titles to check
# took maybe half an hour
[str(i + 0) + " " + t for i,t in enumerate(all_titles[0:])]

In [None]:
with open("../data/processed/lines_to_concatenate_with_text.txt", encoding="utf8") as f:
    lines = [l.strip("\n").split("\t") for l in f.readlines()]
    bad_line_ids, bad_line_texts = [int(l[0]) for l in lines], [l[1] for l in lines]

In [None]:
# check that the text of the lines we're going to merge with other lines matches our expectation
assert all([all_titles[line_id] == text for line_id, text in zip(bad_line_ids, bad_line_texts)])

In [None]:
# check I've coded the lines right
for l in bad_line_ids:
    print(l)
    print("\n".join(all_titles[l-2:l+3]))
    print("\n")

In [None]:
all_titles_concatenated = copy(all_titles)
for l in bad_line_ids[::-1]:
    all_titles_concatenated[l-1] = all_titles_concatenated[l-1] + " " + all_titles_concatenated[l]

all_titles_corrected = []
for i, t in enumerate(all_titles_concatenated):
    if i not in bad_line_ids:
        all_titles_corrected += [t]

In [None]:
all_titles_corrected[62]

#### Select only main works

In [None]:
see_re = re.compile(r"see(?! also)")

In [None]:
see_re.search("hello see also")

In [None]:
main_works = []
for title in all_titles_corrected:
    if see_re.search(title) or "look" in title:
        continue    
    else:
        main_works.append(title)

# This cf is the only incorrect one not caught by the 'see' regex
main_works.remove('Adab Kesopanan bagi Orang Muda-Muda Anak yang Bangsawan - cf Adab aI-Fatiy 1916')

In [None]:
len(main_works)

In [None]:
# template for gt based on first ~30 works
"""
with open("../data/processed/ground_truth/28_main_titles.txt", "w", encoding="utf8") as f:
    for w in main_works[:30]:
        f.write(w + "\n")
"""
with open("../data/processed/ground_truth/28_main_titles.txt", encoding="utf8") as f:
    gt_main_works = [l.strip("\n") for l in f.readlines()]

In [None]:
assert all([w == gt_w for w, gt_w in zip(main_works, gt_main_works)])

#### Compare main works to titles from BL shelf list

In [None]:
aac_list = pd.read_csv("../data/external/Proudfoot-BL collection-6.10.25.csv", header=None, names=["shelfmark", "short_title", "year"], usecols=[0,1,2])

In [None]:
aac_list.shape

In [None]:
aac_list.head()

In [None]:
works_date_re = re.compile(r"[ Â±]{1,2}[l0-9]{4,4}")
trailing_a_re = re.compile(r" a( |$)")
trailing_abc_re = re.compile(r" [abc] ?$")

In [None]:
def clean_short_title(row):
    no_date = re.split(works_date_re, row["short_title"])[0]
    no_a_ed = re.split(trailing_abc_re, no_date)[0]
    clean_short_title = no_a_ed
    return clean_short_title

In [None]:
aac_list["short_title_no_year"] = aac_list.apply(clean_short_title, axis=1)

In [None]:
aac_list["short_title_no_year"].unique()

In [None]:
main_work_short_titles = []
for w in main_works:
    no_date = re.split(works_date_re, w)[0]
    no_a_ed = re.split(trailing_a_re, no_date)[0]
    clean_short_title = no_a_ed
    main_work_short_titles.append(clean_short_title)

435 unique titles in the AAC list. 174 of these don't appear identically in the main works list extracted from the titles list.

In [None]:
matched_works = [(w, w) for w in aac_list["short_title_no_year"].unique() if w in main_work_short_titles]
missing_works = [w for w in aac_list["short_title_no_year"].unique() if w not in main_work_short_titles]
len(missing_works), len(aac_list["short_title_no_year"].unique())

In [None]:
missing_works

Check all missing works from the AAC list against the entire main works list, using the basic rapidfuzz ration returning up to 3 matches.

In [None]:
missing_work_matches = []
for w in missing_works:
    matches = process.extract(w, main_work_short_titles, scorer=fuzz.ratio, limit=3, processor=utils.default_process)
    missing_work_matches.append([w, matches])

In [None]:
accepted_matches = []
failed_matches = []
for w, matches in missing_work_matches:
    if matches[0][1] >= 90:
        accepted_matches.append((w, matches[0][0], matches[0][1]))
    else:
        failed_matches.append((w, matches))

In [None]:
len(missing_works) - len(accepted_matches), len(accepted_matches), len(missing_works), len(aac_list["short_title_no_year"].unique())

In [None]:
accepted_matches

In [None]:
failed_matches

In [None]:
matched_works += [(w[0], w[1]) for w in accepted_matches]

In [None]:
matched_works.sort()

#### Matched work ground truth

In [None]:
# sample = sample(matched_works, 50)
# with open("../data/processed/ground_truth/50_matched_works.txt", "w", encoding="utf8") as f:
#     for s in sample:
#         f.write(f"{s[0]}, {s[1]}\n")

### Create catalogue entry ground truth

In [None]:
gt_lines = []
for p, _ in zip(processed_pages, range(10)):
    gt_lines += p    

In [None]:
entry_gt = {
    "Abbas": gt_lines[1:42],
    "Abdau": gt_lines[43:91],
    "Abdullah": gt_lines[92:572],
    "Abdullah dan Sabat": gt_lines[573:677],
    "AbdulMuluk": gt_lines[678:1091]
}

In [None]:
with open("../data/processed/ground_truth/entry.json", "w") as f:
    json.dump(entry_gt, f, indent=4)

In [None]:
for name, lines in entry_gt.items():
    print(lines[-10:])

In [None]:
len(gt_lines)

In [None]:
def find_headings(lines: list[str]) -> tuple[list[str], list[list[int]], list[str]]:
    """
    Finds all headings from a list of lines
    :param lines: list[str]
    :return: tuple[list[str], list[list[int]]
    """
    sm_titles = []  # The names of the titles
    title_indices = []
    ordered_lines = copy(lines)
    # TODO include the first catalogue entry as well
    for i, l in enumerate(lines):
        sm = find_shelfmark(l)
        if sm:
            title = [l]
            title_index = []
            j = 1
            while i + j < len(lines) and j < 8:
                title_part = lines[i + j]
                if find_shelfmark(title_part):  # If a new catalogue entry begins during the current title
                    break

                title.append(title_part)
                title_index.append(i + j)
                j += 1

                if date_check(title_part) and caps_regex.search(" ". join(title)):  # Date marks the end of a heading
                    sm_titles.append([sm, title])
                    if "Bought in" in title[1]:  # not .lower() - these "Bought in" should all be capitalised
                        sm, bought_in = lines[i], lines[i+1]
                        ordered_lines[i], ordered_lines[i+1] = bought_in, sm
                        title_indices.append(title_index[1:])
                    else:
                        title_indices.append(title_index)
                    break

    title_shelfmarks = [t[0] for t in sm_titles]

    return title_shelfmarks, title_indices, ordered_lines

In [None]:
processed_pages[0]

### Look for letter page headings

In [None]:
a_re = re.compile(r"^C\s{0,2}$", flags=re.MULTILINE)
matches = []
for k, v in text["desc"].items():
    if a_re.findall(v):
        matches.append(v)

print(len(matches))

In [None]:
page_lengths = pd.Series([len(v) for v in text["desc"].values()])

In [None]:
page_lengths.hist(bins=20)

In [None]:
[print(i, "content:", p) for i, p in text["desc"].items() if len(p) < 1000]

In [None]:
a_re = re.compile(r"^C\s{0,2}$", flags=re.MULTILINE)

In [None]:
a_re.findall(full_desc)

In [None]:
full_desc.split("\nA \n")[1][-100:]

In [None]:
text["desc"][1100:1300]

In [None]:
print(text["desc"][1100:5000])

In [None]:
print(text["desc"][-1000:])

In [None]:
print(text["bl_list"][:2000])

In [None]:
print(text["bl_list"][-1000:])

#### Get AG's columns

In [None]:
# alphabet = {l:i for i,l in enumerate(sorted(list(set(full_desc)))[33:59])}

alphabet = {
    'A': 0,
    'B': 1,
    'C': 2,
    'D': 3,
    'E': 4,
    'F': 5,
    'G': 6,
    'H': 7,
    'I': 8,
    'J': 9,
    'K': 10,
    'L': 11,
    'M': 12,
    'N': 13,
    'O': 14,
    'P': 15,
    'Q': 16,
    'R': 17,
    'S': 18,
    'T': 19,
    'U': 20,
    'V': 21,
    'W': 22,
    'X': 23,
    'Y': 24,
    'Z': 25
}

sample_df = pd.read_csv("../data/external/ALEPH sample Bollinger EMP.csv", encoding="utf8", nrows=13)
sample_df.drop(index=2, inplace=True)
cols = ["A", "E", "Q", "U", "AB", "AC", "AD", "AH", "AJ", "AQ", "AR", "CM"]

All before 1887 purchased, 1888 onwards legal deposit - AG

In [None]:
col_nums = []
for c in cols:
    if len(c) == 1:
        col_nums += [alphabet[c]]
    if len(c) == 2:
        d1 = 26 * (alphabet[c[0]] + 1)
        d2 = alphabet[c[1]]
        col_nums += [d1 + d2]

In [None]:
sample_df.columns[col_nums]

In [None]:
gt_df = sample_df.iloc[2:, col_nums].copy()
gt_df