/
utils.py
196 lines (157 loc) · 5.89 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# -*- coding: utf-8 -*-
# coding=utf-8
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
import array
from hashlib import md5
import os
import platform
import sys
import threading
PY2 = sys.version_info.major == 2
WIN = platform.system() == 'Windows'
if WIN:
datadir = os.path.join(os.environ['APPDATA'], 'azure-datalake-store')
else:
datadir = os.sep.join([os.path.expanduser("~"), '.config', 'azure-datalake-store'])
try:
os.makedirs(datadir)
except:
pass
def ensure_writable(b):
if PY2 and isinstance(b, array.array):
return b.tostring()
return b
def write_stdout(data):
""" Write bytes or strings to standard output
"""
try:
sys.stdout.buffer.write(data)
except AttributeError:
sys.stdout.write(data.decode('ascii', 'replace'))
def read_block(f, offset, length, delimiter=None):
""" Read a block of bytes from a file
Parameters
----------
fn: file object
a file object that supports seek, tell and read.
offset: int
Byte offset to start read
length: int
Maximum number of bytes to read
delimiter: bytes (optional)
Ensure reading stops at delimiter bytestring
If using the ``delimiter=`` keyword argument we ensure that the read
stops at or before the delimiter boundaries that follow the location
``offset + length``. For ADL, if no delimiter is found and the data
requested is > 4MB an exception is raised, since a single record cannot
exceed 4MB and be guaranteed to land contiguously in ADL.
The bytestring returned WILL include the
terminating delimiter string.
Examples
--------
>>> from io import BytesIO # doctest: +SKIP
>>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
>>> read_block(f, 0, 13) # doctest: +SKIP
b'Alice, 100\\nBo'
>>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
b'Alice, 100\\n'
>>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
b'\\nCharlie, 300'
>>> f = BytesIO(bytearray(2**22)) # doctest: +SKIP
>>> read_block(f,0,2**22, delimiter=b'\\n') # doctest: +SKIP
IndexError: No delimiter found within max record size of 4MB.
Transfer without specifying a delimiter (as binary) instead.
"""
f.seek(offset)
bytes = f.read(length)
if delimiter:
# max record size is 4MB
max_record = 2**22
if length > max_record:
raise IndexError('Records larger than ' + str(max_record) + ' bytes are not supported. The length requested was: ' + str(length) + 'bytes')
# get the last index of the delimiter if it exists
try:
last_delim_index = len(bytes) -1 - bytes[::-1].index(delimiter)
# this ensures the length includes all of the last delimiter (in the event that it is more than one character)
length = last_delim_index + len(delimiter)
return bytes[0:length]
except ValueError:
# TODO: Before delimters can be supported through the ADLUploader logic, the number of chunks being uploaded
# needs to be visible to this method, since it needs to throw if:
# 1. We cannot find a delimiter in <= 4MB of data
# 2. If the remaining size is less than 4MB but there are multiple chunks that need to be stitched together,
# since the delimiter could be split across chunks.
# 3. If delimiters are specified, there must be logic during segment determination that ensures all chunks
# terminate at the end of a record (on a new line), even if that makes the chunk < 256MB.
if length >= max_record:
raise IndexError('No delimiter found within max record size of ' + str(max_record) + ' bytes. Transfer without specifying a delimiter (as binary) instead.')
return bytes
def tokenize(*args, **kwargs):
""" Deterministic token
>>> tokenize('Hello') == tokenize('Hello')
True
"""
if kwargs:
args = args + (kwargs,)
return md5(str(tuple(args)).encode()).hexdigest()
def commonprefix(paths):
""" Find common directory for all paths
Python's ``os.path.commonprefix`` will not return a valid directory path in
some cases, so we wrote this convenience method.
Examples
--------
>>> # os.path.commonprefix returns '/disk1/foo'
>>> commonprefix(['/disk1/foobar', '/disk1/foobaz'])
'/disk1'
>>> commonprefix(['a/b/c', 'a/b/d', 'a/c/d'])
'a'
>>> commonprefix(['a/b/c', 'd/e/f', 'g/h/i'])
''
"""
return os.path.dirname(os.path.commonprefix(paths))
def clamp(n, smallest, largest):
""" Limit a value to a given range
This is equivalent to smallest <= n <= largest.
Examples
--------
>>> clamp(0, 1, 100)
1
>>> clamp(42, 2, 128)
42
>>> clamp(1024, 1, 32)
32
"""
return max(smallest, min(n, largest))
class CountUpDownLatch:
"""CountUpDownLatch provides a thread safe implementation of Up Down latch
"""
def __init__(self):
self.lock = threading.Condition()
self.val = 0
self.total = 0
def increment(self):
self.lock.acquire()
self.val += 1
self.total += 1
self.lock.release()
def decrement(self):
self.lock.acquire()
self.val -= 1
if self.val <= 0:
self.lock.notifyAll()
self.lock.release()
def total_processed(self):
self.lock.acquire()
temp = self.total
self.lock.release()
return temp
def is_zero(self):
self.lock.acquire()
while self.val > 0:
self.lock.wait()
self.lock.release()
return True