In [None]:
import re

from datetime import date, datetime
from pathlib import Path

import pandas as pd
import numpy as np

import janitor

## A Failing Test

In [None]:
MERGE_TEST_CASES = {
    # auto merge, standard
    "2020-06-18T160015_ILWGS_TMGCUC_NWD190874_256256_2-FLOWCELL-H2JV7DSXY-H7NFJDSXY-H7NHMDSXY": (
        "TMGCUC",
        "NWD190874",
    ),
    # special
    "NWD282009-LIB-ILWGS_TMGCUC_NWD282009_242134_2-ILWGS_TMGCUC_NWD282009_242134_3": (
        "TMGCUC",
        "NWD282009",
    ),
    # special
    "NWD718492-LIB-ILWGS_TMGCUC_NWD718492_242146_2": (
        "TMGCUC",
        "NWD718492",
    ),
    # special
    "TMGCUC.NWD438629-1_2AMP-FLOWCELL-HNL7HDSXX-HWHLTDSXX-HMWGFDSXX": (
        "TMGCUC",
        "NWD438629",
    ),
    # Control, standard
    "2020-02-19T080013_ILWGS_Legacy_NWD628172_246024_2-FLOWCELL-H2CFFDSXY-H2J2VDSXY": (
        "Legacy",
        "NWD628172",
    ),
    # Control, standard
    "2019-06-09T083135_ILWGS_TMCONT_NWD687974_238352_1-FLOWCELL-HKT7JDSXX-HL2JKDSXX": (
        "TMCONT",
        "NWD687974",
    ),
    # Control, special
    "TMCONT.NWD218413-1_1AMP-FLOWCELL-HF7CFCCXY-HCW7LCCXY-HFFLFCCXY-HFGK2CCXY": (
        "TMCONT",
        "NWD218413",
    ),
    "": (None, None),
}

MERGE_TEST_CASES

In [None]:
def test_parser(parser):
    for merge_name in MERGE_TEST_CASES:
        result = parser(merge_name)
        if result == MERGE_TEST_CASES[merge_name]:
            print(f"PASS: {merge_name!r} -> {result!r}")
        else:
            print(f"FAIL: {merge_name!r} -> {result!r}")  

In [None]:
try:
    test_parser(get_cohort_and_sample_id)
except Exception as e:
    print(e)

## Patterns

In [None]:
merge_1 = "2020-06-18T160015_ILWGS_TMGCUC_NWD190874_256256_2-FLOWCELL-H2JV7DSXY-H7NFJDSXY-H7NHMDSXY"
merge_2 = "NWD282009-LIB-ILWGS_TMGCUC_NWD282009_242134_2-ILWGS_TMGCUC_NWD282009_242134_3"
merge_3 = "NWD718492-LIB-ILWGS_TMGCUC_NWD718492_242146_2"
merge_4 = "TMGCUC.NWD438629-1_2AMP-FLOWCELL-HNL7HDSXX-HWHLTDSXX-HMWGFDSXX"
merge_5 = "2020-02-19T080013_ILWGS_Legacy_NWD628172_246024_2-FLOWCELL-H2CFFDSXY-H2J2VDSXY"

In [None]:
merge_bad = "hello-there-world_TMGCUC_NWD190874_absolutely-anything-can-go-here1233"

In [None]:
TIMESTAMP_PAT = r"\d{4}-\d\d-\d\dT\d{4,6}"
PROJECT_PAT = r"Legacy|TM[A-Z]{4}"
SAMPLE_PAT = r"NWD\d{6}"
SEQUENCE_PAT = r"\d{6}_\d"
FLOWCELL_PAT = r"[A-Z0-9]{9}"
LIB_PAT = fr"(IL[A-Z]{{3}})_({PROJECT_PAT})_({SAMPLE_PAT})_{SEQUENCE_PAT}"

MERGE_PAT_STANDARD = re.compile(
    f"(?:{TIMESTAMP_PAT})_{LIB_PAT}-FLOWCELL(?:(?:-{FLOWCELL_PAT})+)$"
)
MERGE_PAT_SPECIAL = re.compile(f"({SAMPLE_PAT})-LIB((?:-{LIB_PAT})+)")
MERGE_PAT_NEW = re.compile(
    fr"({PROJECT_PAT})\.({SAMPLE_PAT})-\d_\dAMP-FLOWCELL(?:(?:-{FLOWCELL_PAT})+)$"
)

In [None]:
m = MERGE_PAT_NEW.match(merge_4)
m

In [None]:
m.groups()

In [None]:
m = MERGE_PAT_STANDARD.match(merge_5)
m

In [None]:
m.groups()

In [None]:
m = MERGE_PAT_SPECIAL.match(merge_2)

In [None]:
sample, libs_string = m.groups()[:2]
sample, libs_string

In [None]:
lib_strings = libs_string.split("-")[1:]
lib_strings

In [None]:
decoded = [re.match(LIB_PAT, l).groups() for l in lib_strings]
decoded

In [None]:
samples = [sample] + [d[2] for d in decoded]

In [None]:
set(samples)

In [None]:
library_types = set(d[0] for d in decoded)

In [None]:
library_types

In [None]:
projects = set(d[1] for d in decoded)

In [None]:
projects

In [None]:
import logging

In [None]:
logger = logging.getLogger("pm_utils")

In [None]:
logger.error("ugh")

## Implementation

In [None]:
def standard_merge_rule(merge_name: str) -> tuple:
    m = MERGE_PAT_STANDARD.match(merge_name)
    if not m:
        return None, None
    _, project, sample = m.groups()
    return project, sample


def special_merge_rule(merge_name: str) -> tuple:
    m = MERGE_PAT_SPECIAL.match(merge_name)
    if not m:
        return None, None
    sample, libs_string = m.groups()[:2]
    lib_strings = libs_string.split("-")[1:]
    decoded = [re.match(LIB_PAT, l).groups() for l in lib_strings]
    library_types = set(d[0] for d in decoded)
    projects = set(d[1] for d in decoded)
    samples = set([sample] + [d[2] for d in decoded])
    errors = False
    if len(library_types) > 1:
        logger.error(f"inconsistent library types {library_types} in {merge_name}")
        errors = True
    if len(projects) > 1:
        logger.error(f"inconsistent projects {projects} in {merge_name}")
        errors = True
    if len(samples) > 1:
        logger.error(f"inconsistent samples {samples} in {merge_name}")
        errors = True
    if errors:
        return None, None
    return projects.pop(), samples.pop()


def new_merge_rule(merge_name: str) -> tuple:
    m = MERGE_PAT_NEW.match(merge_name)
    if not m:
        return None, None
    project, sample = m.groups()
    return project, sample


RULES = [standard_merge_rule, special_merge_rule, new_merge_rule]


def decode_merge_name(merge_name: str) -> tuple:
    for rule in RULES:
        project, sample = rule(merge_name)
        if project:
            return project, sample
    logger.error(f"Cannot decode merge name '{merge_name}'")
    return None, None

In [None]:
decode_merge_name(merge_1)

In [None]:
decode_merge_name(merge_2)

In [None]:
decode_merge_name(merge_3)

In [None]:
decode_merge_name(merge_4)

In [None]:
decode_merge_name(merge_5)

In [None]:
decode_merge_name(merge_bad)

In [None]:
decode_merge_name("NWD282009-LIB-ILWGS_TMGCUC_NWD282009_242134_2-ILWES_TMGCUC_NWD282009_242134_3")

In [None]:
test_parser(decode_merge_name)