Skip to content

Commit

Permalink
support lz4 if lib is installed.
Browse files Browse the repository at this point in the history
  • Loading branch information
wjo1212 committed Jan 15, 2019
1 parent c8e829c commit 714686a
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions aliyun/log/logclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
"""

try:
import logservice_lz4
import lz4framed as lz4

def lz_decompress(raw_size, data):
prefix = b'\x04"M\x18h@' + struct.pack('<I', raw_size) + b'\x00\x00\x00\x00p\xb4\x02\x00\x00'
return lz4.decompress(prefix + data + b'\x00\x00\x00\x00')

except ImportError:
pass
lz4 = None

import json
import requests
Expand Down Expand Up @@ -48,6 +53,7 @@
from .log_logs_raw_pb2 import LogGroupRaw as LogGroup
from .external_store_config import ExternalStoreConfig
from .external_store_config_response import *
import struct

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -305,14 +311,13 @@ def put_log_raw(self, project, logstore, log_group, compress=None):
raw_body_size = len(body)
headers = {'x-log-bodyrawsize': str(raw_body_size), 'Content-Type': 'application/x-protobuf'}

# if raw_body_size > 5 * 1024 * 1024: # 10 MB
# raise LogException('InvalidLogSize',
# "logItems' size exceeds maximum limitation: 5 MB. now: {0} MB.".format(
# raw_body_size / 1024.0 / 1024))

if compress is None or compress:
headers['x-log-compresstype'] = 'deflate'
body = zlib.compress(body)
if lz4:
headers['x-log-compresstype'] = 'lz4'
body = lz4.compress(body)[19:-4]
else:
headers['x-log-compresstype'] = 'deflate'
body = zlib.compress(body)

params = {}
resource = '/logstores/' + logstore + "/shards/lb"
Expand Down Expand Up @@ -370,8 +375,12 @@ def put_logs(self, request):

compress_data = None
if is_compress:
headers['x-log-compresstype'] = 'deflate'
compress_data = zlib.compress(body)
if lz4:
headers['x-log-compresstype'] = 'lz4'
compress_data = lz4.compress(body)[19:-4]
else:
headers['x-log-compresstype'] = 'deflate'
compress_data = zlib.compress(body)

params = {}
logstore = request.get_logstore()
Expand Down Expand Up @@ -801,7 +810,10 @@ def pull_logs(self, project_name, logstore_name, shard_id, cursor, count=None, e

headers = {}
if compress is None or compress:
headers['Accept-Encoding'] = 'gzip'
if lz4:
headers['Accept-Encoding'] = 'lz4'
else:
headers['Accept-Encoding'] = 'gzip'
else:
headers['Accept-Encoding'] = ''

Expand All @@ -820,8 +832,11 @@ def pull_logs(self, project_name, logstore_name, shard_id, cursor, count=None, e
compress_type = Util.h_v_td(header, 'x-log-compresstype', '').lower()
if compress_type == 'lz4':
raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize'))
raw_data = logservice_lz4.uncompress(raw_size, resp)
return PullLogResponse(raw_data, header)
if lz4:
raw_data = lz_decompress(raw_size, resp)
return PullLogResponse(raw_data, header)
else:
raise LogException("ClientHasNoLz4", "There's no Lz4 lib available to decompress the response", resp_header=header, resp_body=resp)
elif compress_type in ('gzip', 'deflate'):
raw_size = int(Util.h_v_t(header, 'x-log-bodyrawsize'))
raw_data = zlib.decompress(resp)
Expand Down

0 comments on commit 714686a

Please sign in to comment.