Skip to content

Commit

Permalink
Refactor database generation
Browse files Browse the repository at this point in the history
Some basic profiling shows that loading the netcdf files is time-consuming
in generating the database. Formerly, :meth:`database.generateEventTable`
and :class:`database.processEvents` both opened each netCDF file. This was
updated to parse a :class:`netCDF4.Dataset` instance to the separate
functions that extract information for the two methods. This means that
:meth:`database.generateEventTable` has been subsumed into
:meth:`database.processEvents`.
  • Loading branch information
wcarthur committed Jun 9, 2016
1 parent 7fcee0f commit d96702b
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
78 changes: 52 additions & 26 deletions database/__init__.py
Expand Up @@ -17,7 +17,7 @@
selected for more detailed modelling.
:class:`HazardDatabase` is initially intended to be created once, then
queried from subsequent scripts or interactive sessions. By
queried from subsequent scripts or interactive sessions. By
inheriting from the :class:`Singleton` class, this will only ever return
the first instance of :class:`HazardDatabase` created in a session.
Expand All @@ -31,10 +31,10 @@
be stored in the db (?).
- Separate the query functions to separate files.
- Modify :func:`windfieldAttributes` to take a :class:`netcdf4.Dataset`
instance. The instance would be created in
instance. The instance would be created in
:func:`HazardDatabase.processEvent`, where it will also be used to extract
the data. This will result in each file being opened only once. It will also
require :func:`HazardDatabase.generateEventTable` to be merged into
require :func:`HazardDatabase.generateEventTable` to be merged into
:func:`HazardDatabase.processEvents`
Expand Down Expand Up @@ -68,6 +68,9 @@
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())


# pylint: disable=R0914,R0902

