This repository has been archived by the owner on Dec 16, 2022. It is now read-only.
/
sqlite_sparse_sequence.py
104 lines (91 loc) · 3.65 KB
/
sqlite_sparse_sequence.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import os
import shutil
from os import PathLike
from typing import MutableSequence, Any, Union, Iterable
from sqlitedict import SqliteDict
from allennlp.common.sequences import SlicedSequence
class SqliteSparseSequence(MutableSequence[Any]):
def __init__(self, filename: Union[str, PathLike], read_only: bool = False):
self.table = SqliteDict(filename, "sparse_sequence", flag="r" if read_only else "c")
def __del__(self):
self.close()
def __getitem__(self, i: Union[int, slice]) -> Any:
if isinstance(i, int):
try:
return self.table[str(i)]
except KeyError:
current_length = len(self)
if i >= current_length or current_length <= 0:
raise IndexError("list index out of range")
elif i < 0 < current_length:
return self.__getitem__(i % current_length)
else:
return None
elif isinstance(i, slice):
return SlicedSequence(self, i)
else:
raise TypeError(f"list indices must be integers or slices, not {i.__class__.__name__}")
def __setitem__(self, i: Union[int, slice], value: Any):
if isinstance(i, int):
current_length = len(self)
if i < 0:
i %= current_length
self.table[str(i)] = value
self.table["_len"] = max(i, current_length)
self.table.commit()
else:
raise TypeError(f"list indices must be integers, not {i.__class__.__name__}")
def __delitem__(self, i: Union[int, slice]):
current_length = len(self)
if isinstance(i, int):
if i < 0:
i %= current_length
if i >= current_length:
raise IndexError("list assignment index out of range")
for index in range(i + 1, current_length):
self.table[str(index - 1)] = self.table.get(str(index))
del self.table[str(current_length - 1)]
self.table["_len"] = current_length - 1
self.table.commit()
elif isinstance(i, slice):
# This isn't very efficient for continuous slices.
for index in reversed(range(*i.indices(current_length))):
del self[index]
else:
raise TypeError(f"list indices must be integers or slices, not {i.__class__.__name__}")
def extend(self, values: Iterable[Any]) -> None:
current_length = len(self)
index = -1
for index, value in enumerate(values):
self.table[str(index + current_length)] = value
if index < 0:
return
self.table["_len"] = current_length + index + 1
self.table.commit()
def insert(self, i: int, value: Any) -> None:
current_length = len(self)
for index in reversed(range(i, current_length)):
self.table[str(index + 1)] = self.table.get(str(index))
self.table[str(i)] = value
self.table["_len"] = current_length + 1
self.table.commit()
def __len__(self) -> int:
try:
return self.table["_len"]
except KeyError:
return 0
def clear(self) -> None:
self.table.clear()
self.table.commit()
def close(self) -> None:
if self.table is not None:
self.table.close()
self.table = None
def copy_to(self, target: Union[str, PathLike]):
try:
os.link(self.table.filename, target)
except OSError as e:
if e.errno == 18: # Cross-device link
shutil.copy(self.table.filename, target)
else:
raise