In [1]:
import collections as col
import re
import datetime as dt
import pandas as pd
import pathlib as pl

ts = dt.datetime.now()
ts = ts.strftime("%Y%m%dT%H%M")

mount = pl.Path("/mounts/hilbert/project")
subfolder = pl.Path("projects/medbioinf/data/00_RESTRUCTURE")
data_folders = ["nanopore", "pacbio_hifi"]

cell_metadata = pl.Path("/home/ebertp/work/code/cubi/project-run-hgsvc-hybrid-assemblies/annotations/external")

hifi_cells = cell_metadata.glob("*hifi*.tsv")
ont_cells = cell_metadata.glob("*ont*.tsv")

clean_out = cell_metadata.parent.joinpath("hgsvc_cells.tsv")

def load_cell_metadata(fpath):
    
    if "jax" in fpath.name:
        source = "JAX"
    elif "uwash" in fpath.name:
        source = "UW"
    elif "umigs" in fpath.name:
        source = "UMIGS"
    else:
        raise ValueError(f"Unknown project {fpath.name}")
    if "hifi" in fpath.name:
        read_type = "HiFi"
    elif "ont" in fpath.name:
        read_type = "ONT"
    else:
        raise ValueError(f"Unknown read type: {fpath.name}")
    
    df = pd.read_csv(fpath, header=0, sep="\t")
    df.columns = [c.lower() for c in df.columns]
    df = df[["sample", "cell"]]
    df["sample"] = df["sample"].str.strip()
    df["cell"] = df["cell"].str.strip()
    df["sin"] = "SIN:" + df["sample"].str.extract("([0-9]+)")
    df["source"] = source
    df["read_type"] = read_type
    return df
    
hifi_cells = pd.concat(
    [load_cell_metadata(fp) for fp in hifi_cells],
    axis=0, ignore_index=False
)
hifi_exp_count = hifi_cells["sin"].value_counts().to_dict()
hifi_cells["exp_count"] = hifi_cells["sin"].replace(hifi_exp_count)

ont_cells = pd.concat(
    [load_cell_metadata(fp) for fp in ont_cells],
    axis=0, ignore_index=False
)
ont_exp_count = ont_cells["sin"].value_counts().to_dict()
ont_cells["exp_count"] = ont_cells["sin"].replace(ont_exp_count)

hifi_cells["HHU_complete"] = "no"
ont_cells["HHU_complete"] = "no"
hifi_cells.sort_values(["sin", "cell"], inplace=True)
hifi_cells.reset_index(drop=True, inplace=True)
ont_cells.sort_values(["sin", "cell"], inplace=True)
ont_cells.reset_index(drop=True, inplace=True)


def group_files_by_sample(fastq_files, all_known):
    
    sample_file_groups = col.defaultdict(list)
    for fq in fastq_files:
        matches = []
        for row in all_known.itertuples():
            if row.cell not in fq.name:
                continue
            matches.append((row.sample, row.cell, fq))
        if len(matches) > 1:
            for smp, cell, fp in matches:
                print(smp, ' - ', cell, ' - ', fp.name)
            raise ValueError("Multi-match")
        elif len(matches) == 0:
            continue
        else:
            sample, cell_id, file_path = matches[0]
            sample_file_groups[sample].append(file_path)
    return sample_file_groups


def find_matching(sample_files, known_subset):
    
    missing = []
    matched = 0
    for cell in known_subset["cell"].values:
        is_uniq = [cell in fn for fn in sample_files]
        is_uniq = sum(is_uniq)
        if is_uniq == 0:
            missing.append(cell)
        elif is_uniq == 1:
            matched += 1
        else:
            pprint_mmatch = "\n".join([sf.name for sf in sample_files])
            raise ValueError(f"Multi-match: {is_uniq} - {cell}", pprint_mmatch)

    if matched == 0:
        raise ValueError("No files matched ", sample_files, know_subset)
    if matched != len(sample_files):
        raise ValueError("Unidentified sample files ", sample_files, known_subset)
    if missing:
        raise ValueError("Missing sample files ", sample_files, known_subset)
    sample_names = known_subset["sample"].unique()
    assert sample_names.size == 1
    sample_name = sample_names[0]
    if sample_name.startswith("GM"):
        sample_name = sample_name.replace("GM", "NA")
    return sample_name, matched


data_types = {
    "nanopore": "ont",
    "pacbio_hifi": "hifi"
}

year = re.compile("20[0-9]{2}")
possible_years = ["2018", "2019", "2020", "2021", "2022", "2023"]

