Skip to content

Commit

Permalink
More manual fixes
Browse files Browse the repository at this point in the history
Shadowing variable names, f-strings, exception logging, star imports.
  • Loading branch information
153957 committed Mar 7, 2024
1 parent e28fd8c commit 361d745
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 62 deletions.
25 changes: 12 additions & 13 deletions migration/migrate_eventwarehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,20 @@
import datetime
import logging
import os
import pickle
import shutil
import tempfile

import cPickle as pickle
import MySQLdb

from settings import *
import settings


class Database:
def __init__(self):
self.open()

def open(self):
self.db = MySQLdb.connect(host=EWH_HOST, user=EWH_USER, passwd=EWH_PASSWD, db=EWH_DB, port=EWH_PORT)
self.db = MySQLdb.connect(**settings.EWH_DATABASE)

def close(self):
self.db.close()
Expand All @@ -55,7 +54,7 @@ def read_migration_status():
"""Read migration status from file"""

try:
with open(EWH_STATUS) as file:
with open(settings.EWH_STATUS) as file:
status = pickle.load(file)
except OSError:
status = {}
Expand All @@ -66,15 +65,15 @@ def read_migration_status():
def write_migration_status(status):
"""Write migration status to file"""

with open(EWH_STATUS, 'w') as file:
with open(settings.EWH_STATUS, 'w') as file:
pickle.dump(status, file)


def read_station_list():
"""Read station, cluster combinations from file"""

station_list = {}
with open(STATION_LIST) as file:
with open(settings.STATION_LIST) as file:
reader = csv.reader(file)
for station in reader:
if station:
Expand Down Expand Up @@ -125,7 +124,7 @@ def get_event_batches(eventwarehouse, status, station):
get_event_data(eventwarehouse, events)
get_calculated_data(eventwarehouse, events)
status[station] = date, offset
offset += BATCHSIZE
offset += settings.BATCHSIZE

events = events.values()
dts = [x['header']['datetime'] for x in events]
Expand Down Expand Up @@ -204,24 +203,24 @@ def get_calculated_data(eventwarehouse, events):
def store_events(event_list, station, clusters):
"""Store an event batch in the datastore incoming directory"""

if station in renumbered_stations:
station = renumbered_stations[station]
if station in settings.renumbered_stations:
station = settings.renumbered_stations[station]

# Do not migrate KASCADE data
if station == 99999:
return

cluster = clusters[station]

dir = os.path.join(DATASTORE_PATH, 'incoming')
tmp_dir = os.path.join(DATASTORE_PATH, 'tmp')
directory = os.path.join(settings.DATASTORE_PATH, 'incoming')
tmp_dir = os.path.join(settings.DATASTORE_PATH, 'tmp')

file = tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False)
data = {'station_id': station, 'cluster': cluster, 'event_list': event_list}
pickle.dump(data, file)
file.close()

shutil.move(file.name, dir)
shutil.move(file.name, directory)


def execute_and_results(eventwarehouse, sql, *args):
Expand Down
25 changes: 12 additions & 13 deletions migration/migrate_olddb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,22 @@
import csv
import logging
import os
import pickle
import re
import shutil
import tempfile
import zlib

import cPickle as pickle
import MySQLdb

from settings import *
import settings


class Database:
def __init__(self):
self.open()

def open(self):
self.db = MySQLdb.connect(host=OLDDB_HOST, user=OLDDB_USER, db=OLDDB_DB, port=OLDDB_PORT)
self.db = MySQLdb.connect(**settings.OLD_DATABASE)

def close(self):
self.db.close()
Expand All @@ -51,7 +50,7 @@ def migrate():
station = int(re.search('events([0-9]+)', table).group(1))

# strange tables, don't migrate
if station == 0 or station == 30 or station == 97:
if station in [0, 30, 97]:
continue
# removed stations (5001 = Sudan)
if station == 5001:
Expand All @@ -65,7 +64,7 @@ def read_migration_status():
"""Read migration status from file"""

