-
Notifications
You must be signed in to change notification settings - Fork 524
/
csv.py
92 lines (77 loc) · 3.03 KB
/
csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# -*- coding: utf-8 -*-
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
# vi: set ft=python sts=4 ts=4 sw=4 et:
"""CSV Handling utilities
"""
from ..base import traits, TraitedSpec, DynamicTraitedSpec, File, BaseInterface
from ..io import add_traits
class CSVReaderInputSpec(DynamicTraitedSpec, TraitedSpec):
in_file = File(
exists=True, mandatory=True, desc="Input comma-seperated value (CSV) file"
)
header = traits.Bool(
False, usedefault=True, desc="True if the first line is a column header"
)
class CSVReader(BaseInterface):
"""
Examples
--------
>>> reader = CSVReader() # doctest: +SKIP
>>> reader.inputs.in_file = 'noHeader.csv' # doctest: +SKIP
>>> out = reader.run() # doctest: +SKIP
>>> out.outputs.column_0 == ['foo', 'bar', 'baz'] # doctest: +SKIP
True
>>> out.outputs.column_1 == ['hello', 'world', 'goodbye'] # doctest: +SKIP
True
>>> out.outputs.column_2 == ['300.1', '5', '0.3'] # doctest: +SKIP
True
>>> reader = CSVReader() # doctest: +SKIP
>>> reader.inputs.in_file = 'header.csv' # doctest: +SKIP
>>> reader.inputs.header = True # doctest: +SKIP
>>> out = reader.run() # doctest: +SKIP
>>> out.outputs.files == ['foo', 'bar', 'baz'] # doctest: +SKIP
True
>>> out.outputs.labels == ['hello', 'world', 'goodbye'] # doctest: +SKIP
True
>>> out.outputs.erosion == ['300.1', '5', '0.3'] # doctest: +SKIP
True
"""
input_spec = CSVReaderInputSpec
output_spec = DynamicTraitedSpec
_always_run = True
def _append_entry(self, outputs, entry):
for key, value in zip(self._outfields, entry):
outputs[key].append(value)
return outputs
def _parse_line(self, line):
line = line.replace("\n", "")
entry = [x.strip() for x in line.split(",")]
return entry
def _get_outfields(self):
with open(self.inputs.in_file, "r") as fid:
entry = self._parse_line(fid.readline())
if self.inputs.header:
self._outfields = tuple(entry)
else:
self._outfields = tuple(["column_" + str(x) for x in range(len(entry))])
return self._outfields
def _run_interface(self, runtime):
self._get_outfields()
return runtime
def _outputs(self):
return self._add_output_traits(super(CSVReader, self)._outputs())
def _add_output_traits(self, base):
return add_traits(base, self._get_outfields())
def _list_outputs(self):
outputs = self.output_spec().get()
isHeader = True
for key in self._outfields:
outputs[key] = [] # initialize outfields
with open(self.inputs.in_file, "r") as fid:
for line in fid.readlines():
if self.inputs.header and isHeader: # skip header line
isHeader = False
continue
entry = self._parse_line(line)
outputs = self._append_entry(outputs, entry)
return outputs