In [None]:
# Install required packages
!pip install -q google-genai pandas openpyxl ipywidgets

# Import necessary libraries
import os
import time
import logging
import shutil
import re
from pathlib import Path
from google.colab import files
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output
from google import genai
from google.genai import types
import pandas as pd

# ============================================
# CREATE FOLDER STRUCTURE
# ============================================

# Define folder paths
FOLDERS = {
    'txt_input': 'txt_input',
    'excel_input': 'excel_input',
    'results': 'results',
    'prompts': 'prompts',
    'log': 'logs'
}

# Create all folders
for folder_name, folder_path in FOLDERS.items():
    os.makedirs(folder_path, exist_ok=True)

# ============================================
# CREATE PROMPT FILES
# ============================================

PROMPT_CONTENT = {
    "summary_prompt.md": """# Summary and Keywords Generation Prompt

Generate a concise summary of the following text in a few sentences, followed by a list of relevant keywords. The summary should allow users to quickly understand the main content.

## Instructions:
- Summary: A few concise sentences clearly identifying the main content
- Keywords: 5-10 relevant keywords or key phrases that capture the main topics and themes
- No introduction or additional commentary
- No markdown formatting in the output

## Response Format:
First provide the summary, then on a new line add "Keywords:" followed by the comma-separated keywords.

Example format:
[Your summary text here in a few sentences]

Keywords: keyword1, keyword2, keyword3, keyword4, keyword5

---

**Text:**
{text}
"""
}

# Write prompt files to disk
for filename, content in PROMPT_CONTENT.items():
    filepath = os.path.join(FOLDERS['prompts'], filename)
    with open(filepath, 'w', encoding='utf-8') as f:
        f.write(content)

print("‚úÖ Setup complete!")
print()
print("üìÅ Folder structure created:")
print("   ‚îú‚îÄ‚îÄ üìÇ txt_input/        ‚Üê Upload .txt files here")
print("   ‚îú‚îÄ‚îÄ üìÇ excel_input/      ‚Üê Upload .xlsx files here")
print("   ‚îú‚îÄ‚îÄ üìÇ results/          ‚Üê Output summaries saved here")
print("   ‚îú‚îÄ‚îÄ üìÇ prompts/          ‚Üê System prompts")
print("   ‚îÇ   ‚îî‚îÄ‚îÄ summary_prompt.md")
print("   ‚îî‚îÄ‚îÄ üìÇ logs/             ‚Üê Processing logs")

## Step 2: Enter Your API Key üîë

Enter your Google Gemini API key below. 

**Don't have one?** Get it free at: https://aistudio.google.com/app/api-keys

Your API key is entered securely (hidden like a password).

In [None]:
# Create a secure password field for the API key
api_key_input = widgets.Password(
    placeholder='Paste your API key here',
    description='API Key:',
    layout=widgets.Layout(width='500px'),
    style={'description_width': '80px'}
)

api_key_status = widgets.HTML(value="")

def validate_api_key(change):
    if len(change['new']) > 20:
        api_key_status.value = "<span style='color: green;'>‚úÖ API key entered</span>"
    else:
        api_key_status.value = "<span style='color: orange;'>‚è≥ Please enter your full API key</span>"

api_key_input.observe(validate_api_key, names='value')

display(HTML("<b>Enter your Gemini API key:</b>"))
display(api_key_input)
display(api_key_status)
display(HTML("<br><i>üí° Tip: Your key starts with 'AIza...'</i>"))

## Step 3: Upload Your Documents üìÅ

Click the button below to select and upload your files.

You can upload:
1. **Text files (.txt)**: Will be summarized individually.
2. **Excel file (.xlsx)**: Must have an 'OCR' column. Summaries will be added to a new 'Summary' column.

In [None]:
# Store uploaded files
uploaded_files = {
    'txt': [],
    'excel': []
}

upload_status = widgets.HTML(value="")

