Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start_with_unprocessed option to auto-pipeline #278

Merged
merged 2 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ repos:
# E902 - IOError
# F822: undefined name in __all__
# F823: local variable name referenced before assignment
- repo: https://gitlab.com/pycqa/flake8
- repo: https://github.com/pycqa/flake8
rev: 3.7.9
hooks:
- id: flake8
Expand Down
4 changes: 3 additions & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ resources:
repositories:
- repository: OpenAstronomy
type: github
endpoint: i4Ds
endpoint: samaloney
name: OpenAstronomy/azure-pipelines-templates
ref: master

Expand All @@ -31,6 +31,8 @@ trigger:
jobs:
- template: run-tox-env.yml@OpenAstronomy
parameters:
toxverspec: <4
toxdeps: tox-pypi-filter
submodules: true
coverage: codecov
envs:
Expand Down
22 changes: 18 additions & 4 deletions docs/pipelineconfiguration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,32 @@ SETUP - Pipeline as systemd service

`sudo systemctl start stix-pipeline.service`

5: To check the status of the pipeline service
* To check the status of the pipeline service

`sudo systemctl status stix-pipeline.service`

6: To enable the service on every reboot
* To enable the service on every reboot

`sudo systemctl enable stix-pipeline.service`

7: To disable the service on every reboot
* To disable the service on every reboot

`sudo systemctl disable stix-pipeline.service`

8: to get/request detailed processing data of the running service you can use a local endpoint
* to get/request detailed processing data of the running service you can use a local endpoint

`(venv) stixcore@pub099:~/STIXCore$ stix-pipeline-status -h`

Startup Behaviour
^^^^^^^^^^^^^^^^^

By default the service starts (restart after booting/error) with a search for unprocessed TM files.
This can be disabled with the config `start_with_unprocessed` parameter.

You might toggle the parameter only for manually restarting the service after you have (re)processed some/all TM data in a batch mode. This would allow for a transition from reprocess all at one to daily mode again.

```
[Pipeline]
start_with_unprocessed = False

```
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ python_requires = >=3.7
setup_requires = setuptools_scm
install_requires =
sunpy>=3.0.1
numpy
spiceypy
bitstring
roentgen
Expand Down
39 changes: 34 additions & 5 deletions stixcore/processing/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import io
import os
import re
import sys
import time
Expand Down Expand Up @@ -49,7 +50,7 @@ class GFTSFileHandler(FileSystemEventHandler):
the file back to the original name `myfile.xml`. Can detect file move event that match the TM
filename pattern.
"""
def __init__(self, func, regex, name="name", **args):
def __init__(self, func, regex, *, name="name", **args):
"""

Parameters
Expand All @@ -73,6 +74,11 @@ def __init__(self, func, regex, name="name", **args):
self.lp = threading.Thread(target=self.process)
self.lp.start()

def add_to_queue(self, initlist):
if initlist:
for p in initlist:
self.queue.put(p)

@poll_decorator(step=1, poll_forever=True)
def process(self):
"""Worker function to process the queue of detected files."""
Expand Down Expand Up @@ -323,31 +329,54 @@ def status_server(self):
connection.close()


def search_unprocessed_tm_files(logging_dir, tm_dir):

unprocessed_tm_files = list()
list_of_log_files = logging_dir.glob('*.out')
latest_log_file = max(list_of_log_files, key=os.path.getmtime)
print(f"{latest_log_file}: {latest_log_file.stat().st_mtime}")
tm_file = Path(tm_dir / str(latest_log_file.name)[0:-4])
if tm_file.exists():
ftime = tm_file.stat().st_mtime
for tmf in tm_dir.glob("*.xml"):
if TM_REGEX.match(tmf.name) and tmf.stat().st_mtime > ftime:
unprocessed_tm_files.append(tmf)
return unprocessed_tm_files


def main():
log_dir = Path(CONFIG.get('Pipeline', 'log_dir'))
log_dir.mkdir(parents=True, exist_ok=True)

time.perf_counter()
observer = Observer()
tmpath = Path(CONFIG.get('Paths', 'tm_archive'))
soop_path = Path(CONFIG.get('Paths', 'soop_files'))
spm = SpiceKernelManager(Path(CONFIG.get("Paths", "spice_kernels")))
Spice.instance = Spice(spm.get_latest_mk())

logging_handler = LoggingEventHandler(logger=logger)
tm_handler = GFTSFileHandler(process_tm, TM_REGEX, name="tm_xml", spm=spm)

soop_manager = SOOPManager(soop_path)
soop_handler = GFTSFileHandler(soop_manager.add_soop_file_to_index,
SOOPManager.SOOP_FILE_REGEX, name="soop")
SOOPManager.instance = soop_manager

