-
Notifications
You must be signed in to change notification settings - Fork 2
/
annotation_vcf_from_cassandraDB.py
125 lines (105 loc) · 4.89 KB
/
annotation_vcf_from_cassandraDB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'''
multi-processing data retrieving of VCF from cassandraDB
'''
# Some references:
# http://kmdouglass.github.io/posts/learning-pythons-multiprocessing-module.html
# http://stackoverflow.com/questions/20887555/dead-simple-example-of-using-multiprocessing-queue-pool-and-locking
# http://www.datastax.com/dev/blog/datastax-python-driver-multiprocessing-example-for-improved-bulk-data-throughput
'''Avoid using multi-threading, because in python it is serialized threading,
pooled multi-processing will reduce the overhead, as well as maximize the
parallel efficiency in the program'''
import gzip, os, sys, csv, re, multiprocessing
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
class annotator_vcf(object):
'''annotator for plain vcf file, fetching data from cassandraDB'''
def __init__(self, input_filename=None, output_filename=None, contact_point_DB=None, keyspace_DB=None, table_DB=None, concurrent_number=None):
'''Initialize function'''
self.input_filename = input_filename
self.output_filename = output_filename
self.contact_point_DB = contact_point_DB
self.keyspace_DB = keyspace_DB
self.table_DB = table_DB
self.concurrent_number = concurrent_number
# establish the connection to the database
# entering the pre-defined keyspace
#cluster = Cluster(contact_points=self.contact_points)
#session = cluster.connect()
#session.set_keyspace(self.keyspace)
# database variables
self.primary_field = ["CHROM", "POS", "REF", "ALT"]
self.primary_key = []
self.fields = None
def generate_primary_key(self):
# file open for reading
f = open(self.input_filename, 'rb')
for line in f:
line = line.rstrip()
# Reading header lines
if line.startswith("#"):
line = line.lstrip("#")
if line.startswith("CHROM"):
self.fields = dict(zip(line.split(), range(len(line.split()))))
continue
# Error catching
if self.fields is None:
print "VCF file does not have a header line describing each column. Exiting."
sys.exit(1)
# Reading lines and collect primary keys
sample_primary_key = []
sample = line.split('\t')
for field in self.primary_field:
sample_primary_key.append(sample[self.fields[field]])
# Append to the primary key list
self.primary_key.append(sample_primary_key)
def run(self, sample):
'''single process run function'''
'''will return a single row in the whole table, iterative'''
# isolation creation of the connection and executions
profile_stmt = session.prepare("SELECT annotations From" + self.table_DB +\
" Where chrom = ? AND pos = ? AND ref = ? \
AND alt = ?")
bound_stmt = profile_stmt.bind(sample)
single_annotation_json = session.execute(bound_stmt)
vep = self.json_to_vep(single_annotation_json)
return sample.append(vep)
def json_to_vep(self, annotation_json):
'''deserialize the python json string format to a VEP format'''
# first parsing using '},{' to see if there is any
result_list = []
for annotation in annotation_json.split("},{"):
value_list = []
annotation = annotation.lstrip("[{")
annotation = annotation.rstrip("}]")
fields = annotation.split(",")
for value_pair in fields:
value = value_pair.split(":")[1]
value = value.strip()
if value == "''":
value_list.append("")
else:
value = value.strip("'")
value_list.append(value)
print value_list
result_list.append("|".join(value_list))
return '\t'.join(result_list)
def multiprocessing_annotate(self):
'''Main method to annotate the vcf file, and output a csv file'''
# calling to generate the primary key
self.generate_primary_key()
# creating processing pools
p = multiprocessing.Pool(self.concurrent_number)
annotated_list = p.map(self.run(), self.primary_key)
# file open for writing
w = open(self.output_filename, 'w')
wr = csv.writer(w, dialect='excel')
# creating headers for csv file
csv_header = [x.lower() for x in self.primary_field[:]]
csv_header.append("annotations")
wr.writerow(csv_header)
# file open for reading
for line in annotated_list:
wr.writerow(line)
a = annotator_vcf("./t/test.vcf")
a.generate_primary_key()
print a.primary_key