try:
with open(OLDDB_STATUS) as file:
with open(settings.OLDDB_STATUS) as file:
status = pickle.load(file)
except OSError:
status = {}
Expand All @@ -76,7 +75,7 @@ def read_migration_status():
def write_migration_status(status):
"""Write migration status to file"""

with open(OLDDB_STATUS, 'w') as file:
with open(settings.OLDDB_STATUS, 'w') as file:
pickle.dump(status, file)


Expand All @@ -93,7 +92,7 @@ def read_station_list():
"""Read station, cluster combinations from file"""

station_list = {}
with open(STATION_LIST) as file:
with open(settings.STATION_LIST) as file:
reader = csv.reader(file)
for station in reader:
if station:
Expand Down Expand Up @@ -188,19 +187,19 @@ def add_data(datalist, key, value):
def store_events(event_list, station, clusters):
"""Store an event batch in the datastore incoming directory"""

if station in renumbered_stations:
station = renumbered_stations[station]
if station in settings.renumbered_stations:
station = settings.renumbered_stations[station]
cluster = clusters[station]

dir = os.path.join(DATASTORE_PATH, 'incoming')
tmp_dir = os.path.join(DATASTORE_PATH, 'tmp')
directory = os.path.join(settings.DATASTORE_PATH, 'incoming')
tmp_dir = os.path.join(settings.DATASTORE_PATH, 'tmp')

file = tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False)
data = {'station_id': station, 'cluster': cluster, 'event_list': event_list}
pickle.dump(data, file)
file.close()

shutil.move(file.name, dir)
shutil.move(file.name, directory)


def execute_and_results(eventwarehouse, sql, *args):
Expand Down
22 changes: 13 additions & 9 deletions migration/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@
EWH_STATUS = '/databases/frome/migration-status'
BATCHSIZE = 1000

OLDDB_HOST = 'oust'
OLDDB_USER = 'webread'
OLDDB_DB = 'hisparc'
OLDDB_PORT = 3306
OLD_DATABASE = {
'host': 'oust',
'user': 'webread',
'db': 'hisparc',
'port': 3306,
}

EWH_HOST = 'peene'
EWH_USER = 'analysis'
EWH_PASSWD = 'Data4analysis!'
EWH_DB = 'eventwarehouse'
EWH_PORT = 3306
EWH_DATABASE = {
'host': 'peene',
'user': 'analysis',
'passwd': 'Data4analysis!',
'db': 'eventwarehouse',
'port': 3306,
}

