/
tasks.py
147 lines (108 loc) · 4.53 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import os
import gzip
from datetime import timedelta
import tempfile
import shutil
from celery.task import Task, PeriodicTask
from irgsh.packager import Packager
from irgsh.builders.pbuilder import Pbuilder
from irgsh_node.conf import settings
from irgsh_node import manager, consts
class TaskCancelled(Exception):
pass
class BuildPackage(Task):
exchange = 'builder'
ignore_result = True
def run(self, spec_id, specification, distribution, **kwargs):
logger = None
try:
task_id = self.request.id
# Check latest status
self.check_spec_status(spec_id)
# Try to claim task
self.claim(task_id)
# Prepare directories
self.update_status(task_id, manager.PREPARING)
taskdir = os.path.join(settings.RESULT_DIR, task_id)
logdir = os.path.join(taskdir, 'logs')
resultdir = os.path.join(taskdir, 'result')
metadir = os.path.join(taskdir, 'meta')
for dirname in [logdir, resultdir, metadir]:
if not os.path.exists(dirname):
os.makedirs(dirname)
# Prepare logger
logname = os.path.join(logdir, 'build.log')
logger = open(logname, 'wb')
# Execute builder
self._run(spec_id, task_id, taskdir, distribution, specification,
resultdir, logger, kwargs)
except TaskCancelled, e:
self.update_status(task_id, manager.CANCELLED)
finally:
if logger is not None:
logger.close()
logger = open(logname, 'rb')
gzlogname = os.path.join(logdir, 'build.log.gz')
gz = gzip.open(gzlogname, 'wb')
gz.write(logger.read())
gz.close()
logger.close()
os.unlink(logname)
self.upload_log(task_id)
def _run(self, spec_id, task_id, taskdir, distribution, specification,
resultdir, logger, kwargs):
clog = self.get_logger(**kwargs)
# Create and prepare builder (pbuilder)
pbuilder_path = settings.PBUILDER_PATH
keyring = os.path.abspath(settings.KEYRING)
builder = Pbuilder(distribution, pbuilder_path, keyring=keyring)
# Build package
clog.info('Building package %s for %s' % (specification.source,
distribution.name))
self.update_status(task_id, manager.BUILDING)
packager = Packager(specification, builder)
changes = packager.build(resultdir, logger)
self.upload_package(task_id, distribution, changes)
def on_success(self, retval, task_id, args, kwargs):
spec_id, specification, distribution = args
self.update_status(task_id, manager.BUILT)
clog = self.get_logger(**kwargs)
clog.info('Package built successfully')
def on_failure(self, exc, task_id, args, kwargs, einfo=None):
spec_id, specification, distribution = args
self.update_status(task_id, manager.FAILED)
clog = self.get_logger(**kwargs)
clog.info('Package failed to build')
def upload_package(self, task_id, distribution, changes):
extra = {'distribution': {'name': distribution.name,
'mirror': distribution.mirror,
'dist': distribution.dist,
'components': distribution.components,
'extra': distribution.extra}}
self.upload(task_id, 'result/%s' % changes, consts.TYPE_RESULT, extra)
def update_status(self, task_id, status):
try:
manager.update_status(task_id, status)
except IOError:
pass
def upload_log(self, task_id):
self.upload(task_id, 'logs/build.log.gz', consts.TYPE_LOG)
def upload(self, task_id, path, content_type, extra={}):
from irgsh_node.localqueue import Queue
data = {}
data.update(extra)
data.update({'task_id': task_id,
'path': path,
'content_type': content_type})
queue = Queue(settings.LOCAL_DATABASE)
queue.put(data)
def check_spec_status(self, spec_id):
res = manager.get_spec_status(spec_id)
if res['code'] < 0:
raise TaskCancelled()
return True
def claim(self, task_id):
res = manager.claim_task(task_id)
if res['code'] < 0:
raise TaskCancelled()
return True