/
oxford_text_archive.py
285 lines (248 loc) · 11.4 KB
/
oxford_text_archive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""
Oxford Text Archive literary works
----------------------------------
A collection of ~2.7k Creative Commons literary works from the Oxford Text Archive,
containing primarily English-language 16th-20th century literature and history.
Records include the following data:
* ``text``: Full text of the literary work.
* ``title``: Title of the literary work.
* ``author``: Author(s) of the literary work.
* ``year``: Year that the literary work was published.
* ``url``: URL at which literary work can be found online via the OTA.
* ``id``: Unique identifier of the literary work within the OTA.
This dataset was compiled by David Mimno from the Oxford Text Archive and
stored in his GitHub repo to avoid unnecessary scraping of the OTA site. It is
downloaded from that repo, and excluding some light cleaning of its metadata,
is reproduced exactly here.
"""
import csv
import io
import itertools
import logging
import os
import re
from .. import constants, utils
from .. import io as tio
from .base import Dataset
LOGGER = logging.getLogger(__name__)
NAME = "oxford_text_archive"
META = {
"site_url": "https://ota.ox.ac.uk/",
"description": (
"Collection of ~2.7k Creative Commons texts from the Oxford Text "
"Archive, containing primarily English-language 16th-20th century "
"literature and history."
),
}
DOWNLOAD_URL = "https://github.com/mimno/ota/archive/master.zip"
class OxfordTextArchive(Dataset):
"""
Stream a collection of English-language literary works from text files on disk,
either as texts or text + metadata pairs.
Download the data (one time only!), saving and extracting its contents to disk::
>>> ds = OxfordTextArchive()
>>> ds.download()
>>> ds.info
{'name': 'oxford_text_archive',
'site_url': 'https://ota.ox.ac.uk/',
'description': 'Collection of ~2.7k Creative Commons texts from the Oxford Text Archive, containing primarily English-language 16th-20th century literature and history.'}
Iterate over literary works as texts or records with both text and metadata::
>>> for text in ds.texts(limit=3):
... print(text[:200])
>>> for text, meta in ds.records(limit=3):
... print("\\n{}, {}".format(meta["title"], meta["year"]))
... print(text[:300])
Filter literary works by a variety of metadata fields and text length::
>>> for text, meta in ds.records(author="Shakespeare, William", limit=1):
... print("{}\\n{}".format(meta["title"], text[:500]))
>>> for text, meta in ds.records(date_range=("1900-01-01", "1990-01-01"), limit=5):
... print(meta["year"], meta["author"])
>>> for text in ds.texts(min_len=4000000):
... print(len(text))
Stream literary works into a :class:`textacy.Corpus <textacy.corpus.Corpus>`::
>>> textacy.Corpus("en", data=ds.records(limit=5))
Corpus(5 docs; 182289 tokens)
Args:
data_dir (str or :class:`pathlib.Path`): Path to directory on disk
under which dataset is stored, i.e. ``/path/to/data_dir/oxford_text_archive``.
Attributes:
full_date_range (Tuple[str]): First and last dates for which works
are available, each as an ISO-formatted string (YYYY-MM-DD).
authors (Set[str]): Full names of all distinct authors included in this
dataset, e.g. "Shakespeare, William".
"""
full_date_range = ("0018-01-01", "1990-01-01")
def __init__(self, data_dir=constants.DEFAULT_DATA_DIR.joinpath(NAME)):
super().__init__(NAME, meta=META)
self.data_dir = utils.to_path(data_dir).resolve()
self._text_dirpath = self.data_dir.joinpath("master", "text")
self._metadata_filepath = self.data_dir.joinpath("master", "metadata.tsv")
self._metadata = None
def download(self, *, force=False):
"""
Download the data as a zip archive file, then save it to disk and
extract its contents under the :attr:`OxfordTextArchive.data_dir` directory.
Args:
force (bool): If True, always download the dataset even if
it already exists.
"""
filepath = tio.download_file(
DOWNLOAD_URL,
filename=None,
dirpath=self.data_dir,
force=force,
)
if filepath:
tio.unpack_archive(filepath, extract_dir=None)
@property
def metadata(self):
"""
Dict[str, dict]
"""
if not self._metadata:
try:
self._metadata = self._load_and_parse_metadata()
except OSError as e:
LOGGER.error(e)
return self._metadata
def _load_and_parse_metadata(self):
"""
Read in ``metadata.tsv`` file from :attr:`OxfordTextArchive._metadata_filepath``
zip archive; convert into a dictionary keyed by record ID; clean up some
of the fields, and remove a couple fields that are identical throughout.
"""
if not self._metadata_filepath.is_file():
raise OSError(
"metadata file {} not found;\n"
"has the dataset been downloaded yet?".format(self._metadata_filepath)
)
re_extract_year = re.compile(r"(\d{4})")
re_extract_authors = re.compile(
r"(\D+)"
r"(?:, "
r"(?:[bdf]l?\. )?(?:ca. )?\d{4}(?:\?| or \d{1,2})?(?:-(?:[bdf]l?\. )?(?:ca. )?\d{4}(?:\?| or \d{1,2})?)?|"
r"(?:\d{2}th(?:/\d{2}th)? cent\.)"
r"\.?)"
)
re_clean_authors = re.compile(r"^[,;. ]+|[,.]+\s*?$")
metadata = {}
with self._metadata_filepath.open(mode="rb") as f:
subf = io.StringIO(f.read().decode("utf-8"))
for row in csv.DictReader(subf, delimiter="\t"):
# only include English-language works (99.9% of all works)
if not row["Language"].startswith("English"):
continue
# clean up years
year_match = re_extract_year.search(row["Year"])
if year_match:
row["Year"] = year_match.group()
else:
row["Year"] = None
# extract and clean up authors
authors = re_extract_authors.findall(row["Author"]) or [row["Author"]]
row["Author"] = tuple(re_clean_authors.sub("", author) for author in authors)
row["Title"] = row["Title"].strip()
# get rid of uniform "Language" and "License" fields
del row["Language"]
del row["License"]
metadata[row["ID"]] = {key.lower(): val for key, val in row.items()}
# set authors attribute for user convenience / to validate author filtering
self.authors = {
author
for value in metadata.values()
for author in value["author"]
if value.get("author")
}
return metadata
def __iter__(self):
if not self._text_dirpath.is_dir():
raise OSError(
"text directory {} not found;\n"
"has the dataset been downloaded yet?".format(self._text_dirpath)
)
_metadata = self.metadata # for performance
for filepath in sorted(tio.get_filepaths(self._text_dirpath, extension=".txt")):
id_, _ = os.path.splitext(os.path.basename(filepath))
record = _metadata.get(id_, {}).copy()
if not record:
LOGGER.debug(
"no metadata found for record %s; probably non-English text...", id_)
continue
with io.open(filepath, mode="rt", encoding="utf-8") as f:
record["text"] = f.read()
yield record
def _get_filters(self, author, date_range, min_len):
filters = []
if min_len is not None:
if min_len < 1:
raise ValueError("`min_len` must be at least 1")
filters.append(
lambda record: len(record.get("text", "")) >= min_len
)
if author is not None:
author = utils.validate_set_members(
author, (str, bytes), valid_vals=self.authors)
filters.append(
lambda record: record.get("author") and any(athr in author for athr in record["author"])
)
if date_range is not None:
date_range = utils.validate_and_clip_range(
date_range, self.full_date_range, val_type=(str, bytes))
filters.append(
lambda record: record.get("year") and date_range[0] <= record["year"] < date_range[1]
)
return filters
def _filtered_iter(self, filters):
if filters:
for record in self:
if all(filter_(record) for filter_ in filters):
yield record
else:
for record in self:
yield record
def texts(self, *, author=None, date_range=None, min_len=None, limit=None):
"""
Iterate over works in this dataset, optionally filtering by a variety
of metadata and/or text length, and yield texts only.
Args:
author (str or Set[str]): Filter texts by the authors' name.
For multiple values (Set[str]), ANY rather than ALL of the authors
must be found among a given works's authors.
date_range (List[str] or Tuple[str]): Filter texts by the date on
which it was published; both start and end date must be specified,
but a null value for either will be replaced by the min/max date
available in the dataset.
min_len (int): Filter texts by the length (number of characters)
of their text content.
limit (int): Return no more than ``limit`` texts.
Yields:
str: Text of the next work in dataset passing all filters.
Raises:
ValueError: If any filtering options are invalid.
"""
filters = self._get_filters(author, date_range, min_len)
for record in itertools.islice(self._filtered_iter(filters), limit):
yield record["text"]
def records(self, *, author=None, date_range=None, min_len=None, limit=None):
"""
Iterate over works in this dataset, optionally filtering by a variety
of metadata and/or text length, and yield text + metadata pairs.
Args:
author (str or Set[str]): Filter records by the authors' name;
see :attr:`OxfordTextArchive.authors`.
date_range (List[str] or Tuple[str]): Filter records by the date on
which it was published; both start and end date must be specified,
but a null value for either will be replaced by the min/max date
available in the dataset.
min_len (int): Filter records by the length (number of characters)
of their text content.
limit (int): Yield no more than ``limit`` records.
Yields:
str: Text of the next work in dataset passing all filters.
dict: Metadata of the next work in dataset passing all filters.
Raises:
ValueError: If any filtering options are invalid.
"""
filters = self._get_filters(author, date_range, min_len)
for record in itertools.islice(self._filtered_iter(filters), limit):
yield record.pop("text"), record