tm_handler = GFTSFileHandler(process_tm, TM_REGEX, name="tm_xml", spm=spm)
PipelineStatus.instance = PipelineStatus(tm_handler)
if CONFIG.getboolean('Pipeline', 'start_with_unprocessed', fallback=True):
logger.info("Searching for unprocessed tm files")
unprocessed_tm_files = search_unprocessed_tm_files(log_dir, tmpath)
if unprocessed_tm_files:
fl = '\n '.join([f.name for f in unprocessed_tm_files])
logger.info(f"Found unprocessed tm files: \n {fl}\nadding to queue.")
tm_handler.add_to_queue(unprocessed_tm_files)
else:
logger.info("Skipping search for unprocessed tm files")

observer.schedule(soop_handler, soop_manager.data_root, recursive=False)
observer.schedule(logging_handler, tmpath, recursive=True)
observer.schedule(tm_handler, tmpath, recursive=True)

PipelineStatus.instance = PipelineStatus(tm_handler)

observer.start()
try:
while True:
Expand Down
145 changes: 85 additions & 60 deletions stixcore/products/level0/scienceL0.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,19 +298,25 @@ def from_levelb(cls, levelb, parent=''):
data.add_basic(name='rcr', nix='NIX00401', attr='value', packets=packets, dtype=np.ubyte)
data.add_basic(name='num_pixel_sets', nix='NIX00442', attr='value', packets=packets,
dtype=np.ubyte)

pixel_mask_ints = packets.get_value('NIXD0407')
pixel_indices = PIXEL_MASK_LOOKUP[pixel_mask_ints]
start = 0
end = 0
res = []
for npx in data['num_pixel_sets']:
end += npx
cur_pm = pixel_indices[start:end].astype(int).tolist()
full_pm = np.full((12), 0, dtype=np.ubyte)
full_pm[cur_pm] = 1
res.append(full_pm)
start = end
pixel_masks = np.array(res, dtype=np.uint8)
if cls is CompressedPixelData:
pixel_indices = PIXEL_MASK_LOOKUP[pixel_mask_ints]
start = 0
end = 0
res = []
for npx in data['num_pixel_sets']:
end += npx
cur_pm = pixel_indices[start:end].astype(int).tolist()
full_pm = np.full((12), 0, dtype=np.ubyte)
full_pm[cur_pm] = 1
res.append(full_pm)
start = end
pixel_masks = np.array(res, dtype=np.uint8)
elif cls is SummedPixelData:
pixel_masks = np.array([list(map(int, format(pm, '012b')))[::-1]
for pm in pixel_mask_ints])
pixel_masks = pixel_masks.astype(np.uint8).reshape(-1, data['num_pixel_sets'][0], 12)
nicHoch marked this conversation as resolved.
Show resolved Hide resolved
param = packets.get('NIXD0407')[0]
pixel_meta = {'NIXS': 'NIXD0407', 'PCF_CURTX': param.idb_info.PCF_CURTX}
data.add_data('pixel_masks', (pixel_masks, pixel_meta))
Expand Down Expand Up @@ -338,49 +344,57 @@ def from_levelb(cls, levelb, parent=''):
counts_flat = np.array(packets.get_value('NIX00260'))
counts_var_flat = np.array(packets.get_value('NIX00260', attr='error'))

n_detectors = data['detector_masks'][0].sum()
start = 0
end = 0
counts = []
counts_var = []
pixel_mask_index = -1
for i, nc in enumerate(tmp['num_data_elements']):
if i % unique_energies_low.size == 0:
pixel_mask_index += 1
end += nc
cur_counts = counts_flat[start:end].reshape(n_detectors, -1)
cur_counts_var = counts_var_flat[start:end].reshape(n_detectors, -1)
if cur_counts.shape[1] != 12:
full_counts = np.zeros((n_detectors, 12))
full_counts_var = np.zeros((n_detectors, 12))
pix_m = data['pixel_masks'][pixel_mask_index].astype(bool)
# Sometimes the chanage in pixel mask is reflected in the mask before the actual
# count data so try the correct pixel mask but if this fails user most recent
# matching value
try:
full_counts[:, pix_m] = cur_counts
full_counts_var[:, pix_m] = cur_counts_var
except ValueError:
last_match_index = np.where(data['pixel_masks'].sum(axis=1)
== cur_counts.shape[1])
pix_m = data['pixel_masks'][last_match_index[0][-1]].astype(bool)
full_counts[:, pix_m] = cur_counts
full_counts_var[:, pix_m] = cur_counts_var

counts.append(full_counts)
counts_var.append(full_counts_var)
else:
counts.append(cur_counts)
counts_var.append(cur_counts_var)
start = end

counts = np.array(counts).reshape(unique_times.size, unique_energies_low.size,
data['detector_masks'].sum(axis=1).max(),
12)
counts_var = np.array(counts_var).reshape(unique_times.size, unique_energies_low.size,
data['detector_masks'].sum(axis=1).max(),
12)

