# Google AI Studio — Drive File Search

Full-text search across **1 000+ extensionless files** stored in your
Google Drive `/Google AI Studio` folder.

---

## Expert Analysis & Critique

### Problem characteristics

| Constraint | Implication |
|---|---|
| 1 000+ files | API pagination required (`pageSize` max 1000) |
| No file extension | Cannot filter by extension; must rely on MIME metadata + binary heuristics |
| Mixed text / binary content | Must detect and skip binary *before* indexing |
| Colab environment | ~12 GB RAM, ephemeral runtime, network-bound I/O |

### Approach comparison

| Method | Throughput | Pre-filtering | Complexity |
|---|---|---|---|
| `drive.mount()` + `os.walk` | **Slow** — FUSE adds per-file latency | None — must open every file | Trivial |
| Drive API, sequential downloads | Slow — one HTTP round-trip at a time | MIME-based | Low |
| **Drive API + parallel downloads** | **~20-30× faster** — concurrent I/O | **MIME + binary heuristics** | Moderate |
| Google Cloud Search / Vertex AI Search | Very fast | Built-in | Requires Workspace admin / costs |

### Chosen architecture

```
Auth ➜ List files (paginated) ➜ Filter by MIME
     ➜ Parallel download (ThreadPoolExecutor, 25 workers)
     ➜ Binary detection (null-byte heuristic on first 8 KB)
     ➜ UTF-8 / Latin-1 decode
     ➜ In-memory dict  { filename : text }
     ➜ Regex / substring search ➜ Highlighted results
```

**Why this wins:**

1. **MIME pre-filter** — `image/*`, `video/*`, `audio/*`, `application/zip`,
   and native Google types (`application/vnd.google-apps.*`) are skipped
   *without downloading a single byte*.
2. **Parallel I/O** — 25 concurrent workers saturate the network link instead
   of waiting sequentially. For 1 000 files this is the single biggest speedup.
3. **Binary detection** — Files typed as `application/octet-stream` (common
   for extensionless files) get a fast null-byte scan on the first 8 KB before
   the full content is decoded.
4. **In-memory index** — After the one-time download pass, every subsequent
   search is pure in-memory string/regex matching—instant.

### Limitations

| Limitation | Mitigation |
|---|---|
| ~12 GB RAM ceiling | 1 000 text files ≈ 50–200 MB — well within budget |
| Drive API quota (12 000 req/min) | 25 workers × 1 000 files = 1 000 calls, ~8 % of quota |
| Index lost on runtime restart | Re-run the indexing cell; optionally pickle the index |
| Keyword search only (no semantics) | Regex support covers complex patterns; add embeddings later if needed |
| Google-native docs (Docs/Sheets) not downloaded | These require `export()`; add a dedicated cell if needed |

In [None]:
# ── Setup & Authentication ───────────────────────────────────────────

from google.colab import auth
import google.auth
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from googleapiclient.errors import HttpError

import io, re, time, csv
from concurrent.futures import ThreadPoolExecutor, as_completed
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets

auth.authenticate_user()
creds, _ = google.auth.default()

drive = build('drive', 'v3', credentials=creds)
print('\u2713 Authenticated \u2014 Drive API ready.')

## Configuration

Adjust these knobs before running the indexing cell.

In [None]:
# ── Configuration ───────────────────────────────────────────────────

FOLDER_NAME    = 'Google AI Studio'   # Folder name (at Drive root)
MAX_WORKERS    = 25                    # Parallel download threads
MAX_FILE_SIZE  = 10 * 1024 * 1024     # Skip files larger than 10 MB
CONTEXT_LINES  = 2                     # Lines of context around each match

## Core Engine

Run this cell once to define all helper functions.

In [None]:
# ── Core Engine ─────────────────────────────────────────────────────

# MIME prefixes that are guaranteed non-text — skip without downloading
_SKIP_MIME = (
    'image/', 'video/', 'audio/',
    'application/zip', 'application/x-tar', 'application/x-rar',
    'application/pdf', 'application/vnd.google-apps.',
)


