In [None]:
import polars as pl
import polars.selectors as cs

# Variables
#colony = "rh_SPF_U42"
colony = "rh_P51"

colony_name = colony.split('_')[-1]

status = pl.read_csv("/master/abagwell/workspace/MHC/output/allele_statuses.tsv", separator="\t", schema_overrides={
    "Id": pl.String
})

demographics = '/master/abagwell/workspace/demographics.MML.tsv'  # Last updated 2025-11-26

In [None]:
# Determine quantiles
#max = 11
max = 4
cuts = [x/max for x in range(1, max)]

In [None]:
#TODO: Filter inferred here!
df = status.filter(
    # Remove animals where any statuses are only inferred
    ~(pl.col("A001 Status").str.contains(r"\(") | pl.col("B003 Status").str.contains(r"\(") | pl.col("B008 Status").str.contains(r"\(") | pl.col("B017 Status").str.contains(r"\("))
).with_columns(
    # Remove parentheses around inferred statuses
    cs.ends_with("Status").str.replace(r"[(]", "").str.replace(r"[)]", "")
).with_columns(
    # Combine for finding quadruple negatives
    pl.concat_list(
        ["A001 Status", "B003 Status", "B008 Status", "B017 Status"]
    ).list.sort().alias("All Statuses")
).with_columns(
    # Find quadruple negatives
    pl.when(pl.col("All Statuses") == ["NEGATIVE", "NEGATIVE", "NEGATIVE", "NEGATIVE"]
    ).then(
        pl.lit("POSITIVE")
    ).when(
        pl.col("All Statuses").list.contains(None)
    ).then(
        pl.lit(None)
    ).otherwise(
        pl.lit("NEGATIVE")
    ).alias("Quadruple Negative"),
    # pl.when(pl.col("All Statuses").list.count_matches("NEGATIVE") > 3
    # ).then(
    #     pl.lit("POSITIVE")
    # ).when(
    #     pl.col("All Statuses").list.contains(None)
    # ).then(
    #     pl.lit(None)
    # ).alias("Triple Negative")
).drop("All Statuses"
).join(
    # Concatenate the parsed excel files
    pl.read_csv(demographics, separator='\t', comment_prefix="#",
        columns=["Id", "Date of Birth", "Colony"], schema_overrides={
        "Id": pl.String,
        "Sire": pl.String,
        "Dam": pl.String,
        "Date of Birth": pl.Datetime
    }), on="Id", how="left"
).rename(
    {
        "A001 Status": "A1*001+",
        "B003 Status": "B*003+",
        "B008 Status": "B*008+",
        "B017 Status": "B*017+" 
    }
).with_columns(
    # Pull out just year
    pl.col("Date of Birth").dt.year().alias("Birth Year")
)

df.select("Colony").group_by("Colony").agg(pl.len())

In [None]:
df2 = df.filter(
    pl.col("Colony") == colony
).with_columns(
    pl.col("Birth Year").qcut(cuts, left_closed=False).alias("Interval")
).drop("Date of Birth", "Birth Year"
).unpivot(on=["A1*001+", "B*003+", "B*008+", "B*017+", "Quadruple Negative"], index="Interval", variable_name="Allele", value_name="Status"
).drop_nulls("Status"
).group_by(
    "Interval", "Allele", #"Status",
).agg(pl.len().alias("Count"), pl.col("Status")
).with_columns(
    pl.col("Status").list.count_matches("POSITIVE").alias("Positives"),
    pl.col("Status").list.count_matches("NEGATIVE").alias("Negatives"),
).drop("Status"
#).pivot(on="Status", index=["Interval", "Allele"], values="Count"
# ).fill_null(
#     # Set empty counts to 0
#     0
).with_columns(
    (pl.col("Positives") / (pl.col("Positives") + pl.col("Negatives"))).alias("Fraction Positive")
)

In [None]:
df3 = df2.with_columns(
    pl.col("Interval").cast(pl.String).str.split(", ")
        .list.get(0)
        .str.replace(r"\(", "")
        .str.replace(r"-inf", "-1")
        .alias("Starting Year"),
    pl.col("Interval").cast(pl.String)
        .str.split(", ")
        .list.get(1)
        .str.replace(r"\]", "")
        .str.replace(r"inf", "0")
        .alias("Ending Year"),
).with_columns(
    # Add one year to first year (since it's not inclusive)
    pl.when(pl.col("Starting Year") != ""
    ).then(
        (pl.col("Starting Year").cast(pl.Int32) + 1).cast(pl.String)
    ).otherwise(
        pl.col("Starting Year")
    ).alias("Starting Year")
).with_columns(
    pl.col("Starting Year").str.replace(r"^0$", " "),
    pl.col("Ending Year").str.replace(r"^0$", " "),
).with_columns(
    (pl.col("Starting Year") + pl.lit("–") + pl.col("Ending Year")).cast(pl.Categorical).alias("Years")
).with_columns(
    # If start and end year are the same, just show that year
    pl.when(pl.col("Starting Year") == pl.col("Ending Year")
    ).then(
        pl.col("Starting Year")
    ).otherwise(
        pl.col("Years")
    ).alias("Years")
).with_columns(
    # Change most recent year to remove "–"
    pl.when(pl.col("Years").str.ends_with("– ")
    ).then(
        pl.col("Years").str.replace(r"–", "")
    ).otherwise(
        pl.col("Years")
    ).alias("Years")
).drop("Starting Year", "Ending Year")

In [None]:
df3

In [None]:
import altair as alt

# This plots the monitored MHC alleles by birth year intervals.
# Each interval was grouped to get a near equal distribution of 

alt.Chart(df3).mark_line().encode(
    alt.X("Years", title="Birth Years"),
    #alt.X("Interval", title="Birth Year Interval"),
    alt.Y("Fraction Positive", title="Fraction of Animals"),
    #alt.Color("Status"),
    alt.Color("Allele", title="Allele Status"),
    tooltip=[
        alt.Tooltip("Positives"),
        alt.Tooltip("Negatives")
    ]
).properties(
    title=f"Monitored MHC Alleles in {colony_name}"
)#.save(f"/master/abagwell/workspace/MHC/plots/monitored_alleles.-SNPRC23.{colony}.no_inferred_animals.html")