Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
ingest: don't ingest into temporary registry
Port commits b19a9d99e098d40d9a771986a210a6f5e0d70e6d and
e95da28b66f42889393ae6906d523d70de277761 (from DM-23213 and DM-23342)
from LSST. This allows an open registry (in a long-lived job, e.g., a
python notebook) to immediately see the results of the ingestion.
  • Loading branch information
PaulPrice committed Feb 16, 2021
1 parent e6f6329 commit cdc82a0
Showing 1 changed file with 81 additions and 0 deletions.
81 changes: 81 additions & 0 deletions python/lsst/obs/pfs/ingest.py
@@ -1,5 +1,6 @@
import os
import re
import sqlite3
from functools import partialmethod
import datetime
import dateutil.parser
Expand All @@ -16,13 +17,55 @@
from lsst.pex.config import Field
from lsst.afw.fits import readMetadata
from lsst.obs.pfs.utils import getLamps
import lsst.pipe.tasks.ingest

from pfs.datamodel.pfsConfig import PfsConfig, PfsDesign
from .translator import PfsTranslator

__all__ = ["PfsParseConfig", "PfsParseTask", "PfsIngestTask", "PfsIngestCalibsTask"]


class RegistryContext:
"""Context manager to provide a registry
This implementation, to be monkey-patched over the original, removes the
use of a temporary registry. This was originally done to allow the registry
to be used while ingest is running, but is no longer necessary, and now it
gets in the way of holding onto the registry (e.g., a butler in a
long-running process doesn't see the latest ingestion).
Parameters
----------
registryName : `str`
Name of registry file.
createTableFunc : callable
Function to create tables.
forceCreateTables : `bool`
Force the (re-)creation of tables?
permissions : `int`
Permissions to set on database file.
"""
def __init__(self, registryName, createTableFunc, forceCreateTables, permissions):
"""Construct a context manager"""
haveTable = os.path.exists(registryName)
self.conn = sqlite3.connect(registryName)
os.chmod(registryName, permissions)
if not haveTable or forceCreateTables:
createTableFunc(self.conn)

def __enter__(self):
"""Provide the 'as' value"""
return self.conn

def __exit__(self, excType, excValue, traceback):
self.conn.commit()
self.conn.close()
return False # Don't suppress any exceptions


lsst.pipe.tasks.ingest.RegistryContext = RegistryContext


class PfsParseConfig(ParseConfig):
"""Configuration for PfsParseTask"""
pfsDesignId = Field(dtype=int, default=0x0, doc="Default value for pfsDesignId")
Expand Down Expand Up @@ -356,6 +399,44 @@ def translate_visit0(self, md):


class PfsRegisterTask(RegisterTask):
def addRow(self, conn, info, dryrun=False, create=False, table=None):
"""Add a row to the file table (typically 'raw').
This is a copy from LSST 18.1.0, with the addition from commit
``b19a9d99e098d40d9a771986a210a6f5e0d70e6d``, to modify the database
directly (rather than a copy).
Parameters
----------
conn : context manager
Database connection.
info : `dict`
File properties to add to database.
dryrun : `bool`
Whether to actually write to the database?
create : `bool`
Whether to create teh database table? (Apparently unused here.)
table : `str`
Name of table in database.
"""
if table is None:
table = self.config.table
sql = "INSERT INTO %s (%s) SELECT " % (table, ",".join(self.config.columns))
sql += ",".join([self.placeHolder] * len(self.config.columns))
values = [self.typemap[tt](info[col]) for col, tt in self.config.columns.items()]

if self.config.ignore:
sql += " WHERE NOT EXISTS (SELECT 1 FROM %s WHERE " % table
sql += " AND ".join(["%s=%s" % (col, self.placeHolder) for col in self.config.unique])
sql += ")"
values += [info[col] for col in self.config.unique]

if dryrun:
print("Would execute: '%s' with %s" % (sql, ",".join([str(value) for value in values])))
else:
with conn:
conn.cursor().execute(sql, values)

def addVisits(self, conn, dryrun=False, table=None):
"""Generate the visits table (typically 'raw_visits') from the
file table (typically 'raw').
Expand Down

0 comments on commit cdc82a0

Please sign in to comment.