# Get samples from folder with zip files

Purpose of this notebook is to extractt sample documents from a folder containing multiple zip files. Each zip file is expected to contain multiple documents. By default the documents are assumed to have .txt or .md extensions but those can be changed in the parameters section below.

## Folder directory

The folder with zip files can be present in local file system or in Google Drive. You will be prompted to provide the path to the folder


## Stratified sampling

You can choose to perform stratified sampling based on number of documents in each zip file. For example, if one zip file contains 100 documents and another only 10, then by default 10% of documents will be sampled from each zip file (i.e. 10 from the first zip and 1 from the second). If you disable stratified sampling, then the same number of documents will be sampled from each zip file regardless of how many documents each zip file contains.

## Minimum documents per zip file

You can also choose to set minimum number of documents that should be extracted from each zip file. For example, if you set this parameter to 5, then at least 5 documents will be extracted from each zip file. If a zip file contains less than 5 documents, then all documents from that zip file will be extracted.

This can be combined with stratified sampling to cover cases where stratified sampling alone would not extract enough documents from zip files with small number of documents.

## Output

You can provide the output folder where the sampled documents will be saved. By default, a folder named "sampled_documents" will be created in the current working directory. 

In [None]:
#@title 🛠️ Install dependencies
!pip install tqdm pandas ipywidgets --quiet


In [None]:
#@title 🔗 Mount Google Drive (optional)
from google.colab import drive
drive.mount('/content/drive')
print("✅ Google Drive mounted. You can now select folders under /content/drive/MyDrive/")


In [None]:
import zipfile
import random
from pathlib import Path
import pandas as pd
from tqdm import tqdm
from collections import defaultdict

def get_samples(
    src_folder,
    dst="temp",
    samples_n=10_000,
    stratified=True,
    include_ext=(".txt", ".md"),
    seed=None,
    exclude_zips_with_symbols=("ж", "д"),
    flatten=True,
    min_selection=0,
    progress=True,
    verbose=True
):
    src_folder = Path(src_folder)
    dst = Path(dst)
    dst.mkdir(parents=True, exist_ok=True)
    if seed is not None:
        random.seed(seed)

    zip_files = [
        f for f in src_folder.glob("*.zip")
        if not any(sym in f.name for sym in exclude_zips_with_symbols)
    ]
    if not zip_files:
        raise FileNotFoundError("No eligible .zip files found in source folder.")

    num_zips = len(zip_files)
    if min_selection * num_zips > samples_n:
        raise ValueError(f"Impossible constraint: min_selection={min_selection} requires at least "
                         f"{min_selection * num_zips} samples, but samples_n={samples_n}.")

    zip_file_counts = {}
    iterable = tqdm(zip_files, desc="Counting files") if progress else zip_files
    for zip_path in iterable:
        with zipfile.ZipFile(zip_path, "r") as zf:
            valid_files = [n for n in zf.namelist()
                           if n.lower().endswith(include_ext) and not n.endswith("/")]
            zip_file_counts[zip_path] = len(valid_files)

    total_files = sum(zip_file_counts.values())
    if total_files == 0:
        raise ValueError("No valid text files found in any zip.")

    # allocate sample counts
    sample_counts = {zp: min(min_selection, zip_file_counts[zp]) for zp in zip_file_counts}
    remaining_samples = samples_n - sum(sample_counts.values())

    if remaining_samples > 0:
        if stratified:
            proportions = {zp: c / total_files for zp, c in zip_file_counts.items()}
            extra_counts = {zp: round(remaining_samples * p) for zp, p in proportions.items()}
            diff = remaining_samples - sum(extra_counts.values())
            if diff != 0:
                fractional = {zp: (remaining_samples * proportions[zp]) % 1 for zp in zip_file_counts}
                sorted_zips = sorted(fractional.items(), key=lambda x: x[1], reverse=True)
                adjustment = 1 if diff > 0 else -1
                for zp, _ in sorted_zips:
                    if diff == 0:
                        break
                    if adjustment > 0 and extra_counts[zp] < zip_file_counts[zp] - sample_counts[zp]:
                        extra_counts[zp] += 1; diff -= 1
                    elif adjustment < 0 and extra_counts[zp] > 0:
                        extra_counts[zp] -= 1; diff += 1
        else:
            per_zip = remaining_samples // num_zips
            extra_counts = {zp: per_zip for zp in zip_file_counts}
            remainder = remaining_samples - per_zip * num_zips
            for zp in random.sample(list(zip_file_counts.keys()), remainder):
                extra_counts[zp] += 1
        for zp in zip_file_counts:
            sample_counts[zp] += extra_counts[zp]

    total_assigned = sum(sample_counts.values())
    assert total_assigned == samples_n, f"Allocation mismatch {total_assigned}/{samples_n}"

    # sampling
    sampled_by_zip, sampled_counts = defaultdict(list), {}
    iterable = tqdm(zip_files, desc="Sampling files") if progress else zip_files
    for zip_path in iterable:
        n_sample = sample_counts.get(zip_path, 0)
        with zipfile.ZipFile(zip_path, "r") as zf:
            valid_files = [n for n in zf.namelist()
                           if n.lower().endswith(include_ext) and not n.endswith("/")]
            if not valid_files:
                sampled_counts[zip_path] = 0; continue
            chosen = random.sample(valid_files, min(len(valid_files), n_sample))
            sampled_by_zip[zip_path] = chosen
            sampled_counts[zip_path] = len(chosen)

    total_sampled = sum(sampled_counts.values())
    assert total_sampled == samples_n, f"Sample mismatch {total_sampled}/{samples_n}"

    # let's get datetime stamp in YYYY_MM_DD_HH_MM format
    import datetime
    timestamp = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M")

    # write output
    output_zip_path = dst / f"sample_{samples_n}_{timestamp}.zip"
    iterable = tqdm(sampled_by_zip.items(), desc="Writing sampled zip") if progress else sampled_by_zip.items()
    with zipfile.ZipFile(output_zip_path, "w", compression=zipfile.ZIP_DEFLATED) as out_zip:
        for zip_path, file_list in iterable:
            if not file_list: continue
            with zipfile.ZipFile(zip_path, "r") as zf:
                for name in file_list:
                    with zf.open(name) as f:
                        if flatten:
                            out_name = f"{zip_path.stem}__{Path(name).name}"
                        else:
                            out_name = str(Path(zip_path.stem) / name)
                        out_zip.writestr(out_name, f.read())

    # summary
    summary_data = []
    for zp, total in zip_file_counts.items():
        sampled = sampled_counts.get(zp, 0)
        summary_data.append({
            "zip_name": zp.name,
            "total_files": total,
            "sampled_files": sampled,
            "proportion_in_corpus": total / total_files if total_files else 0,
            "proportion_in_sample": sampled / total_sampled if total_sampled else 0
        })
    summary_df = pd.DataFrame(summary_data).sort_values("total_files", ascending=False).reset_index(drop=True)
    summary_csv_path = dst / f"sample_{samples_n}_{timestamp}_summary.csv"
    summary_df.to_csv(summary_csv_path, index=False, encoding="utf-8")

    if verbose:
        print("\n📊 Sampling summary (top 10):")
        display_df = summary_df.head(10).copy()
        display_df["% corpus"] = (display_df["proportion_in_corpus"] * 100).round(2)
        display_df["% sample"] = (display_df["proportion_in_sample"] * 100).round(2)
        print(display_df[["zip_name","total_files","sampled_files","% corpus","% sample"]].to_string(index=False))
        if len(summary_df) > 10:
            print(f"... ({len(summary_df)-10} more zips not shown)")
        print(f"\nTotal sampled: {total_sampled} (requested {samples_n})\n")

    print(f"✅ Created: {output_zip_path}")
    print(f"📄 Summary: {summary_csv_path}")
    return output_zip_path, summary_df


