/
xlxtr.py
268 lines (239 loc) · 9.16 KB
/
xlxtr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from typing import TYPE_CHECKING
from fnmatch import fnmatch
import re
import io
import defusedxml
import functools
from refinery.units import Arg, Unit
from refinery.lib.structures import MemoryFile
from refinery.lib.tools import NoLogging
if TYPE_CHECKING:
from pyxlsb2.records import SheetRecord
defusedxml.defuse_stdlib()
def _ref2rc(ref: str):
match = re.match(R'^([A-Z]+)(\d+)$', ref)
if not match:
raise ValueError
col = functools.reduce(lambda acc, c: (acc * 26) + c, (ord(c) - 0x40 for c in match[1]), 0)
row = int(match[2], 10)
return row, col
def _rc2ref(row: int, col: int):
if row <= 0:
raise ValueError
if col <= 0:
raise ValueError
alphabetic = ''
while col:
col, letter = divmod(col - 1, 26)
alphabetic = chr(0x41 + letter) + alphabetic
return F'{alphabetic}{row}'
class SheetReference:
def _parse_sheet(self, token: str):
try:
sheet, token = token.rsplit('#', 1)
except ValueError:
sheet = None
else:
try:
sheet = int(sheet, 0) - 1
except (TypeError, ValueError):
if sheet[0] in ('"', "'") and sheet[~0] == sheet[0] and len(sheet) > 2:
sheet = sheet[1:~1]
return sheet, token
def _parse_range(self, token):
try:
start, end = token.split(':')
return start, end
except ValueError:
return token, token
@staticmethod
def _parse_token(token):
try:
row, col = _ref2rc(token)
except ValueError:
row, col = (int(x, 0) for x in token.split('.'))
if row <= 0:
raise ValueError(F'row must be positive, {row} is an invalid value')
if col <= 0:
raise ValueError(F'col must be positive, {col} is an invalid value')
return row, col
def __init__(self, sheet_reference=None):
self.lbound = 1, 1
self.ubound = None
if sheet_reference is None:
self.sheet = None
return
self.sheet, token = self._parse_sheet(sheet_reference)
if not token:
return
try:
start, stop = (self._parse_token(x) for x in self._parse_range(token))
except Exception:
self.sheet = sheet_reference
else:
row_min = min(start[0], stop[0])
col_min = min(start[1], stop[1])
row_max = max(start[0], stop[0])
col_max = max(start[1], stop[1])
self.lbound = (row_min, col_min)
self.ubound = (row_max, col_max)
def match(self, index: int, name: str):
if self.sheet is None:
return True
if isinstance(self.sheet, int):
return self.sheet == index
return self.sheet == name or fnmatch(name, self.sheet)
def cells(self, row_max, col_max):
if self.ubound is not None:
row_max, col_max = self.ubound
row, col = self.lbound
colstart = col
while True:
yield row, col
if col < col_max:
col += 1
elif row < row_max:
row, col = row + 1, colstart
else:
break
def __contains__(self, ref):
if self.ubound is None:
return True
if not isinstance(ref, tuple):
ref = self._parse_token(ref)
row, col = ref
if row not in range(self.lbound[0], self.ubound[0] + 1):
return False
if col not in range(self.lbound[1], self.ubound[1] + 1):
return False
return True
class xlxtr(Unit):
"""
Extract data from Microsoft Excel documents, both Legacy and new XML type documents. A sheet reference is of the form `B1` or `1.2`,
both specifying the first cell of the second column. A cell range can be specified as `B1:C12`, or `1.2:C12`, or `1.2:12.3`. Finally,
the unit will always refer to the first sheet in the document and to change this, specify the sheet name or index separated by a
hashtag, i.e. `sheet#B1:C12` or `1#B1:C12`. Note that indices are 1-based. To get all elements of one sheet, use `sheet#`. The unit
If parsing a sheet reference fails, the script will assume that the given reference specifies a sheet.
"""
def __init__(self, *references: Arg(metavar='reference', type=SheetReference, help=(
'A sheet reference to be extracted. '
'If no sheet references are given, the unit lists all sheet names.'
))):
if not references:
references = [SheetReference('*')]
super().__init__(references=references)
@Unit.Requires('xlrd2', 'formats', 'office', 'extended')
def _xlrd():
import xlrd2
return xlrd2
@Unit.Requires('openpyxl', 'formats', 'office', 'extended')
def _openpyxl():
import openpyxl
return openpyxl
@Unit.Requires('pyxlsb2', 'formats', 'office', 'extended')
def _pyxlsb2():
import pyxlsb2
return pyxlsb2
def _rcmatch(self, sheet_index, sheet_name, row, col):
assert row > 0
assert col > 0
if not self.args.references:
return True
for ref in self.args.references:
ref: SheetReference
if not ref.match(sheet_index, sheet_name):
continue
if (row, col) in ref:
return True
else:
return False
def _get_value(self, sheet_index, sheet, callable, row, col):
if col <= 0 or row <= 0:
raise ValueError(F'invalid cell reference ({row}, {col}) - indices must be positive numbers')
if not self._rcmatch(sheet_index, sheet, row, col):
return
try:
value = callable(row - 1, col - 1)
except IndexError:
return
if not value:
return
if isinstance(value, float):
if float(int(value)) == value:
value = int(value)
yield self.labelled(
str(value).encode(self.codec),
row=row,
col=col,
ref=_rc2ref(row, col),
sheet=sheet
)
def _process_pyxlsb2(self, data):
with self._pyxlsb2.open_workbook(MemoryFile(data)) as wb:
for ref in self.args.references:
ref: SheetReference
for k, rec in enumerate(wb.sheets):
rec: SheetRecord
self.log_info(rec)
name = rec.name
if not ref.match(k, name):
continue
sheet = wb.get_sheet_by_name(name)
rows = list(sheet.rows())
nrows = len(rows)
ncols = max((len(r) for r in rows), default=0)
for row, col in ref.cells(nrows, ncols):
def get(row, col):
return rows[row][col].v
yield from self._get_value(k, name, get, row, col)
def _process_xlrd(self, data):
with io.StringIO() as logfile:
vb = max(self.log_level.verbosity - 1, 0)
wb = self._xlrd.open_workbook(file_contents=data, logfile=logfile, verbosity=vb, on_demand=True)
logfile.seek(0)
for entry in logfile:
entry = entry.strip()
if re.search(R'^[A-Z]+:', entry) or '***' in entry:
self.log_info(entry)
for ref in self.args.references:
ref: SheetReference
for k, name in enumerate(wb.sheet_names()):
if not ref.match(k, name):
continue
sheet = wb.sheet_by_name(name)
self.log_info(F'iterating {sheet.ncols} columns and {sheet.nrows} rows')
for row, col in ref.cells(sheet.nrows, sheet.ncols):
yield from self._get_value(k, name, sheet.cell_value, row, col)
def _process_openpyxl(self, data):
with NoLogging():
workbook = self._openpyxl.load_workbook(MemoryFile(data), read_only=True)
for ref in self.args.references:
ref: SheetReference
for k, name in enumerate(workbook.sheetnames):
if not ref.match(k, name):
continue
sheet = workbook[name]
with NoLogging():
cells = [row for row in sheet.iter_rows(values_only=True)]
nrows = len(cells)
ncols = max((len(row) for row in cells), default=0)
for row, col in ref.cells(nrows, ncols):
yield from self._get_value(k, name, lambda r, c: cells[r][c], row, col)
def process(self, data):
last_error = None
for name, processor in (
('openpyxl', self._process_openpyxl),
('xlrd', self._process_xlrd),
('pyxlsb2', self._process_pyxlsb2),
):
try:
yield from processor(data)
except Exception as e:
last_error = e
self.log_debug(F'failed processing with {name}: {e!s}')
else:
break
else:
raise last_error