In [None]:
import os
import re
import pandas as pd
from multiprocessing import Pool


In [None]:
# read gene positions
# The GTF file can be downloaded from GENCODE (https://www.gencodegenes.org/human/)
# version 36, GRCh37 reference genome
gtf_path = "/path/to/your/gencode.v36lift37.annotation.gtf.gz"
df_gtf = pd.read_csv(
    gtf_path,
    delimiter="\t",
    comment="#",
    header=None,
    usecols=[0, 2, 3, 4, 6, 8],
    compression="gzip",
)
# extract gene IDs
gene_id_pattern = re.compile(r'gene_id "(ENSG[0-9]+)\.')
df_gtf["gene_id"] = [gene_id_pattern.search(i).group(1) for i in df_gtf[8]]
del df_gtf[8]
df_gtf.columns = [
    "chromosome",
    "type",
    "start",
    "end",
    "strand",
    "gene_id",
]
# only use chromosome 1-22 and X
df_gtf = df_gtf[
    df_gtf["chromosome"].isin([f"chr{i}" for i in list(range(1, 23)) + ["X"]])
]

df_gene_pos = df_gtf[df_gtf["type"] == "gene"].copy()
df_exon_pos = df_gtf[df_gtf["type"] == "exon"].copy()
del df_gtf
del df_gene_pos["type"]
del df_exon_pos["type"]
print(df_gene_pos.head())
print(df_gene_pos.shape)
print(df_exon_pos.head())
print(df_exon_pos.shape)


In [None]:
# convert the gene position information to dict like "{gene ID->{gene position}}"
gene_pos_dict = {
    gid: gene_pos.iloc[0, :-1].to_dict()
    for gid, gene_pos in df_gene_pos.groupby("gene_id")
}
del df_gene_pos
# convert exon position information to dict like "{gene ID->exon position dataframe}"
exon_pos_dict = {
    gid: exon_pos.drop(columns=["gene_id"])
    for gid, exon_pos in df_exon_pos.groupby("gene_id")
}
del df_exon_pos
# for example
print(gene_pos_dict["ENSG00000223972"])
print(exon_pos_dict["ENSG00000223972"])


In [None]:
# [In] 8
# {gene ID->gene length}
gene_len = {gid: v["end"] - v["start"] + 1 for gid, v in gene_pos_dict.items()}
# {gene ID->exon length}
exon_len = {}
for gid, df_exon in exon_pos_dict.items():
    pos = set()
    for start, end in zip(df_exon["start"], df_exon["end"]):
        pos.update(range(start, end + 1))
    exon_len[gid] = len(pos)
# {gene ID->intron length}
intron_len = {gid: l - exon_len[gid] for gid, l in gene_len.items()}
# for example
print(gene_len["ENSG00000000003"])
print(exon_len["ENSG00000000003"])
print(intron_len["ENSG00000000003"])


In [None]:
def _get_variant_type_and_distance(
    gene_id: str, var_chrom: str, var_pos: int
) -> tuple[str, int]:
    """
    giving a variant and a gene,
    get the variant type (exon, intron, or up/downstream) and distance

    Args:
        gene_id (str): ensembl ID.
        var_chrom (str): the chromosome of variant.
        var_pos (int): the position of variant.

    Returns:
        tuple[str, int]: variant type (exon, intron, or up/downstream) and distance
    """
    try:
        gene_pos = gene_pos_dict[gene_id]
        exon_pos = exon_pos_dict[gene_id]
        assert var_chrom == gene_pos["chromosome"]
        var_type = ""
        var_dist = -1
        if gene_pos["strand"] == "+":
            if var_pos < gene_pos["start"]:
                # upstream
                var_type = "upstream"
                var_dist = gene_pos["start"] - var_pos
            elif var_pos > gene_pos["end"]:
                # downstream
                var_type = "downstream"
                var_dist = var_pos - gene_pos["end"]
            else:
                if any(
                    (
                        start <= var_pos <= end
                        for start, end in zip(exon_pos["start"], exon_pos["end"])
                    )
                ):
                    # exon
                    var_type = "exon"
                else:
                    # intron
                    var_type = "intron"
                var_dist = var_pos - gene_pos["start"]
        else:
            if var_pos > gene_pos["end"]:
                # upstream
                var_type = "upstream"
                var_dist = var_pos - gene_pos["end"]
            elif var_pos < gene_pos["start"]:
                # downstream
                var_type = "downstream"
                var_dist = gene_pos["start"] - var_pos
            else:
                if any(
                    (
                        start <= var_pos <= end
                        for start, end in zip(exon_pos["start"], exon_pos["end"])
                    )
                ):
                    # exon
                    var_type = "exon"
                else:
                    # intron
                    var_type = "intron"
                var_dist = gene_pos["end"] - var_pos
        return var_type, var_dist
    except:
        return "", -1


