@@ -352,33 +352,52 @@ def fetched_records(self):
352352 position )
353353 return dict (drained )
354354
355- def _unpack_message_set (self , tp , messages , relative_offset = 0 ):
355+ def _unpack_message_set (self , tp , messages ):
356356 try :
357357 for offset , size , msg in messages :
358358 if self .config ['check_crcs' ] and not msg .validate_crc ():
359359 raise Errors .InvalidMessageError (msg )
360360 elif msg .is_compressed ():
361- mset = msg .decompress ()
362- # new format uses relative offsets for compressed messages
361+ # If relative offset is used, we need to decompress the entire message first to compute
362+ # the absolute offset.
363+ inner_mset = msg .decompress ()
363364 if msg .magic > 0 :
364- last_offset , _ , _ = mset [- 1 ]
365- relative = offset - last_offset
365+ last_offset , _ , _ = inner_mset [- 1 ]
366+ absolute_base_offset = offset - last_offset
366367 else :
367- relative = 0
368- for record in self ._unpack_message_set (tp , mset , relative ):
369- yield record
368+ absolute_base_offset = - 1
369+
370+ for inner_offset , inner_size , inner_msg in inner_mset :
371+ if msg .magic > 0 :
372+ # When magic value is greater than 0, the timestamp
373+ # of a compressed message depends on the
374+ # typestamp type of the wrapper message:
375+
376+ if msg .timestamp_type == 0 : # CREATE_TIME (0)
377+ inner_timestamp = inner_msg .timestamp
378+
379+ elif msg .timestamp_type == 1 : # LOG_APPEND_TIME (1)
380+ inner_timestamp = msg .timestamp
381+
382+ else :
383+ raise ValueError ('Unknown timestamp type: {}' .format (msg .timestamp_type ))
384+ else :
385+ inner_timestamp = msg .timestamp
386+
387+ if absolute_base_offset >= 0 :
388+ inner_offset += absolute_base_offset
389+
390+ key , value = self ._deserialize (inner_msg )
391+ yield ConsumerRecord (tp .topic , tp .partition , inner_offset ,
392+ inner_timestamp , msg .timestamp_type ,
393+ key , value )
394+
370395 else :
371- # Message v1 adds timestamp
372- if msg .magic > 0 :
373- timestamp = msg .timestamp
374- timestamp_type = msg .timestamp_type
375- else :
376- timestamp = timestamp_type = None
377396 key , value = self ._deserialize (msg )
378- yield ConsumerRecord (tp .topic , tp .partition ,
379- offset + relative_offset ,
380- timestamp , timestamp_type ,
397+ yield ConsumerRecord (tp .topic , tp .partition , offset ,
398+ msg .timestamp , msg .timestamp_type ,
381399 key , value )
400+
382401 # If unpacking raises StopIteration, it is erroneously
383402 # caught by the generator. We want all exceptions to be raised
384403 # back to the user. See Issue 545
0 commit comments