Skip to content

Commit

Permalink
Track processed files
Browse files Browse the repository at this point in the history
Record the files that are processed for insertion into the database. If
files are recorded as processed (based on last modified date and/or
md5sum), then they are not re-processed.
  • Loading branch information
wcarthur committed Sep 1, 2015
1 parent fb85eba commit ec02921
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 64 deletions.
39 changes: 14 additions & 25 deletions Utilities/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@
"""

import os
from os.path import join as pjoin
import logging

from Utilities.files import flGetStat, flModDate

global GLOBAL_DATFILE
global GLOBAL_PROCFILES
global GLOBAL_ARCHDIR
global GLOBAL_TIMESTAMP
global GLOBAL_DATEFMT
LOGGER = logging.getLogger()

GLOBAL_DATFILE = None
GLOBAL_PROCFILES = {}
GLOBAL_ARCHDIR = ''
GLOBAL_DATEFMT = '%Y%m%d%H%M'
Expand All @@ -41,7 +38,6 @@ def pSetProcessedEntry(directory, filename, attribute, value):
"""

global GLOBAL_DATFILE
global GLOBAL_PROCFILES
if directory in GLOBAL_PROCFILES:
if filename in GLOBAL_PROCFILES[directory]:
Expand All @@ -67,8 +63,6 @@ def pGetProcessedEntry(directory, filename, attribute):
"""

global GLOBAL_DATFILE
global GLOBAL_PROCFILES
try:
value = GLOBAL_PROCFILES[directory][filename][attribute]
rc = value
Expand All @@ -91,16 +85,14 @@ def pGetProcessedFiles(datFileName=None):
"""

global GLOBAL_DATFILE
global GLOBAL_PROCFILES
rc = 0
if datFileName:
GLOBAL_DATFILE = datFileName
try:
fh = open(datFileName)

