Skip to content

Commit

Permalink
Merge branch 'dbrefactor' of github.com:GeoscienceAustralia/tcrm into…
Browse files Browse the repository at this point in the history
… dbrefactor

Conflicts:
	database/__init__.py
  • Loading branch information
wcarthur committed Jul 21, 2016
2 parents 868bdaa + a1ab460 commit eaa44f5
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 25 deletions.
47 changes: 39 additions & 8 deletions database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,14 @@ def windfieldAttributes(ncobj):

return (trackfile, trackfiledate, tcrm_vers, minslp, maxwind)

def HazardDatabase(configFile):
def HazardDatabase(configFile): # pylint: disable=C0103
"""
Wrapper function to create (or retrieve existing) instance of
:class:`HazardDatabase`.
A wrapper function to instantiate :class:`_HazardDatabase`. We actually
call :meth:`getInstance` to get a singleton instance of the
:class:`_HazardDatabase`. If one exists already, then that instance is
returned.
:param str configFile: Path to the simulation configuration file.
:param str configFile: Path to configuration file
"""
return _HazardDatabase.getInstance(configFile)
Expand Down Expand Up @@ -393,7 +395,8 @@ def processEvents(self):
if w < len(fileList):
pp.send((fileList[w], locations, w),
destination=d, tag=work_tag)
log.debug("Processing file %d of %d" % (w, len(fileList)))
log.debug("Processing {0} ({1} of {2})".\
format(fileList[w], w, len(fileList)))
w += 1
else:
pp.send(None, destination=d, tag=work_tag)
Expand All @@ -404,9 +407,13 @@ def processEvents(self):
while terminated < p:
result, status = pp.receive(pp.any_source, tag=result_tag,
return_status=True)
log.debug("Processing results from node {0}".\
format(status.source))
eventparams, wsparams = result
self.insertEvents(eventparams)
self.insertWindSpeeds(wsparams)
log.debug("Done inserting records from node {0}".\
format(status.source))

d = status.source
if w < len(fileList):
Expand All @@ -423,13 +430,20 @@ def processEvents(self):
W = pp.receive(source=0, tag=work_tag)
if W is None:
break

log.info("Processing {0} on node {1}".\
format(W[0], pp.rank()))
results = self.processEvent(*W)
pp.send(results, destination=0, tag=work_tag)
log.debug("Results received on node {0}".format(pp.rank()))
pp.send(results, destination=0, tag=result_tag)

elif pp.size() == 1 and pp.rank() == 0:
# Assume no Pypar:
locations = self.getLocations()
for eventNum, filename in enumerate(fileList):
log.debug("Processing {0} ({1} of {2})".format(filename,
eventNum,
len(fileList)))
result = self.processEvent(filename, locations, eventNum)
eventparams, wsparams = result
self.insertEvents(eventparams)
Expand Down Expand Up @@ -465,10 +479,16 @@ def processEvent(self, filename, locations, eventNum):
pattern = re.compile(r'\d+')
sim, num = pattern.findall(filename)
eventId = "%03d-%05d" % (int(sim), int(num))
ncobj = Dataset(pjoin(self.windfieldPath, filename))
log.debug("Event ID: {0}".format(eventId))
try:
ncobj = Dataset(pjoin(self.windfieldPath, filename))
except:
log.warn("Cannot open {0}".\
format(pjoin(self.windfieldPath, filename)))

# First perform the event update for tblEvents:
fname = pjoin(self.windfieldPath, filename)
log.debug("Filename: {0}".format(fname))
si = os.stat(fname)
dtWindfieldFile = datetime.fromtimestamp(int(si.st_mtime))
trackfile, dtTrackFile, tcrm_version, minslp, maxwind = \
Expand Down Expand Up @@ -496,7 +516,8 @@ def processEvent(self, filename, locations, eventNum):
float(locVa), float(locPr), " ", datetime.now())
wsparams.append(locParams)

return eventparams, wsparams
log.debug("Finished extracting data from {0}".format(filename))
return (eventparams, wsparams,)

@disableOnWorkers
def processHazard(self):
Expand Down Expand Up @@ -691,13 +712,23 @@ def run(configFile):
pp = attemptParallel()

db = HazardDatabase(configFile)

db.createDatabase()
db.setLocations()

pp.barrier()
db.processEvents()
pp.barrier()

db.processHazard()

pp.barrier()
db.processTracks()
pp.barrier()

db.close()
log.info("Created and populated database")
log.info("Finished running database creation")

@disableOnWorkers
def buildLocationDatabase(location_db, location_file, location_type='AWS'):
Expand Down
18 changes: 1 addition & 17 deletions tcrm.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,23 +443,7 @@ def doDatabaseUpdate(configFile):
log.info("Creating hazard database")
import database
database.run(configFile)
#config = ConfigParser()
#config.read(configFile)


#outputPath = config.get('Output', 'Path')
#location_db = pjoin(outputPath, 'locations.db')
#if not os.path.exists(location_db):
# location_file = config.get('Input', 'LocationFile')
# database.buildLocationDatabase(location_db, location_file)

#db = database.HazardDatabase(configFile)
#db.createDatabase()
#db.setLocations()
#db.processEvents()
#db.processHazard()
#db.processTracks()
#db.close()

log.info("Created and populated database")


Expand Down

0 comments on commit eaa44f5

Please sign in to comment.