diff --git a/andes/io/__init__.py b/andes/io/__init__.py index 1a8c86cda..0e982d539 100644 --- a/andes/io/__init__.py +++ b/andes/io/__init__.py @@ -1,9 +1,12 @@ import importlib +import io import logging import os +from typing import Union + from andes.utils.misc import elapsed -from andes.io import xlsx, psse # NOQA +from andes.io import xlsx, psse, json, matpower # NOQA logger = logging.getLogger(__name__) @@ -66,15 +69,15 @@ def guess(system): # second, guess by lines true_format = '' - with open(files.case, 'r') as fid: - for item in maybe: - parser = importlib.import_module('.' + item, __name__) - testlines = getattr(parser, 'testlines') - if testlines(fid): - true_format = item - files.input_format = true_format - logger.debug('Input format guessed as %s.', true_format) - break + + for item in maybe: + parser = importlib.import_module('.' + item, __name__) + testlines = getattr(parser, 'testlines') + if testlines(files.case): + true_format = item + files.input_format = true_format + logger.debug('Input format guessed as %s.', true_format) + break if not true_format: logger.error('Unable to determine case format.') @@ -180,3 +183,17 @@ def dump(system, output_format, full_path=None, overwrite=False, **kwargs): else: logger.error('Format conversion failed.') return False + + +def read_file_like(infile: Union[str, io.IOBase]): + if isinstance(infile, str): + f = open(infile, 'r') + else: + f = infile + + lines_list = f.read().splitlines() + + if f is not infile: + f.close() + + return lines_list diff --git a/andes/io/json.py b/andes/io/json.py index cd4b1ca09..ad5b748de 100644 --- a/andes/io/json.py +++ b/andes/io/json.py @@ -14,7 +14,7 @@ logger = logging.getLogger(__name__) -def testlines(fid): +def testlines(infile): return True diff --git a/andes/io/matpower.py b/andes/io/matpower.py index 299e52c65..23534fae2 100644 --- a/andes/io/matpower.py +++ b/andes/io/matpower.py @@ -3,17 +3,22 @@ import logging import re +import andes.io + from andes.shared import deg2rad, np logger = logging.getLogger(__name__) -def testlines(fid): +def testlines(infil): return True # hard coded def read(system, file): - """Read a MATPOWER data file into mpc and build andes device elements""" + """ + Read a MATPOWER data file into mpc, and build andes device elements. + """ + func = re.compile(r'function\s') mva = re.compile(r'\s*mpc.baseMVA\s*=\s*') bus = re.compile(r'\s*mpc.bus\s*=\s*\[?') @@ -39,9 +44,9 @@ def read(system, file): 'bus_name': [], } - fid = open(file, 'r') + input_list = andes.io.read_file_like(file) - for line in fid: + for line in input_list: line = line.strip().rstrip(';') if not line: continue @@ -103,8 +108,6 @@ def read(system, file): raise e mpc[field].append(data) - fid.close() - # convert mpc to np array mpc_array = dict() for key, val in mpc.items(): diff --git a/andes/io/psse.py b/andes/io/psse.py index c1b620a4f..ec998dacd 100644 --- a/andes/io/psse.py +++ b/andes/io/psse.py @@ -8,17 +8,24 @@ import re import os +import andes.io + from andes.shared import deg2rad, pd, yaml from andes.utils.misc import to_number from collections import defaultdict logger = logging.getLogger(__name__) -def testlines(fid): +def testlines(infile): """ - Check the raw file for frequency base + Check the raw file for frequency base. """ - first = fid.readline() + lines_list = andes.io.read_file_like(infile) + + if len(lines_list) == 0: + return False + + first = lines_list[0] first = first.strip().split('/') first = first[0].split(',') @@ -38,8 +45,9 @@ def testlines(fid): def get_block_lines(b, mdata): """ - Return the number of lines based on data + Return the number of lines based on the block index in the RAW file. """ + line_counts = [1, 1, 1, 1, 1, 4, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0] if b == 5: # for transformer @@ -52,7 +60,9 @@ def get_block_lines(b, mdata): def read(system, file): - """read PSS/E RAW file v32 format""" + """ + Read PSS/E RAW file v32/v33 formats. + """ blocks = [ 'bus', 'load', 'fshunt', 'gen', 'branch', 'transf', 'area', @@ -74,8 +84,7 @@ def read(system, file): dev_line = 0 # line counter for multi-line models # read file into `line_list` - with open(file, 'r') as f: - line_list = [line.rstrip('\n') for line in f] + line_list = andes.io.read_file_like(file) # parse file into `raw` with to_number conversions for num, line in enumerate(line_list): @@ -147,8 +156,7 @@ def _read_dyr_dict(file): """ Parse dyr file into a dict where keys are model names and values are dataframes. """ - with open(file, 'r') as f: - input_list = [line.strip() for line in f] + input_list = andes.io.read_file_like(file) # concatenate multi-line device data input_concat_dict = defaultdict(list) diff --git a/andes/io/xlsx.py b/andes/io/xlsx.py index ab2cf03d4..b47470ce8 100644 --- a/andes/io/xlsx.py +++ b/andes/io/xlsx.py @@ -1,9 +1,12 @@ """ Excel reader and writer for ANDES power system parameters -This module utilizes xlsxwriter and pandas.Frame. -While I like the simplicity of the dome format, spreadsheet data is easier to read and edit. +This module utilizes openpyxl, xlsxwriter and pandas.Frame. + +While I like the simplicity of the dome format, +spreadsheets are easier to view and edit. """ + import logging from andes.utils.paths import confirm_overwrite @@ -12,7 +15,7 @@ logger = logging.getLogger(__name__) -def testlines(fid): +def testlines(infile): return True @@ -89,8 +92,8 @@ def read(system, infile): ---------- system : System Empty System instance - infile : str - Path to the input file + infile : str or file-like + Path to the input file, or a file-like object Returns ------- diff --git a/tests/test_case.py b/tests/test_case.py index 8fda3e2c1..467440dc0 100644 --- a/tests/test_case.py +++ b/tests/test_case.py @@ -155,15 +155,50 @@ def test_npcc_raw2json_convert(self): self.assertEqual(self.ss2.exit_code, 0, "Exit code is not 0.") def test_read_json_from_memory(self): - fd = open(get_case('ieee14/ieee14_zip.json')) + fd = open(get_case('ieee14/ieee14_zip.json'), 'r') - ss = andes.main.System() + ss = andes.main.System(default_config=True, + no_output=True, + ) ss.undill() andes.io.json.read(ss, fd) ss.setup() ss.PFlow.run() fd.close() + self.assertEqual(ss.exit_code, 0, "Exit code is not 0.") + + def test_read_mpc_from_memory(self): + fd = open(get_case('matpower/case14.m'), 'r') + + ss = andes.main.System(default_config=True, + no_output=True, + ) + ss.undill() + andes.io.matpower.read(ss, fd) + ss.setup() + ss.PFlow.run() + + fd.close() + self.assertEqual(ss.exit_code, 0, "Exit code is not 0.") + + def test_read_psse_from_memory(self): + fd_raw = open(get_case('npcc/npcc.raw'), 'r') + fd_dyr = open(get_case('npcc/npcc_full.dyr'), 'r') + + ss = andes.main.System(default_config=True, + no_output=True, + ) + ss.undill() + andes.io.psse.read(ss, fd_raw) + andes.io.psse.read_add(ss, fd_dyr) + ss.setup() + ss.PFlow.run() + ss.TDS.init() + + fd_raw.close() + fd_dyr.close() + self.assertEqual(ss.exit_code, 0, "Exit code is not 0.") class TestPlot(unittest.TestCase):