renumbered_stations = {
1: 501,
Expand Down
22 changes: 6 additions & 16 deletions writer/store_events.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import base64
import calendar
import logging
import sys
import traceback

from io import StringIO

from writer import storage
from writer.upload_codes import eventtype_upload_codes
Expand All @@ -28,7 +24,7 @@ def store_event(datafile, cluster, station_id, event):
try:
upload_codes = eventtype_upload_codes[eventtype]
except KeyError:
logger.error('Unknown event type: %s, discarding event (station: %s)' % (eventtype, station_id))
logger.error(f'Unknown event type: {eventtype}, discarding event (station: {station_id})')
return

parentnode = storage.get_or_create_station_group(datafile, cluster, station_id)
Expand Down Expand Up @@ -83,12 +79,12 @@ def store_event(datafile, cluster, station_id, event):
if key in data:
data[key][index] = value
else:
logger.warning('Datatype not known on server side: %s (%s)' % (key, eventtype))
logger.warning(f'Datatype not known on server side: {key} ({eventtype})')
elif uploadcode in data:
# uploadcode: EVENTRATE, RED, etc.
data[uploadcode] = value
else:
logger.warning('Datatype not known on server side: %s (%s)' % (uploadcode, eventtype))
logger.warning(f'Datatype not known on server side: {uploadcode} ({eventtype})')

# write data values to row
for key, value in upload_codes.items():
Expand Down Expand Up @@ -129,15 +125,9 @@ def store_event_list(data_dir, station_id, cluster, event_list):
prev_date = date
store_event(datafile, cluster, station_id, event)
else:
logger.error('Strange event (no timestamp!), discarding event (station: %s)' % station_id)
except Exception as inst:
logger.error('Cannot process event, discarding event (station: %s), exception: %s' % (station_id, inst))
# get the full traceback. There must be a better way...
exc_info = sys.exc_info()
with StringIO() as tb:
traceback.print_exception(*exc_info, file=tb)
tb.seek(0)
logger.debug('Traceback: %s', tb.read())
logger.error(f'Strange event (no timestamp!), discarding event (station: {station_id})')
except Exception:
logger.exception(f'Cannot process event, discarding event (station: {station_id})')

if datafile:
datafile.close()
22 changes: 11 additions & 11 deletions wsgi/wsgi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,16 @@ def application(environ, start_response, configfile):
try:
cluster, station_password = station_list[station_id]
except KeyError:
logger.debug('Station %d is unknown' % station_id)
logger.debug(f'Station {station_id} is unknown')
return [rcodes.RC_PE_INV_STATIONID]

if station_password != password:
logger.debug('Station %d: password mismatch: %s' % (station_id, password))
logger.debug(f'Station {station_id}: password mismatch: {password}')
return [rcodes.RC_PE_INV_AUTHCODE]
else:
our_checksum = hashlib.md5(data.encode('iso-8859-1')).hexdigest()
if our_checksum != checksum:
logger.debug('Station %d: checksum mismatch' % station_id)
logger.debug(f'Station {station_id}: checksum mismatch')
return [rcodes.RC_PE_INV_INPUT]
else:
try:
Expand All @@ -84,11 +84,11 @@ def application(environ, start_response, configfile):
logger.debug('UnicodeDecodeError on python 2 pickle. Decoding bytestrings.')
event_list = decode_object(pickle.loads(data.encode('iso-8859-1'), encoding='bytes'))
except (pickle.UnpicklingError, AttributeError):
logger.debug('Station %d: pickling error' % station_id)
logger.debug(f'Station {station_id}: pickling error')
return [rcodes.RC_PE_PICKLING_ERROR]

store_data(station_id, cluster, event_list)
logger.debug('Station %d: succesfully completed' % station_id)
logger.debug(f'Station {station_id}: succesfully completed')
return [rcodes.RC_OK]


Expand Down Expand Up @@ -119,7 +119,7 @@ def do_init(configfile):

# set up logger
if not logger.handlers:
file = config.get('General', 'log') + '-wsgi.%d' % os.getpid()
file = config.get('General', 'log') + f'-wsgi.{os.getpid()}'
handler = logging.handlers.TimedRotatingFileHandler(file, when='midnight', backupCount=14)
handler.setFormatter(formatter)
logger.addHandler(handler)
Expand All @@ -144,22 +144,22 @@ def do_init(configfile):
def store_data(station_id, cluster, event_list):
"""Store verified event data to temporary storage"""

logger.debug('Storing data for station %d' % station_id)
logger.debug(f'Storing data for station {station_id}')

dir = os.path.join(config.get('General', 'data_dir'), 'incoming')
directory = os.path.join(config.get('General', 'data_dir'), 'incoming')
tmp_dir = os.path.join(config.get('General', 'data_dir'), 'tmp')

if is_data_suspicious(event_list):
logger.debug('Event list marked as suspicious.')
dir = os.path.join(config.get('General', 'data_dir'), 'suspicious')
directory = os.path.join(config.get('General', 'data_dir'), 'suspicious')

file = tempfile.NamedTemporaryFile(dir=tmp_dir, delete=False)
logger.debug('Filename: %s' % file.name)
logger.debug(f'Filename: {file.name}')
data = {'station_id': station_id, 'cluster': cluster, 'event_list': event_list}
pickle.dump(data, file)
file.close()

shutil.move(file.name, dir)
shutil.move(file.name, directory)


def is_data_suspicious(event_list):
Expand Down

0 comments on commit 361d745

Please sign in to comment.