In [None]:
# your file to cut
pdf_path = 'books/Principles_of_Microeconomics.txt'
# chosen model, make sure you have downloaded it using ollama
DEFAULT_MODEL = "llama3.2:1b"
##########
# 1. Pre-Process PDF to Text
##########

In [None]:
pip install -r requirements.txt

In [10]:
import PyPDF2
from typing import Optional
import os
import re

from tqdm.notebook import tqdm
import warnings

warnings.filterwarnings('ignore')

### Preliminary: Call LLM Model using Ollama

In [None]:
SYS_PROMPT = """
You are a world class text pre-processor, here is the raw data from a PDF, please parse and return it in a way that is crispy and usable to send to a podcast writer.

The raw data is messed up with new lines, Latex math and you will see fluff that we can remove completely. Basically take away any details that you think might be useless in a podcast author's transcript.

Remember, the podcast could be on any topic whatsoever so the issues listed above are not exhaustive

Please be smart with what you remove and be creative ok?

Remember DO NOT START SUMMARIZING THIS, YOU ARE ONLY CLEANING UP THE TEXT AND RE-WRITING WHEN NEEDED

Be very smart and aggressive with removing details, you will get a running portion of the text and keep returning the processed text.

PLEASE DO NOT ADD MARKDOWN FORMATTING, STOP ADDING SPECIAL CHARACTERS THAT MARKDOWN CAPATILISATION ETC LIKES

ALWAYS start your response directly with processed text and NO ACKNOWLEDGEMENTS about my questions ok?
Here is the text:
"""

In [None]:
def llm_call(model,messages,**kwargs):
    """Call Ollama API,before running this function, make sure you have the Ollama running on your local machine"""
    import requests
    import json
    url = "http://localhost:11434/api/chat"
    data = {
        "model": model,
        "messages": messages,
        "options": {
            "seed": kwargs.get("seed", None),
            "temperature": kwargs.get("temperature", 0),
        },
        "stream": False
    }

    headers = {
        "Content-Type": "application/json"
    }

    response = requests.post(url, headers=headers, data=json.dumps(data))
    # print(response.json())
    return response.json()["message"]["content"]

### 1. Pre-Process PDF to Text

