Skip to content

Commit

Permalink
Import support for misc tarballs. (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
ketiltrout authored Mar 12, 2020
1 parent c5559de commit 2796cfd
Showing 1 changed file with 67 additions and 6 deletions.
73 changes: 67 additions & 6 deletions alpenhorn/auto_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import peewee as pw
import numpy as np
import tarfile
import h5py
import json

from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
Expand All @@ -32,7 +34,7 @@
log = logger.get_log()

# File to use for caching files already imported
LOCAL_IMPORT_RECORD = "/etc/alpenhornd_import.dat" # default path
LOCAL_IMPORT_RECORD = "/var/lib/alpenhorn/alpenhornd_import.dat" # default path

if "ALPENHORN_IMPORT_RECORD" in os.environ:
LOCAL_IMPORT_RECORD = os.environ["ALPENHORN_IMPORT_RECORD"]
Expand Down Expand Up @@ -323,6 +325,57 @@ def get_fileflaginputinfo_keywords_from_h5(path):
return {"start_time": start_time, "finish_time": finish_time}


def get_miscfile_data(path):
"""Get metadata for a misc-type tarball by reading the METADATA.json file.
"""

serial_number, data_type = di.util.parse_miscfile_name(os.path.basename(path))
start_time = None
finish_time = None

with tarfile.open(name=path, mode="r") as f:
try:
metadata = json.loads(f.extractfile(f.getmember("./METADATA.json")).read())
if "start_time" in metadata:
try:
start_time = calendar.timegm(
datetime.datetime.strptime(
metadata["start_time"], "%Y%m%dT%H%M%SZ"
).utctimetuple()
)
del metadata["start_time"]
except ValueError:
log.warning(
"Invalid start_time in misc tarball metadata: {0}".format(
metadata["start_time"]
)
)

if "finish_time" in metadata:
try:
finish_time = calendar.timegm(
datetime.datetime.strptime(
metadata["finish_time"], "%Y%m%dT%H%M%SZ"
).utctimetuple()
)
del metadata["finish_time"]
except ValueError:
log.warning(
"Invalid finish_time in misc tarball metadata: {0}".format(
metadata["finish_time"]
)
)

except KeyError:
metadata = None
return {
"start_time": start_time,
"finish_time": finish_time,
"data_type": data_type,
"metadata": metadata,
}


def get_filerawinfo_keywords(rawinfo, size_b, file_name):
chunk_num = int(file_name[: file_name.find(".")])
log.debug("Rawinfo: %d %d %d" % (size_b, rawinfo.nframe, rawinfo.packet_len))
Expand All @@ -344,7 +397,7 @@ def import_file(node, root, acq_name, file_name):
done = True
except pw.OperationalError:
log.error(
"MySQL connexion dropped. Will attempt to reconnect in " "five seconds."
"MySQL connexion dropped. Will attempt to reconnect in five seconds."
)
time.sleep(5)
db.connect(read_write=True, reconnect=True)
Expand Down Expand Up @@ -522,7 +575,7 @@ def _import_file(node, root, acq_name, file_name):
log.info(
'Added information for file "%s/%s" to DB.' % (acq_name, file_name)
)
if ftype.name == "hk":
elif ftype.name == "hk":
# Add if (1) there is no hkinfo or (2) the hkinfo is missing.
if not file.hkinfos.count():
try:
Expand Down Expand Up @@ -554,7 +607,7 @@ def _import_file(node, root, acq_name, file_name):
log.info(
'Added information for file "%s/%s" to DB.' % (acq_name, file_name)
)
if ftype.name == "weather":
elif ftype.name == "weather":
# Add if (1) there is no weatherinfo or (2) the weatherinfo is missing.
if not file.weatherinfos.count():
# try:
Expand Down Expand Up @@ -585,7 +638,7 @@ def _import_file(node, root, acq_name, file_name):
'Added information for file "%s/%s" to DB.' % (acq_name, file_name)
)

if ftype.name == "rawadc":
elif ftype.name == "rawadc":
# Add if there is no rawadcinfo
if not file.rawadcinfos.count():
try:
Expand All @@ -603,7 +656,7 @@ def _import_file(node, root, acq_name, file_name):
% (acq_name, file_name)
)

if ftype.name == "hkp":
elif ftype.name == "hkp":
# Add if there is no hkpinfo
if not file.hkpinfos.count():
try:
Expand Down Expand Up @@ -669,6 +722,14 @@ def _import_file(node, root, acq_name, file_name):
% (acq_name, file_name)
)

elif atype == "misc" and ftype.name == "miscellaneous":
with db.proxy.atomic():
if not file.miscfileinfos.count():
di.MiscFileInfo.create(file=file, **get_miscfile_data(fullpath))
log.info(
'Added information for file "%s/%s" to DB.' % (acq_name, file_name)
)

if import_done is not None:
bisect.insort_left(import_done, fullpath)
with open(LOCAL_IMPORT_RECORD, "w") as fp:
Expand Down

0 comments on commit 2796cfd

Please sign in to comment.