Skip to content

Commit

Permalink
increase fcs coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
scottgigante committed Mar 2, 2019
1 parent dec85a9 commit bf3ee2a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 21 deletions.
44 changes: 24 additions & 20 deletions scprep/io/fcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,45 @@ def _with_fcsparser(fun, *args, **kwargs):
return fun(*args, **kwargs)


def _channel_names_from_meta(meta, channel_numbers, naming="N"):
try:
return tuple([meta['$P{0}{1}'.format(i, naming)]
for i in channel_numbers])
except KeyError:
return []


def _get_channel_names(meta, channel_numbers, channel_naming="$PnS"):
"""Get list of channel names. Raises a warning if the names are not unique.
Credit: https://github.com/eyurtsev/fcsparser/blob/master/fcsparser/api.py
"""
try:
names_n = tuple([meta['$P{0}N'.format(i)] for i in channel_numbers])
except KeyError:
names_n = []

try:
names_s = tuple([meta['$P{0}S'.format(i)] for i in channel_numbers])
except KeyError:
names_s = []
names_n = _channel_names_from_meta(meta, channel_numbers, "N")
names_s = _channel_names_from_meta(meta, channel_numbers, "S")

# Figure out which channel names to use
if channel_naming == '$PnS':
channel_names, channel_names_alternate = names_s, names_n
else:
elif channel_naming == '$PnN':
channel_names, channel_names_alternate = names_n, names_s
else:
raise ValueError("Expected channel_naming in ['$PnS', '$PnN']. "
"Got '{}'".format(channel_naming))

if len(channel_names) == 0:
channel_names = channel_names_alternate

if len(set(channel_names)) != len(channel_names):
msg = (u'The default channel names (defined by the {} '
u'parameter in the FCS file) were not unique. To avoid '
u'problems in downstream analysis, the channel names '
u'have been switched to the alternate channel names '
u'defined in the FCS file. To avoid '
u'seeing this warning message, explicitly instruct '
u'the FCS parser to use the alternate channel names by '
u'specifying the channel_naming parameter.')
msg = msg.format(channel_naming)
warnings.warn(msg)
warnings.warn('The default channel names (defined by the {} '
'parameter in the FCS file) were not unique. To avoid '
'problems in downstream analysis, the channel names '
'have been switched to the alternate channel names '
'defined in the FCS file. To avoid '
'seeing this warning message, explicitly instruct '
'the FCS parser to use the alternate channel names by '
'specifying the channel_naming parameter.'.format(
channel_naming),
RuntimeWarning)
channel_names = channel_names_alternate

return channel_names
Expand Down
41 changes: 40 additions & 1 deletion test/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ def test_fcs():
else:
raise


def test_fcs_reformat_meta():
path = fcsparser.test_sample_path
meta, data = fcsparser.parse(path, reformat_meta=True)
X_meta, _, X = scprep.io.load_fcs(path, reformat_meta=True, override=True)
assert set(meta.keys()) == set(X_meta.keys())
Expand All @@ -342,14 +345,40 @@ def test_fcs():
meta[key][column], X_column, key + column)
else:
raise
assert 'Time' not in X.columns
assert len(set(X.columns).difference(data.columns)) == 0
np.testing.assert_array_equal(X.index, data.index)
np.testing.assert_array_equal(X.values, data[X.columns].values)


def test_fcs_PnN():
path = fcsparser.test_sample_path
meta, data = fcsparser.parse(path, reformat_meta=True,
channel_naming='$PnN')
X_meta, _, X = scprep.io.load_fcs(path, reformat_meta=True,
channel_naming='$PnN', override=True)
assert set(meta.keys()) == set(X_meta.keys())
for key in meta.keys():
try:
np.testing.assert_array_equal(meta[key], X_meta[key], key)
except AssertionError:
if key == "$NEXTDATA" or (key.startswith("$P") and key.endswith("B")):
np.testing.assert_array_equal(meta[key], int(X_meta[key]), key)
elif key == "_channels_":
for column in meta[key].columns:
X_column = X_meta[key][column].astype(
meta[key][column].dtype)
np.testing.assert_array_equal(
meta[key][column], X_column, key + column)
else:
raise
assert 'Time' not in X.columns
assert len(set(X.columns).difference(data.columns)) == 0
np.testing.assert_array_equal(X.index, data.index)
np.testing.assert_array_equal(X.values, data[X.columns].values)


def test_load_fcs_error():
def test_fcs_file_error():
assert_raise_message(
RuntimeError,
"fcsparser failed to load {}, likely due to"
Expand All @@ -361,6 +390,16 @@ def test_load_fcs_error():
os.path.join(data.data_dir, "test_small.csv"))


def test_fcs_naming_error():
path = fcsparser.test_sample_path
assert_raise_message(
ValueError,
"Expected channel_naming in ['$PnS', '$PnN']. "
"Got 'invalid'",
scprep.io.load_fcs, path,
override=True, channel_naming="invalid")


def test_parse_header():
header1 = np.arange(10)
header2 = os.path.join(data.data_dir, "gene_symbols.csv")
Expand Down

0 comments on commit bf3ee2a

Please sign in to comment.