## Requirements

In [None]:
%cd /kaggle/working
!git clone https://github.com/cognitivetech/ollama-ebook-summary
!pip install uv
%cd ollama-ebook-summary
!uv pip install -r requirements.txt --system
!CMAKE_ARGS="-DLLAMA_CUBLAS=on" FORCE_CMAKE=1 pip install llama-cpp-python --no-cache-dir

## Imports and Core Functions

In [None]:
import os, sys, csv, time, re, json
from typing import List, Tuple, Optional, Dict, Any  
from pathlib import Path
from llama_cpp import Llama

# Configuration
PROMPTS = {
    'bnotes': "Write comprehensive bulleted notes summarizing the provided text, with headings and terms in bold.",
    'concise': "Repeat the provided passage, with Concision.",
    'title': "Write 8-11 words describing this text."
}

DEFAULTS = {
    'prompt': 'bnotes',
    'summary': 'summary_model',
    'title': 'title_model'
}

def get_prompt(alias: str) -> str:
    """Retrieve prompt by alias from the hardcoded configuration."""
    prompt = PROMPTS.get(alias)
    if not prompt:
        print(f"Prompt alias '{alias}' not found in configuration.")
        sys.exit(1)
    return prompt

def load_model(path, model_type="Model"):
    """Load a GGUF model with error handling."""
    try:
        print(f"Loading {model_type} model from {path}...")
        return Llama(
            model_path=path,
            n_gpu_layers=-1,
            n_ctx=4096,
            verbose=False
        )
    except Exception as e:
        print(f"Error loading {model_type} model: {e}")
        sys.exit(1)

def sanitize_text(text: str) -> str:
    """Sanitize the input text by replacing unwanted characters."""
    return text.strip()

def generate_title(title_model: Llama, clean_text: str) -> Optional[str]:
    """Generate a unique title using the local GGUF model with few-shot prompting."""
    chat_prompt = [
        {
            "role": "user",
            "content": "```This new understanding of the multifaceted roles of the cranial nerves in health and disease opens up new therapeutic possibilities. The exercises are noninvasive and do not involve medicine or surgery.``` \nThe content between backticks is a subsection of a book-chapter. write 8-11 words describing it."
        },
        {
            "role": "assistant",
            "content": "Restoring Autonomic Balance Through Cranial Nerve Techniques"
        },
        {
            "role": "user",
            "content": f"```{clean_text}``` \nThe content between backticks is a subsection of a book-chapter. write 8-11 words describing it."
        }
    ]

    try:
        response = title_model.create_chat_completion(
            messages=chat_prompt,
            max_tokens=30
        )
        generated_title = response['choices'][0]['message']['content'].strip()
        return generated_title.split('\n')[0]
    except Exception as e:
        print(f"Error during title generation: {e}")
        return None

def get_unique_title_local(original_title: str, clean_text: str, previous_original_title: str, title_model: Llama) -> Tuple[str, bool]:
    """Ensure the title is unique, generate a new one if necessary."""
    if original_title and original_title != previous_original_title:
        return original_title, False

    for _ in range(3):
        generated_title = generate_title(title_model, clean_text)
        if generated_title and generated_title != previous_original_title:
            return generated_title, True

    fallback_title = clean_text[:150].strip() + "..."
    print(f"Title generation failed. Using fallback title: {fallback_title}")
    return fallback_title, True

def bold_text_before_colon(text: str) -> str:
    """Bold any text before the first colon that isn't already bolded."""
    pattern = r'^([ \t]*-[ \t]*)([a-zA-Z].*?):'
    replacement = r'\1**\2:**'
    return re.sub(pattern, replacement, text)

## Processing Functions

In [None]:
def process_entry(clean_text: str, title: str, previous_original_title: str, 
                  summary_model: Llama, title_model: Llama, prompt_alias: str) -> Tuple[str, bool, str, float, int, str]:
    """Process a single text entry using local models."""
    unique_title, was_generated = get_unique_title_local(title, clean_text, previous_original_title, title_model)

    # Choose prompt based on text length
    if len(clean_text) < 1000:
        prompt = get_prompt("concise")
    else:
        prompt = get_prompt(prompt_alias)

    full_prompt = f"```{clean_text}```\n\n{prompt}"

    start_time = time.time()
    
    try:
        response = summary_model(
            prompt=full_prompt,
            max_tokens=1024,
            stop=["```"],
            echo=False
        )
        output = response['choices'][0]['text'].strip()
    except Exception as e:
        print(f"Error during summary generation: {e}")
        output = "Error: Failed to generate output."

    end_time = time.time()
    
    output = bold_text_before_colon(output)
    elapsed_time = end_time - start_time
    size = len(output)
    return unique_title, was_generated, output, elapsed_time, size, title

def write_csv_header(writer):
    """Write the CSV header."""
    writer.writerow(["chapter", "level", "title", "text", "text.len", "summary", "summary.len", "time"])

