Skip to content

Commit

Permalink
Full parallelisation of database
Browse files Browse the repository at this point in the history
  • Loading branch information
wcarthur committed Jun 28, 2016
1 parent b4f58a4 commit a1ab460
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 34 deletions.
73 changes: 56 additions & 17 deletions database/__init__.py
Expand Up @@ -161,7 +161,16 @@ def windfieldAttributes(ncobj):

return (trackfile, trackfiledate, tcrm_vers, minslp, maxwind)

def HazardDatabase(configFile):
def HazardDatabase(configFile): # pylint: disable=C0103
"""
A wrapper function to instantiate :class:`_HazardDatabase`. We actually
call :meth:`getInstance` to get a singleton instance of the
:class:`_HazardDatabase`. If one exists already, then that instance is
returned.
:param str configFile: Path to configuration file
"""
return _HazardDatabase.getInstance(configFile)

class _HazardDatabase(sqlite3.Connection, Singleton):
Expand Down Expand Up @@ -214,7 +223,7 @@ def createDatabase(self):
self.exists = True
self.commit()
return

@disableOnWorkers
def createTable(self, tblName, tblDef):
"""
Expand Down Expand Up @@ -384,7 +393,7 @@ def parallelProcessEvents(self):
fileList = os.listdir(self.windfieldPath)
fileList = [f for f in fileList if
os.path.isfile(pjoin(self.windfieldPath, f))]

work_tag = 0
result_tag = 1
if (pp.rank() == 0) and (pp.size() > 1):
Expand All @@ -393,43 +402,55 @@ def parallelProcessEvents(self):
p = pp.size() - 1
for d in range(1, pp.size()):
if w < len(fileList):
pp.send((fileList[w], locations, w), destination=d, tag=work_tag)
log.debug("Processing file %d of %d" % (w, len(fileList)))
pp.send((fileList[w], locations, w),
destination=d, tag=work_tag)
log.debug("Processing {0} ({1} of {2})".\
format(fileList[w], w, len(fileList)))
w += 1
else:
pp.send(None, destination=d, tag=work_tag)
p = w

terminated = 0
while(terminated < p):

while terminated < p:
result, status = pp.receive(pp.any_source, tag=result_tag,
return_status=True)
log.debug("Processing results from node {0}".\
format(status.source))
eventparams, wsparams = result
self.insertEvents(eventparams)
self.insertWindSpeeds(wsparams)
log.debug("Done inserting records from node {0}".\
format(status.source))

d = status.source
if w < len(fileList):
pp.send((fileList[w], locations, w), destination=d, tag=work_tag)
pp.send((fileList[w], locations, w),
destination=d, tag=work_tag)
log.debug("Processing file %d of %d" % (w, len(fileList)))
w += 1
else:
pp.send(None, destination=d, tag=work_tag)
terminated += 1

elif (pp.size() > 1) and (pp.rank() !=0):
while(True):
W = pp.receieve(source=0, tag=work_tag)
elif (pp.size() > 1) and (pp.rank() != 0):
while True:
W = pp.receive(source=0, tag=work_tag)
if W is None:
break

log.debug("Processing {0} on node {1}".\
format(W[0], pp.rank()))
results = self.processEvent(*W)
pp.send(results, destination=0, tag=work_tag)
log.debug("Results received on node {0}".format(pp.rank()))
pp.send(results, destination=0, tag=result_tag)

elif pp.size() == 1 and pp.rank() == 0:
# Assume no Pypar:
locations = self.getLocations()
for eventNum, filename in enumerate(fileList):
log.debug("Processing {0} ({1} of {2})".format(filename, eventNum, len(fileList)))
result = self.processEvent(filename, locations, eventNum)
eventparams, wsparams = result
self.insertEvents(eventparams)
Expand Down Expand Up @@ -466,10 +487,16 @@ def processEvent(self, filename, locations, eventNum):
pattern = re.compile(r'\d+')
sim, num = pattern.findall(filename)
eventId = "%03d-%05d" % (int(sim), int(num))
ncobj = Dataset(pjoin(self.windfieldPath, filename))
log.debug("Event ID: {0}".format(eventId))
try:
ncobj = Dataset(pjoin(self.windfieldPath, filename))
except:
log.warn("Cannot open {0}".\
format(pjoin(self.windfieldPath, filename)))

# First perform the event update for tblEvents:
fname = pjoin(self.windfieldPath, filename)
log.debug("Filename: {0}".format(fname))
si = os.stat(fname)
dtWindfieldFile = datetime.fromtimestamp(int(si.st_mtime))
trackfile, dtTrackFile, tcrm_version, minslp, maxwind = \
Expand Down Expand Up @@ -497,7 +524,8 @@ def processEvent(self, filename, locations, eventNum):
float(locVa), float(locPr), " ", datetime.now())
wsparams.append(locParams)

return eventparams, wsparams
log.debug("Finished extracting data from {0}".format(filename))
return (eventparams, wsparams,)

@disableOnWorkers
def processHazard(self):
Expand Down Expand Up @@ -562,6 +590,7 @@ def processHazard(self):
else:
self.commit()

@disableOnWorkers
def processTracks(self):
"""
Populate tblTracks with the details of tracks and their proximity to
Expand Down Expand Up @@ -618,15 +647,25 @@ def run(configFile):
pp = attemptParallel()

db = HazardDatabase(configFile)

db.createDatabase()
db.setLocations()

pp.barrier()

db.parallelProcessEvents()

pp.barrier()

db.processHazard()
db.processTracks()

pp.barrier()

db.close()
log.info("Created and populated database")

log.info("Finished running database creation")

@disableOnWorkers
def buildLocationDatabase(location_db, location_file, location_type='AWS'):
"""
Build a database of locations, using a point shape file of the locations.
Expand Down
18 changes: 1 addition & 17 deletions tcrm.py
Expand Up @@ -443,23 +443,7 @@ def doDatabaseUpdate(configFile):
log.info("Creating hazard database")
import database
database.run(configFile)
#config = ConfigParser()
#config.read(configFile)


#outputPath = config.get('Output', 'Path')
#location_db = pjoin(outputPath, 'locations.db')
#if not os.path.exists(location_db):
# location_file = config.get('Input', 'LocationFile')
# database.buildLocationDatabase(location_db, location_file)

#db = database.HazardDatabase(configFile)
#db.createDatabase()
#db.setLocations()
#db.processEvents()
#db.processHazard()
#db.processTracks()
#db.close()

log.info("Created and populated database")


Expand Down

0 comments on commit a1ab460

Please sign in to comment.