In [None]:
import re
import pandas as pd
import os
from tqdm import tqdm
import sys
sys.path.append('../')
from utils import *

In [None]:
# Check that all books are in the metadata file
metadata_path = "../../data/books/metadata/gutenberg.csv"
book_path = "../../data/books/"

df = pd.read_csv(metadata_path)
book_txt_files = [book.split(".")[0] for book in os.listdir(book_path) if ".txt" in book]

book_names = []
for i, row in tqdm(df.iterrows()):
    if type(row['book_name']) != int and type(row['book_name']) != float:   
        book_name = re.split(r'\W+', row['author'].lower().strip())[-1]
        if book_name not in book_txt_files:
            for word in re.split(r'\W+', row['title'].lower()):
                book_name = row['author'].lower().split()[-1] + "_" + word.lower()
                if book_name in book_txt_files:
                    break
            if book_name not in book_txt_files:
                print(f"Could not find book {book_name} in local folder")
            book_names.append(book_name)
        else: 
            book_names.append(book_name)
    else:       # new subset
        book_name = int(row['book_name'])
        book_txt_files = [int(f) for f in book_txt_files]
        if book_name not in book_txt_files:
            if book_name not in book_txt_files:
                print(f"Could not find book {book_name} in local folder")
            book_names.append(book_name)
        else: 
            book_names.append(book_name)

df['book_name'] = book_names    
df.to_csv(metadata_path, index=False)

In [None]:
# Remove book folders without the expected number of chapters 
book_path = "../../data/books/" 
count = 0
import subprocess
for i, row in tqdm(df.iterrows()): 
    book_name = row['book_name']
    num_chapters = row['num_chapters']
    book_file = f"{book_path}{book_name}.txt"
    
    if not os.path.exists(f"{book_path}{book_name}"):
        continue
    chapters = [ch for ch in os.listdir(f"{book_path}{book_name}") if ".txt" in ch]
    if len(chapters) != num_chapters:
        print(f"Expected {num_chapters} chapters for {book_name}, but found {len(chapters)}")
        os.system(f"rm -r ../../data/books/{book_name}")

In [None]:
# Which book has not been cleaned yet? aka no corresponding folder + number of chapters != ground truth 
gtruth = pd.read_csv("../../data/books/metadata/gutenberg.csv")
all_books = os.listdir("../../data/books/")
all_books = [int(book.split(".")[0]) for book in all_books if book not in ["README.md", ".DS_Store"] and book.endswith(".txt")]
uncleaned_books = []
for book_name in all_books:
    if not os.path.exists(f"../../data/books/{book_name}/"):
        print(book_name, "has not been cleaned yet.")
        uncleaned_books.append(book_name)
    if os.path.exists(f"../../data/books/{book_name}/"):
        num_chapters = len([f for f in os.listdir(f"../../data/books/{book_name}/") if ".txt" in f])
        if num_chapters != gtruth[gtruth['book_name'] == int(book_name)]['num_chapters'].values[0]:
            print(book_name, "has not been cleaned yet.")
            uncleaned_books.append(book_name)
print("{} books have not been cleaned yet.".format(len(uncleaned_books)))

In [None]:
def write_chapters(book_name, chapters):
    """Writes each chapter to a separate file in the book's folder."""
    book_dir = os.path.join("../../data/books", str(book_name))
    os.makedirs(book_dir, exist_ok=True)
    for i, chapter in enumerate(chapters, start=1):
        chapter_path = os.path.join(book_dir, f"{i}.txt")
        with open(chapter_path, "w") as f:
            f.write(chapter)
    print(f"Successfully separated {book_name} into {len(chapters)} chapters")

def dynamic_split_strategy(text, delimiter, expected_count):
    """
    Tries to split the text using the given delimiter while
    shifting the starting offset (to drop extraneous content)
    until the expected number of chapters is reached.
    """    
    parts = [chap for chap in re.split(re.escape(delimiter), text.strip()) if chap.strip()]
    for offset in range(4):
        chapters = parts[offset:]
        if len(chapters) == expected_count:
            return chapters
    return []  

