/
sqlite.py
123 lines (110 loc) · 4.54 KB
/
sqlite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
Sqlite Inventory
"""
import sqlite3
import time
from threading import RLock
from helper_sql import SqlBulkExecute, sqlExecute, sqlQuery
from .storage import InventoryItem, InventoryStorage
class SqliteInventory(InventoryStorage):
"""Inventory using SQLite"""
def __init__(self):
super(SqliteInventory, self).__init__()
# of objects (like msg payloads and pubkey payloads)
# Does not include protocol headers (the first 24 bytes of each packet).
self._inventory = {}
# cache for existing objects, used for quick lookups if we have an object.
# This is used for example whenever we receive an inv message from a peer
# to check to see what items are new to us.
# We don't delete things out of it; instead,
# the singleCleaner thread clears and refills it.
self._objects = {}
# Guarantees that two receiveDataThreads don't receive
# and process the same message concurrently
# (probably sent by a malicious individual)
self.lock = RLock()
def __contains__(self, hash_):
with self.lock:
if hash_ in self._objects:
return True
rows = sqlQuery(
'SELECT streamnumber FROM inventory WHERE hash=?',
sqlite3.Binary(hash_))
if not rows:
return False
self._objects[hash_] = rows[0][0]
return True
def __getitem__(self, hash_):
with self.lock:
if hash_ in self._inventory:
return self._inventory[hash_]
rows = sqlQuery(
'SELECT objecttype, streamnumber, payload, expirestime, tag'
' FROM inventory WHERE hash=?', sqlite3.Binary(hash_))
if not rows:
raise KeyError(hash_)
return InventoryItem(*rows[0])
def __setitem__(self, hash_, value):
with self.lock:
value = InventoryItem(*value)
self._inventory[hash_] = value
self._objects[hash_] = value.stream
def __delitem__(self, hash_):
raise NotImplementedError
def __iter__(self):
with self.lock:
hashes = self._inventory.keys()[:]
hashes += (x for x, in sqlQuery('SELECT hash FROM inventory'))
return hashes.__iter__()
def __len__(self):
with self.lock:
return len(self._inventory) + sqlQuery(
'SELECT count(*) FROM inventory')[0][0]
def by_type_and_tag(self, objectType, tag=None):
"""
Get all inventory items of certain *objectType*
with *tag* if given.
"""
query = [
'SELECT objecttype, streamnumber, payload, expirestime, tag'
' FROM inventory WHERE objecttype=?', objectType]
if tag:
query[0] += ' AND tag=?'
query.append(sqlite3.Binary(tag))
with self.lock:
values = [
value for value in self._inventory.values()
if value.type == objectType
and tag is None or value.tag == tag
] + [InventoryItem(*value) for value in sqlQuery(*query)]
return values
def unexpired_hashes_by_stream(self, stream):
"""Return unexpired inventory vectors filtered by stream"""
with self.lock:
t = int(time.time())
hashes = [x for x, value in self._inventory.items()
if value.stream == stream and value.expires > t]
hashes += (str(payload) for payload, in sqlQuery(
'SELECT hash FROM inventory WHERE streamnumber=?'
' AND expirestime>?', stream, t))
return hashes
def flush(self):
"""Flush cache"""
with self.lock:
# If you use both the inventoryLock and the sqlLock,
# always use the inventoryLock OUTSIDE of the sqlLock.
with SqlBulkExecute() as sql:
for objectHash, value in self._inventory.items():
sql.execute(
'INSERT INTO inventory VALUES (?, ?, ?, ?, ?, ?)',
sqlite3.Binary(objectHash), *value)
self._inventory.clear()
def clean(self):
"""Free memory / perform garbage collection"""
with self.lock:
sqlExecute(
'DELETE FROM inventory WHERE expirestime<?',
int(time.time()) - (60 * 60 * 3))
self._objects.clear()
for objectHash, value in self._inventory.items():
self._objects[objectHash] = value.stream