This repository has been archived by the owner on Jun 10, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
taskdb.py
141 lines (119 loc) · 4.68 KB
/
taskdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
# Author: Binux<i@binux.me>
# http://binux.me
# Created on 2014-10-11 23:54:50
import json
import time
from pymongo import MongoClient
from pyspider.database.base.taskdb import TaskDB as BaseTaskDB
from .mongodbbase import SplitTableMixin
class TaskDB(SplitTableMixin, BaseTaskDB):
collection_prefix = ''
def __init__(self, url, database='taskdb'):
self.conn = MongoClient(url)
self.conn.admin.command("ismaster")
self.database = self.conn[database]
self.projects = set()
self._list_project()
# we suggest manually build index in advance, instead of indexing
# in the startup process,
# for project in self.projects:
# collection_name = self._collection_name(project)
# self.database[collection_name].ensure_index('status')
# self.database[collection_name].ensure_index('taskid')
def _create_project(self, project):
collection_name = self._collection_name(project)
self.database[collection_name].ensure_index('status')
self.database[collection_name].ensure_index('taskid')
self._list_project()
def _parse(self, data):
if '_id' in data:
del data['_id']
for each in ('schedule', 'fetch', 'process', 'track'):
if each in data:
if data[each]:
if isinstance(data[each], bytearray):
data[each] = str(data[each])
data[each] = json.loads(data[each], encoding='utf8')
else:
data[each] = {}
return data
def _stringify(self, data):
for each in ('schedule', 'fetch', 'process', 'track'):
if each in data:
data[each] = json.dumps(data[each])
return data
def load_tasks(self, status, project=None, fields=None):
if not project:
self._list_project()
if project:
projects = [project, ]
else:
projects = self.projects
for project in projects:
collection_name = self._collection_name(project)
for task in self.database[collection_name].find({'status': status}, fields):
yield self._parse(task)
def get_task(self, project, taskid, fields=None):
if project not in self.projects:
self._list_project()
if project not in self.projects:
return
collection_name = self._collection_name(project)
ret = self.database[collection_name].find_one({'taskid': taskid}, fields)
if not ret:
return ret
return self._parse(ret)
def status_count(self, project):
if project not in self.projects:
self._list_project()
if project not in self.projects:
return {}
collection_name = self._collection_name(project)
# when there are too many data in task collection , aggregate operation will take a very long time,
# and this will cause scheduler module startup to be particularly slow
# ret = self.database[collection_name].aggregate([
# {'$group': {
# '_id' : '$status',
# 'total': {
# '$sum': 1
# }
# }
# }])
# Instead of aggregate, use find-count on status(with index) field.
def _count_for_status(collection, status):
total = collection.find({'status': status}).count()
return {'total': total, "_id": status} if total else None
c = self.database[collection_name]
ret = filter(
lambda x: x,
map(
lambda s: _count_for_status(c, s), [self.ACTIVE, self.SUCCESS, self.FAILED]
)
)
result = {}
if isinstance(ret, dict):
ret = ret.get('result', [])
for each in ret:
result[each['_id']] = each['total']
return result
def insert(self, project, taskid, obj={}):
if project not in self.projects:
self._create_project(project)
obj = dict(obj)
obj['taskid'] = taskid
obj['project'] = project
obj['updatetime'] = time.time()
return self.update(project, taskid, obj=obj)
def update(self, project, taskid, obj={}, **kwargs):
obj = dict(obj)
obj.update(kwargs)
obj['updatetime'] = time.time()
collection_name = self._collection_name(project)
return self.database[collection_name].update(
{'taskid': taskid},
{"$set": self._stringify(obj)},
upsert=True
)