In [None]:
#@title 🧩 Set Parameters & Run Sampling

import ipywidgets as widgets
from IPython.display import display, clear_output

# --- User Inputs ---
src_folder = widgets.Text(
    value="/content/drive/MyDrive/zips",
    description="📂 Source folder:",
    layout=widgets.Layout(width="600px")
)
dst_folder = widgets.Text(
    value="/content/drive/MyDrive/samples",
    description="💾 Output folder:",
    layout=widgets.Layout(width="600px")
)
samples_n = widgets.IntText(value=1_000, description="Samples:")
stratified = widgets.Checkbox(value=True, description="Stratified")
flatten = widgets.Checkbox(value=True, description="Flatten paths")
min_selection = widgets.IntText(value=1, description="Min documents/per ZIP:")
seed = widgets.IntText(value=2025, description="Random seed for reproducibility:")
include_ext = widgets.Text(value=".txt,.md", description="Include file extensions:")
exclude_symbols = widgets.Text(value="ж,д", description="Exclude zips with following strings in their names:")
verbose = widgets.Checkbox(value=True, description="Verbose output")

run_button = widgets.Button(description="▶️ Run Sampling", button_style="success")

out = widgets.Output()

def run_sampling(_):
    with out:
        clear_output()
        try:
            src = Path(src_folder.value.strip())
            dst = Path(dst_folder.value.strip())
            inc = tuple(e.strip() for e in include_ext.value.split(",") if e.strip())
            exc = tuple(e.strip() for e in exclude_symbols.value.split(",") if e.strip())
            sample_zip, summary = get_samples(
                src_folder=src,
                dst=dst,
                samples_n=samples_n.value,
                stratified=stratified.value,
                include_ext=inc,
                seed=seed.value,
                exclude_zips_with_symbols=exc,
                flatten=flatten.value,
                min_selection=min_selection.value,
                progress=True,
                verbose=verbose.value
            )
            print("\n✅ Sampling finished successfully.")
            print("Sample ZIP:", sample_zip)
            print("Summary saved in:", dst)
            display(summary.head(10))
        except Exception as e:
            print("❌ Error:", e)

run_button.on_click(run_sampling)

display(src_folder, dst_folder, samples_n, stratified, flatten,
         min_selection, seed, include_ext, exclude_symbols, verbose, run_button, out)