for data_folder in data_folders:
    sample_folder_listings = mount.joinpath(
        subfolder,
        "project-centric",
        "hgsvc",
        data_folder
    )
    cell_lut = hifi_cells if data_folder == "pacbio_hifi" else ont_cells
    assert sample_folder_listings.is_dir()
    for sample_folder_lst in sample_folder_listings.glob("**/sample-folder.lst"):
        with open(sample_folder_lst, "r") as listing:
            for line in listing:
                if not line.strip():
                    continue
                sample_folder = mount.joinpath(subfolder, line.strip())
                mobj = year.search(line)
                if mobj is None:
                    raise ValueError("no year ", line.strip())
                else:
                    ds_year = mobj.group(0)
                    assert ds_year in possible_years
                sample_num = "SIN:" + sample_folder.name[2:]
                if sample_num not in cell_lut["sin"].values:
                    unsorted_fastq = list(sample_folder.glob("**/*.fastq.gz"))
                    if not unsorted_fastq:
                        raise ValueError(sample_folder)
                    sample_file_groups = group_files_by_sample(
                        unsorted_fastq,
                        cell_lut
                    )
                    for sample, sample_files in sample_file_groups.items():
                        sample_num = "SIN:" + sample[2:]
                        fastq_names = [sf.name for sf in sample_files]
                        subset = cell_lut.loc[cell_lut["sin"] == sample_num, :]
                        if subset.shape[0] != len(fastq_names):
                            print("Missing files ", line.strip(), subset.shape[0], len(fastq_names))
                            continue
                        sample_name, matched_files = find_matching(fastq_names, subset)
                        # not raising = dataset complete
                        check_file = sample_folder.joinpath(
                            f"{sample_name}.{matched_files}-cells.verified"
                        )
                        #if not check_file.is_file():
                        if True:
                            with open(check_file, "w") as dump:
                                dump.write(f"# {ts}\n")
                                subset.to_csv(dump, sep="\t", header=True, index=False)
                            relpaths_fastq = sorted(
                                [f.relative_to(mount.joinpath(subfolder)) for f in sample_files]
                            )
                            relpaths_fastq = list(map(str, relpaths_fastq))
                            fofn_path = mount.joinpath(
                                subfolder, "sample-centric", sample_name,
                                f"{sample_name}_{data_types[data_folder]}_fastq.hgsvc-{ds_year}.fofn"
                            )
                            fofn_path.parent.mkdir(exist_ok=True, parents=True)
                            with open(fofn_path, "w") as fofn:
                                fofn.write("\n".join(relpaths_fastq) + "\n")
                        cell_lut.loc[subset.index, "HHU_complete"] = "yes"
                        
                else:
                    # easy case: data per sample in subfolder
                    fastq = list(sample_folder.glob("**/*.fastq.gz"))
                    if not fastq:
                        raise ValueError(sample_folder)
                    fastq_names = [fp.name for fp in fastq]
                    subset = cell_lut.loc[cell_lut["sin"] == sample_num, :]
                    if subset.shape[0] != len(fastq):
                        #print("Missing files ", line.strip(), subset.shape[0], len(fastq))
                        #print("Skipping")
                        continue
                    sample_name, matched_files = find_matching(fastq_names, subset)
                    # not raising = dataset complete
                    check_file = sample_folder.joinpath(
                        f"{sample_name}.{matched_files}-cells.verified"
                    )
                    if not check_file.is_file():
                        with open(check_file, "w") as dump:
                            dump.write(f"# {ts}\n")
                            subset.to_csv(dump, sep="\t", header=True, index=False)
                        relpaths_fastq = sorted([f.relative_to(mount.joinpath(subfolder)) for f in fastq])
                        relpaths_fastq = list(map(str, relpaths_fastq))
                        fofn_path = mount.joinpath(
                            subfolder, "sample-centric", sample_name,
                            f"{sample_name}_{data_types[data_folder]}_fastq.hgsvc-{ds_year}.fofn"
                        )
                        fofn_path.parent.mkdir(exist_ok=True, parents=True)
                        with open(fofn_path, "w") as fofn:
                            fofn.write("\n".join(relpaths_fastq) + "\n")
                    cell_lut.loc[subset.index, "HHU_complete"] = "yes"
                
merged = pd.concat([hifi_cells, ont_cells], axis=0, ignore_index=False)
merged.sort_values(["sin", "read_type", "source", "cell"], inplace=True)
merged.to_csv(clean_out, header=True, index=False, sep="\t")