def upload_files_handler(b):
    global uploaded_files
    upload_status.value = "<span style='color: blue;'>üì§ Upload dialog opened... Select your files</span>"
    
    try:
        uploaded = files.upload()
        
        if uploaded:
            uploaded_files = {'txt': [], 'excel': []}
            valid_files = []
            invalid_files = []
            
            for filename, content in uploaded.items():
                ext = Path(filename).suffix.lower()
                if ext == '.txt':
                    # Save file to txt_input folder
                    filepath = os.path.join(FOLDERS['txt_input'], filename)
                    with open(filepath, 'wb') as f:
                        f.write(content)
                    uploaded_files['txt'].append(filepath)
                    valid_files.append(f"TXT: {filename}")
                elif ext in ['.xlsx', '.xls']:
                    # Save file to excel_input folder
                    filepath = os.path.join(FOLDERS['excel_input'], filename)
                    with open(filepath, 'wb') as f:
                        f.write(content)
                    uploaded_files['excel'].append(filepath)
                    valid_files.append(f"EXCEL: {filename}")
                else:
                    invalid_files.append(filename)
            
            status_html = ""
            if valid_files:
                status_html += f"<span style='color: green;'>‚úÖ Uploaded {len(valid_files)} file(s):</span><br>"
                for f in valid_files:
                    status_html += f"&nbsp;&nbsp;&nbsp;üìÑ {f}<br>"
            if invalid_files:
                status_html += f"<span style='color: red;'>‚ùå Skipped {len(invalid_files)} unsupported file(s):</span><br>"
                for f in invalid_files:
                    status_html += f"&nbsp;&nbsp;&nbsp;‚ö†Ô∏è {f}<br>"
            
            upload_status.value = status_html
        else:
            upload_status.value = "<span style='color: orange;'>‚ö†Ô∏è No files uploaded</span>"
    except Exception as e:
        upload_status.value = f"<span style='color: red;'>‚ùå Error: {str(e)}</span>"

upload_button = widgets.Button(
    description='üìÅ Click to Upload Files',
    button_style='primary',
    layout=widgets.Layout(width='250px', height='40px')
)
upload_button.on_click(upload_files_handler)

display(upload_button)
display(upload_status)
display(HTML("<br><i>üí° Files will be saved to <code>txt_input/</code> or <code>excel_input/</code></i>"))

## Step 4: Summarization Settings üéõÔ∏è

Select the AI model to use for generating summaries.

In [None]:
# ============================================
# SETTINGS WIDGETS
# ============================================

# Model selection
model_dropdown = widgets.Dropdown(
    options=[
        ('Gemini 2.5 Flash (Fast, good for summaries)', 'gemini-2.5-flash'),
        ('Gemini 2.5 Pro (Higher quality)', 'gemini-2.5-pro'),
        ('Gemini 3.0 Pro (Latest preview)', 'gemini-3-pro-preview'),
    ],
    value='gemini-2.5-flash',
    description='AI Model:',
    style={'description_width': '100px'},
    layout=widgets.Layout(width='450px')
)

display(HTML("<h3>ü§ñ Select AI Model</h3>"))
display(model_dropdown)

## Step 5: Start Summarization üöÄ

Click the button below to start processing your files.

In [None]:
# ============================================
# SUMMARIZATION ENGINE
# ============================================

class ColabGeminiSummarizer:
    def __init__(self, api_key: str, model_name: str):
        self.client = genai.Client(api_key=api_key)
        self.model_name = model_name
        self.prompt_template = self._load_prompt_template()
        
    def _load_prompt_template(self) -> str:
        prompt_file = os.path.join(FOLDERS['prompts'], 'summary_prompt.md')
        try:
            with open(prompt_file, 'r', encoding='utf-8') as f:
                return f.read()
        except Exception as e:
            print(f"‚ùå Error reading prompt template: {e}")
            return "{text}"

    def generate_summary(self, text: str) -> str:
        if not text or not text.strip():
            return None
            
        prompt = self.prompt_template.format(text=text)
        try:
            gen_config = types.GenerateContentConfig(temperature=0.2)
            response = self.client.models.generate_content(
                model=self.model_name,
                contents=prompt,
                config=gen_config
            )
            if response and hasattr(response, 'text'):
                return response.text.strip().replace('*', '')
            return None
        except Exception as e:
            print(f"   ‚ùå API Error: {e}")
            return None

