In [4]:
from __future__ import annotations
from io import BytesIO

import numpy as np
import oxbow as ox
import pandas as pd
import polars as pl
import pyarrow as pa
import pysam

In [2]:
# This dictionary maps the VCF-derived input types to numpy/pandas dtypes
TYPE_MAP = {
    "Integer": "int64",
    "Float": "float64",
    "String": "object",
    "Flag": "bool",
}


def read_vcf_as_pandas(vcf_path: str) -> pd.DataFrame:
    ipc = ox.read_vcf(vcf_path)
    return pa.ipc.open_file(BytesIO(ipc)).read_pandas()


def read_vcf_as_polars(vcf_path: str) -> pl.DataFrame:
    ipc = ox.read_vcf(vcf_path)
    return pl.read_ipc(ipc)


def read_info_schema(vcf_path: str) -> pd.DataFrame:
    """ 
    Read the schema of the INFO column of a VCF.

    This currently uses pysam.

    Parameters
    ----------
    vcf_path : str
        Path to bgzipped vcf, with tabix indexed .tbi file with similar name 
        in same folder.
    
    Returns
    -------
    pd.DataFrame
        Dataframe with [name, number, type] columns

    Notes
    -----
    Possible values for `type` are: "Integer", "Float", "String", "Flag".

    Possible values for `number` are:
        - An integer (e.g. 0, 1, 2, 3, 4, etc.) - for fields where the number
          of values per VCF record is fixed. 0 means the field is a "Flag".
        - A string ("A", "G", "R") - for fields where the number of
          values per VCF record is determined by the number of alts, the total
          number of alleles, or the number of genotypes, respectively.
        - A dot (".") - for fields where the number of values per VCF record
          varies, is unknown, or is unbounded.
    """
    with pysam.VariantFile(vcf_path) as f:
        return pd.DataFrame(
            [(obj.name, obj.number, obj.type) for obj in f.header.info.values()],
            columns=["name", "number", "type"],
        )


def _parse_into_info_record(
    pairs: list[str], 
    fields: set[str],
    dtype_map: pd.Series, 
    arity_map: pd.Series
) -> dict:
    """
    Parse a sequence of key-value INFO strings into a dictionary.

    Parameters
    ----------
    pairs : list[str]
        List of `{key}={value}` pairs in the INFO column of a VCF. Note that
        Flag fields will be represented as `{key}` only and variadic values
        will be separated by commas.
    fields : set[str]
        Set of fields to extract from the INFO column.
    dtype_map : pd.Series
        Series mapping INFO field names to a corresponding numpy dtype.
    arity_map : pd.Series
        Series mapping INFO field names to a corresponding number or "arity".

    Returns
    -------
    dict
        Dictionary mapping INFO field names to their values.
    """
    record = {}
    for pair in pairs:
        key, *value = pair.split("=")
        if key in fields:
            arity = arity_map[key]
            dtype = dtype_map[key]
            if arity == 0:
                record[key] = True
            elif arity == 1:
                record[key] = np.array(value[0], dtype=dtype)
            else:
                record[key] = np.array(value[0].split(","), dtype=dtype)
    return record
 

def extract_info(
    info: pd.Series, 
    schema: pd.DataFrame, 
    fields: list[str] | None = None
) -> pd.DataFrame:
    """
    Extracts fields from INFO column of a VCF.
        
    Parameters
    ----------
    info : pd.Series
        The info column of a VCF dataframe.
    schema : pd.DataFrame
        Dataframe with [name, number, type] columns describing the INFO fields
        of a VCF.
    fields : list[str], optional
        List of fields to extract from the INFO column. If None, all fields
        will be extracted.

    Returns
    -------
    pd.DataFrame
        Dataframe with columns corresponding to the requested fields.
    """
    if fields is None:
        fields = list(schema["name"])
    else:
        names_available = set(schema["name"])
        for field in fields:
            if field not in names_available:
                raise ValueError(f"Field '{field}' not found in INFO schema.")
    
    # Create series mapping field names to dtypes and arities
    dtype_map = schema.set_index("name")["type"].map(TYPE_MAP)
    arity_map = schema.set_index("name")["number"]

    # Split the key-value pairs in the info column into record dictionaries
    records = (
        info
        .str
        .split(";")
        .apply(
            _parse_into_info_record, 
            fields=set(fields),
            dtype_map=dtype_map, 
            arity_map=arity_map
        )
    )

    # Convert to dataframe
    info_df = pd.DataFrame.from_records(records).convert_dtypes()

    # Include columns for requested fields that are missing from INFO records 
    # but still declared in the schema
    for field in fields:
        if field not in info_df.columns:
            info_df[field] = pd.NA

    # Reorder columns as requested
    info_df = info_df[fields]
    return info_df

In [3]:
vcf_path = "DRR.vcf.gz"
df = read_vcf_as_pandas(vcf_path)
schema = read_info_schema(vcf_path)
schema

Unnamed: 0,name,number,type
0,CONFLICT,.,String
1,AC,A,Integer
2,AF,A,Float
3,NS,1,Integer
4,AN,1,Integer
5,LV,1,Integer
6,PS,1,String
7,AT,R,String


In [4]:
info = extract_info(df["info"], schema, ["AC", "LV"])
out = df.drop(columns=["info"]).assign(**info)
out = out.fillna(pd.NA)
out

Unnamed: 0,chrom,pos,id,ref,alt,qual,filter,format,AC,LV
0,genome,18,>8>11,G,T,60.0,,GT:AD,[12],0
1,genome,26,>12>16,T,"G,C",60.0,,GT:AD,"[2, 5]",0
2,genome,28,>16>19,A,G,60.0,,GT:AD,[6],0
3,genome,30,>19>22,G,A,60.0,,GT:AD,[54],0
4,genome,35,>22>27,GG,"AG,GT",60.0,,GT:AD,"[2, 4]",0
...,...,...,...,...,...,...,...,...,...,...
482,genome,9544,>2561>2564,T,C,60.0,,GT:AD,[2],0
483,genome,9592,>2566>2569,C,T,60.0,,GT:AD,[2],0
484,genome,9598,>2569>2572,G,A,60.0,,GT:AD,[20],0
485,genome,9634,>2574>2577,T,C,60.0,,GT:AD,[10],0