def write_csv_entry(writer, unique_title: str, text: str, summary: str, elapsed_time: float, is_chapter: bool, heading_level: int):
    """Write CSV entry."""
    escaped_summary = summary.replace('\n', '\\n')
    writer.writerow([
        is_chapter, 
        heading_level, 
        unique_title, 
        text, 
        len(text), 
        escaped_summary, 
        len(summary), 
        elapsed_time
    ])

def determine_header_level(row, default_level=3):
    """Determine header level from CSV row."""
    level = row.get('level')
    if level:
        try:
            level_num = int(level)
            return level_num + 2 if level_num == 0 else level_num
        except ValueError:
            print(f"Warning: Invalid level value '{level}'. Using default level {default_level}.")
    return default_level

def generate_toc(toc_entries: List[Tuple[int, str]]) -> str:
    """Generate Table of Contents."""
    toc_lines = ["## Table of Contents"]
    for level, text in toc_entries:
        slug = re.sub(r'[^a-z0-9]+', '-', text.lower()).strip('-')
        indent = " " * ((level - 2) * 2)
        toc_lines.append(f"{indent}- [{text}](#{slug})")
    return "\n".join(toc_lines) + "\n\n"

def get_last_processed_text(csv_file: str, file_type: str) -> str:
    """Get the text of the last processed entry from CSV file."""
    try:
        with open(csv_file, 'r', newline='', encoding='utf-8') as f:
            reader = csv.reader(f)
            next(reader)  # Skip header
            text_col_idx = 3
            last_row = None
            for row in reader:
                if row:
                    last_row = row
            return last_row[text_col_idx] if last_row else ""
    except (FileNotFoundError, IndexError):
        return ""

### Main Processing Functions

In [None]:
def process_csv_input(input_file: str, summary_model: Llama, title_model: Llama, 
                    prompt_alias: str, markdown_file: str, csv_file: str,
                    verbose: bool = False, continue_processing: bool = False):
    """Process CSV input files."""
    last_processed_text = ""
    mode = "w"
    markdown_lines = []
    toc_entries = []

    if continue_processing:
        last_processed_text = get_last_processed_text(csv_file, 'csv')
        if last_processed_text:
            mode = "a"
            print(f"Continuing from text: {last_processed_text[:50]}...")

    with open(csv_file, mode, newline="", encoding='utf-8') as csv_out:
        writer = csv.writer(csv_out)
        if mode == "w":
            write_csv_header(writer)

        skip_until_found = continue_processing and last_processed_text
        found_last_text = not skip_until_found

        with open(input_file, "r", encoding='utf-8') as csv_in:
            reader = csv.DictReader(csv_in)
            has_level_column = 'level' in reader.fieldnames
            previous_original_title = ""

            for row in reader:
                text = next((row[key] for key in row if key.lower() == "text"), "").strip()
                clean = sanitize_text(text)

                if skip_until_found:
                    if clean == last_processed_text:
                        skip_until_found = False
                        found_last_text = True
                        print(f"Resuming from: {last_processed_text[:50]}...")
                    continue

                if not found_last_text:
                    continue

                original_title = next((row[key] for key in row if key.lower() == "title"), "").strip()
                is_chapter = original_title and original_title != previous_original_title

                if original_title == previous_original_title:
                    unique_title, was_generated, output, elapsed_time, size, _ = process_entry(
                        clean, "", previous_original_title, 
                        summary_model, title_model, prompt_alias
                    )
                else:
                    unique_title, was_generated, output, elapsed_time, size, _ = process_entry(
                        clean, original_title, previous_original_title, 
                        summary_model, title_model, prompt_alias
                    )

                base_level = determine_header_level(row) if has_level_column else 3
                current_level = base_level + 1 if was_generated else base_level

                # Handle split titles
                if ' > ' in unique_title:
                    parts = unique_title.split(' > ', 1)
                    heading = f"{'#' * current_level} {parts[0]}\n\n{'#' * (current_level + 1)} {parts[1]}"
                    toc_entries.append((current_level, parts[0]))
                    toc_entries.append((current_level + 1, parts[1]))
                else:
                    heading = f"{'#' * current_level} {unique_title}"
                    toc_entries.append((current_level, unique_title))

                markdown_block = f"{heading}\n\n{output}\n\n"
                markdown_lines.append(markdown_block)
                
                if verbose:
                    print(markdown_block)

                write_csv_entry(writer, unique_title, clean, output, elapsed_time, is_chapter, current_level)

                if not was_generated:
                    previous_original_title = original_title

    # Generate ToC and write markdown
    toc_content = generate_toc(toc_entries)
    
    with open(markdown_file, 'w', encoding='utf-8') as md_out:
        filename_no_ext = os.path.splitext(os.path.basename(input_file))[0]
        md_out.write(f"# {filename_no_ext}\n\n")
        if mode == "w":
            md_out.write(toc_content + "\n\n")
        md_out.write("\n".join(markdown_lines))