def extract_keywords_from_summary(summary_text: str):
    """
    Extract keywords from the summary text and return cleaned summary + keywords.
    """
    if not summary_text:
        return ("", "")
    
    keyword_indicators = [
        "Keywords:", "Mots-cl√©s:", "Key words:", "Tags:", "ÿßŸÑŸÉŸÑŸÖÿßÿ™ ÿßŸÑŸÖŸÅÿ™ÿßÿ≠Ÿäÿ©:",
    ]
    
    cleaned_summary = summary_text
    keywords = ""
    
    for indicator in keyword_indicators:
        if indicator in summary_text:
            parts = summary_text.split(indicator)
            if len(parts) > 1:
                cleaned_summary = parts[0].strip()
                keyword_section = parts[-1].strip()
                keywords = re.sub(r'[\n,;]', '|', keyword_section)
                keywords = re.sub(r'\s*\|\s*', ' | ', keywords)
    
    return (cleaned_summary, keywords)

# ============================================
# PROCESSING LOGIC
# ============================================

summary_output_area = widgets.Output()
summary_results = {}  # Store results for download

def run_summarization(b):
    global summary_results
    summary_results = {}
    
    with summary_output_area:
        clear_output()
        
        # Validate inputs
        if not api_key_input.value or len(api_key_input.value) < 20:
            print("‚ùå Please enter a valid API key in Step 2")
            return
        
        if not uploaded_files['txt'] and not uploaded_files['excel']:
            print("‚ùå Please upload at least one file in Step 3")
            return
        
        # Initialize
        api_key = api_key_input.value
        model = model_dropdown.value
        summarizer = ColabGeminiSummarizer(api_key, model)
        
        print(f"ü§ñ Model: {model}")
        print("\n" + "="*50)
        
        # 1. Process Excel Files
        if uploaded_files['excel']:
            print("üìä PROCESSING EXCEL FILES")
            for excel_path in uploaded_files['excel']:
                filename = Path(excel_path).name
                print(f"\nüìÑ Reading: {filename}")
                
                try:
                    df = pd.read_excel(excel_path)
                    if 'OCR' not in df.columns:
                        print("   ‚ùå Error: 'OCR' column not found in spreadsheet!")
                        continue
                        
                    if 'Summary' not in df.columns: df['Summary'] = ''
                    if 'Keywords' not in df.columns: df['Keywords'] = ''
                    
                    total_rows = len(df)
                    print(f"   Found {total_rows} rows to process")
                    
                    processed_count = 0
                    for idx, row in df.iterrows():
                        ocr_text = row.get('OCR')
                        
                        # Skip empty or error rows
                        if pd.isna(ocr_text) or not str(ocr_text).strip():
                            continue
                        if str(ocr_text).startswith('[ERROR:') or str(ocr_text).startswith('[SKIPPED:'):
                            continue
                            
                        # Generate summary
                        print(f"   Processing row {idx+1}/{total_rows}...", end='\r')
                        full_summary = summarizer.generate_summary(str(ocr_text))
                        
                        if full_summary:
                            clean_summary, keywords = extract_keywords_from_summary(full_summary)
                            df.at[idx, 'Summary'] = clean_summary
                            df.at[idx, 'Keywords'] = keywords
                            processed_count += 1
                    
                    # Save result
                    output_filename = f"Summarized_{filename}"
                    output_path = os.path.join(FOLDERS['results'], output_filename)
                    df.to_excel(output_path, index=False)
                    
                    summary_results[output_filename] = {'path': output_path, 'type': 'excel'}
                    print(f"\n   ‚úÖ Complete! Processed {processed_count} rows.")
                    print(f"   üíæ Saved to: {output_path}")
                    
                except Exception as e:
                    print(f"   ‚ùå Error processing Excel: {e}")

        # 2. Process TXT Files
        if uploaded_files['txt']:
            print("\nüìù PROCESSING TEXT FILES")
            for txt_path in uploaded_files['txt']:
                filename = Path(txt_path).name
                print(f"\nüìÑ Reading: {filename}")
                
                try:
                    with open(txt_path, 'r', encoding='utf-8') as f:
                        text = f.read()
                    
                    summary = summarizer.generate_summary(text)
                    
                    if summary:
                        output_filename = f"Summary_{filename}"
                        output_path = os.path.join(FOLDERS['results'], output_filename)
                        
                        with open(output_path, 'w', encoding='utf-8') as f:
                            f.write(summary)
                            
                        summary_results[output_filename] = {'path': output_path, 'type': 'txt'}
                        print(f"   ‚úÖ Summary generated")
                    else:
                        print(f"   ‚ö†Ô∏è Failed to generate summary")
                        
                except Exception as e:
                    print(f"   ‚ùå Error processing file: {e}")

        print("\n" + "="*50)
        print("üéâ PROCESSING COMPLETE!")
        print(f"   Files generated: {len(summary_results)}")
        print("\nüëá Download your results in the next step")

