/
gero_drugs.py
executable file
·161 lines (142 loc) · 10.2 KB
/
gero_drugs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#!/usr/bin/env python3
from beartype import beartype
import polars as pl
import click
from pathlib import Path
from typing import Union
@beartype
def prepare_annotations(base: Path):
### Initial preparation of data - it is needed only once
inputdata = (base / "inputdata").absolute().resolve()
print(f"initial data preparation started, input folder is {str(inputdata)}")
## loading of source tables
# var_drug_ann is Variant-Drug table
var_drug_ann = pl.read_csv(f"{str(inputdata)}/var_drug_ann.tsv", has_header = True, sep = "\t")
# study_parameters is Study Parameters table
study_parameters = pl.read_csv(f"{str(inputdata)}/study_parameters.tsv", has_header = True, sep = "\t")
# ### RSID as an example
# rsid = "rs3745274"
# select only needed columns from Variant-Drug table: "Variant/Haplotypes", "Variant Annotation ID", "Drug(s)", "Phenotype Category", "Significance" and "Sentence" for future annotation tab
var_drug_ann_shrink = var_drug_ann.select(["Variant Annotation ID", "Variant/Haplotypes", "Drug(s)", "Phenotype Category", "Significance", "Sentence"])
# select only needed columns from Study Parameters table: "Variant Annotation ID", "P Value", "Ratio Stat Type", "Ratio Stat", "Confidence Interval Start", "Confidence Interval Stop"
study_parameters_shrink = study_parameters.select(["Variant Annotation ID", "Allele Of Frequency In Cases", "Allele Of Frequency In Controls", "P Value", "Ratio Stat Type", "Ratio Stat", "Confidence Interval Start", "Confidence Interval Stop"])
# merge shrinked Variant-Drug table with shrinked study_parameters table for assembly of Annotation table
annotation_tab = var_drug_ann_shrink.join(study_parameters_shrink, on="Variant Annotation ID")
return filter_annotations(annotation_tab, base)
@beartype
def filter_annotations(annotation_tab: pl.DataFrame, base: Path) -> pl.DataFrame:
assert base.exists(), "base path should exist!"
## Filtration
# ! exclude and save entries with multiple "Variant/Haplotypes" values (contains ",")
problematic_multi_var_hap_entries = annotation_tab.filter(pl.col("Variant/Haplotypes").str.contains(","))
annotation_tab = annotation_tab.filter(pl.col("Variant/Haplotypes").str.contains(",") == False)
# ! exclude and save entries with multiple "Drug(s)" values (contains "," or "/") !
problematic_multi_drug_entries = annotation_tab.filter(pl.col("Drug(s)").str.contains(",|/"))
annotation_tab = annotation_tab.filter(pl.col("Drug(s)").str.contains(",|/") == False)
# ! exclude and save entries with multiple "Phenotype Category"
problematic_multi_pheno_cat = annotation_tab.filter(pl.col("Phenotype Category").str.contains(",|/"))
annotation_tab = annotation_tab.filter(pl.col("Phenotype Category").str.contains(",|/") == False)
# ! exclude and save entries with drug class instead of ecact class in "Drug(s)" values (contains "s$")
# ...except "us$" because tacrolimus and sirolimus are not classes
problematic_drug_class = annotation_tab.filter(pl.col("Drug(s)").str.contains("[^u]s$"))
annotation_tab = annotation_tab.filter(pl.col("Drug(s)").str.contains("[^u]s$") == False)
# ! exclude and save entries with Haplotypes in "Variant/Haplotypes"
problematic_haps = annotation_tab.filter(pl.col("Variant/Haplotypes").str.contains("^rs") == False)
annotation_tab = annotation_tab.filter(pl.col("Variant/Haplotypes").str.contains("^rs"))
# ! exclude and save entries with "unknown" "Ratio Stat Type"
problematic_unknown_stat_type = annotation_tab.filter(pl.col("Ratio Stat Type").str.contains("Unknown"))
annotation_tab = annotation_tab.filter(pl.col("Ratio Stat Type").str.contains("Unknown") == False)
# ! exclude and save entries with null Ref and/or Alt nucleotudes in "Allele Of Frequency In Cases" and "Allele Of Frequency In Controls"
problematic_unclear_ref_alt_nucl = annotation_tab.filter(
(pl.col("Allele Of Frequency In Cases").is_null()) | (pl.col("Allele Of Frequency In Controls").is_null()))
annotation_tab = annotation_tab.filter(
(pl.col("Allele Of Frequency In Cases").is_not_null()) & (pl.col("Allele Of Frequency In Controls").is_not_null()))
# ! exclude and save entries with the same Ref and Alt nucleotudes in "Allele Of Frequency In Cases" and "Allele Of Frequency In Controls"
problematic_same_ref_alt_nucl = annotation_tab.filter(pl.col("Allele Of Frequency In Cases") == pl.col("Allele Of Frequency In Controls"))
annotation_tab = annotation_tab.filter(pl.col("Allele Of Frequency In Cases") != pl.col("Allele Of Frequency In Controls"))
# save data with problematic entries
excluded = base / "tempdata" / "excluded"
print(f"writing excluded data to {str(excluded)}")
excluded.mkdir(exist_ok=True)
problematic_multi_var_hap_entries.write_csv(f"{str(excluded)}/problematic_multi_var_hap_entries.tsv", sep = "\t")
problematic_multi_drug_entries.write_csv(f"{str(excluded)}/problematic_multi_drug_entries.tsv", sep = "\t")
problematic_multi_pheno_cat.write_csv(f"{str(excluded)}/problematic_multi_pheno_cat.tsv", sep = "\t")
problematic_drug_class.write_csv(f"{str(excluded)}/problematic_drug_class.tsv", sep = "\t")
problematic_haps.write_csv(f"{str(excluded)}/problematic_haps.csv", sep = "\t")
problematic_unknown_stat_type.write_csv(f"{str(excluded)}/problematic_unknown_stat_type.tsv", sep = "\t")
problematic_unclear_ref_alt_nucl.write_csv(f"{str(excluded)}/problematic_unclear_ref_alt_nucl.tsv", sep = "\t")
problematic_same_ref_alt_nucl.write_csv(f"{str(excluded)}/problematic_same_ref_alt_nucl.tsv", sep = "\t")
# drop entries with null Ratio Stat - with unknown effect value and useless for PRS construction
annotation_tab_filtered = annotation_tab.filter(pl.col("Ratio Stat").is_not_null())
filter_annotations_path = base / "tempdata" / "annotation_tab.tsv"
annotation_tab_filtered.write_csv(f"{str(filter_annotations_path)}", sep = "\t")
print(f"filtered annotations saved to {filter_annotations_path}")
return annotation_tab_filtered
@beartype
def analyze(sample: str, annotation_tab, base: Path) -> Union[str, Path]:
## Genotype analysis
# # load Anton"s VCF as sample
# personvcf = pl.read_csv("antonkulaga.vcf", has_header = False, sep = "\t", comment_char = "#")
# load toy sample
variants_tab = pl.read_csv(sample, has_header=True, sep="\t", comment_char="#")
# rename RSID
variants_tab = variants_tab.rename({"rsid": "Variant/Haplotypes"})
# prepare report table
report_tab = annotation_tab.join(variants_tab, on="Variant/Haplotypes")
# create a separate column for output
report_tab = report_tab.with_column(pl.col('Ratio Stat').alias('Effect'))
# perform interprepation - substitute 'Alt' from genome to 'Allele Of Frequency In Cases' to table, it they don't match - invert effect vale
report_tab = report_tab.with_column(pl.when(pl.col('Allele Of Frequency In Cases') == pl.col('alt')).then(pl.col('Effect')).otherwise((1/pl.col('Effect')).round(3)))
# remove useless now columns
report_tab = report_tab.drop(['Variant Annotation ID', 'Ratio Stat', 'Confidence Interval Start', 'Confidence Interval Stop', 'P Value', 'alt', 'ref'])
# aggregation by drugs in table2
report_tab2 = report_tab.drop(['Variant/Haplotypes', 'Phenotype Category', 'Significance', 'Sentence', 'Allele Of Frequency In Cases', 'Allele Of Frequency In Controls', 'Ratio Stat Type'])
report_tab2 = report_tab2.groupby("Drug(s)").mean()
# save report table
report_path = f"{str(base)}/output/report.tsv"
report_path2 = f"{str(base)}/output/report-aggr.tsv"
report_tab.write_csv(report_path, sep="\t")
report_tab2.write_csv(report_path2, sep="\t")
print(f"successfully wrote report to {report_path}")
return report_path
@click.group()
def app():
print("running gero-drugs application")
@app.command("init")
@click.argument("base", default=".")
def init(base: str = "."):
annotation_tab = prepare_annotations(Path(base).absolute().resolve())
return annotation_tab
@app.command("run")
@click.argument("sample", type=click.Path(exists=True), default="./inputdata/borysova.vcf.gz.pharma.tsv")
@click.argument("annotations", type=click.Path(exists=True), default="./tempdata/annotation_tab.tsv")
@click.argument("base", default=".")
def run(sample: str, annotations: str, base: str):
base_folder = Path(base).absolute().resolve()
print(f"initializing the annotations with basefolder ${str(base_folder)}")
annotation_tab = pl.read_csv(annotations, sep="\t", has_header=True, comment_char="#")
return analyze(sample, annotation_tab, base_folder)
@app.command("report")
@click.argument("report_tsv", type=click.Path(exists=True), default="./inputdata/anton-drugs-report.tsv")
@click.argument("report_aggr_tsv", type=click.Path(exists=True), default="./inputdata/anton-drugs-report-aggr.tsv")
@click.argument("drug_list", type=click.Path(exists=True), default="./inputdata/drug_list.tsv")
@click.argument("output", type=click.Path(exists=False), default="./output/anton_drugs.html")
def report(report_tsv: str, report_aggr_tsv: str, drug_list: str, output: str):
from mako.template import Template
result = Path(output)
result.unlink(missing_ok=True)
report = pl.read_csv(report_tsv, sep="\t", comment_char="#", ignore_errors=True).rename({'Drug(s)': "Drug"})
report_aggr = pl.read_csv(report_aggr_tsv, sep="\t", comment_char="#", ignore_errors=True).rename({'Drug(s)': "Drug"})
drug_list = pl.read_csv(drug_list, sep="\t", comment_char="#", ignore_errors=True)
to_num = pl.col("Longevity association").str.replace("None", "0").str.replace("low", "1").str.replace("average", "2").str.replace("strong", "3")\
.cast(pl.Int32).alias("longevity_number")
agg_rows = report_aggr.join(drug_list,on="Drug")\
.with_column(to_num)\
.with_column(pl.col("Effect").round(4))\
.sort(["longevity_number", "Effect"], reverse=True)
with result.open("w+") as f:
template_str = Template(filename='./templates/report.html').render(agg_rows = agg_rows, report = report)
f.write(template_str)
print(f"report based on {report_tsv} and {report_aggr_tsv} is written to {output}!")
if __name__ == "__main__":
app()