def attempt_chapter_splitting(book_name, file_texts, expected_count):
    """
    Try a series of splitting strategies until one produces the expected number
    of chapters. If successful, write out the chapters and return True.
    """
    strategies = [
        ("split_by_5_linebreaks", 
         lambda text: [chap.strip() for chap in text.strip().split("\n\n\n\n\n") if chap.strip()]),

        ("regex_chapter", 
         lambda text: [chap.strip() for chap in re.split(
             r"(?i)^(?:CHAPTER\s+\d+|Chapter\s+[A-Za-z0-9]+)\b", text, flags=re.MULTILINE) if chap.strip()][1:]),

        ("regex_chapter_alt", 
         lambda text: [chap.strip() for chap in re.split(
             r"^.*(?:CHAPTER|Chapter)\s[A-Za-z0-9]+.*$", text, flags=re.MULTILINE) if chap.strip()][1:]),

        ("split_5_linebreaks_dynamic", 
         lambda text: dynamic_split_strategy(text, "\n\n\n\n\n", expected_count)),

        ("split_4_linebreaks_dynamic", 
         lambda text: dynamic_split_strategy(text, "\n\n\n\n", expected_count)),

        ("regex_dash_number", 
         lambda text: [chap.strip() for chap in re.split(
             r"\n-\s*\d+\s*-", text.strip()) if chap.strip()][1:]),

        ("regex_chapter_optional", 
         lambda text: [chap.strip() for chap in re.split(
             r"\n\s?Chapter\s*[A-Za-z0-9]+", text.strip()) if chap.strip()][1:]),

        ("regex_roman", 
         lambda text: [chap.strip() for chap in re.split(
             r"(?m)^\s*[IVXLCDM]+\.?", text.strip()) if chap.strip()][1:]),

        ("regex_all_caps", 
         lambda text: [chap.strip() for chap in re.split(
             r"\n[A-Z\s\.]+\n", text.strip()) if chap.strip()][1:])
    ]
    
    for strategy_name, strategy in strategies:
        chapters = strategy(file_texts)
        if len(chapters) == expected_count:
            write_chapters(book_name, chapters)
            return True
        else:
            print(f"Strategy {strategy_name} found {len(chapters)} chapters for {book_name} (expected {expected_count}).")
    return False

for book_name in uncleaned_books:
    output_dir = os.path.join("../../data/output", str(book_name))
    if book_name not in os.listdir(os.path.dirname(output_dir)):
        os.makedirs(output_dir, exist_ok=True)
    
    if "outline.md" in os.listdir(output_dir):
        print(f"Skipping {book_name} because it has already been cleaned.")
        continue
    if f"{book_name}/" in os.listdir("../../data/books/"):
        print(f"Skipping {book_name} because it already been cleaned.")
        continue
    if not str(book_name):
        continue

    expected_count = gtruth[gtruth['book_name'] == int(book_name)].num_chapters.tolist()[0]
    
    book_path = os.path.join("../../data/books", f"{book_name}.txt")
    with open(book_path, "r") as f:
        chapter_text = f.read()

    start_match = re.search(r"\*\*\* START OF THE PROJECT GUTENBERG EBOOK .+ \*\*\*", chapter_text)
    start_index = start_match.end() if start_match else 0
    end_match = re.search(r"\*\*\* END OF THE PROJECT GUTENBERG EBOOK .+ \*\*\*", chapter_text)
    end_index = end_match.start() if end_match else len(chapter_text)
    file_texts = chapter_text[start_index:end_index]
    file_texts = re.sub(r"\[.*?\]", "", file_texts, flags=re.DOTALL)
    
    if attempt_chapter_splitting(book_name, file_texts, expected_count):
        continue
    else:
        print(f"Could not split {book_name} into {expected_count} chapters.")
