In [17]:
import pandas as pd
from datetime import datetime

In [40]:
class FileLoader:
    def load(self, filepath) -> pd.DataFrame:
        if filepath.endswith(".tsv"):
            return pd.read_csv(filepath, sep="\t")
        elif filepath.endswith(".json"):
            return pd.read_json(filepath)
        else:
            raise ValueError("Unsupported format")


### Column Mapping

In [46]:
essential_fields = {
    "plasmid_variant_index": {"type": str, "nullable": False},
    "parent_plasmid_variant": {"type": str, "nullable": False},

    "directed_evolution_generation": {"type": int, "nullable": False},

    "assembled_dna_sequence": {"type": str, "nullable": False},

    "dna_quantification_fg": {"type": float, "nullable": False},
    "protein_quantification_pg": {"type": float, "nullable": False},

    "is_control": {"type": bool, "nullable": False},
}



In [42]:
col_synonyms = {

    "plasmid_variant_index": ["plasmid_variant_index", "variant_index", "plasmid_id"],
    "parent_plasmid_variant": ["parent_variant", "parent_plasmid", "parent_id"],

    "directed_evolution_generation": ["generation", "evolution_generation", "evo_gen"],

    "assembled_dna_sequence": ["dna_sequence", "sequence", "assembled_sequence"],

    "dna_quantification_fg": ["dna_concentration_fg", "dna_qty_fg", "dna_fg"],
    "protein_quantification_pg": ["protein_concentration_pg", "protein_qty_pg", "protein_pg"],

    "is_control": ["control", "is_control", "control_sample"]
}

def clean_cols(col: str) -> str:
    return col.strip().lower().replace(" ", "_").replace("-", "_")

def build_synonym_map(col_synonyms): #Reverse synonym lookup
    synonym_map = {}
    for synonym, variants in col_synonyms.items():
        for v in variants:
            synonym_map[clean_cols(v)] = synonym
    return synonym_map

def validate_mapping(mapping): #Prevents uplicate assignments. Remove if confirmation can be require in fronten. 
    reverse = {}
    for raw, field in mapping.items():
        if field in reverse:
            raise ValueError(
                f"Multiple columns mapped to '{field}': "
                f"{reverse[field]} and {raw}"
            )
        reverse[field] = raw

In [79]:
def confirm_mapping_bulk(mapping):

    while True:
        print("\nProposed Column Mapping:")
        for raw, canon in mapping.items():
            print(f"{raw} → {canon}")

        response = input("\nPress Enter to accept, or type 'edit' to modify: ").strip().lower()
        if response == "":
            break  # User accepted mapping

        if response == "edit":
            edits = input(
                "Enter edits as raw:target,raw2:target2,... : "
            ).strip()
            for pair in edits.split(","):
                if ":" not in pair:
                    print(f"Skipping invalid entry '{pair}'")
                    continue
                raw_col, target_field = pair.split(":", 1)
                raw_col = raw_col.strip()
                target_field = target_field.strip()
                if raw_col not in mapping:
                    print(f"Column '{raw_col}' not in proposed mapping. Skipping.")
                    continue
                mapping[raw_col] = target_field
            # After edits, loop prints updated mapping automatically
        else:
            print("Invalid input. Press Enter to accept or type 'edit' to modify.")

    return mapping




In [81]:
class ColumnMapper:
    def __init__(self, essential_fields, col_synonyms):
        self.essential_fields = list(essential_fields.keys())
        self.synonym_map = build_synonym_map(col_synonyms)

    def auto_map_by_synonym(self, columns):
        mapping = {}
        used_cols = set()

        for col in columns:
            if col in self.synonym_map:
                official = self.synonym_map[col]
                if official not in mapping.values():  # avoid duplicates
                    mapping[col] = official
                    used_cols.add(col)

        return mapping, used_cols

    def left_to_right_assign(self, columns, used_cols, existing_mapping): #If NOT already mapped

        remaining_cols = [c for c in columns if c not in used_cols]
        already_assigned_fields = set(existing_mapping.values())
        remaining_fields = [f for f in self.essential_fields if f not in already_assigned_fields]

        for col, field in zip(remaining_cols, remaining_fields):
            existing_mapping[col] = field

        return existing_mapping

    def generate_mapping(self, df_columns):
        # Track original ↔ cleaned names
        original_to_clean = {c: clean_cols(c) for c in df_columns}
        clean_to_original = {v: k for k, v in original_to_clean.items()}

        cleaned_cols = list(clean_to_original.keys())

        #Synonym mapping
        mapping, used = self.auto_map_by_synonym(cleaned_cols)

        #Left-to-right
        mapping = self.left_to_right_assign(cleaned_cols, used, mapping)

        #Validate before user sees it
        validate_mapping(mapping)

        mapped_fields = set(mapping.values())
        missing_fields = [f for f in self.essential_fields if f not in mapped_fields]

        if missing_fields:
            print("\n⚠️ Missing essential fields (not found in file):", missing_fields)


        #User confirmation - do in front en later?
        mapping = confirm_mapping_bulk(mapping)

        # Convert cleaned names back to original DataFrame column names
        final_mapping = {clean_to_original[k]: v for k, v in mapping.items()}
        return final_mapping

