-
Notifications
You must be signed in to change notification settings - Fork 1
/
FASTQFileHandler.py
134 lines (103 loc) · 6.25 KB
/
FASTQFileHandler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import json
import logging
import os
import subprocess
import sys
from watchdog.events import FileSystemEventHandler
from .LinuxNotification import LinuxNotification
# from .LinuxNotification import LinuxNotification
logger = logging.getLogger('micas')
class FASTQFileHandler(FileSystemEventHandler):
def __init__(self, app_loc):
self.app_loc = app_loc
self.num_files_classified = 0
# This should be covered ny on_any_event but isn't for some reason
def on_moved(self, event):
self.on_any_event(event)
def on_any_event(self, event):
# if fasta file is created
if event.src_path.endswith(".fastq") or event.src_path.endswith(".fasta"):
logger.debug(f'Debug: File Event ({str(event.event_type)}), Path: {str(event.src_path)}')
# paths for minimap2 out file and minimap2 report file
minimap2_output = self.app_loc + '/minimap2/runs/' + os.path.basename(event.src_path) + '.out.paf'
# paths for final output file and final report file
final_output = self.app_loc + '/minimap2/final.out.paf'
# figuring out the index name
from pathlib import Path
import sys
import datetime
app_loc = Path(self.app_loc) # Assuming self.app_loc is defined elsewhere
database_path = app_loc / 'database'
mmi_files = list(database_path.glob('*.mmi'))
if not mmi_files:
logger.error("Error: Database not found! No MMI files found at database location")
sys.exit(1)
else:
index_file = mmi_files[0]
cmd = 'minimap2 ' + str(index_file) + ' ' + event.src_path + ' -o ' + minimap2_output
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
proc.wait()
# if running minimap2 was successful
if os.path.exists(minimap2_output) <= 0:
logger.error(f"Error: Was unable to create minimap2 alignment of {index_file} with minimap2 command {cmd}")
return
else:
# see which alert sequence this new data matches to and update its percent match value
# get the name of the header of alert sequence it matches to
#TODO Remove use of awk
#TODO Remove use of awk
minimap2_output_line = os.popen('awk "{print $1}" ' + minimap2_output).read().strip().split("\t")
match_alert_header = minimap2_output_line[5]
# Calculate the new percent threshold
num_match = int(minimap2_output_line[9])
tot_len = int(minimap2_output_line[6])
percent_match_value = (num_match / tot_len) * 100
# Add the new calculated value to the alert threshold
alertinfo_cfg_file = os.path.join(self.app_loc, 'alertinfo.cfg')
alertinfo_cfg_data = json.load(open(alertinfo_cfg_file))
queries = alertinfo_cfg_data["queries"]
device = alertinfo_cfg_data["device"]
logger.debug(f"Debug: Minimap2 Alignment ({event.src_path}), Header: {match_alert_header}, Matchs: {num_match}, Total Length: {tot_len}, % Match: {percent_match_value}")
for query in queries:
# Needs subsring logical match as the query header tends to be longer than the match alert header (which is the ascension number)
if match_alert_header in query["header"] :
query["current_value"] = percent_match_value
# get ready to send an alert if needed
if float(query["threshold"]) <= float(percent_match_value):
date_time = str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M"))
file_date_time = date_time.replace(" ", "_").replace(":", "_").replace("-", "_")
alert_data = {
"alert": {
"name": query["name"],
"concentration": float(percent_match_value),
"threshold": float(query["threshold"]),
"date": file_date_time
}
}
alert_str = json.dumps(alert_data)
with open(os.path.join(app_loc, query['name'] + "_" + file_date_time + ".json" ), 'a') as file:
file.write(alert_str + '\n')
logger.critical(alert_str)
if len(device) > 0:
LinuxNotification.send_notification(device,alert_str)
json.dump(alertinfo_cfg_data, open(alertinfo_cfg_file, 'w'))
logger.debug("Debug: Minimap2 was successful")
# if the final output files already exist, append to them
if os.path.isfile(final_output):
logger.debug(f"Debug: {final_output} file exits, appending onto it")
open(final_output, "a+").writelines([l for l in open(minimap2_output).readlines()])
# Deletes the minimap2 temp file runs
# subprocess.call(['rm', minimap2_output])
# if final output files do not exist, move the already
# generated output files to the appropriate location,
# also rename them as final output respectively.
else:
logger.debug(f"Debug: Starting minimap2 file renaming.")
subprocess.call(['mv', minimap2_output, final_output])
# increase the # of files it has classified
self.num_files_classified += 1
# Get the number of files that are in MinION reads directory
minion_reads_dir = os.path.abspath(os.path.join(event.src_path, os.pardir))
num_files_minion_reads = int(os.popen('ls -1 ' + minion_reads_dir + ' | wc -l').read())
else:
logger.debug(f"Debug: None Fasta/Fastq generated in MINION location: {event.src_path}")