In [1]:
import pathlib as pl
import pandas as pd
import gzip
import dnaio
import collections as col
import io

# Source
# Gencode V44 basic annotation / gene model
# https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_44/gencode.v44.basic.annotation.gtf.gz
# md5: 7450ef42cf9cb3d29625320b22d4bb45

# Gencode v44 protein coding sequences / cDNA
# https://ftp.ebi.ac.uk/pub/databases/gencode/Gencode_human/release_44/gencode.v44.pc_transcripts.fa.gz
# md5: 1d6f02c471e76f421e36921ddb5715a1

# 2023-11-08
# Filtered: 37 entries for chrM
# Remaining set:
# Counter({'known': 64385, 'auto': 61554, 'notbasic': 46577, 'female': 2669, 'male': 162})
# (content of object "count_records" after run)
# leaves at most 64385 - 46577 = 17808 transcripts in output

local_wd = pl.Path("temp").resolve()
local_gtf = local_wd.joinpath("gencode.v44.basic.annotation.gtf.gz")
assert local_gtf.is_file()
local_fasta = local_wd.joinpath("gencode.v44.pc_transcripts.fa.gz")
assert local_fasta.is_file()

def cache_transcript_to_chromosome():

    USE_CHROMOSOMES = [f"chr{i}" for i in range(1,23)] + ["chrX", "chrY"]
    CHROM_LOC = {
        "chrY": "male",
        "chrX": "female",
        "chrM": "mito"
    }

    discarded = col.Counter()
    transcript_infos = dict()
    with gzip.open(local_gtf, "rt") as gtf:
        for line in gtf:
            if line.startswith("#") or not line.strip():
                continue
            parts = line.strip().split("\t")
            if parts[2] != "transcript":
                continue
            if parts[0] not in USE_CHROMOSOMES:
                discarded[parts[0]] += 1
                continue
            position = f"{parts[0]}:{parts[3]}-{parts[4]}"
            infos = parts[-1].split()
            gene_id = None
            gene_name = None
            gene_type = None
            transcript_id = None
            for info_type, info_value in zip(infos[:-1], infos[1:]):
                if info_type == "gene_id":
                    gene_id = info_value.strip().strip(';"')
                if info_type == "gene_name":
                    gene_name = info_value.strip().strip(';"')
                if info_type == "gene_type":
                    gene_type = info_value.strip().strip(';"')
                if info_type == "transcript_id":
                    transcript_id = info_value.strip().strip(';"')
                    assert transcript_id not in transcript_infos
                if all(x is not None for x in [gene_id, gene_name, gene_type, transcript_id]):
                    break
            assert all(x is not None for x in [gene_id, gene_name, gene_type, transcript_id])
            transcript_infos[transcript_id] = {
                "chrom": parts[0],
                "position": position,
                "karyotype": CHROM_LOC.get(parts[0], "auto"),
                "gene_id": gene_id,
                "gene_name": gene_name,
                "gene_biotype": gene_type,
                "transcript_id": transcript_id
            }
    print(discarded)
    return transcript_infos
                    

transcript_infos = cache_transcript_to_chromosome()

buffer_auto = io.StringIO()
buffer_female = io.StringIO()
buffer_male = io.StringIO()

count_records = col.Counter()
record_table = []
with dnaio.open(local_fasta) as fasta:
    for record in fasta:
        transcript_id = record.name.split("|")[0]
        try:
            transcript_info = transcript_infos[transcript_id]
            count_records["known"] += 1
        except KeyError:
            count_records["notbasic"] += 1
            continue
        shorter_name = [transcript_info["position"]]
        shorter_name.extend([p for p in record.name.strip("|").split("|") if not p.startswith("OTT")])
        shorter_name = "|".join(shorter_name)
        transcript_info["fasta_header"] = shorter_name
        
        karyotype = transcript_info["karyotype"]
        assert karyotype in ["male", "female", "auto"]
        if karyotype == "auto":
            buffer_auto.write(f">{shorter_name}\n{record.sequence}\n")
            count_records["auto"] += 1
        if karyotype == "male":
            buffer_male.write(f">{shorter_name}\n{record.sequence}\n")
            count_records["male"] += 1
        if karyotype == "female":
            buffer_female.write(f">{shorter_name}\n{record.sequence}\n")
            count_records["female"] += 1
        record_table.append(transcript_info)

record_table = pd.DataFrame.from_records(record_table)

out_all = local_wd.joinpath("gencode.v44.pc_transcripts.122XY.fa.gz")
with gzip.open(out_all, "wt") as dump:
    _ = dump.write(buffer_auto.getvalue())
    _ = dump.write(buffer_female.getvalue())
    _ = dump.write(buffer_male.getvalue())
    
out_female = local_wd.joinpath("gencode.v44.pc_transcripts.122X.fa.gz")
with gzip.open(out_female, "wt") as dump:
    _ = dump.write(buffer_auto.getvalue())
    _ = dump.write(buffer_female.getvalue())

out_male = local_wd.joinpath("gencode.v44.pc_transcripts.122Y.fa.gz")
with gzip.open(out_male, "wt") as dump:
    _ = dump.write(buffer_auto.getvalue())
    _ = dump.write(buffer_male.getvalue())
    
out_table = local_wd.joinpath("gencode.v44.pc_transcripts.tsv.gz")
record_table.to_csv(out_table, sep="\t", header=True, index=False)

print(count_records.most_common())

Counter({'chrM': 37})
[('known', 64372), ('auto', 61541), ('notbasic', 46590), ('female', 2669), ('male', 162)]
