Skip to content
This repository has been archived by the owner on Feb 10, 2021. It is now read-only.

Commit

Permalink
Flake8, add flake8 to tests. (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcrist committed Nov 14, 2017
1 parent 5fbe948 commit fd8a664
Show file tree
Hide file tree
Showing 14 changed files with 118 additions and 76 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ install:
script:
- pwd
- docker exec -it $CONTAINER_ID $DOCKER_ENV/bin/py.test hdfs3 -s -vv
- docker exec -it $CONTAINER_ID $DOCKER_ENV/bin/flake8 hdfs3
5 changes: 3 additions & 2 deletions continuous_integration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ ENV PATH /opt/conda/bin:$PATH

# hdfs3 - python
ENV LIBHDFS3_CONF /etc/hadoop/conf/hdfs-site.xml
RUN conda install -y -q ipython pytest
RUN conda install -y -q ipython pytest flake8 -c conda-forge
RUN conda install -y -q libhdfs3 -c conda-forge

RUN conda create -y -n py3 python=3.5
RUN conda install -y -n py3 ipython pytest
RUN conda install -y -n py3 ipython pytest flake8 -c conda-forge
RUN conda install -y -n py3 libhdfs3 -c conda-forge

# Cloudera repositories
Expand Down
1 change: 1 addition & 0 deletions hdfs3/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import absolute_import

from .conf import conf
from .core import HDFileSystem, HDFile
Expand Down
10 changes: 6 additions & 4 deletions hdfs3/compatibility.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# flake8: noqa
import sys

if sys.version_info < (3,):
PY3 = sys.version_info.major == 3
PY2 = not PY3


if PY2:
class ConnectionError(OSError):
"""Connection to HDFS failed."""

Expand All @@ -10,13 +14,11 @@ class ConnectionError(OSError):
from urlparse import urlparse
unicode = unicode
bytes = str
PY3 = False

else:
ConnectionError = ConnectionError
PermissionError = PermissionError
FileNotFoundError = FileNotFoundError
from urllib.parse import urlparse
unicode = str
bytes = bytes
PY3 = True

5 changes: 3 additions & 2 deletions hdfs3/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@


def hdfs_conf(confd, more_files=None):
""" Load HDFS config from default locations.
""" Load HDFS config from default locations.
Parameters
----------
confd: str
Expand Down Expand Up @@ -113,4 +113,5 @@ def guess_config():
if os.path.exists(os.path.join(d, 'hdfs-site.xml')):
os.environ['LIBHDFS3_CONF'] = os.path.join(d, 'hdfs-site.xml')


guess_config()
82 changes: 46 additions & 36 deletions hdfs3/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import posixpath
from collections import deque

from .compatibility import FileNotFoundError, ConnectionError, PY3
from .compatibility import FileNotFoundError, ConnectionError
from .conf import conf
from .utils import (read_block, seek_delimiter, ensure_bytes, ensure_string,
ensure_trailing_slash, MyNone)
Expand Down Expand Up @@ -68,7 +68,7 @@ def __init__(self, host=MyNone, port=MyNone, connect=True, autoconf=True,

self._handle = None

if self.conf.get('ticket_cache', None) and self.conf.get('token', None):
if self.conf.get('ticket_cache') and self.conf.get('token'):
m = "It is not possible to use ticket_cache and token at same time"
raise RuntimeError(m)

Expand All @@ -86,7 +86,6 @@ def __setstate__(self, state):
self._handle = None
self.connect()


def connect(self):
""" Connect to the name node
Expand All @@ -110,7 +109,7 @@ def connect(self):
o = _lib.hdfsNewBuilder()

_lib.hdfsBuilderSetNameNode(o, ensure_bytes(conf.pop('host')))

port = conf.pop('port', None)
if port is not None:
_lib.hdfsBuilderSetNameNodePort(o, port)
Expand Down Expand Up @@ -163,7 +162,7 @@ def delegate_token(self, user=None):
def renew_token(self, token=None):
"""
Renew delegation token
Parameters
----------
token: str or None
Expand All @@ -182,7 +181,7 @@ def renew_token(self, token=None):
def cancel_token(self, token=None):
"""
Revoke delegation token
Parameters
----------
token: str or None
Expand All @@ -202,7 +201,8 @@ def cancel_token(self, token=None):
def disconnect(self):
""" Disconnect from name node """
if self._handle:
logger.debug("Disconnect from handle %d", self._handle.contents.filesystem)
logger.debug("Disconnect from handle %d",
self._handle.contents.filesystem)
_lib.hdfsDisconnect(self._handle)
self._handle = None

Expand Down Expand Up @@ -261,7 +261,9 @@ def df(self):
""" Used/free disc space on the HDFS system """
cap = _lib.hdfsGetCapacity(self._handle)
used = _lib.hdfsGetUsed(self._handle)
return {'capacity': cap, 'used': used, 'percent-free': 100*(cap-used)/cap}
return {'capacity': cap,
'used': used,
'percent-free': 100 * (cap - used) / cap}

def get_block_locations(self, path, start=0, length=0):
""" Fetch physical locations of blocks """
Expand All @@ -270,9 +272,11 @@ def get_block_locations(self, path, start=0, length=0):
start = int(start) or 0
length = int(length) or self.info(path)['size']
nblocks = ctypes.c_int(0)
out = _lib.hdfsGetFileBlockLocations(self._handle, ensure_bytes(path),
ctypes.c_int64(start), ctypes.c_int64(length),
ctypes.byref(nblocks))
out = _lib.hdfsGetFileBlockLocations(self._handle,
ensure_bytes(path),
ctypes.c_int64(start),
ctypes.c_int64(length),
ctypes.byref(nblocks))
locs = []
for i in range(nblocks.value):
block = out[i]
Expand Down Expand Up @@ -347,20 +351,19 @@ def glob(self, path):
pass
if '/' in path[:path.index('*')]:
ind = path[:path.index('*')].rindex('/')
root = path[:ind+1]
root = path[:ind + 1]
else:
root = '/'
allfiles = []
allpaths = []
for dirname, dirs, fils in self.walk(root):
allfiles.extend(posixpath.join(dirname, d) for d in dirs)
allfiles.extend(posixpath.join(dirname, f) for f in fils)
allpaths.extend(posixpath.join(dirname, d) for d in dirs)
allpaths.extend(posixpath.join(dirname, f) for f in fils)
pattern = re.compile("^" + path.replace('//', '/')
.rstrip('/')
.replace('*', '[^/]*')
.replace('?', '.') + "$")
out = [f for f in allfiles
if pattern.match(f.replace('//', '/').rstrip('/'))]
return out
return [p for p in allpaths
if pattern.match(p.replace('//', '/').rstrip('/'))]

def ls(self, path, detail=False):
""" List files at path
Expand Down Expand Up @@ -440,20 +443,21 @@ def mv(self, path1, path2):
""" Move file at path1 to path2 """
if not self.exists(path1):
raise FileNotFoundError(path1)
out = _lib.hdfsRename(self._handle, ensure_bytes(path1), ensure_bytes(path2))
out = _lib.hdfsRename(self._handle, ensure_bytes(path1),
ensure_bytes(path2))
return out == 0

def concat(self, destination, paths):
"""Concatenate inputs to destination
Source files *should* all have the same block size and replication.
The destination file must be in the same directory as
the source files. If the target exists, it will be appended to.
Some HDFSs impose that the target file must exist and be an exact
number of blocks long, and that each concated file except the last
is also a whole number of blocks.
The source files are deleted on successful
completion.
"""
Expand All @@ -478,7 +482,7 @@ def rm(self, path, recursive=True):

def exists(self, path):
""" Is there an entry at path? """
out = _lib.hdfsExists(self._handle, ensure_bytes(path) )
out = _lib.hdfsExists(self._handle, ensure_bytes(path))
return out == 0

def chmod(self, path, mode):
Expand All @@ -498,9 +502,14 @@ def chmod(self, path, mode):
Examples
--------
>>> hdfs.chmod('/path/to/file', 0o777) # make read/writeable to all # doctest: +SKIP
>>> hdfs.chmod('/path/to/file', 0o700) # make read/writeable only to user # doctest: +SKIP
>>> hdfs.chmod('/path/to/file', 0o100) # make read-only to user # doctest: +SKIP
Make read/writeable to all
>>> hdfs.chmod('/path/to/file', 0o777) # doctest: +SKIP
Make read/writeable only to user
>>> hdfs.chmod('/path/to/file', 0o700) # doctest: +SKIP
Make read-only to user
>>> hdfs.chmod('/path/to/file', 0o100) # doctest: +SKIP
"""
if not self.exists(path):
raise FileNotFoundError(path)
Expand All @@ -514,8 +523,8 @@ def chown(self, path, owner, group):
""" Change owner/group """
if not self.exists(path):
raise FileNotFoundError(path)
out = _lib.hdfsChown(self._handle, ensure_bytes(path), ensure_bytes(owner),
ensure_bytes(group))
out = _lib.hdfsChown(self._handle, ensure_bytes(path),
ensure_bytes(owner), ensure_bytes(group))
if out != 0:
msg = ensure_string(_lib.hdfsGetLastError()).split('\n')[0]
raise IOError("chown failed on %s %s" % (path, msg))
Expand All @@ -530,7 +539,7 @@ def cat(self, path):

def get(self, hdfs_path, local_path, blocksize=2**16):
""" Copy HDFS file to local """
#TODO: _lib.hdfsCopy() may do this more efficiently
# TODO: _lib.hdfsCopy() may do this more efficiently
if not self.exists(hdfs_path):
raise FileNotFoundError(hdfs_path)
with self.open(hdfs_path, 'rb') as f:
Expand All @@ -551,15 +560,16 @@ def getmerge(self, path, filename, blocksize=2**16):
out = f.read(blocksize)
f2.write(out)

def put(self, filename, path, chunk=2**16, replication=0,block_size=0):
def put(self, filename, path, chunk=2**16, replication=0, block_size=0):
""" Copy local file to path in HDFS """
with self.open(path, 'wb',replication=replication,block_size=block_size) as f:
with open(filename, 'rb') as f2:
with self.open(path, 'wb', replication=replication,
block_size=block_size) as target:
with open(filename, 'rb') as source:
while True:
out = f2.read(chunk)
out = source.read(chunk)
if len(out) == 0:
break
f.write(out)
target.write(out)

def tail(self, path, size=1024):
""" Return last bytes of file """
Expand Down Expand Up @@ -746,10 +756,10 @@ def readline(self, chunksize=2**8, lineterminator='\n'):
line-terminator).
Line iteration uses this method internally.
Note: this function requires many calls to HDFS and is slow; it is
in general better to wrap an HDFile with an ``io.TextIOWrapper`` for
buffering, text decoding and newline support.
buffering, text decoding and newline support.
"""
lineterminator = ensure_bytes(lineterminator)
start = self.tell()
Expand Down
25 changes: 15 additions & 10 deletions hdfs3/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
_lib = ct.cdll.LoadLibrary(name)
break
except OSError as e:
if not e.args or ("image not found" not in str(e.args[0])
and "No such file" not in str(e)):
if not e.args or ("image not found" not in str(e.args[0]) and
"No such file" not in str(e)):
raise
if _lib is None:
raise ImportError("Can not find the shared library: libhdfs3.so\n"
Expand Down Expand Up @@ -59,16 +59,17 @@ class EncryptionZoneInfo(ct.Structure):
class FileInfo(ct.Structure):
_fields_ = [('kind', ct.c_int8),
('name', ct.c_char_p),
('last_mod', ct.c_int64), #time_t, could be 32bit
('last_mod', ct.c_int64), # time_t, could be 32bit
('size', ct.c_int64),
('replication', ct.c_short),
('block_size', ct.c_int64),
('owner', ct.c_char_p),
('group', ct.c_char_p),
('permissions', ct.c_short), #view as octal
('last_access', ct.c_int64), #time_t, could be 32bit
('permissions', ct.c_short), # view as octal
('last_access', ct.c_int64), # time_t, could be 32bit
('encryption_info', ct.POINTER(EncryptionFileInfo))]


hdfsGetPathInfo = _lib.hdfsGetPathInfo
hdfsGetPathInfo.argtypes = [ct.c_char_p]
hdfsGetPathInfo.restype = ct.POINTER(FileInfo)
Expand All @@ -93,6 +94,7 @@ class hdfsFile(ct.Structure):
class hdfsFS(ct.Structure):
_fields_ = [('filesystem', ct.c_void_p)] # TODO: expand this if needed


hdfsGetFileBlockLocations = _lib.hdfsGetFileBlockLocations
hdfsGetFileBlockLocations.argtypes = [ct.POINTER(hdfsFS), ct.c_char_p,
tOffset, tOffset, ct.POINTER(ct.c_int)]
Expand Down Expand Up @@ -279,8 +281,11 @@ class hdfsFS(ct.Structure):
Open a hdfs file in given mode.
param fs The configured filesystem handle.
param path The full path to the file.
param flags - an | of bits/fcntl.h file flags - supported flags are O_RDONLY, O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
O_WRONLY|O_APPEND and O_SYNC. Other flags are generally ignored other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and set errno equal ENOTSUP.
param flags An | of bits/fcntl.h file flags. Supported flags are O_RDONLY,
O_WRONLY (meaning create or overwrite i.e., implies O_TRUNCAT),
O_WRONLY|O_APPEND and O_SYNC. Other flags are generally ignored
other than (O_RDWR || (O_EXCL & O_CREAT)) which return NULL and
set errno equal ENOTSUP.
param bufferSize Size of buffer for read/write - pass 0 if you want
to use the default configured values.
param replication Block replication - pass 0 if you want to use
Expand Down Expand Up @@ -348,7 +353,7 @@ class hdfsFS(ct.Structure):

hdfsWrite = _lib.hdfsWrite
hdfsWrite.argtypes = [ct.POINTER(hdfsFS), ct.POINTER(hdfsFile), ct.c_void_p,
tSize]
tSize]
hdfsWrite.restype = tSize
hdfsWrite.__doc__ = """Write data into an open file.
Expand Down Expand Up @@ -409,7 +414,7 @@ class hdfsFS(ct.Structure):
hdfsConcat.argtypes = [ct.POINTER(hdfsFS), ct.c_char_p,
ct.POINTER(ct.c_char_p)]
hdfsConcat.__doc__ = """Concatenate (move) the blocks in a list of source
files into a single file deleting the source files.
files into a single file deleting the source files.
Source files must all have the same block size and replication and all
but the last source file must be an integer number of full
Expand Down Expand Up @@ -660,5 +665,5 @@ class hdfsFS(ct.Structure):
param fs The configured filesystem handle.
param path The path of the directory.
param keyname The key name of the encryption zone
param keyname The key name of the encryption zone
return Returns 0 on success, -1 on error."""
4 changes: 2 additions & 2 deletions hdfs3/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def __init__(self, hdfs, root, check=False):
hdfs.mkdir(root)
if check:
hdfs.ls(root)
hdfs.touch(root+'/a')
hdfs.rm(root+'/a')
hdfs.touch(root + '/a')
hdfs.rm(root + '/a')

def clear(self):
"""Remove all keys below root - empties out mapping
Expand Down
2 changes: 1 addition & 1 deletion hdfs3/tests/test_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,4 @@ def test_token_and_ticket_cache_in_same_time():
<value>false</value>
</property>
</configuration>
"""
"""

0 comments on commit fd8a664

Please sign in to comment.