# TSV Operations

> TSV file generation and manipulation functions

In [None]:
#| default_exp tsv

In [None]:
#| export

import os
import csv
import re
from pathlib import Path
from suomi.core import ffr
from suomi.xlate import xtexts

In [None]:
#| export
def texts2tsv(
    texts: list[str],  # List of Finnish texts to translate
    output_path: str,  # Output TSV file path (e.g., "tsvs/06_Ruoka.tsv")
    tags: str | list[str] = "lang/fi"  # Tags (string or list), lang/fi auto-included
) -> None:
    """
    Main function: translate Finnish words and create TSV file.
    Example:
        >>> texts = ["omena", "banaani", "Minä syön omenaa"]
        >>> texts2tsv(texts, "tsvs/06_Ruoka.tsv", tags="src/daily")
        >>> texts2tsv(texts, "tsvs/06_Ruoka.tsv", tags=["src/class", "level/A1"])
    """
    # Process tags
    if isinstance(tags, str):
        tag_list = [t.strip() for t in tags.split(",")]
    else:
        tag_list = list(tags)
    
    # Always ensure lang/fi is included
    if "lang/fi" not in tag_list:
        tag_list.insert(0, "lang/fi")
    
    # Convert to comma-separated string for TSV
    tag_string = ",".join(tag_list)
    
    xsls = xtexts(texts)
    out = Path(output_path)
    out.parent.mkdir(parents=True, exist_ok=True)   
    with open(out, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(
            f,
            fieldnames=["Finnish", "English", "Japanese", "mp3_path", "img_path", "tags"],
            delimiter="\t"
        )
        writer.writeheader()
        for x in xsls:
            x["tags"] = tag_string  # Add tags to each row
            writer.writerow(x)

In [None]:
#| export
def cattsv(
    fname: str,  # Path to TSV file
    skip_empty: bool = False  # Skip empty rows
) -> tuple[list[dict], list[str], int]:
    """Read TSV file and return (rows, fieldnames, num_entries) tuple.
    
    Caller can unpack as needed:
        rows = cattsv(fname)[0]  # Just rows
        rows, fields, num = cattsv(fname)  # All metadata
    """
    rows = []   
    with open(fname, "r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f, delimiter="\t")
        fields = list(reader.fieldnames) if reader.fieldnames else []
        
        for row in reader:
            if skip_empty and not any(row.values()):
                continue
            rows.append(row)
    
    return rows, fields, len(rows)

## Example Usage

In [None]:
#| eval: false
# Example 2: Mix of words and phrases
from suomi.xlate import xtexts
finnish_texts = ["Minulla on päänsärky.", "Satuttaako sinua?"]
fname = "tsvs/07_Test.tsv"
texts2tsv(finnish_texts, fname)
cattsv(fname)[0]  # Get just the rows

[{'Finnish': 'Minulla on päänsärky.',
  'English': 'I have a headache.',
  'Japanese': '頭が痛いです。(あたまがいたいです。)',
  'mp3_path': '',
  'img_path': '',
  'tags': 'lang/fi'},
 {'Finnish': 'Satuttaako sinua?',
  'English': 'Does it hurt you?',
  'Japanese': '痛いですか？(いたいですか？)',
  'mp3_path': '',
  'img_path': '',
  'tags': 'lang/fi'}]

In [None]:
#| exporti
def _assign_files_by_row(
    all_files: list[str],  # All files found matching stem prefix
    stem: str,             # File stem (TSV filename without extension)
    extensions: list[str], # List of extensions to process (e.g., [".mp3"] or [".png", ".jpg"])
    num_rows: int,         # Number of rows in TSV (determines result list length)
    use_common: bool = False # Whether to use common files ({stem}.ext) as fallback
) -> list[str]:            # List of file paths, one per row (empty string if not found)
    """Assign files to rows based on naming conventions.
    
    Returns a list of length num_rows where index i corresponds to row i:
    - First tries {stem}_NN.ext (row-specific)
    - If use_common=True and not found, tries {stem}.ext (common)
    - If still not found, returns empty string
    
    For multiple extensions (e.g., [".png", ".jpg"]), uses first found in order.
    """
    import re
    
    # Build extension pattern
    ext_pat = '|'.join(re.escape(ext.lstrip('.')) for ext in extensions)
    row_pat = re.compile(rf"^{re.escape(stem)}_(\\d{{2}})\\.({ext_pat})$", re.IGNORECASE)
    common_pat = re.compile(rf"^{re.escape(stem)}\\.({ext_pat})$", re.IGNORECASE)
    
    # Classify files
    by_row = {}  # {row_idx: {ext: path}}
    common = {}  # {ext: path}
    
    for path in all_files:
        name = Path(path).name
        m = row_pat.match(name)
        if m:
            idx, ext = int(m.group(1)), "." + m.group(2).lower()
            by_row.setdefault(idx, {})[ext] = path
        else:
            m = common_pat.match(name)
            if m:
                common["." + m.group(1).lower()] = path
    
    # Assign files to rows (respecting extension priority)
    result = []
    for i in range(num_rows):
        path = ""
        # First try row-specific files
        if i in by_row:
            for ext in extensions:
                if ext in by_row[i]:
                    path = by_row[i][ext]
                    break
        # If not found, try common files
        if not path:
            for ext in extensions:
                if ext in common:
                    path = common[ext]
                    break
        result.append(path)
    
    return result

In [None]:
#| export
def update_tsv_media_paths(
    tsv: str,               # Path to TSV file
    dirs: list[str] = ["audio", "images"], # Directories to search for media files
) -> None:
    """Update TSV file with mp3 and image paths based on file name conventions.
    
    Searches for media files matching the TSV filename stem and assigns them to rows.
    Only non-empty rows are counted as entries (empty rows are skipped).
    
    File naming rules (where {stem} = TSV filename without extension, NN = 2-digit row index):
    
    MP3 files (row-specific only):
    - Pattern: {stem}_NN.mp3 (e.g., "05_Keho_00.mp3" for row 0)
    - Priority: {stem}_NN.mp3 → "" (common files not used for MP3)
    - Searches in: dirs (recursively)
    
    Image files (row-specific + common fallback):
    - Row-specific: {stem}_NN.{png,jpg} (e.g., "05_Keho_00.png" for row 0)
       - If exists, use this (common files are ignored)
    - Common: {stem}.{png,jpg} (e.g., "05_Keho.png" for all rows)
       - Only used if row-specific file not found
    - Priority: {stem}_NN.png > {stem}_NN.jpg > {stem}.png > {stem}.jpg > ""
    - Searches in: dirs (recursively)
    
    Examples:
        >>> update_tsv_media_paths("tsvs/05_Keho.tsv")
        # For row 0:
        # - MP3: "05_Keho_00.mp3" if exists, else ""
        # - Image: "05_Keho_00.png" if exists, else "05_Keho_00.jpg" if exists,
        #          else "05_Keho.png" if exists, else "05_Keho.jpg" if exists, else ""
    """
    tsv_path = Path(tsv)
    stem = tsv_path.stem
    all_files = ffr(dirs, [".mp3", ".png", ".jpg"], prefix=stem)
    
    # Read TSV to get rows, fieldnames, and entry count (skipping empty rows)
    rows, fields, num_entries = cattsv(tsv, skip_empty=True)
    
    # Assign files using common helper function (based on actual entry count)
    # MP3: row-specific only (common files won't match .mp3 pattern in practice)
    mp3_files = _assign_files_by_row(all_files, stem, [".mp3"], num_entries)
    
    # Images: row-specific + common fallback, png > jpg priority
    img_files = _assign_files_by_row(all_files, stem, [".png", ".jpg"], num_entries)
    
    # Update rows with file paths (using enumerate to match entry indices)
    for i, row in enumerate(rows):
        row['mp3_path'] = mp3_files[i]
        row['img_path'] = img_files[i]
    
    # Write updated TSV
    with open(tsv, "w", encoding="utf-8", newline="") as f:
        w = csv.DictWriter(f, fieldnames=fields, delimiter="\t")
        w.writeheader()
        w.writerows(rows)

## Tests

**Note on API Integration Tests:**

Full API integration tests require:
- `OPENAI_API_KEY` environment variable set
- Network access
- Cost (actual API calls)

For CI/CD environments, consider:
- Mocking the OpenAI API calls
- Using `#| eval: false` to skip expensive tests
- Separate integration test suite

In [None]:
#| test
# Test: texts2tsv creates correct TSV structure with tags
import tempfile
from pathlib import Path

with tempfile.TemporaryDirectory() as tmpdir:
    out = f"{tmpdir}/test.tsv"
    # Mock xtexts to avoid API call
    import suomi.tsv
    original_xtexts = suomi.tsv.xtexts
    suomi.tsv.xtexts = lambda texts: [
        {"Finnish": "kissa", "English": "cat", "Japanese": "猫"},
        {"Finnish": "koira", "English": "dog", "Japanese": "犬"}
    ]

    texts2tsv(["kissa", "koira"], out, tags="src/daily")

    # Verify file exists
    assert Path(out).exists()

    # Verify content
    with open(out, encoding="utf-8") as f:
        reader = csv.DictReader(f, delimiter="\t")
        rows = list(reader)

        assert len(rows) == 2
        assert rows[0]["Finnish"] == "kissa"
        assert rows[0]["English"] == "cat"
        assert rows[0]["tags"] == "lang/fi,src/daily"
        assert "mp3_path" in rows[0]
        assert "img_path" in rows[0]

    # Restore
    suomi.tsv.xtexts = original_xtexts

print("✓ texts2tsv TSV output test passed")

✓ texts2tsv TSV output test passed


In [None]:
#| eval: false
# Test: update_tsv_media_paths with custom directories
# NOTE: This test works when run directly but fails in nbdev_test due to import issues
import tempfile
from pathlib import Path

with tempfile.TemporaryDirectory() as tmpdir:
    tsv_path = f"{tmpdir}/test.tsv"
    audio_dir = Path(tmpdir) / "custom" / "audio"
    audio_dir.mkdir(parents=True, exist_ok=True)
    
    # Create test MP3 file
    (audio_dir / "test_00.mp3").touch()

    # Create test TSV
    with open(tsv_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(
            f, fieldnames=["Finnish", "English", "Japanese", "mp3_path", "img_path", "tags"],
            delimiter="\t"
        )
        writer.writeheader()
        writer.writerow({
            "Finnish": "kissa",
            "English": "cat",
            "Japanese": "猫",
            "mp3_path": "",
            "img_path": "",
            "tags": "lang/fi"
        })

    # Update paths with custom directories
    update_tsv_media_paths(tsv_path, dirs=[str(audio_dir), "images"])

    # Verify paths
    with open(tsv_path, encoding="utf-8") as f:
        rows = list(csv.DictReader(f, delimiter="\t"))
        assert rows[0]["mp3_path"] == str(audio_dir / "test_00.mp3")
        assert rows[0]["img_path"] == ""

print("✓ update_tsv_media_paths custom directories test passed")

AssertionError: 

In [None]:
#| test
# Test: update_tsv_media_paths with empty TSV
with tempfile.TemporaryDirectory() as tmpdir:
    tsv_path = f"{tmpdir}/empty.tsv"

    # Create empty TSV with header only
    with open(tsv_path, "w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(
            f, fieldnames=["Finnish", "English", "Japanese", "mp3_path", "img_path", "tags"],
            delimiter="\t"
        )
        writer.writeheader()

    # Should not crash
    update_tsv_media_paths(tsv_path, dirs=["audio", "images"])

    # Verify still has header
    with open(tsv_path, encoding="utf-8") as f:
        reader = csv.DictReader(f, delimiter="\t")
        assert reader.fieldnames == ["Finnish", "English", "Japanese", "mp3_path", "img_path", "tags"]
        assert len(list(reader)) == 0

print("✓ update_tsv_media_paths empty TSV test passed")

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()