def process_text_input(input_file: str, summary_model: Llama, title_model: Llama,
                      prompt_alias: str, markdown_file: str, csv_file: str,
                      verbose: bool = False, continue_processing: bool = False):
    """Process plain text input files."""
    mode = "a" if continue_processing else "w"
    last_processed_text = ""

    if continue_processing:
        last_processed_text = get_last_processed_text(csv_file, 'txt')
        print(f"Continuing from text: {last_processed_text[:50]}...")

    with open(csv_file, mode, newline="", encoding='utf-8') as csv_out:
        writer = csv.writer(csv_out)
        if mode == "w":
            write_csv_header(writer)

        with open(input_file, "r", encoding='utf-8') as txt_in:
            previous_original_title = ""
            looking_for_start = bool(continue_processing and last_processed_text)

            with open(markdown_file, mode, encoding='utf-8') as md_out:
                if mode == "w":
                    filename_no_ext = os.path.splitext(os.path.basename(input_file))[0]
                    md_out.write(f"# {filename_no_ext}\n\n")

                for line in txt_in:
                    trimmed = line.strip().strip('()')
                    clean = sanitize_text(trimmed)
                    extracted_title = clean[:150].strip().split('+')[0].strip()
                    
                    if looking_for_start:
                        if clean == last_processed_text:
                            looking_for_start = False
                        else:
                            continue

                    unique_title, was_generated, output, elapsed_time, size, original_title = process_entry(
                        clean, extracted_title, previous_original_title, 
                        summary_model, title_model, prompt_alias
                    )
                    unique_title = unique_title.strip('"')

                    # Remove title and '+' from text
                    title_pattern = re.escape(unique_title)
                    title_plus_pattern = f'(?:"{title_pattern}"|{title_pattern})\\s*\\+\\s*'
                    clean_text = re.sub(f'^{title_plus_pattern}', '', clean, count=1).strip()

                    heading = f"#### {unique_title}" if was_generated else f"### {unique_title}"
                    markdown_text = f"{heading}\n\n{output}\n\n"
                    md_out.write(markdown_text)
                    
                    if verbose:
                        print(markdown_text)

                    write_csv_entry(writer, unique_title, clean_text, output, elapsed_time, False, 3)
                    previous_original_title = original_title

## Configuration and Execution

In [None]:
# --- CONFIGURATION - MODIFY THESE VALUES ---
# Define model paths (update these to match your Kaggle input paths)
TITLE_MODEL_PATH = "/kaggle/input/mistral-7b-instruct-v0.2/gguf/default/1/mistral-7b-instruct-v0.2.Q8_0.gguf"
SUMMARY_MODEL_PATH = "/kaggle/input/mistral-0.2-instruct-bulleted-notes/gguf/default/1/mistral-7b-inst-0.2-bulleted-notes.Q8_0.gguf"

# Processing parameters
INPUT_FILE = "/kaggle/working/ollama-ebook-summary/your_input_file.csv"  # UPDATE THIS
PROCESSING_MODE = 'csv'  # 'csv' or 'txt'
PROMPT_ALIAS = 'bnotes'  # 'bnotes', 'summary', or 'concise'
VERBOSE = True
CONTINUE_PROCESSING = False

# Load models
title_model = load_model(TITLE_MODEL_PATH, "Title")
summary_model = load_model(SUMMARY_MODEL_PATH, "Summary")

# Generate output filenames
filename = os.path.basename(INPUT_FILE)
filename_no_ext, _ = os.path.splitext(filename)
markdown_file = f"{filename_no_ext}_summary.md"
csv_file = f"{filename_no_ext}_summary.csv"

print(f"Input file: {INPUT_FILE}")
print(f"Output files: {markdown_file}, {csv_file}")
print(f"Processing mode: {PROCESSING_MODE}")
print(f"Prompt alias: {PROMPT_ALIAS}")

## Run Processing

In [None]:
# Execute processing
if PROCESSING_MODE == 'csv':
    print("Starting CSV processing...")
    process_csv_input(
        INPUT_FILE, 
        summary_model, 
        title_model, 
        PROMPT_ALIAS, 
        markdown_file, 
        csv_file, 
        VERBOSE, 
        CONTINUE_PROCESSING
    )
elif PROCESSING_MODE == 'txt':
    print("Starting TXT processing...")
    process_text_input(
        INPUT_FILE, 
        summary_model, 
        title_model, 
        PROMPT_ALIAS, 
        markdown_file, 
        csv_file, 
        VERBOSE, 
        CONTINUE_PROCESSING
    )
else:
    print("Error: Invalid processing mode. Use 'csv' or 'txt'.")

print(f"Processing completed! Output saved to {markdown_file} and {csv_file}")