except IOError:
LOGGER.warn("Couldn't open dat file %s",
(datFileName))
LOGGER.warn("Couldn't open dat file %s", datFileName)
return rc
else:
LOGGER.debug("Getting previously-processed files from %s",
Expand All @@ -117,8 +109,8 @@ def pGetProcessedFiles(datFileName=None):

else:
LOGGER.info("No dat file name provided - all files will be processed")

return rc

return rc

def pWriteProcessedFile(filename):
Expand All @@ -134,14 +126,13 @@ def pWriteProcessedFile(filename):
"""
global GLOBAL_DATFILE
global GLOBAL_PROCFILES
rc = 0
if GLOBAL_DATFILE:
directory, fname, md5sum, moddate = flGetStat(filename)
try:
fh = open(GLOBAL_DATFILE, 'a')
except IOError:
LOGGER.info("Cannot open %s"%(GLOBAL_DATFILE))
LOGGER.info("Cannot open %s", GLOBAL_DATFILE)

else:
pSetProcessedEntry(directory, fname, 'md5sum', md5sum)
Expand All @@ -150,7 +141,8 @@ def pWriteProcessedFile(filename):
fh.close()
rc = 1
else:
LOGGER.warn("Dat file name not provided. Can't record %s as processed."%(filename))
LOGGER.warn(("Dat file name not provided. "
"Can't record %s as processed."), filename)

return rc

Expand All @@ -165,13 +157,11 @@ def pDeleteDatFile():
"""

global GLOBAL_DATFILE
global GLOBAL_PROCFILES
rc = 0
if os.unlink(GLOBAL_DATFILE):
rc = 1
else:
LOGGER.warn("Cannot remove dat file %s"%(GLOBAL_DATFILE))
LOGGER.warn("Cannot remove dat file %s", GLOBAL_DATFILE)
return rc

def pAlreadyProcessed(directory, filename, attribute, value):
Expand All @@ -190,7 +180,6 @@ def pAlreadyProcessed(directory, filename, attribute, value):
"""
global GLOBAL_DATFILE
global GLOBAL_PROCFILES
rc = False
if pGetProcessedEntry(directory, filename, attribute) == value:
rc = True
Expand Down Expand Up @@ -220,7 +209,7 @@ def pArchiveDir(archive_dir=None):
try:
os.makedirs(GLOBAL_ARCHDIR)
except:
LOGGER.exception("Cannot create %s"%(GLOBAL_ARCHDIR))
LOGGER.exception("Cannot create %s", GLOBAL_ARCHDIR)
raise OSError

rc = GLOBAL_ARCHDIR
Expand Down Expand Up @@ -279,10 +268,10 @@ def pMoveFile(origin, destination):
try:
os.rename(origin, destination)
except OSError:
LOGGER.warn("Error moving %s to %s"%(origin, destination))
LOGGER.warn("Error moving %s to %s", origin, destination)
rc = 0
else:
LOGGER.debug("%s moved to %s"%(origin, destination))
LOGGER.debug("%s moved to %s", origin, destination)
rc = 1

return rc
Expand All @@ -309,14 +298,14 @@ def pArchiveFile(filename):
try:
os.makedirs(archive_dir)
except OSError:
LOGGER.critcal("Cannot create %s"%(archive_dir))
LOGGER.critcal("Cannot create %s", archive_dir)
raise

if pArchiveTimestamp():
archive_date = flModDate(filename, GLOBAL_DATEFMT)
archive_file_name = os.path.join(archive_dir, "%s.%s.%s"%(base, archive_date, ext))
archive_file_name = pjoin(archive_dir, "%s.%s.%s"%(base, archive_date, ext))
else:
archive_file_name = os.path.join(archive_dir, "%s.%s"%(base, ext))
archive_file_name = pjoin(archive_dir, "%s.%s"%(base, ext))

rc = pMoveFile(filename, archive_file_name)
return rc
117 changes: 78 additions & 39 deletions database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@
import numpy as np

from Utilities.config import ConfigParser
from Utilities.files import flModDate
from Utilities.files import flModDate, flGetStat
from Utilities.maputils import find_index
from Utilities.loadData import loadTrackFile
from Utilities.track import loadTracksFromFiles

from Utilities.process import pAlreadyProcessed, pWriteProcessedFile, \
pGetProcessedFiles
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())

Expand Down Expand Up @@ -96,13 +97,16 @@

# Insert statements:
# Insert locations:
INSLOCATIONS = "INSERT INTO tblLocations VALUES (?,?,?,?,?,?,?,?,?,?,?)"
INSLOCATIONS = ("INSERT OR REPLACE INTO tblLocations "
"VALUES (?,?,?,?,?,?,?,?,?,?,?)")

# Insert event record:
INSEVENTS = "INSERT INTO tblEvents VALUES (?,?,?,?,?,?,?,?,?,?,?)"

# Insert wind speed record:
INSWINDSPEED = "INSERT INTO tblWindSpeed VALUES (?,?,?,?,?,?,?,?)"
INSWINDSPEED = ("INSERT INTO tblWindSpeed (locId, eventId, wspd, "
"umax, vmax, pmin, Comments, dtCreated) "
"VALUES (?,?,?,?,?,?,?,?)")

# Insert hazard record:
INSHAZARD = "INSERT INTO tblHazard VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"
Expand Down Expand Up @@ -181,6 +185,11 @@ def __init__(self, configFile):
self.domain = config.geteval('Region', 'gridLimit')
self.hazardDB = pjoin(self.outputPath, 'hazard.db')
self.locationDB = pjoin(self.outputPath, 'locations.db')
self.datfile = config.get('Process', 'DatFile')
self.excludePastProcessed = config.getboolean('Process',
'ExcludePastProcessed')

rc = pGetProcessedFiles(self.datfile)

sqlite3.Connection.__init__(self, self.hazardDB,
detect_types=PARSE_DECLTYPES|PARSE_COLNAMES)
Expand Down Expand Up @@ -228,7 +237,11 @@ def setLocations(self):
"""
Populate _tblLocations_ in the hazard database with all
locations from the default locations database that lie
within the simulation domain.
within the simulation domain. If the table exists and is
populated, the records will be updated and any new records
inserted.
"""

Expand Down Expand Up @@ -318,55 +331,79 @@ def processEvents(self):
_tblWindSpeed_.
"""

log.info("Inserting records into tblWindSpeed")
fileList = os.listdir(self.windfieldPath)
fileList = [f for f in fileList if os.path.isfile(f)]

fileList = [f for f in fileList if
os.path.isfile(pjoin(self.windfieldPath, f))]

locations = self.getLocations()
pattern = re.compile(r'\d+')
for n, f in enumerate(sorted(fileList)):
log.debug("Processing {0}".format(f))
sim, num = pattern.findall(f)
eventId = "%s-%s" % (sim, num)
fname = pjoin(self.windfieldPath, f)
ncobj = Dataset(fname)
lon = ncobj.variables['lon'][:]
lat = ncobj.variables['lat'][:]

vmax = ncobj.variables['vmax'][:]
ua = ncobj.variables['ua'][:]
va = ncobj.variables['va'][:]
pmin = ncobj.variables['slp'][:]
params = []
directory, fname, md5sum, moddate = \
flGetStat(pjoin(self.windfieldPath, f))
if (pAlreadyProcessed(directory, fname, 'md5sum', md5sum) and
self.excludePastProcessed):
log.debug("Already processed %s", f)
pass
else:
log.debug("Processing {0}".format(f))
if self.processEvent(f, locations):
pWriteProcessedFile(pjoin(self.windfieldPath, f))

for loc in locations:
locId, locLon, locLat = loc
i = find_index(lon, locLon)
j = find_index(lat, locLat)
locVm = vmax[j, i]
locUa = ua[j, i]
locVa = va[j, i]
locPr = pmin[j, i]
locParams = (locId, eventId, float(locVm), float(locUa),
float(locVa), float(locPr), " ", datetime.now())
def processEvent(self, filename, locations):
"""
Process an individual event file
:param str filename: Full path to a file to process.
:param list locations: List of locations to sample data for.
"""
log.debug("Processing {0}".format(pjoin(self.windfieldPath, filename)))
pattern = re.compile(r'\d+')
sim, num = pattern.findall(filename)
eventId = "%s-%s" % (sim, num)

params.append(locParams)
ncobj = Dataset(pjoin(self.windfieldPath, filename))
lon = ncobj.variables['lon'][:]
lat = ncobj.variables['lat'][:]
vmax = ncobj.variables['vmax'][:]
ua = ncobj.variables['ua'][:]
va = ncobj.variables['va'][:]
pmin = ncobj.variables['slp'][:]
ncobj.close()
params = list()

for loc in locations:
locId, locLon, locLat = loc
i = find_index(lon, locLon)
j = find_index(lat, locLat)
locVm = vmax[j, i]
locUa = ua[j, i]
locVa = va[j, i]
locPr = pmin[j, i]
locParams = (locId, eventId, float(locVm), float(locUa),
float(locVa), float(locPr), " ", datetime.now())
params.append(locParams)

try:
self.executemany(INSWINDSPEED, params)
except sqlite3.Error as err:
log.exception("Cannot insert records into tblWindSpeed: {0}".\
format(err.args[0]))
raise
return 0
except sqlite3.ProgrammingError as err:
log.exception("Programming error: {0}".format(err.args[0]))
return 0
else:
self.commit()


return 1

def processHazard(self):
"""
Update _tblHazard_ with the return period wind speed data.
"""

log.info("Inserting records into tblHazard")
locations = self.getLocations()
hazardFile = pjoin(self.hazardPath, 'hazard.nc')
ncobj = Dataset(hazardFile)
Expand All @@ -390,6 +427,7 @@ def processHazard(self):
locationParam = ncobj.variables['loc'][:]
scaleParam = ncobj.variables['scale'][:]
shpParam = ncobj.variables['shp'][:]
ncobj.close()

params = []
for k, year in enumerate(years):
Expand Down Expand Up @@ -428,6 +466,7 @@ def processTracks(self):
the locations in the domain.
"""
log.info("Inserting records into tblTracks")
locations = self.getLocations()
points = [Point(loc[1], loc[2]) for loc in locations]

Expand Down Expand Up @@ -583,14 +622,14 @@ def locationRecords(hazard_db, locId):
"""

query = ("SELECT l.locId, l.locName, w.wspd, w.eventId "
"FROM tblLocations l "
"INNER JOIN tblWindSpeed w "
"ON l.locId = w.locId "
"JOIN tblEvents e ON e.eventId = w.eventId "
query = ("SELECT w.locId, l.locName, w.wspd, w.eventId "
"FROM tblWindSpeed w "
"INNER JOIN tblLocations l "
"ON w.locId = l.locId "
"WHERE l.locId = ? ORDER BY w.wspd ASC")
cur = hazard_db.execute(query, (locId,))
results = cur.fetchall()

return results

def locationPassage(hazard_db, locId, distance=50):
Expand Down

0 comments on commit ec02921

Please sign in to comment.