/
s3_mavenindex.py
69 lines (55 loc) · 2.48 KB
/
s3_mavenindex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""S3 storage for maven index."""
import botocore
import os
from tempfile import TemporaryDirectory
from f8a_worker.errors import TaskError
from f8a_worker.process import Archive
from . import AmazonS3
class S3MavenIndex(AmazonS3):
"""S3 storage for maven index."""
_INDEX_DIRNAME = 'central-index'
_INDEX_ARCHIVE = _INDEX_DIRNAME + '.zip'
_LAST_OFFSET_OBJECT_KEY = 'last_offset.json'
_DEFAULT_LAST_OFFSET = 0
def store_index(self, target_dir):
"""Zip files in target_dir/central-index dir and store to S3."""
with TemporaryDirectory() as temp_dir:
central_index_dir = os.path.join(target_dir, self._INDEX_DIRNAME)
archive_path = os.path.join(temp_dir, self._INDEX_ARCHIVE)
try:
Archive.zip_file(central_index_dir, archive_path, junk_paths=True)
except TaskError:
pass
else:
self.store_file(archive_path, self._INDEX_ARCHIVE)
def retrieve_index_if_exists(self, target_dir):
"""Retrieve central-index.zip from S3 and extract into target_dir/central-index."""
if self.object_exists(self._INDEX_ARCHIVE):
with TemporaryDirectory() as temp_dir:
archive_path = os.path.join(temp_dir, self._INDEX_ARCHIVE)
central_index_dir = os.path.join(target_dir, self._INDEX_DIRNAME)
self.retrieve_file(self._INDEX_ARCHIVE, archive_path)
Archive.extract_zip(archive_path, central_index_dir, mkdest=True)
return True
return False
def get_last_offset(self):
"""Get used offset pointing to maven index checker.
Last offset is used in jobs service to schedule new releases of maven packages.
We need to store offset that was used for the last time in order to keep track
where we left off.
"""
try:
content = self.retrieve_dict(self._LAST_OFFSET_OBJECT_KEY)
except botocore.exceptions.ClientError as exc:
if exc.response['Error']['Code'] == 'NoSuchKey':
return self._DEFAULT_LAST_OFFSET
else:
# Some another error, not no such file
raise
return content.get('last_offset', self._DEFAULT_LAST_OFFSET)
def set_last_offset(self, offset):
"""Set used offset pointing to maven index checker."""
content = {
'last_offset': offset
}
self.store_dict(content, self._LAST_OFFSET_OBJECT_KEY)