def find_folder_id(name, parent='root'):
    """Resolve a folder name under *parent* to its Drive ID."""
    q = (f"name='{name}' and '{parent}' in parents and "
         f"mimeType='application/vnd.google-apps.folder' and trashed=false")
    resp = drive.files().list(q=q, fields='files(id,name)',
                              pageSize=5).execute()
    hits = resp.get('files', [])
    if not hits:
        raise FileNotFoundError(f"Folder '{name}' not found under parent='{parent}'")
    return hits[0]['id']


def list_files(folder_id):
    """List every non-folder file inside *folder_id* (handles pagination)."""
    files, token = [], None
    while True:
        resp = drive.files().list(
            q=(f"'{folder_id}' in parents and trashed=false and "
               f"mimeType!='application/vnd.google-apps.folder'"),
            fields='nextPageToken,files(id,name,mimeType,size)',
            pageSize=1000,
            pageToken=token,
        ).execute()
        files.extend(resp.get('files', []))
        token = resp.get('nextPageToken')
        if not token:
            break
    return files


def _is_binary(data: bytes) -> bool:
    """Fast heuristic: null bytes or high ratio of control chars means binary."""
    sample = data[:8192]
    if b'\x00' in sample:
        return True
    non_text = sum(1 for b in sample if b < 8 or (13 < b < 32))
    return non_text / max(len(sample), 1) > 0.10


def download(finfo):
    """Download one file. Returns (name, text|None, status_tag)."""
    fid   = finfo['id']
    name  = finfo['name']
    mime  = finfo.get('mimeType', '')
    size  = int(finfo.get('size', 0))

    # ── Pre-filters ────────────────────────────────────────
    if any(mime.startswith(p) for p in _SKIP_MIME):
        return name, None, 'skip_mime'
    if size > MAX_FILE_SIZE:
        return name, None, 'skip_size'

    # ── Download with retry ────────────────────────────────
    for attempt in range(4):
        try:
            buf = io.BytesIO()
            dl  = MediaIoBaseDownload(buf, drive.files().get_media(fileId=fid))
            done = False
            while not done:
                _, done = dl.next_chunk()
            raw = buf.getvalue()

            if not raw:
                return name, None, 'empty'
            if _is_binary(raw):
                return name, None, 'binary'

            # Decode: prefer UTF-8, fall back to Latin-1 (never fails)
            try:
                return name, raw.decode('utf-8'), 'ok'
            except UnicodeDecodeError:
                return name, raw.decode('latin-1'), 'ok'

        except HttpError as e:
            if e.resp.status in (429, 500, 503) and attempt < 3:
                time.sleep(2 ** attempt)
                continue
            return name, None, f'http_{e.resp.status}'
        except Exception as exc:
            return name, None, 'error'

    return name, None, 'max_retries'


def search(index, query, case_insensitive=True, use_regex=False,
           context=CONTEXT_LINES):
    """Search the in-memory index. Returns [(name, count, [context_blocks])]."""
    flags = re.IGNORECASE if case_insensitive else 0
    pat = re.compile(query if use_regex else re.escape(query), flags)

    results = []
    for name, content in index.items():
        lines = content.split('\n')
        hits  = []
        for i, line in enumerate(lines):
            if pat.search(line):
                s = max(0, i - context)
                e = min(len(lines), i + context + 1)
                block = []
                for j in range(s, e):
                    marker = '\u25b6' if j == i else ' '
                    block.append(f'{marker} {j+1:>5} \u2502 {lines[j]}')
                hits.append('\n'.join(block))
        if hits:
            results.append((name, len(hits), hits))

    results.sort(key=lambda r: -r[1])
    return results


def print_results(results, max_hits_per_file=5, max_files=30):
    """Pretty-print search results to stdout."""
    if not results:
        print('No matches found.')
        return
    total = sum(c for _, c, _ in results)
    print(f'Matches in {len(results)} file(s)  ({total} total hits)\n')
    for name, count, hits in results[:max_files]:
        label = f'{count} match' + ('es' if count != 1 else '')
        print(f'\u2501\u2501\u2501 {name}  ({label}) \u2501\u2501\u2501')
        for h in hits[:max_hits_per_file]:
            print(h)
            print()
        if count > max_hits_per_file:
            print(f'    \u2026 and {count - max_hits_per_file} more match(es)\n')
    if len(results) > max_files:
        print(f'\u2026 and {len(results) - max_files} more file(s) with matches.')