summary_button = widgets.Button(
    description='üöÄ Start Summarization',
    button_style='success',
    layout=widgets.Layout(width='200px', height='50px')
)
summary_button.on_click(run_summarization)

display(summary_button)
display(HTML("<br>"))
display(summary_output_area)

## Step 6: Download Your Results üì•

After processing is complete, click below to download your files.

In [None]:
download_output = widgets.Output()

def download_results(b):
    with download_output:
        clear_output()
        
        if not summary_results:
            print("‚ùå No results available yet. Please run Step 5 first.")
            return
        
        print("üì• Preparing downloads...\n")
        
        for filename, data in summary_results.items():
            try:
                filepath = data['path']
                print(f"   Downloading: {filename}")
                files.download(filepath)
            except Exception as e:
                print(f"   ‚ö†Ô∏è Could not download {filename}: {e}")
        
        print("\n‚úÖ Downloads initiated! Check your browser's download folder.")

def download_all_zip(b):
    """Zip and download all results."""
    with download_output:
        clear_output()
        
        results_path = Path(FOLDERS['results'])
        files_found = list(results_path.glob('*'))
        
        if not files_found:
            print("‚ùå No result files found.")
            return
        
        print(f"üì¶ Zipping {len(files_found)} file(s)...")
        shutil.make_archive('summary_results', 'zip', results_path)
        
        print("üì• Downloading zip file...")
        files.download('summary_results.zip')
        print("\n‚úÖ Download initiated!")

download_button = widgets.Button(
    description='üì• Download Results',
    button_style='info',
    layout=widgets.Layout(width='250px', height='40px')
)
download_button.on_click(download_results)

download_zip_button = widgets.Button(
    description='üì¶ Download All as ZIP',
    button_style='',
    layout=widgets.Layout(width='250px', height='40px')
)
download_zip_button.on_click(download_all_zip)

display(widgets.HBox([download_button, download_zip_button]))
display(download_output)

## Step 7: Cleanup üßπ

Delete temporary files or clear everything when you're done.

In [None]:
cleanup_output = widgets.Output()

def cleanup_inputs(b):
    with cleanup_output:
        clear_output()
        for folder in ['txt_input', 'excel_input']:
            path = Path(FOLDERS[folder])
            if path.exists():
                files_deleted = list(path.glob('*'))
                for f in files_deleted:
                    f.unlink()
        print(f"üßπ Deleted input files")
        global uploaded_files
        uploaded_files = {'txt': [], 'excel': []}

def cleanup_results(b):
    with cleanup_output:
        clear_output()
        path = Path(FOLDERS['results'])
        if path.exists():
            files_deleted = list(path.glob('*'))
            for f in files_deleted:
                f.unlink()
        print(f"üßπ Deleted result files")
        global summary_results
        summary_results = {}

def cleanup_all(b):
    with cleanup_output:
        clear_output()
        cleanup_inputs(None)
        cleanup_results(None)
        print("‚ú® All temporary files cleared!")

btn_in = widgets.Button(description='üóëÔ∏è Delete Inputs', button_style='warning', layout=widgets.Layout(width='180px'))
btn_res = widgets.Button(description='üóëÔ∏è Delete Results', button_style='warning', layout=widgets.Layout(width='180px'))
btn_all = widgets.Button(description='üóëÔ∏è Delete Everything', button_style='danger', layout=widgets.Layout(width='180px'))

btn_in.on_click(cleanup_inputs)
btn_res.on_click(cleanup_results)
btn_all.on_click(cleanup_all)

display(HTML("<b>Cleanup options:</b>"))
display(widgets.HBox([btn_in, btn_res, btn_all]))
display(cleanup_output)

---

### About

**ZMO AI Pipelines** created by [Fr√©d√©rick Madore](https://www.frederickmadore.com/)

Part of the [Leibniz-Zentrum Moderner Orient (ZMO)](https://www.zmo.de/) research tools.