if cls is CompressedPixelData:
n_detectors = data['detector_masks'][0].sum()
start = 0
end = 0
counts = []
counts_var = []
pixel_mask_index = -1
for i, nc in enumerate(tmp['num_data_elements']):
if i % unique_energies_low.size == 0:
pixel_mask_index += 1
end += nc
cur_counts = counts_flat[start:end].reshape(n_detectors, -1)
cur_counts_var = counts_var_flat[start:end].reshape(n_detectors, -1)
if cur_counts.shape[1] != 12:
full_counts = np.zeros((n_detectors, 12))
full_counts_var = np.zeros((n_detectors, 12))
pix_m = data['pixel_masks'][pixel_mask_index].astype(bool)
# Sometimes the chanage in pixel mask is reflected in the mask before the actual
# count data so try the correct pixel mask but if this fails user most recent
# matching value
try:
full_counts[:, pix_m] = cur_counts
full_counts_var[:, pix_m] = cur_counts_var
except ValueError:
last_match_index = np.where(data['pixel_masks'].sum(axis=1)
== cur_counts.shape[1])
pix_m = data['pixel_masks'][last_match_index[0][-1]].astype(bool)
full_counts[:, pix_m] = cur_counts
full_counts_var[:, pix_m] = cur_counts_var

counts.append(full_counts)
counts_var.append(full_counts_var)
else:
counts.append(cur_counts)
counts_var.append(cur_counts_var)
start = end

counts = np.array(counts).reshape(unique_times.size, unique_energies_low.size,
data['detector_masks'].sum(axis=1).max(),
12)
counts_var = np.array(counts_var).reshape(unique_times.size, unique_energies_low.size,
data['detector_masks'].sum(axis=1).max(),
12)
elif cls is SummedPixelData:
counts = counts_flat.reshape(unique_times.size, unique_energies_low.size,
data['detector_masks'].sum(axis=1).max(),
data['num_pixel_sets'][0].sum())

counts_var = counts_var_flat.reshape(unique_times.size, unique_energies_low.size,
data['detector_masks'].sum(axis=1).max(),
data['num_pixel_sets'][0].sum())
# t x e x d x p -> t x d x p x e
counts = counts.transpose((0, 2, 3, 1))

Expand Down Expand Up @@ -431,18 +445,29 @@ def from_levelb(cls, levelb, parent=''):
energy_indices = np.full(32, True)
energy_indices[[0, -1]] = False

ix = np.ix_(np.full(unique_times.size, True), data['detector_masks'][0].astype(bool),
np.ones(12, dtype=bool), np.full(32, True))
if cls is CompressedPixelData:
ix = np.ix_(np.full(unique_times.size, True),
data['detector_masks'][0].astype(bool),
np.ones(12, dtype=bool), np.full(32, True))
elif cls is SummedPixelData:
ix = np.ix_(np.full(unique_times.size, True),
data['detector_masks'][0].astype(bool),
np.ones(4, dtype=bool), np.full(32, True))

out_counts[ix] = rebinned_counts
out_var[ix] = rebinned_counts_var
else:
energy_indices = np.full(32, False)
energy_indices[unique_energies_low.min():unique_energies_high.max() + 1] = True

ix = np.ix_(np.full(unique_times.size, True),
data['detector_masks'][0].astype(bool),
np.ones(12, dtype=bool), energy_indices)
if cls is CompressedPixelData:
ix = np.ix_(np.full(unique_times.size, True),
data['detector_masks'][0].astype(bool),
np.ones(12, dtype=bool), energy_indices)
elif cls is SummedPixelData:
ix = np.ix_(np.full(unique_times.size, True),
data['detector_masks'][0].astype(bool),
np.ones(4, dtype=bool), energy_indices)

out_counts[ix] = counts
out_var[ix] = counts_var
Expand Down
8 changes: 4 additions & 4 deletions stixcore/time/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,8 @@ def from_float(cls, scet_float):

@classmethod
def from_btime(cls, btime):
coarse = btime >> np.int(16)
fine = btime - (coarse << np.int(16))
coarse = btime >> 16
fine = btime - (coarse << 16)
return SCETime(coarse=coarse, fine=fine)

@classmethod
Expand Down Expand Up @@ -594,8 +594,8 @@ def from_btime(cls, btime):
neg_idx = np.where(btime < 0)
btime[neg_idx] = abs(btime[neg_idx])

coarse = btime >> np.int(16)
fine = btime - (coarse << np.int(16))
coarse = btime >> 16
fine = btime - (coarse << 16)
td = cls(coarse, fine)

td[neg_idx] = -td[neg_idx]
Expand Down
2 changes: 1 addition & 1 deletion stixcore/util/scripts/end2end_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def end2end_pipeline(indir, fitsdir):
level=logging.INFO)

rebuild_end2end(files, splits=1, outdir=datapath,
socdir=Path("/data/stix/SOLSOC/from_edds/tm/"))
socdir=Path("/data/stix/SOLSOC/from_edds/tm/incomming/"))

if zippath.parent.exists() and datapath.exists():
zipcmd = f"zip -r -j - {str(datapath)} > {str(zippath)}"
Expand Down