In [1]:
import pandas as pd 
import numpy as np
import os

In [2]:
import os
import io
import tempfile
import shutil
from typing import Optional, Dict, List, Tuple
import numpy as np
import pandas as pd
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor, as_completed


def _sample_file_to_temp(
    in_path: str,
    frac: float,
    rename_cols: Optional[Dict[str, str]],
    seed: int,
    tmpdir: str,
    read_csv_kwargs: Optional[Dict] = None,
) -> Tuple[str, int]:
    """
    Worker: read one CSV, Bernoulli-sample rows, write to a temporary CSV (with header),
    and return (temp_path, n_rows_written). If no rows selected, returns ("", 0).
    """
    if read_csv_kwargs is None:
        read_csv_kwargs = {}

    rng = np.random.default_rng(seed)
    df = pd.read_csv(in_path, **read_csv_kwargs)
    if rename_cols:
        df = df.rename(columns=rename_cols)

    if len(df) == 0:
        return ("", 0)

    mask = rng.random(len(df)) < frac
    sampled = df.loc[mask]
    if sampled.empty:
        return ("", 0)

    fd, tmp_path = tempfile.mkstemp(suffix=".csv", dir=tmpdir)
    os.close(fd)  # we'll open it via pandas
    sampled.to_csv(tmp_path, index=False)  # writes header + data
    return (tmp_path, len(sampled))


def _append_file_skipping_header(src_path: str, dst_fh):
    """
    Append `src_path` CSV to already-open destination file handle `dst_fh`,
    skipping the first line (header) in src.
    """
    with open(src_path, "rb") as sf:
        # Skip first line (header)
        _ = sf.readline()
        # Stream copy the rest
        shutil.copyfileobj(sf, dst_fh)


import fnmatch

def sample_from_csv_folder_parallel(
    folder: str,
    file_patterns: List[str] = ["SPLUS-s*.csv"],  # ← now a list
    frac: float = 1,
    rename_cols: Optional[Dict[str, str]] = None,
    random_state: int = 42,
    max_workers: int = 4,
    out_csv: str = "sampled.csv",
    read_csv_kwargs: Optional[Dict] = None,
) -> str:
    """
    Truly parallel sampling across CSV files:
      - file_patterns can be a list of glob-style patterns (e.g., ["SPLUS-*.csv", "Gaia-*.csv"])
      - Only matching files are included.
    """
    all_files = [os.path.join(folder, f) for f in os.listdir(folder) if f.endswith(".csv")]

    files = []
    for f in all_files:
        base = os.path.basename(f)
        if any(fnmatch.fnmatch(base, pat) for pat in file_patterns):
            files.append(f)

    if not files:
        open(out_csv, "w").close()
        return out_csv

    if os.path.exists(out_csv):
        os.remove(out_csv)

    tmpdir = tempfile.mkdtemp(prefix="csv_parts_")
    non_empty_parts: List[Tuple[str, int]] = []

    try:
        with ProcessPoolExecutor(max_workers=max_workers) as ex:
            futures = {
                ex.submit(
                    _sample_file_to_temp,
                    path,
                    frac,
                    rename_cols,
                    random_state + i,
                    tmpdir,
                    read_csv_kwargs,
                ): path
                for i, path in enumerate(files)
            }

            for fut in tqdm(as_completed(futures), total=len(futures), desc="Sampling (parallel)"):
                tmp_path, n_rows = fut.result()
                if n_rows > 0 and tmp_path:
                    non_empty_parts.append((tmp_path, n_rows))

        if not non_empty_parts:
            open(out_csv, "w").close()
            return out_csv

        # Merge
        first_part = non_empty_parts[0][0]
        with open(out_csv, "wb") as out_fh:
            with open(first_part, "rb") as fp:
                shutil.copyfileobj(fp, out_fh)
            for part_path, _ in non_empty_parts[1:]:
                _append_file_skipping_header(part_path, out_fh)

    finally:
        shutil.rmtree(tmpdir, ignore_errors=True)

    return out_csv

In [7]:
df = sample_from_csv_folder_parallel(
    '../data/field_catalogs/matches', 
    file_pattern=["SPLUS-s*.csv"],
    frac=1, 
    max_workers=8,
    rename_cols={'gaia_parallax': 'parallax'},
    out_csv='../data/oficial/SPLUS-s.csv',
)

Sampling (parallel): 100%|██████████| 1212/1212 [03:39<00:00,  5.51it/s]


In [3]:
df = sample_from_csv_folder_parallel(
    '../data/field_catalogs/matches', 
    file_patterns=["SPLUS-n*", "HYDRA*", "STRIPE*"],
    frac=1, 
    max_workers=8,
    rename_cols={'gaia_parallax': 'parallax'},
    out_csv='../data/oficial/SPLUS-n_HYDRA_STRIPE.csv',
)

  df = pd.read_csv(in_path, **read_csv_kwargs)
Sampling (parallel): 100%|██████████| 1182/1182 [06:41<00:00,  2.94it/s]


In [5]:
df = sample_from_csv_folder_parallel(
    '../data/field_catalogs/splus_gaia_crowded/matches', 
    file_patterns=["MC*"],
    frac=1, 
    max_workers=8,
    rename_cols={'gaia_parallax': 'parallax'},
    out_csv='../data/oficial/MC.csv',
)

Sampling (parallel): 100%|██████████| 146/146 [06:24<00:00,  2.64s/it]


In [4]:
df = sample_from_csv_folder_parallel(
    '../data/field_catalogs/splus_gaia_crowded/matches', 
    file_patterns=["SPLUS-b*", "SPLUS-d*"],
    frac=1,
    max_workers=8,
    rename_cols={'gaia_parallax': 'parallax'},
    out_csv='/mnt/hdcasa/splus_gaia/oficial/SPLUS-b_SPLUS-d.csv',
)

Sampling (parallel): 100%|██████████| 460/460 [25:29<00:00,  3.32s/it]
