Skip to content

Commit

Permalink
Concat should send source files in json (#248)
Browse files Browse the repository at this point in the history
* Json supported requests to fix msconcat issues with symbols in source files.

* Changed api-version to 2018-05-01 for all.
  • Loading branch information
akharit committed Oct 15, 2018
1 parent b7afbb7 commit 0e358c9
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 28 deletions.
10 changes: 7 additions & 3 deletions azure/datalake/store/core.py
Expand Up @@ -20,6 +20,7 @@
import sys
import time
import uuid
import json


# local imports
Expand Down Expand Up @@ -540,11 +541,14 @@ def concat(self, outfile, filelist, delete_source=False):
directory, and delete that whole directory when done.
"""
outfile = AzureDLPath(outfile).trim()
filelist = ','.join(AzureDLPath(f).as_posix() for f in filelist)
delete = 'true' if delete_source else 'false'
sourceList = [AzureDLPath(f).as_posix() for f in filelist]
sources = {}
sources["sources"] = sourceList
self.azure.call('MSCONCAT', outfile.as_posix(),
data='sources='+filelist,
deleteSourceDirectory=delete)
data=bytearray(json.dumps(sources,separators=(',', ':')), encoding="utf-8"),
deleteSourceDirectory=delete,
headers={'Content-Type': "application/json"},)
self.invalidate_cache(outfile)

merge = concat
Expand Down
40 changes: 21 additions & 19 deletions azure/datalake/store/lib.py
Expand Up @@ -211,7 +211,7 @@ class DatalakeRESTInterface:
url_suffix: str (None)
Domain to send REST requests to. The end-point URL is constructed
using this and the store_name. If None, use default.
api_version: str (2016-11-01)
api_version: str (2018-05-01)
The API version to target with requests. Changing this value will
change the behavior of the requests, and can cause unexpected behavior or
breaking changes. Changes to this value should be undergone with caution.
Expand Down Expand Up @@ -246,7 +246,7 @@ class DatalakeRESTInterface:
}

def __init__(self, store_name=default_store, token=None,
url_suffix=default_adls_suffix, api_version='2016-11-01', **kwargs):
url_suffix=default_adls_suffix, api_version='2018-05-01', **kwargs):
# in the case where an empty string is passed for the url suffix, it must be replaced with the default.
url_suffix = url_suffix or default_adls_suffix
self.local = threading.local()
Expand Down Expand Up @@ -288,7 +288,7 @@ def _check_token(self):
self.head = {'Authorization': cur_session.headers['Authorization']}
self.local.session = None

def _log_request(self, method, url, op, path, params, headers, retry_count):
def _log_request(self, method, url, op, path, params, headers, retry_count):
msg = "HTTP Request\n{} {}\n".format(method.upper(), url)
msg += "{} '{}' {}\n\n".format(
op, path,
Expand Down Expand Up @@ -334,7 +334,7 @@ def _is_json_response(self, response):
return False
return response.headers['content-type'].startswith('application/json')

def call(self, op, path='', is_extended=False, expected_error_code=None, retry_policy=None, **kwargs):
def call(self, op, path='', is_extended=False, expected_error_code=None, retry_policy=None, headers = {}, **kwargs):
""" Execute a REST call
Parameters
Expand Down Expand Up @@ -389,15 +389,16 @@ def call(self, op, path='', is_extended=False, expected_error_code=None, retry_p
retry_count += 1
last_exception = None
try:
response = self.__call_once(method,
url,
params,
data,
stream,
request_id,
retry_count,
op,
path,
response = self.__call_once(method=method,
url=url,
params=params,
data=data,
stream=stream,
request_id=request_id,
retry_count=retry_count,
op=op,
path=path,
headers=headers,
**kwargs)
except requests.exceptions.RequestException as e:
last_exception = e
Expand Down Expand Up @@ -449,13 +450,14 @@ def is_successful_response(self, response, exception):
return True
return False

def __call_once(self, method, url, params, data, stream, request_id, retry_count, op, path='', **kwargs):
def __call_once(self, method, url, params, data, stream, request_id, retry_count, op, path='', headers={}, **kwargs):
func = getattr(self.session, method)
headers = self.head.copy()
headers['x-ms-client-request-id'] = request_id + "." + str(retry_count)
headers['User-Agent'] = self.user_agent
self._log_request(method, url, op, urllib.quote(path), kwargs, headers, retry_count)
return func(url, params=params, headers=headers, data=data, stream=stream)
req_headers = self.head.copy()
req_headers['x-ms-client-request-id'] = request_id + "." + str(retry_count)
req_headers['User-Agent'] = self.user_agent
req_headers.update(headers)
self._log_request(method, url, op, urllib.quote(path), kwargs, req_headers, retry_count)
return func(url, params=params, headers=req_headers, data=data, stream=stream)

def __getstate__(self):
state = self.__dict__.copy()
Expand Down
15 changes: 9 additions & 6 deletions tests/test_core.py
Expand Up @@ -145,18 +145,21 @@ def test_seek(azure):

@my_vcr.use_cassette
def test_concat(azure):
with azure.open(a, 'wb') as f:
aplus = a + "+file1"
bplus = b + "+file2"
cplus = c + "+res"
with azure.open(aplus, 'wb') as f:
f.write(b'hello ')
with azure.open(b, 'wb') as f:
with azure.open(bplus, 'wb') as f:
f.write(b'world')
try:
azure.rm(c)
azure.rm(cplus)
except:
pass
azure.concat(c, [a, b])

out = azure.cat(c)
azure.rm(c)
azure.concat(cplus, [aplus, bplus])
out = azure.cat(cplus)
azure.rm(cplus)

assert out == b'hello world'

Expand Down

0 comments on commit 0e358c9

Please sign in to comment.