-
Notifications
You must be signed in to change notification settings - Fork 81
/
s02new_jobs.py
181 lines (152 loc) · 6.68 KB
/
s02new_jobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
""" This script searches the database for files flagged "N"ew or "M"odified.
For each date in the configured range, it checks if other stations are
available and defines the new jobs to be processed. Those are inserted in the
*jobs* table of the database.
To run it from the console:
.. code-block:: sh
$ msnoise new_jobs
Upon first run, if you expect the number of jobs to be large (many days,
many stations), pass the ``--init`` parameter to optimize the insert. Only use
this flag once, otherwise problems will arise from duplicate entries in
the jobs table.
.. code-block:: sh
$ msnoise new_jobs --init
Performance / running on HPC
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
By setting the ``hpc`` configuration parameter to ``Y``, you will disable the
automatic creation of jobs during the workflow, to avoid numerous
interactions with the database (select & update or insert). The jobs have
then to be inserted manually:
.. code-block:: sh
$ msnoise new_jobs --hpc CC:STACK
should be run after the ``msnoise compute_cc`` step in order to create the
``STACK`` jobs.
"""
from .api import *
import pandas as pd
import logbook
logger = logbook.Logger(__name__)
def main(init=False, nocc=False):
logger.info('*** Starting: New Jobs ***')
db = connect()
params = get_params(db)
logger.debug("Checking plugins' entry points")
plugins = get_config(db, "plugins")
extra_jobtypes_scan_archive = []
extra_jobtypes_new_files = ["PSD"]
if plugins:
import pkg_resources
plugins = plugins.split(",")
for ep in pkg_resources.iter_entry_points(group='msnoise.plugins.jobtypes'):
module_name = ep.module_name.split(".")[0]
if module_name in plugins:
jobtypes = ep.load()()
for jobtype in jobtypes:
if jobtype["after"] == "scan_archive":
extra_jobtypes_scan_archive.append(jobtype["name"])
elif jobtype["after"] == "new_files":
extra_jobtypes_new_files.append(jobtype["name"])
crosscorr = False
if len(params.components_to_compute):
crosscorr = True
logger.debug("components_to_compute is populated, creating cross-station CC jobs")
autocorr = False
if len(params.components_to_compute_single_station):
autocorr = True
logger.debug("components_to_compute_single_station is populated, creating single-station CC jobs")
logger.info('Scanning New/Modified files')
stations_to_analyse = []
error = False
for sta in get_stations(db, all=False):
if not len(sta.locs()):
logger.error("You haven't defined location codes to use for %s.%s, "
"you should run 'msnoise db update_loc_chan'; exiting." %
(sta.net, sta.sta))
error = True
for loc in sta.locs():
stations_to_analyse.append("%s.%s.%s" % (sta.net, sta.sta, loc))
if error:
return
all_jobs = []
crap_all_jobs_text = []
updated_days = []
nfs = get_new_files(db)
now = datetime.datetime.utcnow()
for nf in nfs:
tmp = "%s.%s.%s" % (nf.net, nf.sta, nf.loc)
if tmp not in stations_to_analyse:
continue
start, end = nf.starttime.date(), nf.endtime.date()
for date in pd.date_range(start, end, freq="D"):
updated_days.append(date.date())
for jobtype in extra_jobtypes_new_files:
job = {"day": date.date().strftime("%Y-%m-%d"),
"pair": "%s.%s.%s" % (nf.net, nf.sta, nf.loc),
"jobtype": jobtype,
"flag": "T", "lastmod": now}
jobtxt = ''.join(str(x) for x in job.values())
if jobtxt not in crap_all_jobs_text:
all_jobs.append(job)
crap_all_jobs_text.append(jobtxt)
# all_jobs = pd.DataFrame(all_jobs)
# all_jobs.drop_duplicates(inplace=True)
# print(len(all_jobs))
# all_jobs = all_jobs.to_dict()
updated_days = np.asarray(updated_days)
updated_days = np.unique(updated_days)
logger.info('Determining available data for each "updated date"')
count = 0
if len(extra_jobtypes_scan_archive) != 0 or not nocc:
for day in updated_days:
jobs = []
modified = []
available = []
for data in get_data_availability(db, starttime=day, endtime=day+datetime.timedelta(days=1)):
sta = "%s.%s.%s" % (data.net, data.sta, data.loc)
if sta in stations_to_analyse:
available.append(sta)
if data.flag in ["N", "M"]:
modified.append(sta)
modified = np.unique(modified)
available = np.unique(available)
logger.debug("%s: modified: %s | available: %s"% (day, modified,available))
for m in modified:
for a in available:
if (m != a and crosscorr) or (m == a and autocorr):
pair = ':'.join(sorted([m, a]))
if pair not in jobs:
if not nocc:
all_jobs.append({"day": day.strftime("%Y-%m-%d"), "pair": pair,
"jobtype": "CC", "flag": "T",
"lastmod": now})
for jobtype in extra_jobtypes_scan_archive:
all_jobs.append({"day": day.strftime("%Y-%m-%d"), "pair": pair,
"jobtype": jobtype, "flag": "T",
"lastmod": now})
jobs.append(pair)
if init and len(all_jobs) > 1e5:
logger.debug('Already 100.000 jobs, inserting/updating')
massive_insert_job(all_jobs)
all_jobs = []
count += 1e5
else:
logger.debug("skipping the CC jobs creation & the extrajobtype creation")
if len(all_jobs) != 0:
logger.debug('Inserting/Updating %i jobs' % len(all_jobs))
if init:
massive_insert_job(all_jobs)
else:
for job in all_jobs:
update_job(db, job['day'], job['pair'],
job['jobtype'], job['flag'],
commit=False)
db.commit()
count += len(all_jobs)
for sta in get_stations(db, all=True):
mark_data_availability(db, sta.net, sta.sta, flag='A')
db.commit()
logger.info("Inserted %i jobs" % count)
logger.info('*** Finished: New Jobs ***')
return count
if __name__ == "__main__":
main()