-
Notifications
You must be signed in to change notification settings - Fork 1
/
storage.py
130 lines (95 loc) · 3.14 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from gcloud import storage
import io
import logging
import os
from . import GCLOUD, LOCAL
logger = logging.getLogger(__name__)
def _makedirs(dpath):
if not os.path.exists(dpath):
os.makedirs(dpath)
return dpath
class LocalData(object):
def __init__(self, name, root):
self.name = name
self.root = root
@property
def path(self):
return os.path.join(self.root, self.name)
class LocalBlob(LocalData):
def upload_from_string(self, bstream, content_type):
"""Upload data as a bytestring.
Parameters
----------
bstream : bytes
Bytestring to upload.
content_type : str
Not used; preserved for consistency with gcloud.storage.
"""
# obj = dict(bstream=bstream, content_type=content_type)
with open(self.path, 'wb') as fp:
fp.write(bstream)
def download_as_string(self):
"""Upload data as a bytestring.
Returns
-------
bstream : bytes
Bytestring format of the data.
"""
with open(self.path, 'rb') as fp:
fdata = fp.read()
return fdata
class LocalBucket(LocalData):
def __init__(self, name, root):
super(LocalBucket, self).__init__(name, root)
def blob(self, name):
return LocalBlob(name, root=_makedirs(self.path))
def get_blob(self, name):
return LocalBlob(name, root=self.path)
class LocalClient(object):
def __init__(self, project_id, root_dir):
self.project_id = project_id
self.root_dir = _makedirs(root_dir)
def get_bucket(self, name):
return LocalBucket(name=name, root=self.root_dir)
BACKENDS = {
GCLOUD: storage.Client,
LOCAL: LocalClient
}
class Storage(object):
def __init__(self, name, project_id, backend=GCLOUD,
local_dir=None):
if backend == LOCAL and local_dir is None:
raise ValueError(
"`local_dir` must be given if backend is '{}'".format(LOCAL))
self.name = name
self.project_id = project_id
self._backend = backend
self._backend_kwargs = dict(project_id=project_id)
if self._backend == LOCAL:
self._backend_kwargs.update(
root_dir=os.path.abspath(os.path.expanduser(local_dir)))
@property
def client(self):
return BACKENDS[self._backend](**self._backend_kwargs)
def upload(self, fdata, key):
"""Upload a local file to GCS.
Parameters
----------
fdata : str
File's bytestream.
key : str
Key for writing the file data.
Returns
-------
nothing?
Not sure what a sane response object is here.
"""
logger.debug("Uploading {} bytes to {}.".format(len(fdata), key))
bucket = self.client.get_bucket(self.name)
blob = bucket.blob(key)
blob.upload_from_string(fdata, content_type="application/octet-stream")
return
def download(self, key):
bucket = self.client.get_bucket(self.name)
blob = bucket.get_blob(key)
return blob.download_as_string()