Skip to content

Commit

Permalink
Merge pull request #203 from Azure/cache-leak
Browse files Browse the repository at this point in the history
Cache leak
  • Loading branch information
milanchandna committed Feb 7, 2018
2 parents d18e47d + f016cf1 commit fa69e4b
Show file tree
Hide file tree
Showing 3 changed files with 417 additions and 42 deletions.
5 changes: 5 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
Release History
===============

0.0.18 (2018-02-05)
-------------------
* Fixed read issue where whole file was cached while doing positional reads #198
* Fixed readline as well for the same

0.0.17 (2017-09-21)
-------------------
* Fixed README.rst indentation error
Expand Down
102 changes: 62 additions & 40 deletions azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,8 @@ def __init__(self, azure, path, mode='rb', blocksize=2**25,
self.cache = b""
self.loc = 0
self.delimiter = delimiter
self.start = None
self.end = None
self.start = 0
self.end = 0
self.closed = False
self.trim = True
self.buffer = io.BytesIO()
Expand Down Expand Up @@ -731,16 +731,29 @@ def readline(self, length=-1):
If length is specified, at most size bytes will be read.
"""
self._fetch(self.loc, self.loc + 1)
if length < 0:
length = self.size

line = b""
while True:

# if cache has last bytes of file and its read, return line and exit loop
if self.end >= self.size and self.loc >= self.end:
return line

self._read_blocksize()

found = self.cache[self.loc - self.start:].find(b'\n') + 1
if length > 0 and found > length:
return self.read(length)
if found:
return self.read(found)
if self.end >= self.size:
return self.read(length)
self._fetch(self.start, self.end + self.blocksize)
partialLine = self.cache[self.loc-self.start: min(self.loc-self.start+found, self.loc-self.start+length)]
else:
partialLine = self.cache[self.loc-self.start:]

self.loc += len(partialLine)
line += partialLine

if found:
return line

def __next__(self):
out = self.readline()
Expand All @@ -758,28 +771,34 @@ def readlines(self):
return list(self)

def _fetch(self, start, end):
if self.start is None and self.end is None:
# First read
self.start = start
self.end = min(end + self.blocksize, self.size)
response = _fetch_range_with_retry(
self.azure.azure, self.path.as_posix(), start, self.end, filesessionid=self.filesessionid)
self.cache = getattr(response, 'content', response)
if start < self.start:
response = _fetch_range_with_retry(
self.azure.azure, self.path.as_posix(), start, self.start, filesessionid=self.filesessionid)
new = getattr(response, 'content', response)
self.start = start
self.cache = new + self.cache
if end > self.end:
if self.end >= self.size:
return
newend = min(self.size, end + self.blocksize)
response = _fetch_range_with_retry(
self.azure.azure, self.path.as_posix(), self.end, newend, filesessionid=self.filesessionid)
new = getattr(response, 'content', response)
self.end = newend
self.cache = self.cache + new
self.start = start
self.end = min(end, self.size)
response = _fetch_range_with_retry(
self.azure.azure, self.path.as_posix(), self.start, self.end, filesessionid=self.filesessionid)
self.cache = getattr(response, 'content', response)

def _read_blocksize(self, offset=-1):
"""
Reads next blocksize of data and updates the cache if read offset is not within cache otherwise nop
Parameters
----------
offset : int (-1)
offset from where to read; if <0, last read location or beginning of file.
:return:
"""
if offset < 0:
offset = self.loc
if offset >= self.size:
self.start = self.size
self.end = self.size
self.cache = b""
return
if offset >= self.start and offset < self.end:
return
if offset > self.size:
raise ValueError('Read offset is outside the File')
self._fetch(offset, offset + self.blocksize)

def read(self, length=-1):
"""
Expand All @@ -796,15 +815,18 @@ def read(self, length=-1):
length = self.size
if self.closed:
raise ValueError('I/O operation on closed file.')
self._fetch(self.loc, self.loc + length)
out = self.cache[self.loc - self.start:
self.loc - self.start + length]
self.loc += len(out)
if self.trim and self.blocksize:
num = (self.loc - self.start) // self.blocksize - 1
if num > 0:
self.start += self.blocksize * num
self.cache = self.cache[self.blocksize * num:]

out = b""
while length > 0:
self._read_blocksize()
data_read = self.cache[self.loc - self.start:
min(self.loc - self.start + length, self.end - self.start)]
out += data_read
self.loc += len(data_read)
length -= len(data_read)
if self.loc >= self.size:
length = 0

return out

read1 = read
Expand Down
Loading

0 comments on commit fa69e4b

Please sign in to comment.