print('\u2713 Core functions defined.')

## Build the Index

Run this cell **once** per session. It lists all files, downloads text
content in parallel, and builds the in-memory search index.

In [None]:
# ── Build Index ─────────────────────────────────────────────────────

folder_id = find_folder_id(FOLDER_NAME)
all_files = list_files(folder_id)
print(f'Found {len(all_files):,} files in /{FOLDER_NAME}/\n')

index = {}
stats = {'ok': 0, 'skip_mime': 0, 'skip_size': 0,
         'binary': 0, 'empty': 0, 'errors': 0}

t0 = time.time()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
    futures = {pool.submit(download, f): f for f in all_files}
    done_count = 0
    for fut in as_completed(futures):
        done_count += 1
        name, content, status = fut.result()
        if status == 'ok':
            index[name] = content
            stats['ok'] += 1
        elif status in stats:
            stats[status] += 1
        else:
            stats['errors'] += 1
        if done_count % 100 == 0 or done_count == len(all_files):
            print(f'\r  Progress: {done_count:,}/{len(all_files):,}', end='', flush=True)

elapsed = time.time() - t0
mem_mb  = sum(len(v) for v in index.values()) / 1_048_576

print(f'\n\n{"="*45}')
print(f'  Text files indexed : {stats["ok"]:,}')
print(f'  Skipped (MIME)     : {stats["skip_mime"]:,}')
print(f'  Skipped (size)     : {stats["skip_size"]:,}')
print(f'  Skipped (binary)   : {stats["binary"]:,}')
print(f'  Skipped (empty)    : {stats["empty"]:,}')
print(f'  Errors             : {stats["errors"]:,}')
print(f'  Index memory       : {mem_mb:.1f} MB')
print(f'  Elapsed            : {elapsed:.1f} s')
print(f'{"="*45}')

## Search

Edit `QUERY` below and re-run this cell as many times as you like.
The index stays in memory — searches are instant.

In [None]:
# ── Search ──────────────────────────────────────────────────────────

QUERY            = 'your search term here'   # <── EDIT THIS
CASE_INSENSITIVE = True
USE_REGEX        = False   # Set True for regex patterns (e.g. r'model.*temperature')

results = search(index, QUERY, CASE_INSENSITIVE, USE_REGEX)
print_results(results)

## Interactive Search (widget)

An interactive text box so you don't have to re-run a cell for each query.

In [None]:
# ── Interactive Widget Search ────────────────────────────────────────

query_box    = widgets.Text(placeholder='Enter search query\u2026',
                            layout=widgets.Layout(width='50%'))
case_toggle  = widgets.Checkbox(value=True,  description='Case insensitive')
regex_toggle = widgets.Checkbox(value=False, description='Regex')
search_btn   = widgets.Button(description='Search', button_style='primary')
out          = widgets.Output()

def _on_search(_=None):
    with out:
        clear_output()
        q = query_box.value.strip()
        if not q:
            print('Enter a search query.')
            return
        r = search(index, q, case_toggle.value, regex_toggle.value)
        print_results(r)

search_btn.on_click(_on_search)
query_box.on_submit(_on_search)

display(widgets.VBox([
    widgets.HBox([query_box, search_btn]),
    widgets.HBox([case_toggle, regex_toggle]),
    out
]))

## Export Results to CSV (optional)

Run a search first (the `results` variable from the manual search cell),
then execute this cell to download a CSV summary.

In [None]:
# ── Export to CSV ───────────────────────────────────────────────────

csv_path = '/content/search_results.csv'
with open(csv_path, 'w', newline='', encoding='utf-8') as f:
    w = csv.writer(f)
    w.writerow(['File', 'Matches', 'First match preview'])
    for name, count, hits in results:
        preview = hits[0].replace('\n', ' | ')[:300] if hits else ''
        w.writerow([name, count, preview])

from google.colab import files
files.download(csv_path)
print(f'Exported {len(results)} rows to {csv_path}')