> refer to [NotebookLlama](https://github.com/meta-llama/llama-recipes/blob/main/recipes/quickstart/NotebookLlama/Step-1%20PDF-Pre-Processing-Logic.ipynb)

In [None]:
def validate_pdf(file_path: str) -> bool:
    if not os.path.exists(file_path):
        print(f"Error: File not found at path: {file_path}")
        return False
    if not file_path.lower().endswith('.pdf'):
        print("Error: File is not a PDF")
        return False
    return True

def extract_text_from_pdf(file_path: str, max_chars: int = 100000) -> Optional[str]:
    if not validate_pdf(file_path):
        return None
    
    try:
        with open(file_path, 'rb') as file:
            # Create PDF reader object
            pdf_reader = PyPDF2.PdfReader(file)
            
            # Get total number of pages
            num_pages = len(pdf_reader.pages)
            print(f"Processing PDF with {num_pages} pages...")
            
            extracted_text = []
            total_chars = 0
            
            # Iterate through all pages
            for page_num in range(num_pages):
                # Extract text from page
                page = pdf_reader.pages[page_num]
                text = page.extract_text()
                
                # Check if adding this page's text would exceed the limit
                if total_chars + len(text) > max_chars:
                    # Only add text up to the limit
                    remaining_chars = max_chars - total_chars
                    extracted_text.append(text[:remaining_chars])
                    print(f"Reached {max_chars} character limit at page {page_num + 1}")
                    break
                
                extracted_text.append(text)
                total_chars += len(text)
                print(f"Processed page {page_num + 1}/{num_pages}")
            
            final_text = '\n'.join(extracted_text)
            print(f"\nExtraction complete! Total characters: {len(final_text)}")
            return final_text
            
    except PyPDF2.PdfReadError:
        print("Error: Invalid or corrupted PDF file")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        return None
# Get PDF metadata
def get_pdf_metadata(file_path: str) -> Optional[dict]:
    if not validate_pdf(file_path):
        return None
    
    try:
        with open(file_path, 'rb') as file:
            pdf_reader = PyPDF2.PdfReader(file)
            metadata = {
                'num_pages': len(pdf_reader.pages),
                'metadata': pdf_reader.metadata
            }
            return metadata
    except Exception as e:
        print(f"Error extracting metadata: {str(e)}")
        return None

In [None]:
# Extract metadata first
print("Extracting metadata...")
metadata = get_pdf_metadata(pdf_path)
if metadata:
    print("\nPDF Metadata:")
    print(f"Number of pages: {metadata['num_pages']}")
    print("Document info:")
    for key, value in metadata['metadata'].items():
        print(f"{key}: {value}")

# Extract text
print("\nExtracting text...")
extracted_text = extract_text_from_pdf(pdf_path)

# Display first 500 characters of extracted text as preview
if extracted_text:
    print("\nPreview of extracted text (first 500 characters):")
    print("-" * 50)
    print(extracted_text[:500])
    print("-" * 50)
    print(f"\nTotal characters extracted: {len(extracted_text)}")

# Optional: Save the extracted text to a file
if extracted_text:
    output_file = 'extracted_text.txt'
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(extracted_text)
    print(f"\nExtracted text has been saved to {output_file}")

### 2. Load txt file and clean by LLM

In [20]:
def load_txt_file(file_path: str,encoding='utf-8') -> Optional[str]:
    if not os.path.exists(file_path):
        print(f"Error: File not found at path: {file_path}")
        return None
    if not file_path.lower().endswith('.txt'):
        print("Error: File is not a text file")
        return None
    
    try:
        with open(file_path, 'r', encoding=encoding) as file:
            text = file.read()
            return text
    except Exception as e:
        print(f"An unexpected error occurred: {str(e)}")
        return None

In [21]:
extracted_text = load_txt_file('/Users/al/Learning/LLM/repo/PDF2Podcast/micro_economic.txt', encoding='gb2312')

In [None]:
def create_word_bounded_chunks(text, target_chunk_size):
    """
    Split text into chunks at word boundaries close to the target chunk size.
    """
    words = text.split()
    chunks = []
    current_chunk = []
    current_length = 0
    
    for word in words:
        word_length = len(word) + 1  # +1 for the space
        if current_length + word_length > target_chunk_size and current_chunk:
            # Join the current chunk and add it to chunks
            chunks.append(' '.join(current_chunk))
            current_chunk = [word]
            current_length = word_length
        else:
            current_chunk.append(word)
            current_length += word_length
    
    # Add the last chunk if it exists
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    
    return chunks

def process_chunk(text_chunk, chunk_num):
    """Process a chunk of text and return both input and output for verification"""
    conversation = [
        {"role": "system", "content": SYS_PROMPT},
        {"role": "user", "content": text_chunk},
    ]

    processed_text = llm_call(DEFAULT_MODEL,conversation,temperature=0.5)
    
   
    # Print chunk information for monitoring
    #print(f"\n{'='*40} Chunk {chunk_num} {'='*40}")
    print(f"INPUT TEXT:\n{text_chunk[:500]}...")  # Show first 500 chars of input
    print(f"\nPROCESSED TEXT:\n{processed_text[:500]}...")  # Show first 500 chars of output
    print(f"{'='*90}\n")
    
    return processed_text

In [38]:
INPUT_FILE = "extracted_text.txt"  # Replace with your file path
CHUNK_SIZE = 1000  # Adjust chunk size if needed

chunks = create_word_bounded_chunks(extracted_text, CHUNK_SIZE)
num_chunks = len(chunks)

In [39]:
output_file = f"clean_{os.path.basename(INPUT_FILE)}"

In [None]:
processed_text = ""
with open(output_file, 'w', encoding='utf-8') as out_file:

    for chunk_num, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
        # Process chunk and append to complete text
        processed_chunk = process_chunk(chunk, chunk_num)
        processed_text += processed_chunk + "\n"
        # Write chunk immediately to file
        out_file.write(processed_chunk + "\n")
        out_file.flush()

### 3. Cut the Txt file to sections by LLM

- (1) Use regex to split the txt files
- (2) Use LLM to judge whether the split is reasonable

> Notice:  Should modify logic for different regex

In [None]:
import os

import re
import requests
import json

def llm_call(model, messages, **kwargs):
    """Call Ollama API, ensure Ollama is running on your local machine"""
    url = "http://localhost:11434/api/chat"
    data = {
        "model": model,
        "messages": messages,
        "options": {
            "seed": kwargs.get("seed", None),
            "temperature": kwargs.get("temperature", 0),
        },
        "stream": False
    }

    headers = {
        "Content-Type": "application/json"
    }

    response = requests.post(url, headers=headers, data=json.dumps(data))
    return response.json()["message"]["content"]

def is_new_chapter(title, context):
    messages = [
        {"role": "system", "content": "你是一个帮助识别章节分割的助手。"},
        {"role": "user", "content": f"以下是一个章节标题和上下文，请判断是否应该在此处分割章节。\n\n章节标题: {title}\n上下文: {context}\n\n请回答 '是' 或 '否'。"}
    ]
    response = llm_call("llama3.2:1b", messages, temperature=0)
    return response.strip().lower() == '是'

def is_directory_file(file_content):
    """Use GPT to determine if the file is a directory file"""
    messages = [
        {"role": "system", "content": "你是一个帮助识别目录文件的助手。"},
        {"role": "user", "content": f"以下是文件内容的一部分，请判断是否该文件是不是属于一个目录:\n\n{file_content}\n\n回答 '是' 或 '否'。"}
    ]
    response = llm_call("llama3.2:1b", messages, temperature=0)
    return response.strip().lower() == '是'


def extract_chapter_titles(file_content):
    """Extract all chapter titles from the file"""
    chapter_pattern = re.compile(r'(第[一二三四五六七八九十]+章\s+.*)')
    return chapter_pattern.findall(file_content)

def find_matching_file(chapter_title, files_dir, toc_file):
    """Find the corresponding chapter file based on the chapter title"""
    sanitized_title = re.sub(r'[\\/*?:"<>|]', "", chapter_title)
    for filename in os.listdir(files_dir):
        if filename == toc_file:
            continue
        if filename.endswith('.txt'):
            with open(os.path.join(files_dir, filename), 'r', encoding='utf-8') as f:
                first_line = f.readline().strip()
                sanitized_line = re.sub(r'[\\/*?:"<>|]', "", first_line)
                if sanitized_line in sanitized_title:
                    return os.path.join(files_dir, filename)
    return None

def split_chapters(file_path):
    with open(file_path, 'r', encoding='gbk') as file:
        lines = file.readlines()

    chapter_pattern = re.compile(r'^第[一二三四五六七八九十]+章\s+.*')
    chapters = []
    current_chapter = []
    chapter_title = ""
    
    for i, line in enumerate(lines):
        if chapter_pattern.match(line):
            context = ''.join(current_chapter[-5:])  # 上下文取最近5行
            if is_new_chapter(line.strip(), context):
                if current_chapter:
                    chapters.append((chapter_title, ''.join(current_chapter)))
                chapter_title = line.strip()
                current_chapter = []
            current_chapter.append(line)
        else:
            current_chapter.append(line)
    
    if current_chapter:
        chapters.append((chapter_title, ''.join(current_chapter)))

    output_dir = 'chapters'
    os.makedirs(output_dir, exist_ok=True)

    for idx, (title, content) in enumerate(chapters, 1):
        sanitized_title = re.sub(r'[\\/*?:"<>|]', "", title[:30])
        file_name = f"Chapter{idx}_{sanitized_title}.txt"
        file_path = os.path.join(output_dir, file_name)
        with open(file_path, 'w', encoding='utf-8') as f:
            f.write(content)

    print(f'All chapters splitted saved under "{output_dir}"')


def merge_tocs_and_chapters(files_dir: str, output_dir: str):
    """
    Traverse all files in files_dir, identify directory files, match and merge corresponding chapter files, and save to output_dir
    """
    os.makedirs(output_dir, exist_ok=True)
    
    for filename in os.listdir(files_dir):
        file_path = os.path.join(files_dir, filename)
        if not os.path.isfile(file_path):
            continue
        
        file = open(file_path, 'r', encoding='utf-8')
        tocs = file.readlines()
        preview = ''.join(tocs[:5])
        if is_directory_file(preview):
            print(f"Toc file detected: {filename}")
            chapter_titles = extract_chapter_titles(preview)
            merged_content = ''.join(tocs)
            chapter_file = None
            for title in chapter_titles:
                chapter_file = find_matching_file(title, files_dir,os.path.basename(file_path))
                if chapter_file:
                    with open(chapter_file, 'r', encoding='utf-8') as cf:
                        merged_content += f"\n\n{cf.read()}"
                else:
                    print(f"Toc No detected: {title}")
            if chapter_file:
                sanitized_title = re.sub(r'[\\/*?:"<>|]', "", os.path.basename(chapter_file)[:30])
                merged_filename = f"Merged_{sanitized_title}"
                merged_file_path = os.path.join(output_dir, merged_filename)
                with open(merged_file_path, 'w', encoding='utf-8') as mf:
                    mf.write(merged_content)
                print(f"Merged file saved into: {merged_filename}")
        file.close()
    print(f"All files merged, saved under '{output_dir}'")


In [None]:
split_chapters('micro_economic.txt')

In [None]:
merge_tocs_and_chapters('chapters', 'merged_chapters')

In [2]:
1 + 1

2