### QC / Validation

In [99]:

def coerce_types(df, essential_fields):
    df = df.copy()
    
    for col, dtype in essential_fields.items():
        if col not in df.columns:
            continue

        # Handle numeric types safely
        if dtype in [int, float]:
            df[col] = pd.to_numeric(df[col], errors="coerce")
        # Strings
        elif dtype == str:
            df[col] = df[col].astype(str)
        # Boolean / control fields
        elif dtype == bool:
            df[col] = df[col].map(lambda x: bool(x) if pd.notna(x) else pd.NA)
        else:
            # fallback: keep original
            df[col] = df[col]

    return df


class QCValidator:
    def __init__(self, essential_fields):
        self.essential_fields = essential_fields

    def validate(self, df):
        df = df.copy()

        error_records = {}   # row_index → error string
        valid_mask = []      # True = valid row, False = rejected

        for idx, row in df.iterrows():
            errors = []

            # --- Required field presence ---
            for field in self.essential_fields:
                if pd.isna(row.get(field)):
                    errors.append(f"Missing value for {field}")

            # --- Logical / biological rules (NA-safe) ---
            gen = row.get("directed_evolution_generation")
            if pd.notna(gen) and gen < 0:
                errors.append("Generation cannot be negative")

            dna_q = row.get("dna_quantification_fg")
            if pd.notna(dna_q) and dna_q < 0:
                errors.append("DNA quantification cannot be negative")

            prot_q = row.get("protein_quantification_pg")
            if pd.notna(prot_q) and prot_q < 0:
                errors.append("Protein quantification cannot be negative")

            seq = row.get("assembled_dna_sequence")
            if pd.notna(seq):
                seq = str(seq).upper()
                if not set(seq).issubset({"A", "T", "C", "G", "N", "R", "Y"}): #Included ambiguity codes for now
                    errors.append("DNA sequence contains invalid characters")
            

            # --- Record result ---
            if errors:
                error_records[idx] = "; ".join(errors)
                valid_mask.append(False)
            else:
                valid_mask.append(True)

        # Convert mask to pandas Series aligned with df index
        mask_series = pd.Series(valid_mask, index=df.index)

        valid_df = df[mask_series].copy()
        rejected_df = df[~mask_series].copy()

        if not rejected_df.empty:
            rejected_df["qc_error_reason"] = rejected_df.index.map(error_records)

        return valid_df, rejected_df
    
def summarize_qc_errors(rejected_df):
    
    if rejected_df.empty:
        print("No QC errors!")
        return

    print("\nQC Summary:")

    # Iterate over unique error reasons
    error_groups = rejected_df.groupby("qc_error_reason")

    for reason, group in error_groups:
        # Convert index to 1-based row numbers
        rows = (group.index + 1).tolist()
        count = len(rows)
        print(f"- {reason} (Count: {count}, Rows: {rows})")




### Pipeline

In [104]:
file_path = "c://Users//Leora//OneDrive - Queen Mary, University of London//Group_Project//Example_Data//DE_BSU_Pol_Batch_1.tsv"

loader = FileLoader()
df = loader.load(file_path)



print(f"Loaded {len(df)} rows")
print("Original columns:", df.columns.tolist())

mapper = ColumnMapper(essential_fields, col_synonyms)
column_mapping = mapper.generate_mapping(df.columns)

df = df.rename(columns=column_mapping)

# Ensure all essential columns exist
for col in essential_fields:
    if col not in df.columns:
        missing_fields = [f for f in essential_fields if f not in df.columns]
        raise ValueError(
        f"⚠️ Missing essential fields in file: {missing_fields}. "
        "Ingestion cannot continue until these columns are present."
    )


df = coerce_types(df, essential_fields)
#print(df.dtypes)
#print(df.head())

validator = QCValidator(essential_fields)
valid_df, rejected_df = validator.validate(df)

print("Valid rows:", len(valid_df))
print("Rejected rows:", len(rejected_df))

if len(rejected_df) > 0:
    summarize_qc_errors(rejected_df)
    raise ValueError("QC FAILED — Fix file before ingestion.")


input("Press Enter to continue.")


Loaded 301 rows
Original columns: ['Plasmid_Variant_Index', 'Parent_Plasmid_Variant', 'Directed_Evolution_Generation', 'Assembled_DNA_Sequence', 'DNA_Quantification_fg', 'Protein_Quantification_pg', 'Control']

Proposed Column Mapping:
plasmid_variant_index → plasmid_variant_index
control → is_control
parent_plasmid_variant → parent_plasmid_variant
directed_evolution_generation → directed_evolution_generation
assembled_dna_sequence → assembled_dna_sequence
dna_quantification_fg → dna_quantification_fg
protein_quantification_pg → protein_quantification_pg
Valid rows: 301
Rejected rows: 0


''

SQL