def get_variant_type_and_distance(args) -> tuple[str, int]:
    return _get_variant_type_and_distance(*args)


In [None]:
if not os.path.isdir(output_dir := os.path.join("eQTL_distance")):
    os.mkdir(output_dir)
# path to the significant eQTLs of TCGA
# which can be downloaded in this repo
eqtl_dir = "/path/to/your/signif_cis_eQTL"
for eqtl_file in os.listdir(eqtl_dir):
    cancer_type = eqtl_file.split(".", maxsplit=1)[0]
    if os.path.isfile(output_file := os.path.join(output_dir, f"{cancer_type}.txt.gz")):
        continue
    df_eqtl = pd.read_csv(
        os.path.join(eqtl_dir, eqtl_file), delimiter="\t", compression="gzip"
    )
    df_eqtl[["chrom", "pos"]] = [
        [(j := i.split("_", maxsplit=2))[0], int(j[1])] for i in df_eqtl["variant_id"]
    ]
    df_dist = df_eqtl[["variant_id", "gene_id"]].copy()
    with Pool(4) as p:
        rst = list(
            p.map(
                get_variant_type_and_distance,
                zip(df_eqtl["gene_id"], df_eqtl["chrom"], df_eqtl["pos"]),
            ),
            total=len(df_eqtl),
        )
    df_dist[["variant_position", "variant_distance"]] = rst
    df_dist.to_csv(
        output_file, sep="\t", compression="gzip", index=False, line_terminator="\n"
    )


In [None]:
# calculate parameters
df_param = []
for file in list(os.listdir(output_dir)):
    cancer_type = file.split(".", maxsplit=1)[0].removeprefix("TCGA-")
    df = pd.read_csv(
        os.path.join(output_dir, file),
        delimiter="\t",
        compression="gzip",
    )
    genes = set(df["gene_id"])

    # exon
    df_exon = df[df["variant_position"] == "exon"]
    df_nvariants = df_exon[["gene_id", "variant_id"]].groupby("gene_id").count()
    genes_novar = sorted(genes - set(df_nvariants.index))
    df_nvariants = pd.concat(
        [
            df_nvariants,
            pd.DataFrame({"variant_id": [0] * len(genes_novar)}, index=genes_novar),
        ],
        axis=0,
        sort=False,
        ignore_index=False,
    )
    # density = total count / total length
    df_nvariants["exon_length"] = df_nvariants.index.map(exon_len)
    density_exon = df_nvariants["variant_id"].sum() / df_nvariants["exon_length"].sum()

    # intron
    df_intron = df[df["variant_position"] == "intron"]
    df_nvariants = df_intron[["gene_id", "variant_id"]].groupby("gene_id").count()
    genes_novar = sorted(genes - set(df_nvariants.index))
    df_nvariants = pd.concat(
        [
            df_nvariants,
            pd.DataFrame({"rsID": [0] * len(genes_novar)}, index=genes_novar),
        ],
        axis=0,
        sort=False,
        ignore_index=False,
    )
    # density = total count / total length
    df_nvariants["intron_length"] = df_nvariants.index.map(intron_len)
    density_intron = (
        df_nvariants["variant_id"].sum() / df_nvariants["intron_length"].sum()
    )

    # up/downstream
    # formula: The number of all up/downstream eQTLs * 0.5 / ((Dup + Ddown) * number of genes)
    df_nogene = df[df["variant_position"].isin(["upstream", "downstream"])]
    # The number of up/downstream eQTLs
    nvariants = len(df_nogene)
    # median distances up/downstream
    dist_up = (
        df_nogene[df_nogene["variant_position"] == "upstream"]["variant_distance"]
        .median()
        .__ceil__()
    )
    dist_down = (
        df_nogene[df_nogene["variant_position"] == "downstream"]["variant_distance"]
        .median()
        .__ceil__()
    )
    # number of genes
    ngenes = len(genes)
    density_up_down_stream = nvariants * 0.5 / (dist_up + dist_down) / ngenes

    # scale the weight of up/downstream into 1
    # exon : up/downstream
    exon_weight = density_exon / density_up_down_stream
    # intron : up/downstream
    intron_weight = density_intron / density_up_down_stream
    df_param.append([cancer_type, exon_weight, intron_weight, 1.0, dist_up, dist_down])
df_param = pd.DataFrame(
    df_param,
    columns=[
        "cancer_type",
        "exon_weight",
        "intron_weight",
        "up_downstream_weight",
        "upstream_distance",
        "downstream_distance",
    ],
)
df_param