# Table definition statements
# Stations - we assume a geographic coordinate system:
TBLLOCATIONDEF = ("CREATE TABLE IF NOT EXISTS tblLocations "
Expand Down Expand Up @@ -133,22 +136,18 @@
# Select locId, locLon & locLat from the subset of locations:
SELECTLOCLONLAT = "SELECT locId, locLon, locLat FROM tblLocations "

def windfieldAttributes(ncfile):
def windfieldAttributes(ncobj):
"""
Extract the required attributes from a netCDF file.
:param str ncfile: Path to a valid netCDF file created by TCRM.
:param str ncobj: :class:`netCDF4.Dataset` instance.
:returns: A tuple containing the track filename, file modification date,
TCRM version, minimum pressure and maximum wind, stored as
global attributes in the netCDF file.
"""
try:
ncobj = Dataset(ncfile, 'r')
except Exception as err:
log.exception("Cannot open file: {0}: {1}".format(ncfile, err.args[0]))
raise

trackfile = getattr(ncobj, 'track_file')
trackfiledate = getattr(ncobj, 'track_file_date')
trackfiledate = datetime.strptime(trackfiledate, '%Y-%m-%d %H:%M:%S')
Expand All @@ -163,7 +162,7 @@ def windfieldAttributes(ncfile):

vmaxobj = ncobj.variables['vmax']
maxwind = getattr(vmaxobj, 'actual_range')[1]
ncobj.close()

return (trackfile, trackfiledate, tcrm_vers, minslp, maxwind)

def HazardDatabase(configFile):
Expand Down Expand Up @@ -301,7 +300,7 @@ def generateEventTable(self):
log.info("Inserting records into tblEvents")

fileList = os.listdir(self.windfieldPath)
fileList = [f for f in fileList if
fileList = [f for f in fileList if
os.path.isfile(pjoin(self.windfieldPath, f))]

pattern = re.compile(r'\d+')
Expand Down Expand Up @@ -348,17 +347,17 @@ def processEvents(self):
os.path.isfile(pjoin(self.windfieldPath, f))]

locations = self.getLocations()
for f in sorted(fileList):
fstat = flGetStat(pjoin(self.windfieldPath, f))
for eventNum, fname in enumerate(sorted(fileList)):
fstat = flGetStat(pjoin(self.windfieldPath, fname))
if (pAlreadyProcessed(fstat[0], fstat[1], 'md5sum', fstat[2]) and
self.excludePastProcessed):
log.debug("Already processed %s", f)
log.debug("Already processed %s", fname)
else:
log.debug("Processing {0}".format(f))
if self.processEvent(f, locations):
pWriteProcessedFile(pjoin(self.windfieldPath, f))
log.debug("Processing {0}".format(fname))
if self.processEvent(fname, locations, eventNum):
pWriteProcessedFile(pjoin(self.windfieldPath, fname))

def loadWindfieldFile(self, filename):
def loadWindfieldFile(self, ncobj):
"""
Load an individual dataset.
Expand All @@ -367,28 +366,55 @@ def loadWindfieldFile(self, filename):
:returns: tuple containing longitude, latitude, wind speed,
eastward and northward components and pressure grids.
"""
ncobj = Dataset(pjoin(self.windfieldPath, filename))
#ncobj = Dataset(pjoin(self.windfieldPath, filename))
lon = ncobj.variables['lon'][:]
lat = ncobj.variables['lat'][:]
vmax = ncobj.variables['vmax'][:]
ua = ncobj.variables['ua'][:]
va = ncobj.variables['va'][:]
pmin = ncobj.variables['slp'][:]
ncobj.close()
#ncobj.close()
return (lon, lat, vmax, ua, va, pmin)

def processEvent(self, filename, locations):
def processEvent(self, filename, locations, eventNum):
"""
Process an individual event file
:param str filename: Name of a file to process.
:param list locations: List of locations to sample data for.
:param int eventNum: Ordered event number.
"""
log.debug("Processing {0}".format(pjoin(self.windfieldPath, filename)))
pattern = re.compile(r'\d+')
sim, num = pattern.findall(filename)
eventId = "%03d-%05d" % (int(sim), int(num))
lon, lat, vmax, ua, va, pmin = self.loadWindfieldFile(filename)
params = list()
ncobj = Dataset(pjoin(self.windfieldPath, filename))

# First perform the event update for tblEvents:
fname = pjoin(self.windfieldPath, filename)
si = os.stat(fname)
dtWindfieldFile = datetime.fromtimestamp(int(si.st_mtime))
trackfile, dtTrackFile, tcrm_version, minslp, maxwind = \
windfieldAttributes(ncobj)

eventparams = ("%06d"%eventNum, eventId, os.path.basename(fname),
trackfile, float(maxwind), float(minslp),
dtTrackFile, dtWindfieldFile, tcrm_version,
"", datetime.now())

try:
self.execute(INSEVENTS, eventparams)
except sqlite3.Error as err:
log.exception("Cannot insert records into tblEvents: {0}".\
format(err.args[0]))
ncobj.close()
return 0
else:
self.commit()

# Perform update for tblWindSpeed:
lon, lat, vmax, ua, va, pmin = self.loadWindfieldFile(ncobj)
ncobj.close()
wsparams = list()

for loc in locations:
locId, locName, locLon, locLat = loc
Expand All @@ -400,10 +426,10 @@ def processEvent(self, filename, locations):
locPr = pmin[j, i]
locParams = (locId, eventId, float(locVm), float(locUa),
float(locVa), float(locPr), " ", datetime.now())
params.append(locParams)
wsparams.append(locParams)

try:
self.executemany(INSWINDSPEED, params)
self.executemany(INSWINDSPEED, wsparams)
except sqlite3.Error as err:
log.exception("Cannot insert records into tblWindSpeed: {0}".\
format(err.args[0]))
Expand Down
1 change: 0 additions & 1 deletion tcrm.py
Expand Up @@ -456,7 +456,6 @@ def doDatabaseUpdate(configFile):

db = database.HazardDatabase(configFile)
db.createDatabase()
db.generateEventTable()
db.setLocations()
db.processEvents()
db.processHazard()
Expand Down

0 comments on commit d96702b

Please sign in to comment.